In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
!mkdir weights

In [1]:
# get max audio len from wavs in directory AudioWAV/
import os
from os import listdir
from os.path import isfile, join

import torch
import torchaudio
import librosa
from transformers import Wav2Vec2Processor, Wav2Vec2Model
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd
import pytorch_lightning as pl
from torch import nn, optim
import wandb
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

In [2]:
# create full data df
mypath = '/content/AudioWAV/'
data = []
for full_fname in os.listdir(mypath):
    fname = full_fname[:-4]
    tags = fname.split('_')
    tags.append(mypath + full_fname)
    data.append(tags)

full_df = pd.DataFrame(data, columns=['subject_ID', 'sentence', 'emotion', 'intensity', 'path'])

In [3]:
# split data into train, test, val based on unique subject IDs
# also todo: try splitting on ID + some other demographic feature (e.g. sex, race, country etc.)

unique_subjects = full_df['subject_ID'].unique()

# Step 2: Shuffle and split subjects into train, test, and validation sets
train_subjects, temp_subjects = train_test_split(unique_subjects, test_size=0.3, random_state=42)
test_subjects, val_subjects = train_test_split(temp_subjects, test_size=0.5, random_state=42)

# Step 3: Create DataFrames for each split based on the subject IDs
train_df = full_df[full_df['subject_ID'].isin(train_subjects)]
test_df = full_df[full_df['subject_ID'].isin(test_subjects)]
val_df = full_df[full_df['subject_ID'].isin(val_subjects)]

In [4]:
# some statistics:

# Print the number of samples in each split to verify
print("Train set samples:", len(train_df))
print("Test set samples:", len(test_df))
print("Validation set samples:", len(val_df))
print('\n')

# get percentage of different emotions in train set
emotion_counts = train_df['emotion'].value_counts()
emotion_counts = emotion_counts / emotion_counts.sum()
print(emotion_counts)

# same for test and val
emotion_counts = test_df['emotion'].value_counts()
emotion_counts = emotion_counts / emotion_counts.sum()
print(emotion_counts)

emotion_counts = val_df['emotion'].value_counts()
emotion_counts = emotion_counts / emotion_counts.sum()
print(emotion_counts)

Train set samples: 5147
Test set samples: 1147
Validation set samples: 1148


emotion
DIS    0.170779
SAD    0.170779
FEA    0.170779
ANG    0.170779
HAP    0.170779
NEU    0.146105
Name: count, dtype: float64
emotion
DIS    0.170881
FEA    0.170881
HAP    0.170881
SAD    0.170881
ANG    0.170881
NEU    0.145597
Name: count, dtype: float64
emotion
HAP    0.170732
ANG    0.170732
FEA    0.170732
SAD    0.170732
DIS    0.170732
NEU    0.146341
Name: count, dtype: float64


# Dataset

In [5]:
class CustomAudioDataset(Dataset):
    def __init__(self, file_paths, labels_metainfo, max_len, target='emotion_label', processor_name="facebook/wav2vec2-large-960h-lv60-self", sampling_rate=16000, fp16=True):
        self.file_paths = file_paths
        self.labels = list(labels_metainfo[target])
        self.max_len = max_len
        self.sampling_rate = sampling_rate
        self.processor = Wav2Vec2Processor.from_pretrained(processor_name)
        self.fp16 = fp16

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        # Load audio file using librosa
        audio, sr = librosa.load(self.file_paths[idx], sr=self.sampling_rate)
        audio = torch.from_numpy(audio).float()

        # Pad/Truncate the audio to max_len
        if audio.shape[0] > self.max_len:
            audio = audio[:self.max_len]
        else:
            pad_len = self.max_len - audio.shape[0]
            audio = torch.nn.functional.pad(audio, (0, pad_len))

        # Use processor to preprocess the audio
        inputs = self.processor(audio, sampling_rate=self.sampling_rate, return_tensors="pt", padding=True)
        input_values = inputs.input_values.squeeze(0)  # Remove batch dimension
        if self.fp16:
          input_values = input_values.to(torch.float16)

        # Get the corresponding label
        label = self.labels[idx]

        return input_values, label

