In [None]:
# !pip install transformers

In [19]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

import os
import librosa
import librosa.display
import IPython.display as ipd
from datetime import datetime

from sklearn.metrics import accuracy_score
from sklearn.model_selection import StratifiedKFold

from sklearn.pipeline import make_pipeline
from sklearn.compose import ColumnTransformer
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.svm import LinearSVC
from sklearn.kernel_approximation import Nystroem
from sklearn.ensemble import ExtraTreesClassifier, RandomForestClassifier, HistGradientBoostingClassifier, VotingClassifier
from sklearn.preprocessing import LabelEncoder

import torch
import torchaudio
from transformers import HubertModel, Wav2Vec2FeatureExtractor, Wav2Vec2Tokenizer, Wav2Vec2Model
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import WeightedRandomSampler
import torchvision
import torchvision.transforms as T
import librosa
import librosa.display
import IPython.display as ipd

from sklearn.utils import shuffle
import random
from IPython.display import clear_output
from torchaudio import transforms
from collections import Counter
from tqdm.notebook import tqdm

In [20]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [21]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cuda'

In [22]:
torch.manual_seed(1)
random.seed(1)
np.random.seed(1)

In [23]:
def process_df(range, RAVDESS, dir_list):
  emotion = []
  path = []
  for i in range:
      directory = dir_list[i]
      fname = os.listdir(RAVDESS + directory)
      for f in fname:
          part = f.split('.')[0].split('-')
          emotion.append(int(part[2]))
          path.append(RAVDESS + directory + '/' + f)

  df = pd.DataFrame(emotion, columns=['label_class'])
  df['label'] = df['label_class'].replace({1:'neutral', 2:'calm', 3:'happy', 4:'sad', 5:'angry', 6:'fear', 7:'disgust', 8:'surprise'})
  df = pd.concat([df, pd.DataFrame(path, columns=['path'])], axis=1)
  df['label_class'] = df['label_class'] - 1
  return df

In [35]:
def fetch_dataset(path):
  RAVDESS = path+'/'
  dir_list = os.listdir(RAVDESS)
  dir_list.sort()

  # make train dataset
  df_train = process_df(range(20), RAVDESS, dir_list)

  # make val dataset
  df_val = process_df(range(20, 22), RAVDESS, dir_list)

  # make test dataset
  df_test = process_df(range(22, 24), RAVDESS, dir_list)

  return df_train, df_val, df_test


In [36]:
df_train, df_val, df_test = fetch_dataset("/content/drive/MyDrive/wav2vec/RAVDESS/")

In [37]:
class AudioDataset(Dataset):
    def __init__(self, df, data_col, label_col, max_length=4*16000, new_sr=16000, use_aug=False):


        self.file_path_list = df[data_col].tolist()
        self.label_list = df[label_col].tolist()
        self.max_length = max_length
        self.new_sr = new_sr
        self.use_aug = use_aug

        total_len = len(self.file_path_list)

    def __len__(self):
        return len(self.file_path_list)

    def __getitem__(self, idx):
        audio, sample_rate = librosa.load(self.file_path_list[idx])
        if sample_rate != self.new_sr:
            audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=self.new_sr)
        label = self.label_list[idx]

        # data augmentation
        if self.use_aug:
          if random.random()>0.8:
              audio = self.noise(audio)
          if random.random()>0.8:
              audio = self.stretch(audio)

        desired_length = self.max_length

        # pad or trim the audio signal to the desired length
        # pad the audio tensor with zeros to a fixed length of 160000
        if len(audio) < desired_length:
            padding = desired_length - len(audio)
            audio = np.pad(audio, (0, padding), 'constant')
        elif len(audio) > desired_length:
            audio = audio[:desired_length]

        return audio, label, self.file_path_list[idx]

    def noise(self, data):
      noise_amp = 0.01*np.random.uniform()*np.amax(data)
      data = data + noise_amp*np.random.normal(size=data.shape[0])
      return data

    def stretch(self, data):
      rate = 1+0.1*np.random.uniform(-1, 1)
      return librosa.effects.time_stretch(data, rate=rate)

    def targets(self):
        return  self.label_list

In [38]:
def get_dataloaders(df_train, df_val, df_test, BATCH_SIZE=8, use_aug = False):
  if use_aug:
    train_dataset = AudioDataset(df_train, 'path', 'label_class', use_aug=True)
    class_count = Counter(train_dataset.targets())
    class_weights = {i: 1/c for i, c in class_count.items()}
    sample_weights = [0] * len(train_dataset)
    for i, (data, label, file_path) in enumerate(tqdm(train_dataset)):
        class_weight = class_weights[label]
        sample_weights[i] = class_weight

    N = int(3*max(class_count.values()) * len(class_count)/2)  # fit to 1.5*max
    train_sampler = WeightedRandomSampler(sample_weights, num_samples=N, replacement=True)
    train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=train_sampler, num_workers=2)
  else:
    train_dataset = AudioDataset(df_train, 'path', 'label_class')
    train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)


  val_dataset = AudioDataset(df_val, 'path', 'label_class')
  val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

  test_dataset = AudioDataset(df_test, 'path', 'label_class')
  test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

  dataloaders = {'train': train_dataloader, 'val': val_dataloader, 'test': test_dataloader}

  return dataloaders

