In [37]:
# Train the CNN Emotion Regression Model with MIDI data#
# Last editted by Pu Zeng, 18/10/2023 #

In [2]:
import pandas as pd
from tqdm import tqdm
import os
from sklearn.model_selection import KFold
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

from torch.utils.data import TensorDataset
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import Dataset, DataLoader,TensorDataset,random_split,SubsetRandomSampler, ConcatDataset
import string
import numpy as np
import pickle
from sklearn import preprocessing
import torch
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import warnings
import json
warnings.filterwarnings('ignore')
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [3]:
# Calculate the mean Arousal and Valence score from raw json data
def read_json_av(fileName):
    file = open(fileName,'r')
    lines = json.load(file)
    file.close()
    file = pd.DataFrame(pd.DataFrame(lines['pieces']).T['midi'])
    ann = pd.DataFrame(lines['annotations']).T.reset_index()
    file['arousal'] = np.nan
    file['valence'] = np.nan
    i = 1
    for i in range(len(file)):
        f = file.iloc[i]
        raw = ann[ann['index'].str.contains(f.name+'_')]
        ar = []
        va = []
        for j in range(len(raw)):
            row = raw.iloc[j]
            ar.append(np.mean(row['arousal']))
            va.append(np.mean(row['valence']))
        file['arousal'].iloc[i] = np.mean(ar)
        file['valence'].iloc[i] = np.mean(va)
    return file.reset_index()[['midi','arousal','valence']]

In [4]:
j1 = read_json_av('../Toy_Dataset/VGG/vgmidi_raw_1.json');
j2 = read_json_av('../Toy_Dataset/VGG/vgmidi_raw_2.json');
data = pd.concat([j1,j2],ignore_index=True)

In [5]:
# Transform MIDI into MFCCs

res1 = []
from midi2audio import FluidSynth
import librosa
import librosa.display
import pretty_midi
import numpy as np
def parse_piano_most(file_path):
    mean_intensity = []
    midi_data = pretty_midi.PrettyMIDI(file_path)
    for instrument in midi_data.instruments:
        mean_intensity.append(instrument.get_piano_roll().mean())
    instruments_to_remove_index = np.argsort(mean_intensity)[:-1]
    instruments_to_remove = []
    for i in instruments_to_remove_index:
        instruments_to_remove.append(midi_data.instruments[i])
    for i in instruments_to_remove:
        midi_data.instruments.remove(i)
    midi_data.instruments[0].program=0
    return midi_data

for i in tqdm(range(len(data))):
    try:
        m = data.iloc[i]
        filename = '../Toy_Dataset/VGG/midi/'+ m['midi']
        midi_data = parse_piano_most(filename)
        if midi_data == None:
            continue
        midi_data.write('1.mid')
        fs = FluidSynth();
        fs.midi_to_audio('1.mid', 'output.wav');
        y, sr = librosa.load('output.wav');
        mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=20)
        res1.append([mfccs, m['arousal'],m['valence']])
    except:
        pass

100%|████████████████████████████████████████████████████████████████████████████████| 200/200 [04:24<00:00,  1.32s/it]


In [6]:
# Organize the dataset
import numpy as np
midi = []
for m in res1:
    midi.append([m[0], [m[1],m[2]]])

In [7]:
# Find the minimum length to uniform all MFCCs to the same shape
dim = np.min([m[0].shape[1] for m in res1])
dim

359

In [12]:
# Final dataset
from tqdm import tqdm
X = []
y = []
for m in res1:
    # X.append(m[0][:, :dim])
    X.append(m[0].T[0:dim,:])
    y.append([m[1], m[2]])
X = np.array(X)

