In [1]:
import librosa
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

from sklearn import metrics

import shutil
import os
import zipfile
import tarfile

import time
import random

import wave

import torch
from torch import nn

import pickle

from torchsummary import summary

import IPython.display as ipd

%matplotlib inline

In [2]:
dataset_type = input('Pick a dataset from the following\nby writing a letter in the upper case\nI (IEMOCAP), R (RAVDESS), C (CREMA-D):')

dataset_type = dataset_type.upper()

dataset_type_dict = {
    'Name': ['CREMA-D', 'IEMOCAP', 'RAVDESS'],
    'Archive name': ['crema_d-audio_np-sr_22050.tar.xz', 'iemocap_numpy_audio_sr22050.tar.xz', 'ravdess-speech-sr22050.tar.xz']}
datasets_info = pd.DataFrame(dataset_type_dict, index=['C', 'I', 'R'])
 

print('You have picked {} dataset'.format(datasets_info['Name'][dataset_type]))

You have picked CREMA-D dataset


In [0]:
# process the dataset
path_to_packed_dataset = os.path.join('/content/drive/My Drive/datasets', datasets_info['Archive name'][dataset_type])
target_path = os.path.join('/content', datasets_info['Name'][dataset_type])

if not os.path.isdir(target_path):
    os.mkdir(target_path)

path_to_packed_dataset = shutil.copy2(path_to_packed_dataset, target_path)

print('The archive with {} audio files is copied'.format(datasets_info['Name'][dataset_type]))

with tarfile.open(path_to_packed_dataset) as tar:
    tar.extractall(target_path)

print('The archive with {} audio files is unpacked'.format(datasets_info['Name'][dataset_type]))################

The archive with RAVDESS audio files is copied
The archive with RAVDESS audio files is unpacked


In [4]:
def get_paths_to_wavs(path_to_dataset):
    file_paths_list = []

    for root, dirs, files in os.walk(path_to_dataset):
        if len(files) != 0:
            file_paths_list += [os.path.join(root, f) for f in files if f.endswith('.wav')]

    return file_paths_list

def get_paths_to_npys(path_to_dataset):
    # get a list with all absolute paths to each file
    file_paths_list = []

    for root, dirs, files in os.walk(path_to_dataset):
        if len(files) != 0:
            file_paths_list += [os.path.join(root, f) for f in files if f.endswith('.npy')]
            #file_paths_list += [os.path.join(root, f) for f in files if os.path.isdir(os.path.join(root, f))]

    return file_paths_list

class numpy_ravdess_dataset(torch.utils.data.Dataset):
    '''
    Due to librosa reads wav-files very slow it is more preferable to read the
    numpy representations of the original wavs
    '''

    emotion_dict = {
        0: 'neutral',
        1: 'calm',
        2: 'happy',
        3: 'sad',
        4: 'angry',
        5: 'fearful',
        6: 'disgust',
        7: 'surprised'
        }

    def __init__(self, paths_to_wavs_list, spectrogram_shape):
        super(numpy_ravdess_dataset, self).__init__()

        self.paths_to_wavs_list = paths_to_wavs_list

        self.mfcc_rows = spectrogram_shape[0]
        self.mfcc_cols = spectrogram_shape[1]

    def __len__(self):
        return len(self.paths_to_wavs_list)

    def read_audio(self, path_to_wav):
        return np.load(path_to_wav, allow_pickle=True)

    def get_class_label(self, path_to_file):
        # Parse the filename, which has the following pattern:
        # modality-vocal_channel-emotion-intensity-statement-repetition-actor.wav
        # e.g., '02-01-06-01-02-01-12.wav'
        file_name = os.path.split(path_to_file)[1]
        file_name = file_name[:-4]
        class_label = int(file_name.split('-')[2]) - 1 # 2 is a number of emotion code
        return class_label
        

    def __getitem__(self, idx):
        path_to_wav = self.paths_to_wavs_list[idx]
        # debug
        #print(path_to_wav)

        # read the wav file
        wav, sr = self.read_audio(path_to_wav)
        #wav, sr = librosa.load(path_to_wav)

        # get mfcc coefficients
        mfccs = librosa.feature.mfcc(wav, sr=sr, n_mfcc=self.mfcc_rows).astype(np.float32)

        actual_mfcc_cols = mfccs.shape[1]

        # prmitive time-shifting augmentation
        target_real_diff = actual_mfcc_cols - self.mfcc_cols
        if target_real_diff > 0:
            beginning_col = np.random.randint(target_real_diff)
            mfccs = mfccs[..., beginning_col:beginning_col + self.mfcc_cols]
        elif target_real_diff < 0:
            mfccs = np.pad(mfccs, ((0, 0), (0, np.abs(target_real_diff))), constant_values=(0), mode='constant')

        # make the data compatible to pytorch 1-channel CNNs format
        mfccs = np.expand_dims(mfccs, axis=0)

        # Parse the filename, which has the following pattern:
        # modality-vocal_channel-emotion-intensity-statement-repetition-actor.wav
        # e.g., '02-01-06-01-02-01-12.wav'
        #file_name = os.path.split(path_to_wav)[1]
        #file_name = file_name[:-4]
        #class_label = int(file_name.split('-')[2]) - 1 # 2 is a number of emotion code
        #class_label = np.array(class_label)
        class_label = self.get_class_label(path_to_wav)

        return torch.from_numpy(mfccs), class_label

