# Instalación de Dependencias e Importación de Librerías
Instalación de PyWavelets, fCWT y otras dependencias necesarias. Importación de todas las bibliotecas requeridas para procesamiento de señales, visualización, construcción de espectrogramas de Mel y aprendizaje profundo.

In [None]:
# Instalación de dependencias necesarias
!pip install PyWavelets
!pip install librosa
!git clone https://github.com/fastlib/fCWT.git
!pip install fCWT
!apt-get update
!apt-get install libfftw3-single3 -y
!pip install opencv-python

# Importación de bibliotecas necesarias
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import seaborn as sns
import random
import h5py
import cv2
import pywt
import fcwt
import librosa
import librosa.display
import scipy
from scipy import signal
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, classification_report
from tensorflow import keras
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tqdm.notebook import tqdm
from IPython.display import display

# Configuraciones globales para visualización
plt.style.use('seaborn-v0_8-whitegrid')

# Carga y Visualización de Datos
Carga de datos de acelerómetro desde Google Drive, con opciones para clasificación binaria o multiclase (cinco tipos). Visualización de las señales originales para explorar las diferencias entre distintos tipos de fugas.

In [None]:
# Definir la ruta de los datos en Google Drive
data_dir = '/content/drive/MyDrive/Tesis/Accelerometer_Dataset/Branched'
original_sr = 25600  # Frecuencia de muestreo original en Hz
signal_sr = 25600  # Frecuencia de muestreo deseada en Hz
downsample_factor = original_sr // signal_sr

# Modo de clasificación: 'five_classes' o 'binary'
classification_mode = 'five_classes'  # Cambiar según necesidad

# Diccionario de etiquetas según el modo de clasificación
if classification_mode == 'five_classes':
    label_codes_dict = {
        'Circumferential Crack': 0,
        'Gasket Leak': 1,
        'Longitudinal Crack': 2,
        'No-leak': 3,
        'Orifice Leak': 4
    }
else:  # binary
    label_codes_dict = {'Leak': 0, 'No-leak': 1}

# Función para eliminar archivos .DS_Store si existen
def remove_DS_store_file(path):
    for ds_name in ['.DS_Store', '.DS_store']:
        ds_store_file_location = os.path.join(path, ds_name)
        if os.path.isfile(ds_store_file_location):
            os.remove(ds_store_file_location)