In [13]:
# Define CNN regression model
class CNNModel(nn.Module):
    def __init__(self, hiddenSize, outChannels, dropoutRate, activate):
        super().__init__()
        self.outChannels = outChannels
        self.activate = nn.Sigmoid() if activate == "Sigmoid" else nn.ReLU()
        self.conv1 = nn.Conv2d(1, 24, (10,1))
        self.pool = nn.MaxPool2d((2, 1))
        
        self.conv2 = nn.Conv2d(24, 48, (10,1))
        self.bn1 = nn.BatchNorm2d(48)
        self.conv3 = nn.Conv2d(48, 96, (10,1))
        self.bn2 = nn.BatchNorm2d(96)
        self.conv4 = nn.Conv2d(96, 192, (10,2))
        self.bn3 = nn.BatchNorm2d(192)
        self.conv5 = nn.Conv2d(192, 96, (5,2))
        self.bn4 = nn.BatchNorm2d(96)
        self.conv6 = nn.Conv2d(96, 96, (5,2))
        self.dense1 = nn.Linear(1632, hiddenSize)
        self.dropout = nn.Dropout(dropoutRate)
        self.dense2 = nn.Linear(hiddenSize, 2)

    def forward(self, x):
        x = self.pool(self.activate(self.conv1(x)))
        x = self.pool(self.bn1(self.activate(self.conv2(x))))
        # print(x.shape)
        x = self.dropout(self.pool(self.bn2(self.activate(self.conv3(x)))))
        x = self.pool(self.bn3(self.activate(self.conv4(x))))
        x = self.pool(self.bn4(self.activate(self.conv5(x))))
        
        x = self.activate(self.conv6(x))
        # print(x.shape)
        x = x.view(-1, 1632)
        
        x = self.dropout(self.activate(self.dense1(x)))
        # print(self.dense2(x))
        return self.dense2(x)

# Number of neurons in the first fully-connected layer
hiddenSize = 64
# Number of feature filters in second convolutional layer
numFilters = 25
# Dropout rate
dropoutRate = 0.3
# Activation function
activation = "ReLU"
# Learning rate
learningRate = 0.001
# Momentum for SGD optimizer
momentum = 0.8
# Number of training epochs
numEpochs = 200

In [14]:
# Define Train and Validation function
from tqdm import tqdm
def train_epoch(cnn,device,dataloader,loss_fn,optimizer, m, std):
    cnn.train()
    cnnRunningLoss = 0
    total = 0
    R2 = 0
    for i, (inputs, labels) in enumerate(dataloader, 0):
        optimizer.zero_grad()
        # Normalization
        inputs = ((inputs.reshape(-1,1,dim,20)-m)/std).to(device)
        labels = labels.to(device)
        # Forward propagation
        cnnOutputs = cnn(inputs)

        l2_lambda = 0.01
        l2_reg = torch.tensor(0.).to(device)

        for param in cnn.parameters():
            l2_reg += torch.norm(param)

        # Backpropagation
        cnnLoss = criterion(cnnOutputs, labels)+l2_reg*l2_lambda
        cnnLoss.backward()

        # Gradient update
        optimizer.step()

        cnnRunningLoss += cnnLoss.item()
        total += 1
        # if (i+1) % 20 == 0:    # print every 2000 mini-batches
        #     print(cnnLoss)
        R2 += sklearn.metrics.r2_score(labels.to('cpu').detach().numpy(), cnnOutputs.to('cpu').detach().numpy())
    print('Training Loss {}, Trainging R-squared {}'.format(str(cnnRunningLoss/total), str(R2/total)))
    return cnnRunningLoss/total, R2
def valid_epoch(cnn,device,dataloader,loss_fn, m, std):
    cnn.eval()
    totalLoss = 0
    total = 0
    R2 = 0
    for inputs, labels in dataloader:
        inputs = ((inputs.reshape(-1,1,dim,20)-m)/std).to(device)
        labels = labels.to(device)
        cnnOutputs = cnn(inputs)
        cnnLoss = criterion(cnnOutputs, labels)#+l2_reg*l2_lambda
        # print(cnnLoss)
        totalLoss += cnnLoss.item()
        total +=1
        R2 += sklearn.metrics.r2_score(labels.to('cpu').detach().numpy(), cnnOutputs.to('cpu').detach().numpy())
    print('\nCNN validation R2: {}\n '.format(str(R2/total)))
    cnn.train()
    return totalLoss/total, R2

In [15]:
import sklearn
X_train_v, X_test, y_train_v, y_test = train_test_split(X, y, test_size=0.1, random_state=42)
X_train_v = torch.as_tensor(X_train_v, dtype=torch.float) # an alternative to torch.from_numpy
y_train_v = torch.as_tensor(y_train_v, dtype=torch.float)
X_test = torch.as_tensor(X_test, dtype=torch.float)
y_test = torch.as_tensor(y_test, dtype=torch.float)
splits=KFold(n_splits=5,shuffle=True,random_state=42)
train_dataset = TensorDataset(X_train_v, y_train_v)
test_dataset = TensorDataset(X_test, y_test)
batch_size=128
models = []
history = {'fold':[], 'train_loss': [], 'test_loss': [],'train_R2':[],'test_R2':[]}


