In [162]:
import os
import numpy as np
import librosa
import librosa.display
import torch
import soundfile
from utils import torch_device_seed, check_gpu, clear_device_cache

from tqdm.notebook import tqdm

In [163]:
def check_gpu():
    # torch.cuda.is_available() checks and returns a Boolean True if a GPU is available, else it'll return False
    is_cuda = torch.cuda.is_available()
    
    is_mps = torch.backends.mps.is_available()

    # If we have a GPU available, we'll set our device to GPU. We'll use this device variable later in our code.
    if is_cuda:
        device = torch.device("cuda")
        print("GPU is available")
    else:
        if is_mps:
            device = torch.device("mps")
        else:
            device = torch.device("cpu")
            print("GPU not available, CPU used")
    return device

In [164]:
device = check_gpu()
torch_device_seed(3407)
print(device)

GPU is available
cuda


In [165]:
def get_waveforms(file):
    wave,_= librosa.load(file,duration=3,offset=0.5,sr=sample_rate)
    waveform, _ = librosa.effects.trim(wave,top_db=60) # tops at 60 decibels

    # make sure waveform vectors are homogenous by defining explicitly
    waveform_homo = np.zeros((int(sample_rate*3,)))
    waveform_homo[:len(waveform)] = waveform
    
    # return a single file's waveform    
    return waveform_homo

In [166]:
emotions_dict = {
    0:'surprised',
    1:'neutral',
    2:'calm',
    3:'happy',
    4:'sad',
    5:'angry',
    6:'fearful',
    7:'disgust'
}
# Additional attributes from RAVDESS to play with
emotion_intensities = {
    1: 'normal',
    2: 'strong'
}

# RAVDESS native sample rate is 48k
sample_rate = 48000

In [167]:
# Function to extract features
def extract_feature(file_name, mfcc, chroma, mel):
    with soundfile.SoundFile(file_name) as sound_file:
        X = sound_file.read(dtype="float32")
        sample_rate = sound_file.samplerate
        stft = np.abs(librosa.stft(X)) if chroma else None
        result=np.array([])
        
        if mfcc:
            mfcc_vector = librosa.feature.mfcc(y=X, sr=sample_rate, n_mfcc=40)
            if len(mfcc_vector.shape) > 2:
                mfcc_vector = mfcc_vector.squeeze(2)
                mfcc_vector = mfcc_vector.reshape(40, -1) # n_mfcc parameter
            mfccs = np.mean(mfcc_vector.T, axis=0)
            result=np.hstack((result, mfccs))
        if chroma:
            chroma_vector = librosa.feature.chroma_stft(S=stft, sr=sample_rate)
            if len(chroma_vector.shape) > 2:
                chroma_vector = chroma_vector.squeeze(2)
                chroma_vector = chroma_vector.reshape(12, -1) # n_chroma_bins parameter
            chroma_feature = np.mean(chroma_vector.T, axis=0)
            result=np.hstack((result, chroma_feature))
        if mel:
            mel_vector = librosa.feature.melspectrogram(y=X, sr=sample_rate)
            if len(mel_vector.shape) > 2:
                mel_vector = mel_vector.squeeze(2)
                mel_vector = mel_vector.reshape(128, -1) # n_mels parameter
            mel_feature = np.mean(mel_vector.T, axis=0)
            result=np.hstack((result, mel_feature))
        # print(mfccs.shape, chroma_feature.shape, mel_feature.shape)
    return result

In [168]:
RAV = "ravdess-emotional-speech-audio-augmented/"

dir_list = os.listdir(RAV)
dir_list.sort()
# features and labels
emotions = []
features = []
# raw waveforms to augment later
# waveforms = []
# extra labels
intensities, genders = [],[]

for i in tqdm(dir_list, total=len(dir_list)):
    if i == ".DS_Store":
        continue

    if not i.startswith("Actor_"):
        continue

    fname = os.listdir(RAV+i)
    for f in fname:
        # Modality (01 = full-AV, 02 = video-only, 03 = audio-only). We only have 03
        # Vocal channel (01 = speech, 02 = song). We only have 01
        # Emotion (01 = neutral, 02 = calm, 03 = happy, 04 = sad, 05 = angry, 06 = fearful, 07 = disgust, 08 = surprised).
        # Emotional intensity (01 = normal, 02 = strong). NOTE: There is no strong intensity for the 'neutral' emotion.
        # Statement (01 = "Kids are talking by the door", 02 = "Dogs are sitting by the door").
        # Repetition (01 = 1st repetition, 02 = 2nd repetition).
        # Actor (01 to 24. Odd numbered actors are male, even numbered actors are female).
        part = f.split('.')[0].split('-')
        
        emotion = int(part[2])
        
        file_dir = RAV + i +'/'+ f
        
        #  move surprise to 0 for cleaner behaviour with PyTorch/0-indexing
        if emotion == 8: 
            emotion = 0 # surprise is now at 0 index; other emotion indeces unchanged
        elif emotion == 2: 
            continue # Skip calm emotion
        
        intensity = int(part[3])
        
        gender = int(part[6])
        if gender%2==0:
            gender = 'female'
        else:
            gender = 'male'
        
        # get waveform from the sample
        # waveform = get_waveforms(file_dir)
        
        feature = extract_feature(file_dir, mfcc = True, chroma = True, mel = True)
        # store waveforms and labels
        features.append(feature)
        emotions.append(emotion) # no need for the onehot encoded
        intensities.append(intensity) # store intensity in case we wish to predict
        genders.append(gender)

  0%|          | 0/24 [00:00<?, ?it/s]