class numpy_crema_dataset(numpy_ravdess_dataset):
    emotions_dict = {
        'ANG': 0,
        'DIS': 1,
        'FEA': 2,
        'SAD': 3,
        'HAP': 4,
        'NEU': 5
    }

    label2str = {
        0: 'ANG',
        1: 'DIS',
        2: 'FEA',
        3: 'SAD',
        4: 'HAP',
        5: 'NEU'
    }
    
    def get_class_label(self, path_to_file):
        file_name = os.path.split(path_to_file)[1]
        file_name = file_name[:-4]
        emotion_name = file_name.split('_')[2] # 2 is a number of emotion code
        return self.emotions_dict[emotion_name]

class numpy_iemocap_dataset(numpy_ravdess_dataset):
    emotions_dict = {
        'exc': 0,
        'sad': 1,
        'fru': 2,
        'hap': 3,
        'neu': 4,
        'sur': 5,
        'ang': 6,
        'fea': 7,
        'dis': 8,
        #'oth': 9
    }

    def get_class_label(self, path_to_file):
        file_name = os.path.split(path_to_file)[1]
        file_name = file_name[:-4]
        emotion_name = file_name.split('_')[-1] # the last is a position of emotion code
        return self.emotions_dict[emotion_name]

In [37]:
# define a class that describes an audio CNN
class audio_cnn(nn.Module):
    def __init__(self, rows, cols, num_classes):
        super(audio_cnn, self).__init__()

        self.rows = rows
        self.cols = cols
        self.num_classes = num_classes

        self.conv_extractor = nn.Sequential(nn.Conv2d(in_channels=1, out_channels=32, kernel_size=(3,3), padding=1),
            nn.BatchNorm2d(num_features=32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=(2,2)),

            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3,3), padding=1),
            nn.BatchNorm2d(num_features=64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=(2,2)),
            
            nn.Conv2d(in_channels=64, out_channels=128,kernel_size=(3, 3), padding=1),
            nn.BatchNorm2d(num_features=128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=(2,2)),

            nn.Conv2d(in_channels=128, out_channels=128,kernel_size=(3, 3), padding=1),
            nn.BatchNorm2d(num_features=128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=(2,2)),
            
            nn.Conv2d(in_channels=128, out_channels=64, kernel_size=(1,1)),
            nn.BatchNorm2d(num_features=64),
            nn.ReLU(inplace=True)
        )

        self.fc1 = nn.Sequential(
            nn.Linear(in_features=2048, out_features=512),
            nn.Dropout(),
            nn.ReLU(inplace=True)
        )

        self.fc2 = nn.Sequential(
            nn.Linear(in_features=512, out_features=128),
            nn.Dropout(),
            nn.ReLU(inplace=True)
        )
        self.fc3 = nn.Sequential(
            nn.Linear(in_features=128, out_features=32),
            nn.Dropout(),
            nn.ReLU(inplace=True),
            nn.Linear(in_features=32, out_features=num_classes)
        )
        
    def forward(self, x):
        x = self.conv_extractor(x)
        #print(x.shape, end='\n\n')
        x = torch.flatten(x, start_dim=1)
        #print(x.shape, end='\n\n')
        x = self.fc1(x)
        #print(x.shape, end='\n\n')
        x = self.fc2(x)
        #print(x.shape, end='\n\n')
        x = self.fc3(x)
        return x

def index_dataset(paths_list):
    freq_dict = {}
    for path in paths_list:
        file_name = os.path.split(path)[1]
        file_name = file_name[:-4]
        emotion_name = file_name.split('_')[2] # 2 is a number of emotion code
        try:
            freq_dict[emotion_name] += 1
        except KeyError:
            freq_dict[emotion_name] = 1

    for key in feq_dict:
        freq_dict[key] /= len(paths_list)
    return freq_dict

