---
# **Emotion Recognition - wav2vec2-XLSR-53 large - Spanish**
---

***References***

[Wav2Vec2 - Spanish](https://huggingface.co/jonatasgrosman/wav2vec2-large-xlsr-53-spanish)

[Wav2Vec2 - Trasnfer learning](https://github.com/amansyayf/wav2vec2_emotion_recognition/blob/main/training_module.ipynb)


## **Data Preparation**
---

In [None]:
from google.colab import drive
import zipfile

drive.mount('/content/drive')

#Path de los archivos zip
mesd_zip =  '/content/drive/MyDrive/MESD.zip'
ems_zip  =  '/content/drive/MyDrive/EmoMatchSpanishDB/EmoMatchSpanishDB.zip'
smc_zip =   '/content/drive/MyDrive/Spanish MeaCorpus/sp.zip'

import warnings
warnings.filterwarnings('ignore')

In [None]:
zip_ref = zipfile.ZipFile(mesd_zip, 'r')
zip_ref.extractall('/content/mesd')
zip_ref.close()
!ls

In [None]:
df = pd.concat([mesd_df, smc_df, ems_df], ignore_index=True)
df.head()

In [None]:
df = pd.concat([mesd_df, smc_df], ignore_index=True)
df.head()

In [None]:
df.label.unique()

In [None]:
label_mapping = {
    'Anger': 'Ira', 'anger': 'Ira',
    'Disgust': 'Asco', 'disgust': 'Asco',
    'Fear': 'Miedo', 'fear': 'Miedo',
    'Happiness': 'Alegria', 'joy': 'Alegria', 'happiness': 'Alegria',
    'Neutral': 'Neutro', 'neutral': 'Neutro',
    'Sadness': 'Tristeza', 'sadness': 'Tristeza',
    'Alegria': 'Alegria',
    'Asco': 'Asco',
    'Ira': 'Ira',
    'Miedo': 'Miedo',
    'Neutro': 'Neutro',
    'Sorpresa': 'Sorpresa',
    'Tristeza': 'Tristeza'
}

df['label'] = df['label'].map(label_mapping)
df.head()

In [None]:
from sklearn.model_selection import train_test_split

df_train, df_val = train_test_split(df, test_size=0.2, random_state=42)
df_val, df_test = train_test_split(df_val, test_size=0.5, random_state=42)

In [None]:
num_emotions = len(df['label_class'].unique())

## **Set-up training**
---

In [None]:
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
device

In [None]:
import librosa
import numpy as np
from torch.utils.data import Dataset, DataLoader
import random

class AudioDataset(Dataset):
    def __init__(self, df, data_col, label_col, max_length=10*16000, new_sr=16000):

        self.file_path_list = df[data_col].tolist()
        self.label_list = df[label_col].tolist()
        self.max_length = max_length
        self.new_sr = new_sr

        total_len = len(self.file_path_list)

    def __len__(self):
        return len(self.file_path_list)

    def __getitem__(self, idx):
        audio, sample_rate = librosa.load(self.file_path_list[idx])
        if sample_rate != self.new_sr:
            audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=self.new_sr)
        label = self.label_list[idx]

        desired_length = self.max_length

        # pad or trim the audio signal to the desired length
        # pad the audio tensor with zeros to a fixed length of 160000
        if len(audio) < desired_length:
            padding = desired_length - len(audio)
            audio = np.pad(audio, (0, padding), 'constant')
        elif len(audio) > desired_length:
            audio = audio[:desired_length]
        return audio, label, self.file_path_list[idx]

In [None]:
from collections import Counter
from tqdm.notebook import tqdm
from torch.utils.data.sampler import WeightedRandomSampler

def get_dataloaders(df_train, df_val, df_test, BATCH_SIZE=8):

  train_dataset = AudioDataset(df_train, 'path', 'label_class')
  train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)


  val_dataset = AudioDataset(df_val, 'path', 'label_class')
  val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

  test_dataset = AudioDataset(df_test, 'path', 'label_class')
  test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

  dataloaders = {'train': train_dataloader, 'val': val_dataloader, 'test': test_dataloader}

  return dataloaders