# Función para cargar datos de acelerómetro
def load_accelerometer_data(data_dir, sample_rate, downsample_factor, label_codes, mode='five_classes', fraction_to_include=1):
    remove_DS_store_file(data_dir)
    signals = []
    labels = []

    for label in os.listdir(data_dir):
        label_dir = os.path.join(data_dir, label)
        if not os.path.isdir(label_dir):
            print(f"Omitiendo {label_dir} porque no es un directorio")
            continue

        remove_DS_store_file(label_dir)

        for file in os.listdir(label_dir):
            file_path = os.path.join(label_dir, file)
            if not os.path.isfile(file_path):
                continue

            accelerometer_signal_df = pd.read_csv(file_path, index_col=False)
            accelerometer_signal_df = accelerometer_signal_df.iloc[::downsample_factor, :].reset_index(drop=True)
            accelerometer_signal = accelerometer_signal_df['Value'][0:(sample_rate * 30)]

            sample_indexes = np.linspace(0, len(accelerometer_signal) - sample_rate, len(accelerometer_signal) // sample_rate)
            signal_frames_number = fraction_to_include * len(sample_indexes)
            signal_frames_counter = 0

            for signal_frame in sample_indexes:
                accelerometer_signal_frame = accelerometer_signal[int(signal_frame):int(signal_frame + sample_rate)]
                signal_frames_counter += 1

                if signal_frames_counter > signal_frames_number:
                    break

                if len(accelerometer_signal_frame) != sample_rate:
                    continue

                if mode == 'five_classes':
                    signals.append(accelerometer_signal_frame)
                    labels.append(label_codes[label])
                else:  # binary
                    if label == 'No-leak':
                        signals.append(accelerometer_signal_frame)
                        labels.append(1)  # 1 para No-leak
                    else:
                        signals.append(accelerometer_signal_frame)
                        labels.append(0)  # 0 para Leak

    return signals, labels

# Cargar los datos desde Google Drive
signals_lst, labels_lst = load_accelerometer_data(
    data_dir,
    signal_sr,
    downsample_factor,
    label_codes_dict,
    mode=classification_mode,
    fraction_to_include=1
)

# Dividir los datos en conjuntos de entrenamiento y prueba
signals_dict = {'training': [], 'testing': []}
labels_dict = {'training': [], 'testing': []}

signals_dict['training'], signals_dict['testing'], labels_dict['training'], labels_dict['testing'] = train_test_split(
    signals_lst,
    labels_lst,
    test_size=0.2,
    random_state=53
)

# Imprimir información sobre el dataset
print(f'Data Directory: {data_dir}')
print(f'Sample Rate: {signal_sr} Hz')
print(f'Classification Mode: {classification_mode}')
print(f'Number of signals (training, testing): ({len(signals_dict["training"])}, {len(signals_dict["testing"])})')
print(f'Number of labels (training, testing): ({len(labels_dict["training"])}, {len(labels_dict["testing"])})')
print(f'Number of samples per signal: {len(signals_dict["training"][0])}')

# Visualizar algunas señales
plt.figure(figsize=(20, 20))
rows, cols = 5, 2
n = rows * cols
random_index = []

for i in range(n):
    plt.subplot(rows, cols, i + 1)
    random_index.append(np.random.randint(0, len(signals_dict['training'])))
    plt.plot(signals_dict['training'][random_index[i]])

    if classification_mode == 'five_classes':
        label_name = list(label_codes_dict.keys())[list(label_codes_dict.values()).index(labels_dict['training'][random_index[i]])]
    else:
        label_name = 'Leak' if labels_dict['training'][random_index[i]] == 0 else 'No-leak'

    plt.title(label_name)
    plt.grid()
plt.show()

# Normalización y Denoising con Wavelet
Aplicación de técnicas de denoising basadas en wavelet para eliminar el ruido en las señales. Normalización posterior para preparar los datos para la generación de espectrogramas Mel.

In [None]:
# Función para aplicar denoising con wavelet
def wavelet_denoise(signals_dict, labels_dict):
    """
    Normaliza y aplica denoising wavelet a las señales.

    Args:
        signals_dict: Diccionario con señales de entrenamiento y prueba.
        labels_dict: Diccionario con etiquetas de entrenamiento y prueba.

    Returns:
        wavelet_denoised_signals: Diccionario con señales procesadas.
        labels_dict: Diccionario con etiquetas.
    """
    # Crear un objeto de la clase WaveletDenoising
    wd = WaveletDenoising(
        normalize=True,
        wavelet='sym3',
        level=4,
        thr_mode='soft',
        method="universal"
    )

    # Crear un nuevo diccionario para almacenar las señales procesadas
    wavelet_denoised_signals = {'training': [], 'testing': []}

    for key, signals_subset in signals_dict.items():
        for signal_element in tqdm(signals_subset, desc=f"Denoising {key} signals"):
            # Aplicar denoising a la señal
            denoised_signal = wd.fit(signal_element)
            wavelet_denoised_signals[key].append(denoised_signal)

    return wavelet_denoised_signals, labels_dict

# Aplicar denoising a las señales
wavelet_denoised_signals_dict, labels_dict = wavelet_denoise(signals_dict=signals_dict, labels_dict=labels_dict)

# Imprimir información sobre el dataset resultante
print(f'Number of signals (training, testing): ({len(wavelet_denoised_signals_dict["training"])}, {len(wavelet_denoised_signals_dict["testing"])})')
print(f'Number of labels (training, testing): ({len(labels_dict["training"])}, {len(labels_dict["testing"])})')
print(f'Number of samples per signal: {len(wavelet_denoised_signals_dict["training"][0])}')

# Visualizar algunas señales procesadas
plt.figure(figsize=(20, 20))
rows, cols = 5, 2
n = rows * cols

for i in range(n):
    plt.subplot(rows, cols, i + 1)
    plt.plot(wavelet_denoised_signals_dict['training'][random_index[i]])

    if classification_mode == 'five_classes':
        label_name = list(label_codes_dict.keys())[list(label_codes_dict.values()).index(labels_dict['training'][random_index[i]])]
    else:
        label_name = 'Leak' if labels_dict['training'][random_index[i]] == 0 else 'No-leak'

    plt.title(label_name)
    plt.grid()
plt.show()

# Función para normalizar señales
def normalize_signals(signals_dict):
    """
    Normaliza cada señal del diccionario a un rango de 0 a 1.

    Args:
        signals_dict: Diccionario con señales a normalizar.

    Returns:
        normalized_signals: Diccionario con señales normalizadas.
    """
    normalized_signals = {'training': [], 'testing': []}

    for key, signals_subset in signals_dict.items():
        for signal in tqdm(signals_subset, desc=f"Normalizando {key} signals"):
            min_val = np.min(signal)
            max_val = np.max(signal)

            if max_val > min_val:
                normalized_signal = (signal - min_val) / (max_val - min_val)
            else:
                normalized_signal = np.ones_like(signal) * 0.5

            normalized_signals[key].append(normalized_signal)

    return normalized_signals

# Normalizar las señales procesadas
print("Normalizando señales procesadas con wavelet denoising...")
normalized_signals_dict = normalize_signals(wavelet_denoised_signals_dict)

# Imprimir información sobre el dataset normalizado
print(f'Number of signals (training, testing): ({len(normalized_signals_dict["training"])}, {len(normalized_signals_dict["testing"])})')
print(f'Number of samples per signal: {len(normalized_signals_dict["training"][0])}')

# Comprobar rango de valores
for key in normalized_signals_dict:
    sample_signal = normalized_signals_dict[key][0]
    print(f"Rango de valores en {key}: [{np.min(sample_signal):.4f}, {np.max(sample_signal):.4f}]")

# Visualizar comparación de señales originales y normalizadas
plt.figure(figsize=(20, 15))
rows, cols = 3, 2
sample_indices = random_index[:3]

for i, idx in enumerate(sample_indices):
    # Señal con denoising (sin normalizar)
    plt.subplot(rows, cols, i * 2 + 1)
    plt.plot(wavelet_denoised_signals_dict['training'][idx])

    if classification_mode == 'five_classes':
        label_name = list(label_codes_dict.keys())[list(label_codes_dict.values()).index(labels_dict['training'][idx])]
    else:
        label_name = 'Leak' if labels_dict['training'][idx] == 0 else 'No-leak'

    plt.title(f"Denoised: {label_name}")
    plt.grid()

    # Señal normalizada entre 0 y 1
    plt.subplot(rows, cols, i * 2 + 2)
    plt.plot(normalized_signals_dict['training'][idx])
    plt.title(f"Normalized: {label_name}")
    plt.ylim([-0.1, 1.1])
    plt.grid()

plt.tight_layout()
plt.show()

# Usar las señales normalizadas para los pasos siguientes
wavelet_denoised_signals_dict = normalized_signals_dict

# Segmentación de Señales
División de las señales normalizadas en segmentos de 512 muestras, similar al enfoque utilizado en la notebook de escalogramas CWT. Creación de etiquetas correspondientes para cada segmento.

In [None]:
# Función para segmentar señales en fragmentos de 512 muestras
def segment_signals(signals_dict, labels_dict, segment_size=512):
    """
    Segmenta las señales en fragmentos de tamaño fijo y crea etiquetas correspondientes.

    Args:
        signals_dict: Diccionario con señales de entrenamiento y prueba.
        labels_dict: Diccionario con etiquetas de entrenamiento y prueba.
        segment_size: Tamaño de cada segmento en muestras.

    Returns:
        segmented_signals_dict: Diccionario con segmentos de señales.
        segmented_labels_dict: Diccionario con etiquetas correspondientes.
    """
    segmented_signals_dict = {'training': [], 'testing': []}
    segmented_labels_dict = {'training': [], 'testing': []}

    for key in signals_dict.keys():
        for signal, label in tqdm(zip(signals_dict[key], labels_dict[key]), desc=f"Segmentando {key} signals", total=len(signals_dict[key])):
            num_segments = len(signal) // segment_size
            for i in range(num_segments):
                start_idx = i * segment_size
                end_idx = start_idx + segment_size
                segment = signal[start_idx:end_idx]
                if len(segment) == segment_size:
                    segmented_signals_dict[key].append(segment)
                    segmented_labels_dict[key].append(label)

    return segmented_signals_dict, segmented_labels_dict

# Segmentar las señales normalizadas
print("Segmentando señales normalizadas en fragmentos de 512 muestras...")
segmented_signals_dict, segmented_labels_dict = segment_signals(wavelet_denoised_signals_dict, labels_dict, segment_size=512)

# Imprimir información sobre los segmentos generados
print(f'Número de segmentos (training, testing): ({len(segmented_signals_dict["training"])}, {len(segmented_signals_dict["testing"])})')
print(f'Número de etiquetas (training, testing): ({len(segmented_labels_dict["training"])}, {len(segmented_labels_dict["testing"])})')
print(f'Tamaño de cada segmento: {len(segmented_signals_dict["training"][0])} muestras')

# Visualizar algunos segmentos
plt.figure(figsize=(20, 20))
rows, cols = 5, 2
n = rows * cols
random_segment_indices = random.sample(range(len(segmented_signals_dict['training'])), n)

for i, idx in enumerate(random_segment_indices):
    plt.subplot(rows, cols, i + 1)
    plt.plot(segmented_signals_dict['training'][idx])

    if classification_mode == 'five_classes':
        label_name = list(label_codes_dict.keys())[list(label_codes_dict.values()).index(segmented_labels_dict['training'][idx])]
    else:
        label_name = 'Leak' if segmented_labels_dict['training'][idx] == 0 else 'No-leak'

    plt.title(label_name)
    plt.grid()

plt.tight_layout()
plt.show()

# Generación de Espectrogramas Mel por Segmentos
Implementación del proceso de generación de espectrogramas logarítmicos de Mel para cada segmento de 512 muestras. Ajuste de los parámetros STFT y bancos de filtros para adaptarse a la menor duración de los segmentos.

In [None]:
# Función para generar espectrogramas logarítmicos de Mel por segmentos
def generate_mel_spectrograms_by_segments(segmented_signals_dict, segmented_labels_dict, fs=25600, n_mels=40, fmin=1, fmax=12800):
    """
    Genera espectrogramas logarítmicos de Mel para cada segmento de señales.

    Args:
        segmented_signals_dict: Diccionario con segmentos de señales de entrenamiento y prueba.
        segmented_labels_dict: Diccionario con etiquetas correspondientes.
        fs: Frecuencia de muestreo en Hz.
        n_mels: Número de bancos de filtros Mel.
        fmin: Frecuencia mínima para los filtros Mel.
        fmax: Frecuencia máxima para los filtros Mel.

    Returns:
        mel_spectrograms_dict: Diccionario con espectrogramas logarítmicos de Mel.
        labels_dict: Diccionario con etiquetas correspondientes.
    """
    # Parámetros para STFT
    frame_length = int(0.02 * fs)  # 20 ms
    frame_shift = int(0.01 * fs)  # 10 ms
    nfft = 512  # Tamaño de FFT

    # Crear diccionario para almacenar espectrogramas
    mel_spectrograms_dict = {'training': [], 'testing': []}

    for key, signals_subset in segmented_signals_dict.items():
        for i, signal in enumerate(tqdm(signals_subset, desc=f"Generando espectrogramas Mel para {key}")):
            # Calcular STFT
            stft = librosa.stft(
                signal,
                n_fft=nfft,
                hop_length=frame_shift,
                win_length=frame_length,
                window='hann'
            )

            # Calcular espectrograma de potencia
            power_spectrogram = np.abs(stft) ** 2

            # Aplicar bancos de filtros Mel
            mel_filterbank = librosa.filters.mel(
                sr=fs,
                n_fft=nfft,
                n_mels=n_mels,
                fmin=fmin,
                fmax=fmax
            )
            mel_spectrogram = np.dot(mel_filterbank, power_spectrogram)

            # Convertir a escala logarítmica
            log_mel_spectrogram = np.log1p(mel_spectrogram)

            # Normalizar a rango [0, 1]
            log_mel_spectrogram = (log_mel_spectrogram - np.min(log_mel_spectrogram)) / (np.max(log_mel_spectrogram) - np.min(log_mel_spectrogram))

            # Almacenar en el diccionario
            mel_spectrograms_dict[key].append(log_mel_spectrogram)

    return mel_spectrograms_dict, segmented_labels_dict

# Generar espectrogramas logarítmicos de Mel por segmentos
mel_spectrograms_dict, mel_labels_dict = generate_mel_spectrograms_by_segments(
    segmented_signals_dict,
    segmented_labels_dict,
    fs=signal_sr,
    n_mels=40,
    fmin=1,
    fmax=12800
)

# Imprimir información sobre los espectrogramas generados
print(f'Número de espectrogramas Mel (training, testing): ({len(mel_spectrograms_dict["training"])}, {len(mel_spectrograms_dict["testing"])})')
print(f'Tamaño de un espectrograma Mel (ejemplo): {mel_spectrograms_dict["training"][0].shape}')

# Visualizar algunos espectrogramas logarítmicos de Mel
plt.figure(figsize=(20, 20))
rows, cols = 5, 2
n = rows * cols
random_mel_indices = random.sample(range(len(mel_spectrograms_dict['training'])), n)

for i, idx in enumerate(random_mel_indices):
    plt.subplot(rows, cols, i + 1)
    librosa.display.specshow(
        mel_spectrograms_dict['training'][idx],
        sr=signal_sr,
        hop_length=int(0.01 * signal_sr),
        x_axis='time',
        y_axis='mel',
        fmin=1,
        fmax=12800,
        cmap='viridis'
    )
    plt.colorbar(format='%+2.0f dB')
    if classification_mode == 'five_classes':
        label_name = list(label_codes_dict.keys())[list(label_codes_dict.values()).index(mel_labels_dict['training'][idx])]
    else:
        label_name = 'Leak' if mel_labels_dict['training'][idx] == 0 else 'No-leak'
    plt.title(label_name)

plt.tight_layout()
plt.show()

# Preparación de Datos para el Modelo CNN
Preparación de los espectrogramas logarítmicos de Mel para ser utilizados en el modelo CNN. Conversión a arrays NumPy, normalización, expansión de dimensiones y codificación one-hot de las etiquetas. Guardado del dataset en formato HDF5.

In [None]:
# Función para preparar los datos para el modelo CNN
def prepare_data_for_model(mel_spectrograms_dict, mel_labels_dict, classification_mode):
    """
    Prepara los datos para el modelo CNN.

    Args:
        mel_spectrograms_dict: Diccionario con espectrogramas logarítmicos de Mel.
        mel_labels_dict: Diccionario con etiquetas correspondientes.
        classification_mode: Modo de clasificación ('five_classes' o 'binary').

    Returns:
        x_train, y_train, x_test, y_test: Datos preparados para entrenamiento.
        num_classes: Número de clases.
    """
    # Convertir a arrays numpy
    x_train = np.array(mel_spectrograms_dict['training'])
    y_train = np.array(mel_labels_dict['training'])
    x_test = np.array(mel_spectrograms_dict['testing'])
    y_test = np.array(mel_labels_dict['testing'])

    # Asegurar que los espectrogramas tengan valores normalizados
    x_train = np.clip(x_train, 0, 1)
    x_test = np.clip(x_test, 0, 1)

    # Determinar número de clases según el modo
    if classification_mode == 'five_classes':
        num_classes = 5
    else:  # binary
        num_classes = 2

    # One-hot encoding de las etiquetas
    y_train_onehot = keras.utils.to_categorical(y_train, num_classes)
    y_test_onehot = keras.utils.to_categorical(y_test, num_classes)

    return x_train, y_train, y_train_onehot, x_test, y_test, y_test_onehot, num_classes

# Preparar los datos para el modelo
x_train, y_train, y_train_onehot, x_test, y_test, y_test_onehot, num_classes = prepare_data_for_model(
    mel_spectrograms_dict,
    mel_labels_dict,
    classification_mode
)

# Imprimir información sobre los datos preparados
print(f'x_train shape: {x_train.shape}')
print(f'y_train_onehot shape: {y_train_onehot.shape}')
print(f'x_test shape: {x_test.shape}')
print(f'y_test_onehot shape: {y_test_onehot.shape}')
print(f'Number of classes: {num_classes}')

# Guardar en formato HDF5
def save_dataset_to_h5(x_train, y_train, y_train_onehot, x_test, y_test, y_test_onehot,
                       classification_mode, num_classes, label_codes_dict,
                       file_path=None):
    """
    Guarda el dataset en formato HDF5.
    """
    if file_path is None:
        file_path = f'/content/drive/MyDrive/Tesis/Accelerometer_Dataset/mel_log_spectrogram_dataset_{classification_mode}.h5'

    print(f"Guardando dataset en {file_path}...")
    with h5py.File(file_path, 'w') as hf:
        # Crear grupos para training y testing
        train_group = hf.create_group('train')
        test_group = hf.create_group('test')

        # Guardar espectrogramas y etiquetas procesados
        train_group.create_dataset('spectrograms', data=x_train)
        train_group.create_dataset('labels', data=y_train)
        train_group.create_dataset('labels_onehot', data=y_train_onehot)

        test_group.create_dataset('spectrograms', data=x_test)
        test_group.create_dataset('labels', data=y_test)
        test_group.create_dataset('labels_onehot', data=y_test_onehot)

        # Guardar metadatos
        metadata = hf.create_group('metadata')
        metadata.create_dataset('num_classes', data=num_classes)
        metadata.create_dataset('shape', data=np.array(x_train.shape[1:]))
        metadata.attrs['classification_mode'] = classification_mode

        # Guardar diccionario de etiquetas
        import json
        if classification_mode == 'five_classes':
            label_codes_json = json.dumps({k: int(v) for k, v in label_codes_dict.items()})
        else:  # binary
            label_codes_json = json.dumps({k: int(v) for k, v in {'Leak': 0, 'No-leak': 1}.items()})
        metadata.attrs['label_codes_dict'] = label_codes_json

    print(f"Dataset procesado y guardado en {file_path}")
    print(f"Datos de entrenamiento: {len(x_train)} muestras")
    print(f"Datos de prueba: {len(x_test)} muestras")

    # Verificar la existencia del archivo guardado
    if os.path.exists(file_path):
        print(f"Archivo guardado correctamente. Tamaño: {os.path.getsize(file_path) / (1024*1024):.2f} MB")
    else:
        print("Error: No se pudo guardar el archivo")

# Guardar el dataset
save_dataset_to_h5(
    x_train, y_train, y_train_onehot,
    x_test, y_test, y_test_onehot,
    classification_mode, num_classes, label_codes_dict
)

# Aumento de Datos para Espectrogramas
Implementación de técnicas de aumento de datos específicas para espectrogramas Mel, incluyendo desplazamiento de tiempo, enmascaramiento de frecuencia, adición de ruido gaussiano y estiramientos temporales para mejorar la generalización del modelo.

In [None]:
# Función para aplicar técnicas de aumento de datos a los espectrogramas Mel
def apply_spectrogram_augmentation(x_train, y_train, augmentation_factor=2):
    """
    Aplica técnicas de aumento de datos a los espectrogramas logarítmicos de Mel.

    Args:
        x_train: Espectrogramas de entrenamiento.
        y_train: Etiquetas de entrenamiento.
        augmentation_factor: Número de veces que se aumentará el dataset original.

    Returns:
        augmented_x_train: Espectrogramas aumentados.
        augmented_y_train: Etiquetas correspondientes a los espectrogramas aumentados.
    """
    augmented_x_train = []
    augmented_y_train = []

    for i in tqdm(range(len(x_train)), desc="Augmenting spectrograms"):
        spectrogram = x_train[i].copy()
        label = y_train[i]

        # Agregar el espectrograma original
        augmented_x_train.append(spectrogram)
        augmented_y_train.append(label)

        for _ in range(augmentation_factor - 1):
            # Crear una copia para manipular
            augmented_spectrogram = spectrogram.copy()

            # Desplazamiento de tiempo (adaptado para segmentos cortos)
            time_shift = np.random.randint(-5, 5)  # Reducido para segmentos pequeños
            augmented_spectrogram = np.roll(augmented_spectrogram, time_shift, axis=1)

            # Enmascaramiento de frecuencia
            freq_mask = np.random.randint(0, augmented_spectrogram.shape[0] // 4)
            freq_start = np.random.randint(0, augmented_spectrogram.shape[0] - freq_mask)
            augmented_spectrogram[freq_start:freq_start + freq_mask, :] = 0

            # Ruido gaussiano
            noise = np.random.normal(0, 0.01, augmented_spectrogram.shape)
            augmented_spectrogram += noise

            # Normalizar a rango [0, 1]
            augmented_spectrogram = np.clip(augmented_spectrogram, 0, 1)

            # Agregar el espectrograma aumentado
            augmented_x_train.append(augmented_spectrogram)
            augmented_y_train.append(label)

    return np.array(augmented_x_train), np.array(augmented_y_train)

# Dividir el conjunto de entrenamiento para crear un conjunto de validación
x_train_split, x_val, y_train_split, y_val = train_test_split(
    x_train,  # Datos originales sin aumentar
    y_train,  # Etiquetas originales sin aumentar
    test_size=0.2,
    stratify=y_train,
    random_state=42
)

# Aplicar aumento de datos solo al conjunto de entrenamiento
augmented_x_train, augmented_y_train = apply_spectrogram_augmentation(x_train_split, y_train_split)

# Los datos finales para entrenar son los aumentados
x_train_final = augmented_x_train
y_train_final = augmented_y_train

# Verificar tamaños
print(f"Datos originales de entrenamiento: {len(x_train_split)}")
print(f"Datos aumentados de entrenamiento: {len(x_train_final)}")
print(f"Datos de validación (sin aumentar): {len(x_val)}")
print(f"Datos de prueba: {len(x_test)}")

# Visualizar algunos ejemplos de espectrogramas originales y aumentados
plt.figure(figsize=(15, 10))
rows, cols = 3, 2
for i in range(rows):
    # Índice aleatorio para seleccionar un ejemplo
    idx = np.random.randint(0, len(x_train_split))
    
    # Espectrograma original
    plt.subplot(rows, cols, i*2 + 1)
    librosa.display.specshow(
        x_train_split[idx],
        sr=signal_sr,
        hop_length=int(0.01 * signal_sr),
        x_axis='time',
        y_axis='mel',
        fmin=1,
        fmax=12800,
        cmap='viridis'
    )
    plt.colorbar(format='%+2.0f dB')
    plt.title(f"Original: {list(label_codes_dict.keys())[list(label_codes_dict.values()).index(y_train_split[idx])]}")
    
    # Espectrograma aumentado
    idx_aug = idx * augmentation_factor  # Índice en la versión aumentada
    plt.subplot(rows, cols, i*2 + 2)
    librosa.display.specshow(
        x_train_final[idx_aug+1],  # +1 para tomar la primera versión aumentada
        sr=signal_sr,
        hop_length=int(0.01 * signal_sr),
        x_axis='time',
        y_axis='mel',
        fmin=1,
        fmax=12800,
        cmap='viridis'
    )
    plt.colorbar(format='%+2.0f dB')
    plt.title(f"Aumentado: {list(label_codes_dict.keys())[list(label_codes_dict.values()).index(y_train_final[idx_aug+1])]}")

plt.tight_layout()
plt.show()

# Implementación y Entrenamiento del Modelo ResNet18
Construcción de un modelo ResNet18 adaptado para la clasificación de espectrogramas Mel de segmentos cortos. Implementación con regularización avanzada para prevenir el sobreajuste y callbacks especializados para monitorear y optimizar el proceso de entrenamiento.

In [None]:
# Definición de bloques residuales para el modelo ResNet18
def residual_block(x, filters, kernel_size=3, stride=1, conv_shortcut=False, l2_reg=1e-4):
    """
    Bloque residual básico para ResNet con regularización L2.

    Args:
        x: Entrada al bloque residual.
        filters: Número de filtros para las capas convolucionales.
        kernel_size: Tamaño del kernel de las capas convolucionales.
        stride: Stride para la primera capa convolucional.
        conv_shortcut: Si True, aplica un atajo convolucional.
        l2_reg: Regularización L2.

    Returns:
        Salida del bloque residual.
    """
    shortcut = x

    if conv_shortcut:
        shortcut = layers.Conv2D(filters, 1, strides=stride, padding='same',
                               kernel_regularizer=regularizers.l2(l2_reg))(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)

    x = layers.Conv2D(filters, kernel_size, strides=stride, padding='same',
                    kernel_regularizer=regularizers.l2(l2_reg))(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)

    x = layers.Conv2D(filters, kernel_size, padding='same',
                    kernel_regularizer=regularizers.l2(l2_reg))(x)
    x = layers.BatchNormalization()(x)

    x = layers.add([shortcut, x])
    x = layers.ReLU()(x)

    return x

def build_segment_mel_resnet18(input_shape, num_classes, l2_reg=2e-4, dropout_rate=0.5):
    """
    Modelo ResNet18 adaptado para espectrogramas Mel de segmentos cortos.
    Incluye regularización aumentada para prevenir sobreajuste.
    """
    inputs = keras.Input(shape=input_shape)

    # Primera capa convolucional - adaptada para espectrogramas más pequeños
    x = layers.Conv2D(64, 5, strides=1, padding='same',  # Kernel más pequeño y stride=1 para mantener resolución
                    kernel_regularizer=regularizers.l2(l2_reg))(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.MaxPooling2D(2, strides=2, padding='same')(x)  # Reducción moderada de dimensionalidad

    # Bloques residuales - adaptados para espectrogramas de segmentos
    x = residual_block(x, 64, conv_shortcut=True, l2_reg=l2_reg)
    x = residual_block(x, 64, l2_reg=l2_reg)

    x = residual_block(x, 128, stride=2, conv_shortcut=True, l2_reg=l2_reg)
    x = residual_block(x, 128, l2_reg=l2_reg)

    x = residual_block(x, 256, stride=2, conv_shortcut=True, l2_reg=l2_reg)
    x = residual_block(x, 256, l2_reg=l2_reg)

    x = residual_block(x, 512, stride=2, conv_shortcut=True, l2_reg=l2_reg)
    x = residual_block(x, 512, l2_reg=l2_reg)

    # Capas finales con mayor regularización
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(l2_reg))(x)
    x = layers.Dropout(dropout_rate)(x)  # Dropout alto para prevenir sobreajuste
    outputs = layers.Dense(num_classes, activation='softmax',
                         kernel_regularizer=regularizers.l2(l2_reg))(x)

    model = keras.Model(inputs, outputs, name="segment_mel_resnet18")
    return model

# Preparar los datos para el entrenamiento
# Asegurarse de que y_train_final y y_val estén en formato one-hot
y_train_final_onehot = keras.utils.to_categorical(y_train_final, num_classes)
y_val_onehot = keras.utils.to_categorical(y_val, num_classes)

# Añadir dimensión de canal si no existe
if len(x_train_final.shape) == 3:
    x_train_final = x_train_final[..., np.newaxis]
if len(x_val.shape) == 3:
    x_val = x_val[..., np.newaxis]
if len(x_test.shape) == 3:
    x_test = x_test[..., np.newaxis]

print(f"Forma de datos de entrenamiento: {x_train_final.shape}")
print(f"Forma de datos de validación: {x_val.shape}")
print(f"Forma de datos de prueba: {x_test.shape}")

# Construir el modelo
input_shape = x_train_final.shape[1:]  # Forma de entrada
model = build_segment_mel_resnet18(
    input_shape=input_shape,
    num_classes=num_classes,
    l2_reg=2e-4,  # Regularización L2 moderada
    dropout_rate=0.5  # Dropout alto
)

# Compilar el modelo
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=1e-3),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# Resumen del modelo
model.summary()

# Función para warmup de la tasa de aprendizaje
def warmup_schedule(epoch, lr):
    warmup_epochs = 5
    init_lr = 1e-6
    target_lr = 0.0005

    if epoch < warmup_epochs:
        return init_lr + (target_lr - init_lr) * epoch / warmup_epochs
    return lr

# Callbacks para monitoreo y optimización
callbacks = [
    keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=20,
        restore_best_weights=True
    ),
    keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.2,
        patience=5,
        min_lr=1e-7
    ),
    keras.callbacks.ModelCheckpoint(
        filepath='/content/drive/MyDrive/Tesis/Models/segment_mel_spect_model.h5',
        monitor='val_accuracy',
        save_best_only=True
    ),
    keras.callbacks.LearningRateScheduler(warmup_schedule)
]