In [6]:
def validate(model, criterion, testloader, device):

    dataset_size = len(testloader.dataset)  
        
    correct = 0
    total = 0

    model.eval()

    epoch_loss = 0.0
    
    for i, (data, target) in enumerate(testloader):
        t0 = time.time()
        data = data.to(device)
        target = target.to(device)
        
        with torch.no_grad():
            # run forward step
            predicted = model(data)

            loss = criterion(predicted, target)

            epoch_loss += loss.item() * data.size(0)

        _, pred_labels = torch.max(predicted.data, 1)

        total += target.size(0)
        correct += (pred_labels == target).sum().item()


    return epoch_loss/dataset_size, correct/total


def train_num_epochs(model, trainloader, testloader, device, criterion, optimizer, starting_epoch, ending_epoch):
    '''
    model - neural network
    trainloader - pytorch dataloader for training set
    testloader - pytorch dataloader for test set
    device - cpu / cuda
    criterion - loss function (nn.CrossEntropyLoss())
    optimizer - (Adam)
    starting_epoch - 
    ending_epoch - 
    '''
    dataset_size = len(trainloader.dataset)  

    correct = 0
    total = 0

    # iterate over epochs
    for epoch_num in range(starting_epoch, ending_epoch):
        print('Epoch #%d' % (epoch_num))

        # iterate over batches
        epoch_loss = 0.0

        model.train()

        t = 0.0

        for i, (data, target) in enumerate(trainloader):
            t0 = time.time()
            data = data.to(device)
            target = target.to(device)

            # zero all the gradient tensors
            optimizer.zero_grad()
            # run forward step
            predicted = model(data)

            # compute loss
            loss = criterion(predicted, target)

            # compute gradient tensors
            loss.backward()

            # update parameters
            optimizer.step()

            # compute the loss value
            epoch_loss += loss.item() * data.size(0)

            t += time.time() - t0
            
            total += target.size(0)
            _, pred_labels = torch.max(predicted.data, 1)

            correct += (pred_labels == target).sum().item()
            
        
        epoch_loss /=  dataset_size
        print('# Time passed: %.0f s' % (t))
        print('# Epoch loss = %.4f' % (epoch_loss))
        print('# Train acc = {}'.format(correct/total))
        print('# Validation process on validation set')
        val_loss, val_acc = validate(model, criterion, testloader, device)
        print('# Validation loss = {}'.format(val_loss))
        print('# Validation acc = {}'.format(val_acc))

    return model, val_loss, val_acc

In [44]:
# prepare dataloaders
if dataset_type == 'C':
    target_path = '/media/mikhail/files/datasets/emotion_recognition/CREMA-D/audio_npy'
elif dataset_type == 'R':
    target_path = '/media/mikhail/files/datasets/emotion_recognition/RAVDESS/npy'
elif dataset_type == 'I':
    target_path = ''
npys_list = get_paths_to_npys(target_path)

# shuffle the dataset to for the learning process stability
random.seed(0)
random.shuffle(npys_list)

dataset_size = len(npys_list)

print(dataset_size)

# index dataset
print(index_dataset(npys_list))
print(index_dataset(npys_list[:train_size]))
print(index_dataset(npys_list[train_size:]))

7442
{'FEA': 0.1707874227358237, 'DIS': 0.1707874227358237, 'ANG': 0.1707874227358237, 'HAP': 0.1707874227358237, 'SAD': 0.1707874227358237, 'NEU': 0.1460628863208815}
{'FEA': 0.17436586594994122, 'DIS': 0.1723500755921384, 'ANG': 0.17201411053250462, 'HAP': 0.16882244246598355, 'SAD': 0.16882244246598355, 'NEU': 0.14362506299344868}
{'NEU': 0.15580926796507724, 'FEA': 0.15648085963734049, 'DIS': 0.16453995970449967, 'SAD': 0.1786433848220282, 'HAP': 0.1786433848220282, 'ANG': 0.16588314304902618}


In [39]:
index_dataset(npys_list)

{'FEA': 0.1707874227358237,
 'DIS': 0.1707874227358237,
 'ANG': 0.1707874227358237,
 'HAP': 0.1707874227358237,
 'SAD': 0.1707874227358237,
 'NEU': 0.1460628863208815}

In [25]:
emotion_name

'HAP'

In [15]:
lst = [1, 2, 3, 4]
random.seed(1)
random.shuffle(lst)
lst