for fold, (train_idx,val_idx) in enumerate(splits.split(np.arange(len(X_train_v)))):
    print('Fold {}'.format(fold + 1))
    train_sampler = SubsetRandomSampler(train_idx)
    test_sampler = SubsetRandomSampler(val_idx)
    m = np.vstack(X_train_v[train_idx]).mean()
    std = np.vstack(X_train_v[train_idx]).std()
    train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)
    test_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=test_sampler)
    
    model = CNNModel(hiddenSize, numFilters, dropoutRate, activation).to(device)
    criterion = nn.MSELoss()
    optimizer = torch.optim.SGD(list(model.parameters()), lr=learningRate, momentum=momentum)
    # optimizer = torch.optim.Adam(list(model.parameters()), lr=learningRate)
    best_test = -np.inf
    best_model = None
    for epoch in range(numEpochs):
        train_loss, train_R2=train_epoch(model,device,train_loader,criterion,optimizer, m, std)
        test_loss, test_R2=valid_epoch(model,device,test_loader,criterion, m, std)
        history['fold'].append(fold)
        history['train_loss'].append(train_loss)
        history['test_loss'].append(test_loss)
        history['train_R2'].append(train_R2)
        history['test_R2'].append(test_R2)   
        if test_R2>best_test:
            best_test = test_R2
            best_model = model
    models.append([best_model, m, std])

Fold 1
Training Loss 0.8883683979511261, Trainging R-squared -0.71922868559512

CNN validation R2: -0.5924767279498604
 
Training Loss 0.8730755746364594, Trainging R-squared -0.46519375148667735

CNN validation R2: -0.589629197142566
 
Training Loss 0.8814115226268768, Trainging R-squared -0.6475037131048493

CNN validation R2: -0.5847355371007656
 
Training Loss 0.8754326403141022, Trainging R-squared -0.4633856073084207

CNN validation R2: -0.5844088804760241
 
Training Loss 0.8712138831615448, Trainging R-squared -0.25573721914072456

CNN validation R2: -0.5858422163586051
 
Training Loss 0.8569222688674927, Trainging R-squared -0.22603838852008198

CNN validation R2: -0.5875088330500474
 
Training Loss 0.863812267780304, Trainging R-squared -0.20696149183040297

CNN validation R2: -0.5892590263389372
 
Training Loss 0.8614230155944824, Trainging R-squared -0.34204292082364024

CNN validation R2: -0.5919845242585575
 
Training Loss 0.8478294909000397, Trainging R-squared -0.0726407

In [16]:
r2 = []
r2_r = []
r2_v = []
for best_model, m, std in models:
    i = 0
    # best_model=models[1]
    test_loader = DataLoader(test_dataset, batch_size=batch_size)
    best_model.eval()
    for inputs, labels in test_loader:
        inputs = ((inputs.reshape(-1,1,dim,20)-m)/std).to('cuda')
        labels = labels.to('cuda')
        if i == 0:
            cnnOutputs = best_model(inputs).to('cpu')
            l = labels.to('cpu')
        else:
            cnnOutputs = torch.cat((cnnOutputs, best_model(inputs).to('cpu')), 0)
            l = torch.cat((l, labels.to('cpu')), 0)
        i+=1
    r2_v.append(sklearn.metrics.r2_score(l.to('cpu').detach().numpy()[:,1], cnnOutputs.to('cpu').detach().numpy()[:,1]))
    r2_r.append(sklearn.metrics.r2_score(l.to('cpu').detach().numpy()[:,0], cnnOutputs.to('cpu').detach().numpy()[:,0]))
    r2.append(sklearn.metrics.r2_score(l.to('cpu').detach().numpy(), cnnOutputs.to('cpu').detach().numpy()))

In [17]:
np.mean(r2)

0.03192516909693901

In [18]:
np.mean(r2_v)

0.04082095171521418

In [19]:
np.mean(r2_r)

0.023029369542872047