<a href="https://colab.research.google.com/github/Ahmed-L/CSE465-/blob/main/Ravdess_resnet50.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Unzip dataset files and Generate CSVs

Ravdess download, unzip, and generate csv

In [1]:

!gdown --id 1UBZLKSiAJyKoIy_cvIH94ad0EhJLO1vY # Ravdess
!gdown --id 1a3R72CZ7IYNa68dQ9UDBQGNr8CWGnHrl # Ravdess csv
import zipfile
dataset_directory = '/content/Audio_Speech_Actors_01-24.zip' # RAVDESS
zip_ref = zipfile.ZipFile(dataset_directory, 'r')
zip_ref.extractall('Unzipped_Data')

import pandas as pd
from sklearn.model_selection import train_test_split
csv =  pd.read_csv('/content/ravdess_data.csv')
for index, row in csv.iterrows():
  if (row['Emotion_ID']==0):
    csv.drop(index, axis=0, inplace = True)

csv['Emotion_ID'].replace({1:0, 2:1, 3:2, 4:3, 5:4, 6:5, 7:6}, inplace=True)
y = pd.DataFrame(csv[['Emotion_ID']])

train_csv, test_csv = train_test_split(csv, random_state = 0, stratify=y, test_size = 0.2097, shuffle = True)


y = pd.DataFrame(train_csv[['Emotion_ID']])
train_csv, val_csv = train_test_split(train_csv, random_state = 0, stratify=y, train_size = 0.886, shuffle = True)
val_csv.to_csv(r'val_csv.csv', index=False)
train_csv.to_csv(r'train_csv.csv', index=False)
test_csv.to_csv(r'test_csv.csv', index=False)
print(len(train_csv))
print(len(test_csv))
print(len(val_csv))

Downloading...
From: https://drive.google.com/uc?id=1UBZLKSiAJyKoIy_cvIH94ad0EhJLO1vY
To: /content/Audio_Speech_Actors_01-24.zip
100% 208M/208M [00:01<00:00, 117MB/s]
Downloading...
From: https://drive.google.com/uc?id=1a3R72CZ7IYNa68dQ9UDBQGNr8CWGnHrl
To: /content/ravdess_data.csv
100% 94.0k/94.0k [00:00<00:00, 34.0MB/s]
940
282
122


In [2]:
import os
import torch
from torch.utils.data import Dataset
import pandas as pd
import torchaudio
from torch.utils.data import DataLoader
from torch.nn.functional import normalize
import librosa
import numpy


class CustomDataset(Dataset):
    def __init__(self, annotations_file, audio_dir, transformation = None, target_sample_rate = 16000, num_samples = 48000, device = 'cuda'):
        self.annotations = pd.read_csv(annotations_file)
        self.audio_dir = audio_dir
        self.device = device
        self.transformation = transformation.to(device)
        self.target_sample_rate = target_sample_rate
        self.num_samples = num_samples

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        audio_sample_path = self._get_audio_sample_path(index)
        #print(audio_sample_path)
        label = self._get_audio_sample_label(index)
        #print(label)
        signal, sr = torchaudio.load(audio_sample_path)
        signal = signal.to(self.device)
        signal = self._resample_if_necessary(signal, sr)
        signal = self._mix_down_if_necessary(signal)
        signal = self._cut_if_necessary(signal)
        signal = self._right_pad_if_necessary(signal)
        if self.transformation:
          signal = self.transformation(signal)
          delta = torchaudio.functional.compute_deltas(signal)
          delta2 = torchaudio.functional.compute_deltas(delta)
          signal = torch.cat((signal, delta, delta2), 1) # concatenating deltas
          #signal = torch.mean(signal, 2, True)
          #signal = (torch.sum(signal, 2))
        
        means = signal.mean(dim=1, keepdim=True)
        stds = signal.std(dim=1, keepdim=True)
        signal = (signal - means) / stds #NORMALIZED
        
        #signal = normalize(signal, p=2.0)

        return signal, label

    def _cut_if_necessary(self, signal):
        if signal.shape[1] > self.num_samples:
            # print(signal.shape[1]) # print sample size
            signal = signal[:, :self.num_samples]
        return signal

    def _right_pad_if_necessary(self, signal):
        length_signal = signal.shape[1]
        if length_signal < self.num_samples:
            num_missing_samples = self.num_samples - length_signal
            last_dim_padding = (0, num_missing_samples)
            signal = torch.nn.functional.pad(signal, last_dim_padding)
        return signal

    def _resample_if_necessary(self, signal, sr):
        if sr != self.target_sample_rate:
            resampler = torchaudio.transforms.Resample(sr, self.target_sample_rate)
            resampler = resampler.to(device)
            signal = resampler(signal)
        return signal

    def _mix_down_if_necessary(self, signal):
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal

    # for any audio index this function returns the audio path
    def _get_audio_sample_path(self, index):
        path = os.path.join(self.audio_dir, self.annotations.iloc[index,0])
        return path
    
    # for specified audio index this will return the label from the data csv
    def _get_audio_sample_label(self, index):
        return self.annotations.iloc[index, 2]

