In [None]:
import torch as th
from torch import nn

import numpy as np
import matplotlib.pyplot as plt

import librosa
import scipy.io.wavfile as wav

# import speechbrain as sb
# from speechbrain.lobes.models.Xvector import Xvector


# Por reproducibilidad
th.manual_seed(42)
np.random.seed(42)

DC = 'cuda:0' if th.cuda.is_available() else 'cpu'

In [None]:
# funciones aleatorias
import random
# tomar n elementos de una secuencia
from itertools import islice as take

# audio
import librosa
import librosa.display
import IPython as ip

# redes neuronales
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
# redes audio
import torchaudio
import torchaudio.transforms as T
# redes visión
# import torchvision.models as tvm

# redes neuronales
from torch.utils.data import DataLoader
# from torchaudio.datasets import SPEECHCOMMANDS
# inspección de arquitectura
from torchinfo import summary

# barras de progreso
from tqdm.auto import trange

#Counter
import collections

# Files
from os.path import join


  from .autonotebook import tqdm as notebook_tqdm


In [None]:
from moviepy.editor import *
from torch.utils.data import Dataset
from torch.nn import functional as F
from torchaudio.transforms import MelSpectrogram

Dataloader

In [None]:
import pandas as pd

# Lee el archivo de texto utilizando pd.read_csv()
df = pd.read_csv('D:\\CommonVoice2\\clean_data_all.csv')

In [None]:
# Mapear las categorías de edad a valores numéricos
mapeo_edades = {'teens': 18, 'twenties': 20, 'thirties': 35, 'fourties': 45, 'fifties': 55, 'sixties': 65, 'seventies': 75, 'eighties': 80, 'nineties': 90}

# Crear una nueva columna 'age_numerico' usando el mapeo
df['age_numerico'] = df['age'].replace(mapeo_edades)

In [None]:
df = df[df['gender'] != 'other']

In [None]:
df['gender'] = df['gender'].replace({'male': 0,
                                'female': 1})

In [None]:
# tamaño de la ventana
n_fft = 1024
# tamaño del salto
hop_length = n_fft // 2

In [None]:
# tamaño del lote
BATCH_SIZE = 40

# parámetros de audio
SECS = 1
SAMPLE_RATE = 16000

# parámetros FFT
N_FFT = 400
HOP_LENGTH = N_FFT // 2

# SpeechCommands classes
CLASSES_AGE = (
    'teens', 'twenties', 'thirties', 'fourties', 'fifties',
    'sixties', 'seventies', 'eighties', 'nineties'
)

CLASSES_GENDER =('male','female')

NUM_CLASSES = len(CLASSES_AGE)
CLASS_IDX = {c: i for i, c in enumerate(CLASSES_AGE)}
print(CLASS_IDX)

NUM_CLASSES2 = len(CLASSES_GENDER)
CLASS_IDX2 = {c: i for i, c in enumerate(CLASSES_GENDER)}
print(CLASS_IDX2)


def set_seed(seed=0):
    """Initializes pseudo-random number generators."""
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)


# reproducibilidad
set_seed()

In [None]:
NUM_CLASSES_GEN =len(CLASSES_GENDER)
NUM_CLASES_AGE = len(CLASSES_AGE)

In [None]:
def identity(x):
    return x

def label2index_age(label):
    return CLASS_IDX[label]

def label2index_gender(label):
    return CLASS_IDX2[label]