In [169]:
emotions[0]

1

In [170]:
intensities[0]

1

In [171]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

In [172]:
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)

In [173]:
X_train, X_valid_test, y_train, y_valid_test = train_test_split(features, emotions, test_size=0.2, stratify=emotions)

In [174]:
from torch.utils.data import Dataset, DataLoader

In [175]:
class WaveformDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        return self.data[index], self.labels[index]

In [176]:
train_ds = WaveformDataset(X_train, y_train)
train_dataloader = DataLoader(train_ds, batch_size=32, num_workers=0, shuffle=True)

In [177]:
for x in train_dataloader:
    print(x)
    break

[tensor([[-5.6153e+02,  4.4960e+01, -1.3757e+01,  ...,  3.6342e-05,
          3.6531e-05,  3.7116e-05],
        [-5.6030e+02,  4.4190e+01, -1.4187e+01,  ...,  1.3121e-08,
          5.2733e-09,  4.4597e-09],
        [-6.2928e+02,  2.8251e+01,  2.6097e+00,  ...,  3.3833e-05,
          2.9757e-05,  4.6808e-06],
        ...,
        [-4.2834e+02,  3.2971e+01, -1.4513e+01,  ...,  5.6308e-04,
          5.7494e-04,  5.8092e-04],
        [-7.2574e+02,  6.4371e+01, -8.0500e+00,  ...,  5.5177e-09,
          5.2601e-09,  5.2591e-09],
        [-4.5256e+02,  4.1647e+01, -1.2695e+01,  ...,  3.6180e-07,
          3.5527e-08,  7.4517e-09]], dtype=torch.float64), tensor([3, 1, 7, 4, 6, 0, 5, 3, 0, 4, 1, 0, 5, 5, 5, 0, 5, 6, 7, 0, 7, 4, 6, 6,
        1, 1, 0, 4, 1, 4, 0, 7])]


In [178]:
valid_ds = WaveformDataset(X_valid_test, y_valid_test)
valid_dataloader = DataLoader(valid_ds, batch_size=32, num_workers=0, shuffle=False)

In [179]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [180]:
class SimpleLinearModel(nn.Module):
    def __init__(self, input_size, output_size):
        super(SimpleLinearModel, self).__init__()

        self.lstm = nn.LSTM(input_size, 128, num_layers=1, dropout=0.1, bidirectional=True, batch_first=True)

        self.linear1 = nn.Linear(128 * 2, 256) # for bidirectional
        self.linear2 = nn.Linear(256, 128)
        self.linear3 = nn.Linear(128, output_size)

    
    def forward(self, waveform):
        output, _ = self.lstm(waveform)
        output = F.relu(self.linear1(output))
        output = F.relu(self.linear2(output))
        output = self.linear3(output)

        return output

In [181]:
class EnhancedLinearModel(nn.Module):
    def __init__(self, input_size, output_size):
        super(EnhancedLinearModel, self).__init__()
        self.lstm = nn.LSTM(input_size, 128, num_layers=3, dropout=0.1, bidirectional=True, batch_first=True)
        self.linear1 = nn.Linear(128 * 2, 256)  # for bidirectional
        self.batch_norm1 = nn.BatchNorm1d(256)
        self.dropout1 = nn.Dropout(0.2)

        self.linear2 = nn.Linear(256, 128)
        self.batch_norm2 = nn.BatchNorm1d(128)
        self.dropout2 = nn.Dropout(0.2)

        self.linear3 = nn.Linear(128, output_size)
    
    def forward(self, waveform):
        output, _ = self.lstm(waveform)
        output = F.relu(self.linear1(output))
        output = output.view(-1, 256)
        output = self.batch_norm1(output)
        output = self.dropout1(output)

        output = F.relu(self.linear2(output))
        output = output.view(-1, 128)
        output = self.batch_norm2(output)
        output = self.dropout2(output)

        output = self.linear3(output)
        return output

In [182]:
import gc

clear_device_cache()

gc.collect()

0