In [39]:
class AudioClassifier(nn.Module):
    def __init__(self,path):
        super().__init__()

        self.feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(path)
        self.wav2vec2 = Wav2Vec2Model.from_pretrained(path)
        self.wav2vec2.feature_extractor._freeze_parameters()
        self.conv1 = nn.Conv1d(199, 256, 1)
        self.dropout1 = torch.nn.Dropout(0.5)
        self.conv2 = nn.Conv1d(256, 1, 1)
        self.fc1 = torch.nn.Linear(768, 256)
        self.dropout2 = torch.nn.Dropout(0.5)
        self.fc2 = torch.nn.Linear(256, 8)


    def forward(self, input, spec_aug=False, mixup_lambda=None):
        input = self.feature_extractor(input, return_tensors="pt", sampling_rate=16000).to(device)
        input = input.input_values.squeeze(dim=0)
        wav2feature = self.wav2vec2(input).last_hidden_state
        # wav2feature = torch.mean(wav2feature, dim=1)
        x = self.dropout1(F.relu(self.conv1(wav2feature)))
        x = self.conv2(x)
        x = torch.mean(x, dim=1)
        x = self.dropout2(F.relu(self.fc1(x)))
        x = self.fc2(x)
        x = torch.nn.functional.softmax(x, dim=1)
        return x

In [40]:
# facebook/wav2vec2-large-xlsr-53

In [41]:
def save_checkpoint(checkpoint_path, model, optimizer):
    state = {
        'state_dict': model.state_dict(),
        'optimizer' : optimizer.state_dict()}
    torch.save(state, checkpoint_path)
    print('model saved to %s' % checkpoint_path)

def load_checkpoint(checkpoint_path, model, optimizer):
    state = torch.load(checkpoint_path)
    model.load_state_dict(state['state_dict'])
    optimizer.load_state_dict(state['optimizer'])
    print('model loaded from %s' % checkpoint_path)

In [42]:
class Learner():
  def __init__(self, model, opt, dataloaders, loss_fn, device, checkpoint_path):
    self.model = model
    self.opt = opt
    self.data_loader = dataloaders
    self.loss_fn = loss_fn
    self.device = device
    self.checkpoint_path = checkpoint_path

  def save_checkpoint(self):
    state = {
        'state_dict': self.model.state_dict(),
        'optimizer' : self.opt.state_dict()}
    torch.save(state, self.checkpoint_path)
    print('model saved to %s' % self.checkpoint_path)

  def load_checkpoint(self):
      state = torch.load(self.checkpoint_path)
      self.model.load_state_dict(state['state_dict'])
      self.opt.load_state_dict(state['optimizer'])
      print('model loaded from %s' % self.checkpoint_path)

  def accuracy_fn(self, y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred))
    return acc

  def train_step(self, train_losses = [], train_accuracies=[]):

      train_loss, train_acc = 0, 0

      self.model.train()

      for batch, (X, y, file_path) in enumerate(self.data_loader['train']):
          X, y = X.to(self.device), y.to(self.device)

          y_prob = self.model(X).to(self.device)
          y_pred = torch.argmax(y_prob, dim=1).to(self.device)

          loss = self.loss_fn(torch.log(y_prob), y)
          train_loss += loss
          acc = self.accuracy_fn(y_true=y, y_pred=y_pred)
          train_acc += acc

          self.opt.zero_grad()

          loss.backward()
          optimizer.step()

      train_loss /= len(self.data_loader['train'])
      train_acc /= len(self.data_loader['train'])

      train_losses.append(train_loss.detach().cpu())
      train_accuracies.append(train_acc)

  def val_step(self, val_losses = [], val_accuracies = [], key='val'):

      val_loss, val_acc = 0, 0

      self.model.eval()

      with torch.inference_mode():
          for batch, (X, y, file_path) in enumerate(self.data_loader[key]):

              X, y = X.to(self.device), y.to(self.device)

              val_prob = self.model(X).to(self.device)
              val_pred = torch.argmax(val_prob, dim=1).to(self.device)

              loss = self.loss_fn(torch.log(val_prob), y)
              val_loss += loss
              acc = self.accuracy_fn(y_true=y, y_pred=val_pred)
              val_acc += acc

          val_loss /= len(self.data_loader[key])
          val_acc /= len(self.data_loader[key])

          if key == 'val':
            if val_accuracies and val_acc > max(val_accuracies):
              self.save_checkpoint()

            val_losses.append(val_loss.detach().cpu())
            val_accuracies.append(val_acc)

          if key == 'test':
            return {"model_loss": val_loss.detach().cpu(),
                    "model_acc": val_acc}

  def test(self):
    if os.path.isfile(self.checkpoint_path):
      self.load_checkpoint()
    return self.val_step(key = 'test')

  def fit(self, epochs = 15):

    train_losses, val_losses = [], []
    train_accuracies, val_accuracies = [], []

    for epoch in range(epochs):
        self.train_step(train_losses = train_losses, train_accuracies =train_accuracies)

        self.val_step(val_losses = val_losses, val_accuracies = val_accuracies, key = 'val')

        clear_output(True)

        fig, axes = plt.subplots(1, 2, figsize=(12, 7))

        axes[0].set_title('loss')
        axes[0].plot(train_losses, label='train')
        axes[0].plot(val_losses, label='val')
        axes[0].legend(loc='upper right')
        axes[0].grid()

        axes[1].set_title('acc')
        axes[1].plot(train_accuracies, label='train')
        axes[1].plot(val_accuracies, label='val')
        axes[1].legend(loc='upper right')
        axes[1].grid()

        plt.show()



In [None]:
dataloaders = get_dataloaders(df_train, df_val, df_test, use_aug = False)

model = AudioClassifier("facebook/wav2vec2-base").to(device)
next(model.parameters()).device

loss_fn = nn.NLLLoss() # Multi-category loss

optimizer = torch.optim.SGD(params=model.parameters(), lr=0.005)
checkpoint_path = '/content/drive/MyDrive/wav2vec/model1.pth'

In [None]:
learner =  Learner(model, optimizer, dataloaders, loss_fn, device, checkpoint_path = checkpoint_path)
learner.fit(epochs = 10)

In [None]:
learner.test()