In [None]:
class CommonVoice2(Dataset):

    def __init__(self, df, waveform_tsfm=identity, label_tsfm=identity, cut=False, cut_sec=1):
        self.waveform_tsfm = waveform_tsfm
        self.label_tsfm = label_tsfm
        self.df = df
        self.cut = cut
        self.cut_sec = cut_sec

    def __getitem__(self, i):
        # print(i)
        dato = self.df.iloc[i]
        path = dato['path']
        edad = dato['age']
        edad_num = dato['age_numerico']
        genero = dato['gender']

        directorio_actual = os.getcwd()
        directorio_actual +='/temp'

        audio = AudioFileClip(path)
        duracion = audio.duration

        if duracion >= self.cut_sec and self.cut:
            # CORTAR EL AUDIO
            # if self.cut:
            start_time = 0  # Tiempo de inicio en segundos

            # Realizar el corte
            cut_audio = audio.subclip(start_time)
            
            # Ajustar la duración del audio al valor deseado
            dur_audio = cut_audio.set_duration(self.cut_sec)
            
            # nombre_archivo = 'temp_'+i+'.wav'
            # print(nombre_archivo)

            # Crear un archivo temporal
            with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file:
                temp_path = temp_file.name

                # Exportar el audio cortado al archivo temporal
                dur_audio.write_audiofile(temp_path,verbose=False, logger=None)

                # print(temp_path)

                # Cargar la forma de onda del archivo de audio temporal antes de salir del bloque 'with'
            waveform, sample_rate = librosa.load(temp_path, sr=16000)
            os.remove(temp_path)
        else:
            waveform, sample_rate = librosa.load(path, sr = 16000)

        # print(path)
        # waveform, sample_rate, label, *_ = super().__getitem__(i)
        x = self.waveform_tsfm(waveform)
        # y = self.label_tsfm(label)
        return x, edad , genero , edad_num
        # return x, edad, genero, edad_num, sample_rate
    
    def __len__(self):
        return (len(self.df))


In [None]:
class WaveformPadTruncate(nn.Module):

    def __init__(self, secs=1, sample_rate=16000, transform_type=0):
        super().__init__()
        self.samples = secs * sample_rate
        self.transform_type=transform_type
        self.sample_rate=sample_rate

    def forward(self, waveform):
        samples = len(waveform)
        wave = torch.tensor(waveform, dtype=torch.float32)
        waveform = torch.from_numpy(waveform)

        if samples < self.samples:
          waveform = waveform.unsqueeze(0) if waveform.dim() == 1 else waveform
          difference = self.samples - samples
          padding = torch.zeros(1, difference)
          waveform = torch.cat([waveform, padding], 1)
          # print(waveform.shape)
          waveform= waveform.squeeze()

        elif samples > self.samples:
            start = random.randint(0, samples - self.samples)
            # Devuelve un nuevo tensor que es una versión reducida del tensor de entrada.
            waveform = waveform.narrow(1, start, self.samples) # (dimension, start, length)


        if self.transform_type==1:
          spectrograma = T.MelSpectrogram(n_fft=n_fft, hop_length=hop_length)(waveform)
          spectrograma2 = spectrograma.flatten(start_dim=1)
          spectrograma3  = spectrograma.reshape(-1, 1)
          # print(spectrograma.shape)
          return spectrograma
        elif self.transform_type==2:
          
          # waveform = torch.from_numpy(waveform)
          mfcc = T.MFCC(n_mfcc=23,sample_rate=16000)(waveform)
          return mfcc
        else:
          return waveform

In [None]:
from sklearn.model_selection import train_test_split

df_ent, df_val = train_test_split(df,
                                  test_size=0.2,
                                  shuffle=True)

In [None]:
df_ent = df_ent.reset_index(drop=True)
df_val = df_val.reset_index(drop=True)

In [None]:
ds_ent = CommonVoice2(
    # directorio de datos
    df = df_ent,
    # transformación de la forma de onda
    waveform_tsfm=WaveformPadTruncate(transform_type=2),
    # transformación de etiqueta
    label_tsfm=label2index_age,
    cut=True,
    cut_sec=1
)


In [None]:
dl_ent= DataLoader(
    ds_ent,
    # tamaño del lote
    batch_size=BATCH_SIZE,
    shuffle=True,
    drop_last=True
)

In [None]:
# creamos un Dataset
ds_val = CommonVoice2(
    # directorio de datos
    df = df_val,
    # transformación de la forma de onda
    waveform_tsfm=WaveformPadTruncate(transform_type=2),
    # transformación de etiqueta
    label_tsfm=label2index_age,
    cut=True,
    cut_sec=1
)

In [None]:
dl_val= DataLoader(
    ds_val,
    # tamaño del lote
    batch_size=BATCH_SIZE,
    shuffle=True,
    drop_last=True
)

ENTRENAMIENTO

In [None]:
def exactitud(y_hat, y):
  cmp = y_hat.argmax(dim=-1) == y
  aciertos = th.count_nonzero(cmp)
  return aciertos / cmp.shape[0]

In [None]:
from tqdm import tqdm
from sklearn.metrics import f1_score