# Model

In [6]:
class AudioClassifier(nn.Module):
    def __init__(self, config):
        super(AudioClassifier, self).__init__()
        self.config = config
        if config['fp16']:
          self.body = Wav2Vec2Model.from_pretrained(config['model_name'], torch_dtype=torch.float16)
        else:
          self.body = Wav2Vec2Model.from_pretrained(config['model_name'])
        if self.config['freeze_cnn']:
            for param in self.body.feature_extractor.parameters():
                param.requires_grad = False
        self.avg_pool = nn.AvgPool1d(kernel_size=config['in_seq_len'])
        self.classifier = nn.Sequential(
            nn.Linear(config['in_dim'], config['hidden_dim']),
            nn.LeakyReLU(),
            nn.BatchNorm1d(config['hidden_dim']),
            nn.Dropout(p=config['dropout']),
            nn.Linear(config['hidden_dim'], config['num_classes'])
        )


    def forward(self, x):
        x = self.body(x).last_hidden_state
        x = x.permute(0, 2, 1)
        x = self.avg_pool(x)
        x = x.squeeze(2)
        x = self.classifier(x)
        return x


class ClassifierModel(pl.LightningModule):
    def __init__(self, config):
        super(ClassifierModel, self).__init__()
        self.config = config
        self.model = AudioClassifier(config)

        if self.config['kaiming_init']:
            self.model.classifier.apply(self.init_weights)

        self.criterion = nn.CrossEntropyLoss()
        self.learning_rate = config['learning_rate']
        self.batch_size = config['batch_size']
        self.val_progress = []
        self.train_progress = []
        self.test_progress = []
        self.val_loss = 0.0
        self.train_loss = 0.0
        self.best_acc = 0
        self.best_f1 = 0
        self.best_epoch = 0

    def configure_optimizers(self):
        # initialize Adam optimizer and scheduler with warmup
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate, weight_decay=self.config['weight_decay'])
        if self.config['scheduler'] == 'cycle':
            self.scheduler = optim.lr_scheduler.CyclicLR(self.optimizer, base_lr=self.config['min_lr'], max_lr=self.config['max_lr'], step_size_up=self.config['step_size'], cycle_momentum=False)
            return [self.optimizer], [self.scheduler]
        elif self.config['scheduler'] == 'step':
            self.scheduler = optim.lr_scheduler.StepLR(self.optimizer, step_size=self.config['step_size'], gamma=self.config['gamma'])
        else:
            return [self.optimizer]


    def forward(self, x):
        return self.model(x)


    def init_weights(self, m):
        if isinstance(m, nn.Linear):
            nn.init.kaiming_uniform_(m.weight, nonlinearity=self.config['init_nonlinearity'])
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)

    def training_step(self, batch, batch_idx):
        x, y = batch
        probs = self(x)
        if self.config['fp16']:
          probs = probs.float()
        loss = self.criterion(probs, y)
        self.train_loss += loss.item()
        self.log('train_loss', loss)
        probs = nn.functional.softmax(probs, dim=1)
        self.train_progress.append((probs, y))
        return loss


    def validation_step(self, batch, batch_idx):
        x, y = batch
        probs = self(x)
        if self.config['fp16']:
          probs = probs.float()
        loss = self.criterion(probs, y)
        self.val_loss += loss.item()
        self.log('val_loss', loss)
        # save predictions and true labels to calculate accuracy on full validation after epoch end
        probs = nn.functional.softmax(probs, dim=1)
        self.val_progress.append((probs, y))
        return loss

    def on_validation_epoch_end(self):
        # calculate accuracy on full validation set
        preds = torch.cat([pred for pred, y in self.val_progress], dim=0)
        y = torch.cat([y for pred, y in self.val_progress], dim=0)
        preds = torch.argmax(preds, dim=1)
        acc = accuracy_score(y.cpu(), preds.cpu())
        precision, recall, f1, _ = precision_recall_fscore_support(y.cpu(), preds.cpu(), average='macro')

        if len(self.train_progress) > 0:
            train_preds = torch.cat([pred for pred, y in self.train_progress], dim=0)
            train_y = torch.cat([y for pred, y in self.train_progress], dim=0)
            train_preds = torch.argmax(train_preds, dim=1)
            train_acc = accuracy_score(train_y.cpu(), train_preds.cpu())
        else:
            train_acc = 0.0

        # log metrics based on configuration settings
        if self.config['log'] in ['wandb', 'all']:
            log_dict = {'train_loss': self.train_loss, 'train acc': train_acc, 'val_loss': self.val_loss, 'val_acc': acc, 'val_precision': precision, 'val_recall': recall, 'val_f1': f1}
            if self.config['scheduler'] == 'step':
                log_dict['lr'] = self.scheduler.get_last_lr()[0]
            wandb.log(log_dict)
        if self.config['log'] in ['stdout', 'all']:
            print(f'Training loss: {self.train_loss}')
            print(f'Validation accuracy: {acc}')
            print(f'Validation loss: {self.val_loss}')
            print(f'Validation precision: {precision}')
            print(f'Validation recall: {recall}')
            print(f'Validation f1: {f1}')
            if self.config['scheduler'] == 'step':
                print(f'Learning rate: {self.scheduler.get_last_lr()[0]}')

        # saved best model, based on chosen target metric if save_best is set to True
        if self.config['save_best']:
            if self.config['target_metric'] == 'accuracy' and acc >= self.best_acc:
                self.best_acc = acc
                self.save_checkpoint('/content/weights/'+self.config['run_name']+'.pth')
                self.best_epoch = self.current_epoch
                self.best_f1 = f1
            elif self.config['target_metric'] == 'f1' and f1 >= self.best_f1:
                self.best_f1 = f1
                self.save_checkpoint('/content/weights/'+self.config['run_name']+'.pth')
                self.best_epoch = self.current_epoch
                self.best_acc = acc

        # reset variables for next epoch
        self.val_progress = []
        self.val_loss = 0.0
        self.train_loss = 0.0
        self.train_progress = []


    def test_step(self, batch, batch_idx):
        x, y = batch
        probs = self(x)
        if self.config['fp16']:
          probs = probs.float()
        loss = self.criterion(probs, y)
        self.log('test_loss', loss)
        probs = nn.functional.softmax(probs, dim=1)
        self.test_progress.append((probs, y))
        return loss

    def on_test_epoch_end(self):
        # calculate accuracy on full test set
        preds = torch.cat([pred for pred, y in self.test_progress], dim=0)
        y = torch.cat([y for pred, y in self.test_progress], dim=0)
        preds = torch.argmax(preds, dim=1)
        acc = accuracy_score(y.cpu(), preds.cpu())
        precision, recall, f1, _ = precision_recall_fscore_support(y.cpu(), preds.cpu(), average='macro')

        # log metrics based on configuration settings
        if self.config['log'] in ['wandb', 'all']:
            wandb.log({'test_acc': acc, 'test_precision': precision, 'test_recall': recall, 'test_f1': f1})
        if self.config['log'] in ['stdout', 'all']:
            print(f'Test accuracy: {acc}')
            print(f'Test precision: {precision}')
            print(f'Test recall: {recall}')
            print(f'Test f1: {f1}')

        # reset variables for next epoch
        self.test_progress = []


    def save_checkpoint(self, path):
        """
        Saves a checkpoint, optimizer, and scheduler from a specified path.
        :param path: The file path from which to load the checkpoint.
        """
        model_state = {
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict()
        }
        if self.config['scheduler'] is not None:
            model_state['scheduler_state_dict'] = self.scheduler.state_dict()
        torch.save(model_state, path)

    def load_checkpoint(self, path):
        """
        Loads a checkpoint, optimizer, and scheduler from a specified path.
        :param path: The file path from which to load the checkpoint.
        """
        checkpoint = torch.load(path)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        if self.config['scheduler'] is not None:
            self.scheduler.load_state_dict(checkpoint['scheduler_state_dict'])