In [201]:
linear_model = SimpleLinearModel(features[0].shape[0], len(emotions_dict)).to(device)
enhanced_model = EnhancedLinearModel(features[0].shape[0], len(emotions_dict)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(linear_model.parameters(), lr=0.001)
# optimizer = optim.Adam(enhanced_model.parameters(), lr=0.001)



In [202]:
def weights_init(m):
    if isinstance(m, nn.Linear):
        nn.init.kaiming_uniform_(m.weight)
        if m.bias is not None:
            nn.init.constant_(m.bias, 0)

linear_model.apply(weights_init)

SimpleLinearModel(
  (lstm): LSTM(180, 128, batch_first=True, dropout=0.1, bidirectional=True)
  (linear1): Linear(in_features=256, out_features=256, bias=True)
  (linear2): Linear(in_features=256, out_features=128, bias=True)
  (linear3): Linear(in_features=128, out_features=8, bias=True)
)

In [196]:
EPOCHS = 50

In [203]:
def fit(model, criterion, optimizer, epochs=EPOCHS, clip_value=1.0):
    train_accuracies = []
    val_accuracies = []
    train_losses = []
    val_losses = []

    for epoch in range(epochs):  # Loop over the dataset multiple times
        running_loss = 0.0
        total = 0
        correct = 0
        model.train()
        for inputs, labels in train_dataloader:
            inputs = inputs.to(device).float()
            labels = labels.to(device)
    
            optimizer.zero_grad()
    
            outputs = model(inputs)
            # print('outputs',outputs.shape)
            # print('labels',labels.shape)
            loss = criterion(outputs, labels)
            loss.backward()
            
            # Gradient clipping
            # torch.nn.utils.clip_grad_norm_(model.parameters(), clip_value)
            
            optimizer.step()
    
            running_loss += loss.item() 

            _, predicted = torch.max(outputs.data, axis=1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
        # get train loss and accuracy
        train_loss = running_loss / len(train_dataloader.dataset)
        train_accuracy = correct / total
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)
    
        # get test loss and accuracy
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        with torch.no_grad():
            model.eval()
            for data in valid_dataloader:
                inputs, labels = data
                inputs = inputs.to(device).float()
                labels = labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, axis=1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

        val_loss /= len(valid_dataloader)
        val_accuracy = val_correct / val_total
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)

        print(f"Epoch {epoch}: train_loss: {train_loss:.4f}; train_accuracy: {train_accuracy:.4f}; val_loss: {val_loss:.4f}; val_accuracy: {val_accuracy:.4f}")

    return {
        "train_loss": train_losses,
        "train_accuracy": train_accuracies,
        "val_loss": val_losses,
        "val_accuracy": val_accuracies
    }

In [205]:
train_result = fit(linear_model, criterion, optimizer)

Epoch 0: train_loss: 0.0242; train_accuracy: 0.7345; val_loss: 1.5806; val_accuracy: 0.4760
Epoch 1: train_loss: 0.0252; train_accuracy: 0.6874; val_loss: 1.5352; val_accuracy: 0.4880
Epoch 2: train_loss: 0.0260; train_accuracy: 0.6964; val_loss: 1.4852; val_accuracy: 0.4560
Epoch 3: train_loss: 0.0247; train_accuracy: 0.6964; val_loss: 1.6025; val_accuracy: 0.4880
Epoch 4: train_loss: 0.0239; train_accuracy: 0.7214; val_loss: 1.5859; val_accuracy: 0.4920
Epoch 5: train_loss: 0.0245; train_accuracy: 0.7134; val_loss: 1.6045; val_accuracy: 0.4760
Epoch 6: train_loss: 0.0230; train_accuracy: 0.7385; val_loss: 1.7892; val_accuracy: 0.4560
Epoch 7: train_loss: 0.0224; train_accuracy: 0.7265; val_loss: 1.5598; val_accuracy: 0.5120
Epoch 8: train_loss: 0.0212; train_accuracy: 0.7495; val_loss: 1.6790; val_accuracy: 0.4640
Epoch 9: train_loss: 0.0212; train_accuracy: 0.7555; val_loss: 1.5851; val_accuracy: 0.5000
Epoch 10: train_loss: 0.0221; train_accuracy: 0.7415; val_loss: 1.7801; val_accu

In [206]:
from IPython.display import Audio

In [207]:
sr = 22050

y_sweep = librosa.chirp(fmin=librosa.note_to_hz('C#4'),
                        fmax=librosa.note_to_hz('Eb5'),
                        sr=sr,
                        duration=1)

display(Audio(data=y_sweep, rate=sr))

In [190]:
for x in train_dataloader:
    print(x[0].shape)
    break

torch.Size([32, 180])


In [208]:
def predict(model, filename):
    f = filename.split("/")[-1]
    part = f.split('.')[0].split('-')
    emotion = int(part[2])

    emotion_label = emotions_dict[emotion]
    this_audio, sr = librosa.load(filename)

    feature = extract_feature(filename, mfcc = True, chroma = True, mel = True)

    with torch.no_grad():
        model.eval()
        feature_tensor = torch.tensor(np.array([feature]), dtype=torch.float).to(device)
        output = model(feature_tensor)
        _, prediction = torch.max(output, axis=1)
        prediction_label = emotions_dict[prediction.cpu().numpy()[0]]

    # show
    display(Audio(data=this_audio, rate=sr))
    print(f"Label     : {emotion_label}")
    print(f"Prediction: {prediction_label}")

In [211]:
predict(linear_model, "ravdess-emotional-speech-audio/Actor_24/03-01-07-01-01-02-24.wav")

Label     : disgust
Prediction: disgust