In [3]:
SAMPLE_RATE = 48000
NUM_SAMPLES = SAMPLE_RATE * 4
AUDIO_DIR = ''

if torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"
print(f"Using device {device}")

train_csv_file = '/content/train_csv.csv'
test_csv_file = '/content/test_csv.csv'
val_csv_file = '/content/val_csv.csv'

mfcc = torchaudio.transforms.MFCC(sample_rate=SAMPLE_RATE, n_mfcc=44, melkwargs={"n_fft": 1500, "hop_length": 500, 'power':2})

train_dataset = CustomDataset(train_csv_file, AUDIO_DIR, mfcc, SAMPLE_RATE, NUM_SAMPLES, device)
val_dataset = CustomDataset(val_csv_file, AUDIO_DIR, mfcc, SAMPLE_RATE, NUM_SAMPLES, device)
test_dataset = CustomDataset(test_csv_file, AUDIO_DIR, mfcc, SAMPLE_RATE, NUM_SAMPLES, device)

print(f"There are {len(train_dataset)} samples in the dataset.")
signal, label = train_dataset[1]
print(signal.size()) 
print(len(train_dataset))
print(len(test_dataset))
print(len(val_dataset))
 

Using device cuda
There are 940 samples in the dataset.
torch.Size([1, 132, 385])
940
282
122


In [7]:
import math
#Calculate outputs:
def CalculateConv2dOutput(W, F, P, S):
  return math.floor((W - F + 2 * P) / S ) + 1

def MaxPool2d(W, F, P, S):
  return math.floor((W - F) / S ) + 1


#layer 1
a = CalculateConv2dOutput(132, 3, 2, 1)
b = CalculateConv2dOutput(385, 3, 2, 1)

a = MaxPool2d(a, 2, 0, 2)
b = MaxPool2d(b, 2, 0, 2)
#layer 2
a = CalculateConv2dOutput(a, 3, 2, 1)
b = CalculateConv2dOutput(b, 3, 2, 1)

a = MaxPool2d(a, 2, 0, 2)
b = MaxPool2d(b, 2, 0, 2)

#layer 3
a = CalculateConv2dOutput(a, 3, 2, 1)
b = CalculateConv2dOutput(b, 3, 2, 1)

a = MaxPool2d(a, 2, 0, 2)
b = MaxPool2d(b, 2, 0, 2)
# layer 4
a = CalculateConv2dOutput(a, 3, 2, 1)
b = CalculateConv2dOutput(b, 3, 2, 1)

a = MaxPool2d(a, 2, 0, 2)
b = MaxPool2d(b, 2, 0, 2)

print(a)
print(b)

10
25


In [10]:
from torch import nn
from torchsummary import summary