In [None]:
def paso_ent(modelo,
             fp_edades,
             fp_genero,
             fp_reg,
             metrica_edades,
             metrica_genero,
             metrica_edadN,
             opt,
             X,
             y_edad,
             y_genero,
             y_reg):
  opt.zero_grad() # se ponen los gradientes asociados a los parámetros
                    # a actualizaren en cero

  y_hat_edad, y_hat_genero, y_hat_reg = modelo(X) # se propagan las entradas para obtener las predicciones

  # y_hat_genero = y_hat_genero.squeeze().float()
  # y_hat_reg = y_hat_reg.squeeze().float()


  # sacamos las probabilidades
  y_prob = F.softmax(y_hat_edad, 1)

  # sacamos las clases
  y_pred = torch.argmax(y_prob, 1).detach().cpu().numpy()

  # print(y_pred)
  # print(y_hat_edad)

  y_pred_genero = torch.round(y_hat_genero)

  # y_pred_detached = y_pred.detach()

  # perdida = F.cross_entropy(y_hat, y) # se calcula la pérdida
  # print('y_hat')
  # print(y_hat_edad.dtype)
  # print(y_hat_reg.dtype)
  # print(y_hat_genero.dtype)
  # print('y_')
  # print(y_genero.dtype)
  # print(y_edad.dtype)
  # print(y_reg.dtype)
  

  perdida_genero = F.binary_cross_entropy(y_hat_genero, y_genero.float()) #fp_edades
  perdida_edad = F.cross_entropy(y_hat_edad, y_edad) #fp_genero
  perdida_reg = F.mse_loss(y_hat_reg, y_reg.float()) #fp_reg

  # print('Perdidas')
  # print(perdida_genero.dtype)
  # print(perdida_edad.dtype)
  # print(perdida_reg.dtype)
  # Puedes ajustar los pesos según sea necesario
  w_genero = 0.01
  w_edad = 0.01
  w_reg = 0.01

  # print('Pesos')
  # print(w_genero.dtype)
  # print(w_edad.dtype)
  # print(w_reg.dtype)

  # Calcular la pérdida total como la suma ponderada de las pérdidas individuales
  perdida_total = w_genero * perdida_genero.float() + w_edad * perdida_edad.float() + w_reg * perdida_reg.float()
  perdida_total = perdida_total.float()

  # print(perdida_total.dtype)

  perdida_total.backward() # se obtienen los gradientes
  opt.step() # se actualizan todos los parámetros del modelo


  with th.no_grad():
    perdida_paso = perdida_total.cpu().numpy() # convertimos la pérdida (instancia de
                                         # Tensor de orden 0) a NumPy, para
                                         # lo que es necesario moverla a CPU
    # metricas_paso = metrica(y_hat, y)

    metrica_genero_paso = metrica_genero(y_hat_genero, y_genero.float())
    metrica_edad_paso = metrica_edades(y_hat_edad, y_edad)
    metrica_reg_paso = metrica_edadN(y_hat_reg, y_reg) 

    weightedf1= f1_score(y_pred_genero.cpu(), y_pred, average='weighted')

  # return perdida_paso, metricas_paso
  return perdida_paso, metrica_edad_paso, metrica_genero_paso, metrica_reg_paso, weightedf1

In [None]:
import copy