# Entrenamiento del modelo
history = model.fit(
    x_train_final, y_train_final_onehot,
    validation_data=(x_val, y_val_onehot),
    epochs=200,
    batch_size=32,
    callbacks=callbacks,
    verbose=1
)

# Guardar el modelo entrenado
model.save('/content/drive/MyDrive/Tesis/Models/segment_mel_spect_model_final.h5')

# Visualizar la evolución del entrenamiento
plt.figure(figsize=(15, 5))

# Gráfico de pérdida
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Entrenamiento')
plt.plot(history.history['val_loss'], label='Validación')
plt.title('Pérdida durante el entrenamiento')
plt.xlabel('Época')
plt.ylabel('Pérdida')
plt.legend()

# Gráfico de precisión
plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Entrenamiento')
plt.plot(history.history['val_accuracy'], label='Validación')
plt.title('Precisión durante el entrenamiento')
plt.xlabel('Época')
plt.ylabel('Precisión')
plt.legend()

plt.tight_layout()
plt.show()

# Evaluación del Rendimiento del Modelo
Evaluación exhaustiva del modelo entrenado utilizando métricas como precisión, recall, F1-score y matriz de confusión. Análisis detallado del rendimiento por clase y visualización de predicciones en ejemplos específicos para comprender mejor el comportamiento del modelo.