In [None]:
import torch
import torchaudio
from transformers import HubertModel, Wav2Vec2FeatureExtractor, Wav2Vec2Tokenizer, Wav2Vec2Model
import torch.nn as nn
import torch.nn.functional as F

class AudioClassifier(nn.Module):
    def __init__(self,path):
        super().__init__()

        self.feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(path)
        self.wav2vec2 = Wav2Vec2Model.from_pretrained(path)
        self.wav2vec2.feature_extractor._freeze_parameters()

        # Ajusta las capas convolucionales según sea necesario
        self.conv1 = nn.Conv1d(499, 256, 1)
        self.dropout1 = torch.nn.Dropout(0.5)
        self.conv2 = nn.Conv1d(256, 1, 1)
        self.fc1 = torch.nn.Linear(1024,256)  # Ajusta la entrada según la nueva longitud de la secuencia
        self.dropout2 = torch.nn.Dropout(0.5)
        self.fc2 = torch.nn.Linear(256, num_emotions)


    def forward(self, input, spec_aug=False, mixup_lambda=None):
        input = self.feature_extractor(input, return_tensors="pt", sampling_rate=16000).to(device)
        input = input.input_values.squeeze(dim=0) # shape = (Batch_size, 16000)
        wav2feature = self.wav2vec2(input).last_hidden_state  # shape = (498, 768)

        # wav2feature = torch.mean(wav2feature, dim=1)
        # wav2feature = wav2feature.permute(0, 2, 1)  # Cambia a (batch_size, 768, sequence_length) -> (batch_size, 768, 498)

        x = self.dropout1(F.relu(self.conv1(wav2feature))) # shape = (N , 468, 501)
        x = self.conv2(x) # shape = (N, 501, 468)
        x = torch.mean(x, dim=1)
        x = self.dropout2(F.relu(self.fc1(x))) # shape = (N,468)
        x = self.fc2(x)
        x = torch.nn.functional.softmax(x, dim=1)
        return x

In [None]:
def save_checkpoint(checkpoint_path, model, optimizer):
    state = {
        'state_dict': model.state_dict(),
        'optimizer' : optimizer.state_dict()}
    torch.save(state, checkpoint_path)
    print('model saved to %s' % checkpoint_path)

def load_checkpoint(checkpoint_path, model, optimizer):
    state = torch.load(checkpoint_path)
    model.load_state_dict(state['state_dict'])
    optimizer.load_state_dict(state['optimizer'])
    print('model loaded from %s' % checkpoint_path)


In [None]:
from IPython.display import clear_output
import numpy as np
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support
import matplotlib.pyplot as plt
from torch.cuda.amp import GradScaler, autocast

def compute_metrics(y_true, y_pred, num_classes):
    # Calculando la matriz de confusión
    cm = confusion_matrix(y_true, y_pred, labels=list(range(num_classes)))

    # Calculando precision, recall y f1-score
    precision, recall, fscore, _ = precision_recall_fscore_support(y_true, y_pred, labels=list(range(num_classes)), average=None)

    # Calculando métricas por clase
    class_accuracy = np.diag(cm) / np.sum(cm, axis=1)

    return {
        "confusion_matrix": cm,
        "precision": precision,
        "recall": recall,
        "fscore": fscore,
        "class_accuracy": class_accuracy,
    }