def entrena(modelo,
            fp_edades,
            fp_genero,
            fp_reg,
            metrica_edades,
            metrica_genero,
            metrica_edadN,
            opt,
            entdl,
            valdl,
            disp,
            ckptpath,
            n_epocas = 10,
            tbdir = 'runs/'):
  n_lotes_ent = len(entdl)
  n_lotes_val = len(valdl)

  hist = {'perdida_ent':np.zeros(n_epocas),
          'weightedF1_ent': np.zeros(n_epocas),
          'weightedF1_val': np.zeros(n_epocas),
          'perdida_val': np.zeros(n_epocas),
          'Accuracy_Edades_ent': np.zeros(n_epocas),
          'Accuracy_Edades_val': np.zeros(n_epocas),

          'Accuracy_Genero_ent': np.zeros(n_epocas),
          'Accuracy_Genero_val': np.zeros(n_epocas),

          'MSE_Edad_ent': np.zeros(n_epocas),
          'MSE_Edad_val': np.zeros(n_epocas)}

  # tbwriter = SummaryWriter(tbdir)
  perdida_min = th.inf
  mejor_modelo = copy.deepcopy(modelo)


  for e in range(n_epocas):
    # bucle de entrenamiento
    modelo.train()
    for Xlote, ylote_edades, ylote_genero, ylote_reg, *_ in entdl:
      Xlote = Xlote.to(disp)
      ylote_edades = ylote_edades.to(disp)
      ylote_genero = ylote_genero.to(disp)
      ylote_reg = ylote_reg.to(disp)

      # perdida_paso, perdida_edad_paso, perdida_genero_paso, perdida_reg_paso, weightedf1
      perdida_paso, metrica_edad_paso, metrica_genero_paso, metrica_reg_paso, weightedf1 = paso_ent(modelo,
                                            fp_edades,
                                            fp_genero,
                                            fp_reg,
                                            metrica_edades,
                                            metrica_genero,
                                            metrica_edadN,
                                            opt,
                                            Xlote,
                                            ylote_edades,
                                            ylote_genero,
                                            ylote_reg)

      hist['perdida_ent'][e] += perdida_paso
      hist['weightedF1_ent'][e] += weightedf1
      hist['Accuracy_Edades_ent'][e] += metrica_edad_paso
      hist['Accuracy_Genero_ent'][e] += metrica_genero_paso
      hist['MSE_Edad_ent'][e] += metrica_reg_paso

    # bucle de validación
    modelo.eval()
    with th.no_grad():
      for Xlote, ylote_edades, ylote_genero, ylote_reg, *_  in valdl:
        Xlote = Xlote.to(disp)
        ylote_edades = ylote_edades.to(disp)
        ylote_genero = ylote_genero.to(disp)
        ylote_reg = ylote_reg.to(disp)

        y_hat_edades, y_hat_genero, y_hat_reg = modelo(Xlote)

        y_hat_genero = y_hat_genero.squeeze().float()
        y_hat_reg = y_hat_reg.squeeze().float()

        # sacamos las probabilidades
        y_prob = F.softmax(y_hat_edades, 1)

        # sacamos las clases
        y_pred = torch.argmax(y_prob, 1)

        weightedf1= f1_score(ylote_edades.cpu(), y_pred.cpu().numpy(), average='weighted')

        perdida_genero = F.binary_cross_entropy(y_hat_genero, ylote_genero.float()) #fp_edades
        perdida_edad = F.cross_entropy(y_hat_edades, ylote_edades) #fp_genero
        perdida_reg = F.mse_loss(y_hat_reg, ylote_reg) #fp_reg

        # Puedes ajustar los pesos según sea necesario
        w_genero = 0.01
        w_edad = 0.01
        w_reg = 0.01

        # Calcular la pérdida total como la suma ponderada de las pérdidas individuales
        perdida_total = w_genero * perdida_genero.float() + w_edad * perdida_edad.float() + w_reg * perdida_reg.float()

        metrica_genero_val = metrica_genero(y_hat_genero, ylote_genero.float())
        metrica_edad_val = metrica_edades(y_hat_edades, ylote_edades)
        metrica_reg_val = metrica_edadN(y_hat_reg, ylote_reg)

        hist['perdida_val'][e] += perdida_total
        hist['weightedF1_val'][e] += weightedf1
        hist['Accuracy_Edades_val'][e] += metrica_edad_val
        hist['Accuracy_Genero_val'][e] += metrica_genero_val
        hist['MSE_Edad_val'][e] += metrica_reg_val

    hist['weightedF1_ent'][e] /=  n_lotes_ent
    hist['perdida_ent'][e] /=  n_lotes_ent
    hist['perdida_ent'][e] =  hist['perdida_ent'][e]*100
    hist['Accuracy_Edades_ent'][e] /= n_lotes_ent
    hist['Accuracy_Edades_ent'][e] *= 100

    hist['Accuracy_Genero_ent'][e] /= n_lotes_ent
    hist['Accuracy_Genero_ent'][e] *= 100

    hist['MSE_Edad_ent'][e] /= n_lotes_ent
    hist['MSE_Edad_ent'][e] *= 100


    hist['weightedF1_val'][e] /=  n_lotes_val
    hist['perdida_val'][e] /=  n_lotes_val
    hist['perdida_val'][e] =  hist['perdida_val'][e]*100

    hist['Accuracy_Edades_val'][e] /= n_lotes_val
    hist['Accuracy_Edades_val'][e] *= 100

    hist['Accuracy_Genero_val'][e] /= n_lotes_val
    hist['Accuracy_Genero_val'][e] *= 100

    hist['MSE_Edad_val'][e] /= n_lotes_val
    hist['MSE_Edad_val'][e] *= 100
    # guardamos checkpoint y copiamos pesos y sesgos del modelo
    # actual si disminuye la metrica a monitorear
    if hist['perdida_val'][e] < perdida_min:
      mejor_modelo.load_state_dict(modelo.state_dict())
      guarda_ckpt(ckptpath, modelo, e, opt)

    # registra_info_tboard(tbwriter, e, hist)

    print(f'\nÉpoca {e}:\n '
          'ENTRENAMIENTO: \n'
          f'weighted_F1(E) = {hist["weightedF1_ent"][e]:.3f},\n '
          f'Perdida(E) = {hist["perdida_ent"][e]:.3f}, \n'
          f'Accuracy_Edades(E) = {hist["Accuracy_Edades_ent"][e]:.3f},\n '
          f'Accuracy_Genero(E) = {hist["Accuracy_Genero_ent"][e]:.3f},\n'
          f'MSE_Edad(E) = {hist["MSE_Edad_ent"][e]:.3f},\n '
          'VALIDACIÓN: \n'
          f'weighted_F1(V) = {hist["weightedF1_val"][e]:.3f},\n  '
          f'Perdida(V) = {hist["perdida_val"][e]:.3f},\n  '
          f'Accuracy_Edades(V) = {hist["Accuracy_Edades_val"][e]:.3f}\n '
          f'Accuracy_Genero(V) = {hist["Accuracy_Genero_val"][e]:.3f}\n '
          f'MSE_Edad(V) = {hist["MSE_Edad_val"][e]:.3f}\n '
          '---------------------------------------------------------------------')

  return modelo, mejor_modelo, hist