class CNNNetwork(nn.Module):

    def __init__(self):
        super().__init__()
        # 4 conv blocks / flatten / linear / softmax
        self.conv1 = nn.Sequential(            # shape = 66,132
            nn.Conv2d(
                in_channels=1,
                out_channels=16,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)      # shape after maxpool = 33, 66
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(
                in_channels=16,
                out_channels=32,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(
                in_channels=32,
                out_channels=64,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(
                in_channels=64,
                out_channels=128,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(128 * 10 * 25, 512)
        self.linear2 = nn.Linear(512, 256)
        self.linear3 = nn.Linear(256, 128)
        self.linear4 = nn.Linear(128, 7) # 8 outputs # added linear layers
        self.softmax = nn.Softmax(dim=1)

    def forward(self, input_data):
        x = self.conv1(input_data)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.flatten(x)
        x = self.linear(x)
        x = self.linear2(x)
        x = self.linear3(x)
        logits = self.linear4(x)
        predictions = self.softmax(logits)
        return predictions


if __name__ == "__main__":
    model = CNNNetwork()

In [11]:

model = model.to(device)
summary(model, (signal.shape))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 16, 134, 387]             160
              ReLU-2         [-1, 16, 134, 387]               0
         MaxPool2d-3          [-1, 16, 67, 193]               0
            Conv2d-4          [-1, 32, 69, 195]           4,640
              ReLU-5          [-1, 32, 69, 195]               0
         MaxPool2d-6           [-1, 32, 34, 97]               0
            Conv2d-7           [-1, 64, 36, 99]          18,496
              ReLU-8           [-1, 64, 36, 99]               0
         MaxPool2d-9           [-1, 64, 18, 49]               0
           Conv2d-10          [-1, 128, 20, 51]          73,856
             ReLU-11          [-1, 128, 20, 51]               0
        MaxPool2d-12          [-1, 128, 10, 25]               0
          Flatten-13                [-1, 32000]               0
           Linear-14                  [

In [12]:
from torchvision import models
model = models.resnet50(pretrained=True)

model = model.to(device)
model.conv1=nn.Conv2d(1, model.conv1.out_channels, 
                      kernel_size=model.conv1.kernel_size[0], 
                      stride=model.conv1.stride[0], 
                      padding=model.conv1.padding[0])
num_ftrs = model.fc.in_features
model.fc = nn.Sequential(*[nn.Dropout(p=0.25), nn.Linear(num_ftrs, 7)])

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth


  0%|          | 0.00/97.8M [00:00<?, ?B/s]

In [15]:
BATCH_SIZE = 32
EPOCHS = 200
LEARNING_RATE = 0.0001
import torch.utils.data as data
from torch.optim.lr_scheduler import ReduceLROnPlateau

def create_data_loader(train_data, batch_size):
    train_dataloader = DataLoader(train_data, batch_size=batch_size)
    return train_dataloader

def train_single_epoch(model, data_loader, loss_fn, optimiser, device):
    model.train()
    for input, target in data_loader:
        input, target = input.to(device), target.to(device)
        # calculate loss
        prediction = model(input)
        loss = loss_fn(prediction, target)
        # backpropagate error and update weights
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()
    print(f"loss: {loss.item()}")


def test_single_epoch(model, dataloader, loss_fn, optimiser, device):
  correct = 0
  size = len(dataloader.dataset)
  #model.eval()
  with torch.no_grad():
    for input, target in dataloader:
          input, target = input.to(device), target.to(device)
          # calculate loss
          #loss = loss_fn(prediction, target)
          prediction = model(input)
          loss = loss_fn(prediction, target)
          correct += (prediction.argmax(1) == target).type(torch.float).sum().item()
    correct /= size
    print(f"Validation loss: {loss.item()}")
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}% \n")
    return loss.item()

# original train function
def train(model, data_loader, loss_fn, optimiser, device, epochs):
    for i in range(epochs):
        print(f"Epoch {i+1}")
        train_single_epoch(model, data_loader, loss_fn, optimiser, device)
        print("---------------------------")
    print("Finished training")


def train_test(model, train_dataloader, test_dataloader, loss_fn, optimiser, device, epochs):
    for i in range(epochs):
        print(f"Epoch {i+1}")
        train_single_epoch(model, train_dataloader, loss_fn, optimiser, device)
        validation_loss = test_single_epoch(model, test_dataloader, loss_fn, optimiser, device)
        #scheduler.step(validation_loss)
        print("---------------------------")
    print("Finished training")

train_dataloader = create_data_loader(train_dataset, BATCH_SIZE)
test_dataloader = create_data_loader(test_dataset, BATCH_SIZE)
val_dataloader = create_data_loader(val_dataset, BATCH_SIZE)
#model = model.to(device)

# initialise loss funtion + optimiser
loss_fn = nn.CrossEntropyLoss()
optimiser = torch.optim.Adam(model.parameters(),lr=LEARNING_RATE)
#scheduler = ReduceLROnPlateau(optimiser, 'min', verbose = True, cooldown = 5, patience = 20)


In [16]:
model.to(device)
train_test(model, train_dataloader, val_dataloader, loss_fn, optimiser, device, EPOCHS)

Epoch 1
loss: 1.4741967916488647
Validation loss: 1.2968744039535522
Test Error: 
 Accuracy: 55.7% 

---------------------------
Epoch 2
loss: 0.49214640259742737
Validation loss: 0.8989935517311096
Test Error: 
 Accuracy: 64.8% 

---------------------------
Epoch 3
loss: 0.10194418579339981
Validation loss: 0.9554879069328308
Test Error: 
 Accuracy: 65.6% 

---------------------------
Epoch 4
loss: 0.04746444150805473
Validation loss: 0.8545631170272827
Test Error: 
 Accuracy: 70.5% 

---------------------------
Epoch 5
loss: 0.045461636036634445
Validation loss: 0.9780051112174988
Test Error: 
 Accuracy: 68.9% 

---------------------------
Epoch 6
loss: 0.16891039907932281
Validation loss: 0.7061783075332642
Test Error: 
 Accuracy: 73.0% 

---------------------------
Epoch 7
loss: 0.09441106766462326
Validation loss: 0.9260916113853455
Test Error: 
 Accuracy: 72.1% 

---------------------------
Epoch 8
loss: 0.034882914274930954
Validation loss: 0.9970343708992004
Test Error: 
 Accur

KeyboardInterrupt: ignored

In [None]:
torch.save(model, '/content/Resnet_high_model.pth')

In [17]:
model.eval()
test_loss = test_single_epoch(model, test_dataloader, loss_fn, optimiser, device)

Validation loss: 1.4923886060714722
Test Error: 
 Accuracy: 71.6% 