class Learner():
  def __init__(self, model, opt, dataloaders, loss_fn, device, checkpoint_path):
    self.model = model
    self.opt = opt
    self.data_loader = dataloaders
    self.loss_fn = loss_fn
    self.device = device
    self.checkpoint_path = checkpoint_path
    self.scaler = GradScaler()

  def save_checkpoint(self):
    state = {
        'state_dict': self.model.state_dict(),
        'optimizer' : self.opt.state_dict()}
    torch.save(state, self.checkpoint_path)
    print('model saved to %s' % self.checkpoint_path)

  def load_checkpoint(self):
      state = torch.load(self.checkpoint_path)
      self.model.load_state_dict(state['state_dict'])
      self.opt.load_state_dict(state['optimizer'])
      print('model loaded from %s' % self.checkpoint_path)

  def accuracy_fn(self, y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred))
    return acc

  def train_step(self, train_losses = [], train_accuracies=[]):
      train_loss, train_acc = 0, 0
      self.model.train()
      accumulation_steps = 4  # Número de pasos de acumulación

      for batch, (X, y, file_path) in enumerate(tqdm(self.data_loader['train'], desc="Training", leave=False)):
        X, y = X.to(self.device), y.to(self.device)
        with autocast():
            y_prob = self.model(X)
            loss = self.loss_fn(torch.log(y_prob), y)
            loss = loss / accumulation_steps  # Escala la pérdida
        self.scaler.scale(loss).backward()

        if (batch + 1) % accumulation_steps == 0:
            self.scaler.step(self.opt)
            self.scaler.update()
            self.opt.zero_grad()

        y_pred = torch.argmax(y_prob, dim=1)
        train_loss += loss.item() * accumulation_steps
        acc = self.accuracy_fn(y_true=y, y_pred=y_pred)
        train_acc += acc

      train_loss /= len(self.data_loader['train'])
      train_acc /= len(self.data_loader['train'])

      train_losses.append(train_loss)
      train_accuracies.append(train_acc)

  def val_step(self, val_losses = [], val_accuracies = [], key='val'):
      y_trues, y_preds = [], []
      val_loss, val_acc = 0, 0

      self.model.eval()
      with torch.no_grad():
          for batch, (X, y, file_path) in enumerate(tqdm(self.data_loader[key], desc="Validating", leave=False)):
              X, y = X.to(self.device), y.to(self.device)
              with autocast():
                  val_prob = self.model(X)
                  val_pred = torch.argmax(val_prob, dim=1)
                  loss = self.loss_fn(torch.log(val_prob), y)
              val_loss += loss.item()
              acc = self.accuracy_fn(y_true=y, y_pred=val_pred)
              val_acc += acc
              y_trues.extend(y.cpu().numpy())
              y_preds.extend(val_pred.cpu().numpy())

          val_loss /= len(self.data_loader[key])
          val_acc /= len(self.data_loader[key])

          metrics = compute_metrics(np.array(y_trues), np.array(y_preds), num_classes=num_emotions)

          if key == 'val':
              if val_accuracies and val_acc > max(val_accuracies):
                  self.save_checkpoint()

              val_losses.append(val_loss)
              val_accuracies.append(val_acc)

          if key == 'test':
              return {"model_loss": val_loss, "model_acc": val_acc, "metrics": metrics}

  def test(self):
      if os.path.isfile(self.checkpoint_path):
          self.load_checkpoint()
      return self.val_step(key = 'test')

  def fit(self, epochs = 15):
      train_losses, val_losses = [], []
      train_accuracies, val_accuracies = [], []

      for epoch in range(epochs):
          self.train_step(train_losses = train_losses, train_accuracies =train_accuracies)
          self.val_step(val_losses = val_losses, val_accuracies = val_accuracies, key = 'val')

          clear_output(True)

          fig, axes = plt.subplots(1, 2, figsize=(15, 7))
          axes[0].set_title('Loss')
          axes[0].plot(train_losses, label='train')
          axes[0].plot(val_losses, label='val')
          axes[0].legend(loc='upper right')
          axes[0].grid()

          axes[1].set_title('Accuracy')
          axes[1].plot(train_accuracies, label='train')
          axes[1].plot(val_accuracies, label='val')
          axes[1].legend(loc='upper right')
          axes[1].grid()

          plt.show()

          print(f"Epoch {epoch} || train_loss: {train_losses[-1]}, val_loss: {val_losses[-1]}, train_accuracy: {train_accuracies[-1]}, val_accuracy: {val_accuracies[-1]}")

## **Training**
---

In [None]:
BATCH_SIZE=8

dataloaders = get_dataloaders(df_train, df_val, df_test, BATCH_SIZE=BATCH_SIZE)

model = AudioClassifier("facebook/wav2vec2-large-xlsr-53-spanish").to(device)
# next(model.parameters()).device

loss_fn = nn.NLLLoss() # Multi-category loss

optimizer = torch.optim.Adam(params=model.parameters(), lr=0.00005, betas=(0.5, 0.9))

checkpoint_path = '/content/drive/MyDrive/wav2vec/ser.pth'

In [None]:
learner =  Learner(model, optimizer, dataloaders, loss_fn, device, checkpoint_path = checkpoint_path)
learner.fit(epochs = 15)

In [None]:
learner.test()