MODELO

In [None]:
DC = 'cuda:1' if th.cuda.is_available() else 'cpu'
LOGDIR = './logs/'
N_EPOCAS = 200

In [None]:
class FrotEnd(nn.Module):
    def __init__(self,dim_inicial, clases_age=NUM_CLASES_AGE, dropout=0.0, extract=False):
        super(FrotEnd, self).__init__()

        self.age_classification = nn.Sequential(
            nn.Linear(dim_inicial,dim_inicial),
            nn.ReLU(),
            nn.BatchNorm1d(dim_inicial),
            nn.Linear(dim_inicial,clases_age),
            # nn.Softmax()
        )

        self.gender = nn.Sequential(
            nn.Linear(dim_inicial,dim_inicial),
            nn.ReLU(),
            nn.BatchNorm1d(dim_inicial),
            nn.Linear(dim_inicial,1),
            nn.Sigmoid()
        )

        self.age_regression = nn.Sequential(
            nn.Linear(dim_inicial,dim_inicial),
            nn.ReLU(),
            nn.BatchNorm1d(dim_inicial),
            nn.Linear(dim_inicial,1)
        )

    def forward(self, x):
        age_classes = self.age_classification(x)
        age_classes = age_classes
        # age_classes = age_classes.float()
        
        gender = self.gender(x)
        gender = gender.squeeze().float()

        age_regression = self.age_regression(x)
        age_regression = age_regression.squeeze().float()

        return age_classes , gender , age_regression

In [None]:
class LSTMDvector(nn.Module):
    """LSTM-based d-vector."""
    def __init__(self, input_size, hidden_size,embedding_size, num_layers=1):
        super(LSTMDvector, self).__init__()

        self.lstm1 = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True)
        self.lstm2 = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True)
        self.lstm3 = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True)

        # Capa lineal para obtener el d-vector
        self.linear = nn.Linear(hidden_size, embedding_size)

    def forward(self, x):
        # Set initial hidden and cell states
        batch_size = x.size(0)
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(self.device) 
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(self.device)

        lstm_out1, _ = self.lstm1(x, (h0, c0))
        lstm_out2, _ = self.lstm2(lstm_out1)
        lstm_out3, _ = self.lstm3(lstm_out2)

        # Tomar la última salida de la secuencia como d-vector
        d_vector = self.linear(lstm_out3[:, -1, :])

        class_edad, genero, edad_num = self.frontEnd(d_vector)

        return class_edad.float(), genero.float(), edad_num.float()


In [None]:
test_layer = LSTMDvector(16000)
summary(test_layer, (40, 1,16000), device='cpu',col_names=['input_size', 'output_size', 'num_params'])