In [1]:
import zipfile
import os
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from keras import optimizers
from keras import layers
from keras.regularizers import l1
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import Model
from keras.models import Sequential
from keras.layers import Dense, GlobalAveragePooling2D, Dropout, Flatten, concatenate
from tensorflow.keras import regularizers
from keras.callbacks import EarlyStopping
from keras.callbacks import ModelCheckpoint
from keras.utils.vis_utils import plot_model
import pylab as pl
import numpy as np
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from keras.utils import np_utils
from sklearn import preprocessing
import math


# audio lib
import librosa
import librosa.display
from librosa.util import fix_length

import IPython.display as ipd

In [2]:
import warnings
warnings.filterwarnings("ignore")

In [3]:
path = '../input/PMEmo2019'
chorus_path = '../input/PMEmo2019/chorus'

In [4]:
annotations = pd.read_csv(path + '/annotations/static_annotations.csv')
annotations['musicId'] = annotations['musicId'].astype(str)
annotations.shape

In [5]:
annotations.head()

In [6]:
labels = []

for index, row in annotations.iterrows():
    if row[1] <= 0.5 and row[2] <= 0.5:
        labels.append(0) #LL
    elif  row[1] <= 0.5 and row[2] > 0.5:
            labels.append(1) #LW
    elif  row[1] > 0.5 and row[2] <= 0.5:
            labels.append(2) #HL
    elif  row[1] > 0.5 and row[2] > 0.5:
            labels.append(3) #HH
            
len(labels)

In [7]:
annotations['labels'] = labels
annotations.head()

# CNN SPECTROGRAM APPROACH

## Data exploration

In [8]:
signal, sample_rate = librosa.load('../input/PMEmo2019/chorus/945.mp3', sr=22050)
librosa.display.waveshow(signal, sr=sample_rate)

In [9]:
hop_length = 512 # in num. of samples
n_fft = 2048 # window in num. of samples

# MFCCs
# extract 13 MFCCs
MFCCs = librosa.feature.mfcc(y=signal, sr=sample_rate, n_fft=n_fft, hop_length=hop_length, n_mfcc=13)


librosa.display.specshow(MFCCs, sr=sample_rate, hop_length=hop_length)
plt.xlabel("Time")
plt.ylabel("MFCC coefficients")
plt.colorbar()
plt.title("MFCCs")

# show plots
plt.show()

## Data preparation
I file audio vengono caricati con una durata massima di 60 secondi. Successivamente i file più corti vengono modificati per essere di 60 secondi.

In [10]:
SAMPLE_RATE = 22050
TRACK_DURATION = 60
SAMPLES_PER_TRACK = SAMPLE_RATE * TRACK_DURATION

num_segments = 10
samples_per_segment = int(SAMPLES_PER_TRACK / num_segments)


num_mfcc = 13
n_fft = 2048
hop_length = 512

num_mfcc_vectors_per_segment = math.ceil(samples_per_segment / hop_length)


data = {
    'ID' : [],
    'MFCC' : [],
    'Label' : []
}

for index, row in annotations.iterrows():
    print("Processando canzone ID: "+row[0])
    
    file_name = str(row[0]) + '.mp3'
    file_path = os.path.join(chorus_path, file_name)
    
    signal, sample_rate = librosa.load(file_path, sr=SAMPLE_RATE, duration=TRACK_DURATION)
    padded_audio = fix_length(signal, size=TRACK_DURATION*sample_rate) # array size is required_audio_size*sampling frequency
    
    for d in range(num_segments):

                    # calculate start and finish sample for current segment
                    start = samples_per_segment * d
                    finish = start + samples_per_segment

                    # extract mfcc
                    mfcc = librosa.feature.mfcc(y=padded_audio[start:finish], sr=sample_rate, n_mfcc=num_mfcc, n_fft=n_fft, hop_length=hop_length)
                    mfcc = mfcc.T

                    # store only mfcc feature with expected number of vectors
                    if len(mfcc) == num_mfcc_vectors_per_segment:
                        data["ID"].append(str(row[0]) +'.'+ str(d))
                        data["MFCC"].append(mfcc.tolist())
                        data["Label"].append(row[3])


    