In [None]:
# Evaluación del modelo en el conjunto de prueba
y_test_onehot = keras.utils.to_categorical(y_test, num_classes)
test_loss, test_accuracy = model.evaluate(x_test, y_test_onehot, verbose=1)
print(f"Pérdida en test: {test_loss:.4f}")
print(f"Precisión en test: {test_accuracy:.4f}")

# Obtener predicciones del modelo
y_pred_probs = model.predict(x_test)
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = y_test if len(y_test.shape) == 1 else np.argmax(y_test_onehot, axis=1)

# Calcular matriz de confusión
conf_matrix = confusion_matrix(y_true, y_pred)

# Visualizar matriz de confusión
plt.figure(figsize=(10, 8))
sns.heatmap(
    conf_matrix, 
    annot=True, 
    fmt='d', 
    cmap='Blues',
    xticklabels=list(label_codes_dict.keys()),
    yticklabels=list(label_codes_dict.keys())
)
plt.xlabel('Predicción')
plt.ylabel('Verdadero')
plt.title('Matriz de confusión')
plt.show()

# Generar informe de clasificación
class_report = classification_report(
    y_true, 
    y_pred, 
    target_names=list(label_codes_dict.keys()),
    output_dict=True
)

# Convertir el informe a un DataFrame de pandas para mejor visualización
class_report_df = pd.DataFrame(class_report).transpose()
print("Informe de clasificación:")
display(class_report_df)