[4, 1, 3, 2]

In [45]:
# get the size of the training set
train_size = int(dataset_size * 0.8)

# set up train dataset and train dataloader
#
# pick a dataset type
if dataset_type == 'C': # CREMA-D case
    numpy_audio_dataset = numpy_crema_dataset
elif dataset_type == 'I':
    numpy_audio_dataset = numpy_iemocap_dataset
elif dataset_type == 'R':
    numpy_audio_dataset = numpy_ravdess_dataset

train_dataset = numpy_audio_dataset(npys_list[:train_size], (64, 128))

train_dataloader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=256, shuffle=True, num_workers=4)

# set up test dataset and test dataloader
test_dataset = numpy_audio_dataset(npys_list[train_size:], (64, 128))

test_dataloader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=256, shuffle=True, num_workers=4)

In [19]:
train_dataset.emotion_dict

{0: 'neutral',
 1: 'calm',
 2: 'happy',
 3: 'sad',
 4: 'angry',
 5: 'fearful',
 6: 'disgust',
 7: 'surprised'}

In [47]:
# set-up devices
cuda = torch.device('cuda:1')
cpu = torch.device('cpu')

mfcc_emotion_cnn = audio_cnn(rows=64, cols=128, num_classes=len(train_dataset.emotion_dict))

summary(mfcc_emotion_cnn, input_size=(1, 64, 128), device='cpu')

mfcc_emotion_cnn.to(cuda)

cross_entropy = nn.CrossEntropyLoss()
# define an optimization algorithm and bind it with the NN parameters
optimizer = torch.optim.Adam(params=mfcc_emotion_cnn.parameters())

starting_epoch = 0
ending_epoch = 300
epoch_step = 300

path_to_weights = 'weights'

if not os.path.isdir(path_to_weights):
    os.mkdir(path_to_weights)

basic_name = '{}_mfcc_emotion_cnn'.format(datasets_info['Name'][dataset_type])

basic_name

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1          [-1, 32, 64, 128]             320
       BatchNorm2d-2          [-1, 32, 64, 128]              64
              ReLU-3          [-1, 32, 64, 128]               0
         MaxPool2d-4           [-1, 32, 32, 64]               0
            Conv2d-5           [-1, 64, 32, 64]          18,496
       BatchNorm2d-6           [-1, 64, 32, 64]             128
              ReLU-7           [-1, 64, 32, 64]               0
         MaxPool2d-8           [-1, 64, 16, 32]               0
            Conv2d-9          [-1, 128, 16, 32]          73,856
      BatchNorm2d-10          [-1, 128, 16, 32]             256
             ReLU-11          [-1, 128, 16, 32]               0
        MaxPool2d-12           [-1, 128, 8, 16]               0
           Conv2d-13           [-1, 128, 8, 16]         147,584
      BatchNorm2d-14           [-1, 128

'CREMA-D_mfcc_emotion_cnn'

In [48]:
min_val_acc = 0
for epoch in range(starting_epoch, ending_epoch, epoch_step):
    print('###################################################################')
    print('#\tEpoch number is %d' % (epoch))
    print('###################################################################')
    mfcc_emotion_cnn, val_loss, val_acc = train_num_epochs(model=mfcc_emotion_cnn,
                                                           trainloader=train_dataloader,
                                                           testloader=test_dataloader,
                                                           device=cuda,
                                                           criterion=cross_entropy,
                                                           optimizer=optimizer,
                                                           starting_epoch=epoch,
                                                           ending_epoch=epoch+epoch_step)
                                                           

    if val_acc > min_val_acc:
        min_val_acc = val_acc
        model_name = basic_name + '_ep-{}_loss-{:.3}_acc-{:.3}.pth'.format(epoch + epoch_step, val_loss, val_acc)
        path_to_saving_model = os.path.join(path_to_weights, model_name)
        torch.save(mfcc_emotion_cnn.state_dict(), path_to_saving_model)
        print('model %s have been saved' % (path_to_saving_model))

###################################################################
#	Epoch number is 0
###################################################################
Epoch #0
# Time passed: 3 s
# Epoch loss = 1.9379
# Train acc = 0.21736939358306737
# Validation process on validation set
# Validation loss = 1.6491444700912312
# Validation acc = 0.3498992612491605
Epoch #1
# Time passed: 3 s
# Epoch loss = 1.7090
# Train acc = 0.2592810347723837
# Validation process on validation set
# Validation loss = 1.5629244825838395
# Validation acc = 0.38146406984553394
Epoch #2


KeyboardInterrupt: 