# Clasificación de acentos latinos

Comenzaremos revisando al archivo de metadata 

In [1]:
import pandas as pd
from pathlib import Path

download_path = Path.cwd()/'data'
metadata_file = download_path/'Train.csv'
df = pd.read_csv(metadata_file)
df.columns = df.columns.str.lower().str.strip().str.replace(' ', '_')
df.head()


Unnamed: 0,id,expected
0,01216683570.wav,0 2
1,00433588573.wav,0 2
2,00381534896.wav,0 2
3,01635825413.wav,0 2
4,00325117692.wav,0 2


Notemos que el problema es de clasificación multiclase, i.e., debemos clasificar el genero (que es el primer número de la coluna `expected`) y el origen del acento (el segundo). Veamos cuales son las categorias.

In [3]:
import numpy as np

cats = np.load(download_path/'dict.npy', allow_pickle=True)
cats

array({'0': 'Femenino', '1': 'Masculino', '2': 'Argentina', '3': 'Chile', '4': 'Colombia', '5': 'Perú', '6': 'Venezuela'},
      dtype=object)

Como primer intento vamos a tratar el dataset como si solo tuviera 10 categorias.

In [None]:
from sklearn import preprocessing
le = preprocessing.LabelEncoder()
le.fit(['0 2', '0 3', '0 4', '0 5', '0 6', '1 2', '1 3', '1 4', '1 5', '1 6'])


df['target'] = le.transform(df['expected'])
df.drop('expected', axis=1, inplace=True)


# construimos el path necesario
df['relative_path'] =  '\\' + df['id'].astype(str)


# guardamos una copia para experimentar
old_df = df.copy() 


# nos quedamos con las columnas importantes
df = df[['relative_path', 'target']]


Ahora comenzamos con la extracción del audio, para esto crearemos una clase 

In [None]:

import math, random
import torch
import torchaudio
from torchaudio import transforms
from IPython.display import Audio

class AudioUtil():
  # ----------------------------
  # Cargamos el audio, esto nos entrega el tensor y la frecuencia de muestreo
  # ----------------------------
  @staticmethod
  def open(audio_file):
    sig, sr = torchaudio.load(audio_file)
    return (sig, sr)



  # ----------------------------
  # Covertimos el audio a la cantidad deseada de canales (mono o stereo)
  # ----------------------------
  @staticmethod
  def rechannel(aud, new_channel):
    sig, sr = aud

    if (sig.shape[0] == new_channel):
      # nada que hacer
      return aud

    if (new_channel == 1):
      # convertimos a mono quedandonos con el primer canal
      resig = sig[:1, :]
    else:
      # convertimos a stereo duplicando el primer canal
      resig = torch.cat([sig, sig])

    return ((resig, sr))



  # ----------------------------
  # de tener que hacer un resample, lo hacemos a la frecuencia deseada en ambos canales 
  # ----------------------------
  @staticmethod
  def resample(aud, newsr):
    sig, sr = aud

    if (sr == newsr):
      # nada que hacer
      return aud

    num_channels = sig.shape[0]
    # Resample primer canal
    resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1,:])
    if (num_channels > 1):
      # Resample del segundo canal y lo resampleamos
      retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:,:])
      resig = torch.cat([resig, retwo])

    return ((resig, newsr))


  # ----------------------------
  # rellenar o truncar el audio a la longitud deseada 'max_ms' en milisegundos
  # ----------------------------
  @staticmethod
  def pad_trunc(aud, max_ms):
    sig, sr = aud
    num_rows, sig_len = sig.shape
    max_len = sr//1000 * max_ms

    if (sig_len > max_len):
      # truncamos
      sig = sig[:,:max_len]

    elif (sig_len < max_len):
      # preparamos el largo del relleno
      pad_begin_len = random.randint(0, max_len - sig_len)
      pad_end_len = max_len - sig_len - pad_begin_len

      # rellenamos con 0s
      pad_begin = torch.zeros((num_rows, pad_begin_len))
      pad_end = torch.zeros((num_rows, pad_end_len))

      sig = torch.cat((pad_begin, sig, pad_end), 1)
      
    return (sig, sr)

  # ----------------------------
  # Mueve la señal a la izquierda o derecha por un porcentaje, 
  # los valores de los extremos "aparecen al otro lado"
  # ----------------------------
  @staticmethod
  def time_shift(aud, shift_limit):
    sig,sr = aud
    _, sig_len = sig.shape
    shift_amt = int(random.random() * shift_limit * sig_len)
    return (sig.roll(shift_amt), sr)


     # ----------------------------
  # generamos un espectromagrma 
  # ----------------------------
  @staticmethod
  def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
    sig,sr = aud
    top_db = 80

    # este tiene forma [channel, n_mels, time], donde el canal es mono, stereo etc
    spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)

    # Convert to decibels
    spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
    return (spec)


  # ----------------------------
  # tapamos un sector del espectrograma en ambas direcciones con tal de prevenir
  # overfitting y ayudar a generalizar, las partes se tapan con el valor promedio.
  # ----------------------------
  @staticmethod
  def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
    _, n_mels, n_steps = spec.shape
    mask_value = spec.mean()
    aug_spec = spec

    freq_mask_param = max_mask_pct * n_mels
    for _ in range(n_freq_masks):
      aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

    time_mask_param = max_mask_pct * n_steps
    for _ in range(n_time_masks):
      aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

    return aug_spec