# misc processing

In [7]:
emotions_label_encoder = {
    'ANG':0,
    'DIS':1,
    'FEA':2,
    'HAP':3,
    'NEU':4,
    'SAD':5
}
emotions_label_decoder = {v: k for k, v in emotions_label_encoder.items()}

In [8]:
max_len = 0

mypath = '/content/AudioWAV/'
onlyfiles = [f for f in listdir(mypath) if isfile(join(mypath, f))]
for file in onlyfiles:
    audio, sr = librosa.load(mypath + file, sr=16000)
    if audio.shape[0] > max_len:
        max_len = audio.shape[0]

In [9]:
train_df['emotion_label'] = train_df['emotion'].map(emotions_label_encoder)
test_df['emotion_label'] = test_df['emotion'].map(emotions_label_encoder)
val_df['emotion_label'] = val_df['emotion'].map(emotions_label_encoder)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  train_df['emotion_label'] = train_df['emotion'].map(emotions_label_encoder)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_df['emotion_label'] = test_df['emotion'].map(emotions_label_encoder)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  val_df['emotion_label'] = val_df['emotion'].map(emotion

# Experiment

In [10]:
config = {
    "processor_name": "audeering/wav2vec2-large-robust-12-ft-emotion-msp-dim",
    "model_name": "audeering/wav2vec2-large-robust-12-ft-emotion-msp-dim",
    "sampling_rate": 16000,
    'batch_size': 32,
    'learning_rate': 5e-5,
    'kaiming_init': True,
    'init_nonlinearity': 'leaky_relu',
    'freeze_cnn': True,
    'in_seq_len': 250,
    'in_dim': 1024,
    'hidden_dim': 512,
    'num_classes': 6,
    'fp16': False,
    'wandb_api_key': 'your-key',
    'scheduler': None,
    'log': 'wandb',
    'save_best': True,
    'target_metric': 'accuracy',
    'dropout': 0.4,
    'weight_decay': 1e-4,
}

In [11]:
wandb.init(project='audio-emotion-classification', config=config)
wandb.login(key = config['wandb_api_key'])
config['run_name'] = wandb.run.name

[34m[1mwandb[0m: Currently logged in as: [33mtimothy-senchenko[0m. Use [1m`wandb login --relogin`[0m to force relogin




In [12]:
train_set = CustomAudioDataset(list(train_df['path']), train_df.drop('path', axis=1), max_len, processor_name=config['processor_name'], fp16=config['fp16'])
val_set = CustomAudioDataset(list(val_df['path']), val_df.drop('path', axis=1), max_len, processor_name=config['processor_name'], fp16=config['fp16'])
test_set = CustomAudioDataset(list(test_df['path']), test_df.drop('path', axis=1), max_len, processor_name=config['processor_name'], fp16=config['fp16'])

# Create DataLoader
trainloader = torch.utils.data.DataLoader(train_set, batch_size=config['batch_size'], shuffle=True)
valloader = torch.utils.data.DataLoader(val_set, batch_size=config['batch_size'], shuffle=False)
testloader = torch.utils.data.DataLoader(test_set, batch_size=config['batch_size'], shuffle=False)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [13]:
cls = ClassifierModel(config)
if config['fp16']:
  cls = cls.to(torch.float16)

Some weights of Wav2Vec2Model were not initialized from the model checkpoint at audeering/wav2vec2-large-robust-12-ft-emotion-msp-dim and are newly initialized: ['wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [14]:
if config['fp16']:
  trainer = pl.Trainer(max_epochs=10, check_val_every_n_epoch=1, precision='16-true')
else:
  trainer = pl.Trainer(max_epochs=10, check_val_every_n_epoch=1)

INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs


In [None]:
trainer.fit(cls, trainloader, valloader)