# Visualizar ejemplos de predicciones correctas e incorrectas
def plot_predictions(x_data, y_true, y_pred, num_examples=5):
    # Encontrar ejemplos correctos e incorrectos
    correct_indices = np.where(y_true == y_pred)[0]
    incorrect_indices = np.where(y_true != y_pred)[0]
    
    plt.figure(figsize=(20, 12))
    
    # Visualizar predicciones correctas
    for i in range(min(num_examples, len(correct_indices))):
        idx = correct_indices[i]
        plt.subplot(2, num_examples, i + 1)
        
        # Eliminar la dimensión del canal si es necesario
        spec_to_plot = x_data[idx].squeeze() if x_data[idx].ndim > 2 else x_data[idx]
        
        librosa.display.specshow(
            spec_to_plot,
            sr=signal_sr,
            hop_length=int(0.01 * signal_sr),
            x_axis='time',
            y_axis='mel',
            fmin=1,
            fmax=12800,
            cmap='viridis'
        )
        
        true_label = list(label_codes_dict.keys())[list(label_codes_dict.values()).index(y_true[idx])]
        plt.title(f"Correcto: {true_label}")
    
    # Visualizar predicciones incorrectas
    for i in range(min(num_examples, len(incorrect_indices))):
        idx = incorrect_indices[i]
        plt.subplot(2, num_examples, num_examples + i + 1)
        
        # Eliminar la dimensión del canal si es necesario
        spec_to_plot = x_data[idx].squeeze() if x_data[idx].ndim > 2 else x_data[idx]
        
        librosa.display.specshow(
            spec_to_plot,
            sr=signal_sr,
            hop_length=int(0.01 * signal_sr),
            x_axis='time',
            y_axis='mel',
            fmin=1,
            fmax=12800,
            cmap='viridis'
        )
        
        true_label = list(label_codes_dict.keys())[list(label_codes_dict.values()).index(y_true[idx])]
        pred_label = list(label_codes_dict.keys())[list(label_codes_dict.values()).index(y_pred[idx])]
        plt.title(f"Incorrecto: Real {true_label}, Pred {pred_label}")
    
    plt.tight_layout()
    plt.show()