Ya con esto tenemos lo que necesitamos para procesar nuestro audio, ahora generemos el objeto de base de datos de audio. 

In [None]:

from torch.utils.data import DataLoader, Dataset, random_split
import torchaudio

# ----------------------------
# Dataset para el audio
# ----------------------------
class SoundDS(Dataset):
  def __init__(self, df, data_path):
    self.dataframe  = df
    self.data_path = str(data_path)
    self.duration = 4000
    self.sr = 44100
    self.channel = 1
    self.shift_pct = 0.4
            
  # ----------------------------
  # numero de items
  # ----------------------------
  def __len__(self):
    return len(self.dataframe)    
    
  # ----------------------------
  # obtwe el item en la posicion idx
  # ----------------------------
  def __getitem__(self, idx):
    # generamos el nombre del archivo
    audio_file = self.data_path + self.dataframe.loc[idx, 'relative_path']
    # tener el target
    class_id = self.dataframe.loc[idx, 'target']

    aud = AudioUtil.open(audio_file)
    # hacemos que todos los datos tengan el mismo largo
    reaud = AudioUtil.resample(aud, self.sr)
    rechan = AudioUtil.rechannel(reaud, self.channel)

    # procesamos el audio para obtener el espectrograma 
    dur_aud = AudioUtil.pad_trunc(rechan, self.duration)
    shift_aud = AudioUtil.time_shift(dur_aud, self.shift_pct)
    sgram = AudioUtil.spectro_gram(shift_aud, n_mels=64, n_fft=1024, hop_len=None)
    aug_sgram = AudioUtil.spectro_augment(sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)
    
    return aug_sgram, class_id



In [None]:
ya con esto podemos crear el dataset y- el dataloader

In [None]:

from torch.utils.data import random_split

# tener el camino al archivo de datos
data_path = download_path/'Train'


# cargamos los datos
myds = SoundDS(df, data_path)



# separación aletoria de 80:20 between training and idation

num_items = len(myds)
num_train = round(num_items * 0.8)
num_val = num_items - num_train
train_ds, val_ds = random_split(myds, [num_train, num_val])
print(len(train_ds), len(val_ds))
# creamos un dataset de validación y de entrenamoento 

# creamos el dataloader

train_dl = torch.utils.data.DataLoader(train_ds, batch_size=16, shuffle=True)
print('Training data loader created with', len(train_dl), 'batches')
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=16, shuffle=False)


Ya casi listos, creamemos la arquitectura con pytorch.

In [None]:

import torch.nn as nn
import torch.nn.functional as F
from torch.nn import init