In [11]:
df = pd.DataFrame(data)


df

In [12]:
librosa.display.specshow(np.array(df['MFCC'][703]).T, sr=sample_rate, hop_length=hop_length)
plt.xlabel("Time")
plt.ylabel("MFCC coefficients")
plt.colorbar()
plt.title("MFCCs")

# show plots
plt.show()

In [13]:
librosa.display.specshow(np.array(df['MFCC'][706]).T, sr=sample_rate, hop_length=hop_length)
plt.xlabel("Time")
plt.ylabel("MFCC coefficients")
plt.colorbar()
plt.title("MFCCs")

# show plots
plt.show()

Rimuovo tutte le righe che contengono parti aggiunte tramite padding

In [14]:
zero = df['MFCC'][706] # lista di riferimento contenente valori aggiunti da padding

for index, row in df.iterrows():
    if row[1] == zero:
        df = df[df.ID != row[0]]
        
df

In [15]:
df.to_csv('mfcc.csv',index=False)

In [38]:
X = np.stack(df["MFCC"])    
X.shape

In [39]:
Y = np.array(df["Label"])
Y.shape

In [103]:
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.25)

In [104]:
X_train = X_train[..., np.newaxis]
X_test = X_test[..., np.newaxis]


input_shape = (X_train.shape[1], X_train.shape[2], 1)

In [135]:
model = keras.Sequential()


# 1st conv layer
model.add(keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=input_shape))
model.add(keras.layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
model.add(keras.layers.BatchNormalization())
model.add(Dropout(0.2))

# 2nd conv layer
model.add(keras.layers.Conv2D(64, (3, 3), activation='relu'))
model.add(keras.layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
model.add(keras.layers.BatchNormalization())
model.add(Dropout(0.2))

# 3rd conv layer
model.add(keras.layers.Conv2D(128, (2, 2), activation='relu'))
model.add(keras.layers.MaxPooling2D((2, 2), strides=(2, 2), padding='same'))
model.add(keras.layers.BatchNormalization())
model.add(Dropout(0.2))

# 4rd conv layer
model.add(keras.layers.Conv2D(256, (1,1), activation='relu'))
model.add(keras.layers.MaxPooling2D((2, 2), strides=(2, 2), padding='same'))
model.add(keras.layers.BatchNormalization())
model.add(Dropout(0.2))


# flatten output and feed it into dense layer
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(512, activation='relu'))
model.add(keras.layers.Dropout(0.4))
model.add(keras.layers.Dense(128, activation='relu'))
model.add(keras.layers.Dropout(0.3))
model.add(keras.layers.Dense(64, activation='relu'))
model.add(keras.layers.Dropout(0.2))

# output layer
model.add(keras.layers.Dense(4, activation='softmax'))

model.compile(optimizer='adam',
             loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
    
model.summary()

In [136]:
plot_model(model)

In [137]:
filepath_cnn="weights.hdf5"
checkpoint = ModelCheckpoint(filepath_cnn, monitor='val_loss', verbose=1, save_best_only=True, mode='min')


history = model.fit(X_train, y_train, validation_split=0.2, batch_size=32, epochs=15, callbacks=[checkpoint])
model.load_weights(filepath_cnn)

In [138]:
x_plot = list(range(1,15 + 1))
def plot_history(network_history):
    plt.figure()
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.plot(x_plot, network_history.history['loss'])
    plt.plot(x_plot, network_history.history['val_loss'])
    plt.legend(['Training', 'Validation'])
    plt.figure()
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.plot(x_plot, network_history.history['accuracy'])
    plt.plot(x_plot, network_history.history['val_accuracy'])
    plt.legend(['Training', 'Validation'], loc='lower right')
    plt.show()
    
plot_history(history)

In [139]:
test_loss, test_acc = model.evaluate(X_test, y_test, verbose=2)
print('\nTest accuracy:', test_acc)