# Visualizar ejemplos de predicciones
plot_predictions(x_test, y_true, y_pred, num_examples=5)

# Análisis de rendimiento por clase
class_accuracy = conf_matrix.diagonal() / conf_matrix.sum(axis=1)
plt.figure(figsize=(12, 6))
sns.barplot(x=list(label_codes_dict.keys()), y=class_accuracy)
plt.title('Precisión por clase')
plt.xlabel('Clase')
plt.ylabel('Precisión')
plt.xticks(rotation=45)
plt.ylim(0, 1)
plt.tight_layout()
plt.show()

# Analizar los casos más difíciles (predicciones con menor confianza)
prediction_confidence = np.max(y_pred_probs, axis=1)
lower_confidence_indices = np.argsort(prediction_confidence)[:10]  # 10 predicciones con menor confianza

print("Casos con menor confianza en la predicción:")
for idx in lower_confidence_indices:
    true_label = list(label_codes_dict.keys())[list(label_codes_dict.values()).index(y_true[idx])]
    pred_label = list(label_codes_dict.keys())[list(label_codes_dict.values()).index(y_pred[idx])]
    confidence = prediction_confidence[idx]
    print(f"Índice: {idx}, Real: {true_label}, Predicción: {pred_label}, Confianza: {confidence:.4f}")

# Guardar resultados de evaluación
evaluation_results = {
    'test_loss': test_loss,
    'test_accuracy': test_accuracy,
    'confusion_matrix': conf_matrix.tolist(),
    'classification_report': class_report
}

import json
with open('/content/drive/MyDrive/Tesis/Models/segment_mel_spect_evaluation.json', 'w') as f:
    json.dump(evaluation_results, f)

print("Evaluación completa. Resultados guardados.")