# ----------------------------
# modelo de clasificación de audio
# ----------------------------
class AudioClassifier (nn.Module):
    # ----------------------------
    # contruimos la aquitectura del modelo
    # ----------------------------
    def __init__(self):
        super().__init__()
        conv_layers = []

        # Primer bloque de convolución con Relu y Batch Norm. Usamos inicialización de  Kaiming 
        self.conv1 = nn.Conv2d(2, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]

        # Segundo bloque de convolución
        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]

        # tercer bloque de convolución
        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]

        # cuarto bloque de convolución
        self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]

        # clasificador lineal 
        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Linear(in_features=64, out_features=10)

        # envolver los bloques de convolución en una capa secuencial
        self.conv = nn.Sequential(*conv_layers)
 
    # ----------------------------
    # computación del forward pass 
    # ----------------------------
    def forward(self, x):
        # correr bloques de convolución
        x = self.conv(x)

        # Adaptive pool y flatten para meter en la capa lineal
        x = self.ap(x)
        x = x.view(x.shape[0], -1)

        # capa lineal
        x = self.lin(x)

        # output
        return x  

# creamos el modelo
myModel = AudioClassifier()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
myModel = myModel.to(device)
# revisamos que este en  Cuda
next(myModel.parameters()).device

Ya con esto comenzamos con el entrenamiento

In [None]:
# ----------------------------
# Loop de entrenamiento 
# ----------------------------
import numpy as np
def training(model, train_dl, num_epochs):
  # funcion de perdida, optimizador y Scheduler
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
  scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                                steps_per_epoch=int(len(train_dl)),
                                                epochs=num_epochs,
                                                anneal_strategy='linear')

  # repetir para cada epoch
  for epoch in range(num_epochs):
    running_loss = 0.0
    correct_prediction = 0
    total_prediction = 0

    # repetir par acada batch

    for i, data in enumerate(train_dl):
      # obeter los inputs y labels, cargar en GPU
      inputs, labels = data[0].to(device), torch.from_numpy(np.array(data[1])).long().to(device)
      # normalizar los inputs
      inputs_m, inputs_s = inputs.mean(), inputs.std()
      inputs = (inputs - inputs_m) / inputs_s

      # poner el gradiente a cero
      optimizer.zero_grad()

      # forward + backward + optimize
      outputs = model(inputs)
      loss = criterion(outputs, labels)
      loss.backward()
      optimizer.step()
      scheduler.step()

      # guardar estadísticas para perdida y precisión
      #  Keep stats for Loss and Accuracy
      running_loss += loss.item()

      # obtener la predicción
      _, prediction = torch.max(outputs,1)
      # contar las predicciones correctas
      correct_prediction += (prediction == labels).sum().item()
      total_prediction += prediction.shape[0]

      #if i % 10 == 0:    # print cada 10 mini-batches
      #    print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 10))
  
    # print el loss y la precisión por epoch
    num_batches = len(train_dl)
    avg_loss = running_loss / num_batches
    acc = correct_prediction/total_prediction
    print(f'Epoch: {epoch}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')

  print('Finished Training')
  
num_epochs=7   # numero de epochs
training(myModel, train_dl, num_epochs)\



Veamos como nos fue

In [None]:

# Inference
# ----------------------------
def inference (model, val_dl):
  correct_prediction = 0
  total_prediction = 0

  # Disable gradient updates
  with torch.no_grad():

    for data in val_dl:
      # Get the input features and target labels, and put them on the GPU
      inputs, labels = data[0].to(device), data[1].to(device)

      # Normalize the inputs
      inputs_m, inputs_s = inputs.mean(), inputs.std()
      inputs = (inputs - inputs_m) / inputs_s

      # Get predictions
      outputs = model(inputs)

      # Get the predicted class with the highest score
      _, prediction = torch.max(outputs,1)
      # Count of predictions that matched the target label
      correct_prediction += (prediction == labels).sum().item()
      total_prediction += prediction.shape[0]
    
  acc = correct_prediction/total_prediction
  print(f'Accuracy: {acc:.2f}, Total items: {total_prediction}')

# Run inference on trained model with the validation set
inference(myModel, val_dl)
