In [None]:
'''

Author: Luca Pedrelli
l.pedrelli@deeplearningitalia.com
lucapedrelli@gmail.com

Exercitation: Multivariate Prediction on Musical Tasks

NB: This file is realized exclusively for educational purposes

'''

# MUSICAL TASKS INFO: 
# C.Gallicchio, A.Micheli, L.Pedrelli: Comparison between DeepESNs and gated RNNs on multivariate time-series prediction
# https://arxiv.org/pdf/1812.11527.pdf

import os
os.environ["CUDA_VISIBLE_DEVICES"]="-1"

import tensorflow
from keras import backend as K
import numpy as np
import keras
from keras.models import Sequential, Input, Model
from keras.callbacks import ModelCheckpoint
from keras.layers.core import Dense, Activation
from keras.layers import TimeDistributed, Masking
from keras.optimizers import SGD, Adam
from keras import utils
from keras.layers import LSTM, SimpleRNN, GRU
from scipy.io import loadmat


def pad_to_right(inputs):
    inputs = [inp.T for inp in inputs]
    
    lengths = [inp.shape[0] for inp in inputs]
    max_len = max(lengths)
    pad_list = [np.concatenate([inp, np.zeros((max_len-inp.shape[0], inp.shape[1]))], axis=0) for inp in inputs]
    return np.stack(pad_list, axis=0), lengths

def load_pianomidi(path):
    
    data = loadmat(os.path.join(path, 'pianomidi.mat')) # load dataset

    class Struct(object): pass
    dataset = Struct()
    dataset.name = data['dataset'][0][0][0][0]
    dataset.inputs = data['dataset'][0][0][1][0].tolist()
    dataset.targets = data['dataset'][0][0][2][0].tolist()
    dataset.inputs_padding, dataset.inputs_lengths = pad_to_right(dataset.inputs)
    dataset.targets_padding, dataset.targets_lengths = pad_to_right(dataset.targets)

    # input dimension
    Nu = dataset.inputs[0].shape[1]

    # function used for model evaluation     
    
    # select the model that achieves the maximum accuracy on validation set
    optimization_problem = np.argmax    
    
    
    TR_indexes = range(87) # indexes for training, validation and test set in Piano-midi.de task
    VL_indexes = range(87,99)
    TS_indexes = range(99,124)
    
    return dataset, Nu, optimization_problem, TR_indexes, VL_indexes, TS_indexes

def music_accuracy(y_true, y_pred):
    threshold = 0.5;

    X = y_pred
    Y = y_true

    X = K.transpose(X)
    Y = K.transpose(Y)   
    
    Nsys = K.sum(K.cast(K.greater(X,threshold), y_pred.dtype), axis=0)
    Nref = K.sum(K.cast(K.greater(Y,threshold), y_pred.dtype), axis=0)
    Ncorr = K.sum(K.cast(K.greater(X,threshold), y_pred.dtype) * K.cast(K.greater(Y,threshold), y_pred.dtype), axis=0)
    
    
    TP = K.sum(Ncorr)
    FP = K.sum(Nsys-Ncorr)
    FN = K.sum(Nref-Ncorr)
    ACCURACY = TP/(TP + FP + FN)
    return ACCURACY

In [None]:
path = ''
dataset, Nu, optimization_problem, TR_indexes, VL_indexes, TS_indexes = load_pianomidi(path)

In [None]:
dataset.inputs_padding.shape, dataset.targets_padding.shape

In [None]:
dataset.inputs_padding = dataset.inputs_padding[:,0:100,:]
dataset.targets_padding = dataset.targets_padding[:,0:100,:]

In [None]:
dataset.inputs_padding.shape, dataset.targets_padding.shape

In [None]:

NB_EPOCH = 1000
BATCH_SIZE = 100
VERBOSE = 2

NB_CLASSES = 88
INPUT_SHAPE = (dataset.inputs_padding.shape[1], dataset.inputs_padding.shape[2])

In [None]:
inputs = Input(shape=INPUT_SHAPE) 
mask = Masking(mask_value=0.0)(inputs)
layer_h = GRU(10, return_sequences=True)(mask)

In [None]:
outputs = TimeDistributed(Dense(NB_CLASSES, activation='sigmoid'))(layer_h) 

In [None]:
outputs

In [None]:
# try: clip norm, dropout, recurrent dropout, lr, Nr, epochs

inputs = Input(shape=INPUT_SHAPE) 
mask = Masking(mask_value=0.0)(inputs)
layer_h = GRU(10, return_sequences=True)(mask)
outputs = TimeDistributed(Dense(NB_CLASSES, activation='sigmoid'))(layer_h) 
model = Model(inputs = inputs, outputs = outputs)


model.compile(loss='categorical_crossentropy', metrics = [music_accuracy],
              optimizer=Adam())

model.summary()

filepath="model.best_.hdf5"
mcp = ModelCheckpoint(filepath, monitor="val_music_accuracy", verbose=1,
                  save_best_only=True, save_weights_only=False, mode='max')
es = keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=0, patience=30, mode='min')


In [None]:
history = model.fit(dataset.inputs_padding[TR_indexes,:,:], dataset.targets_padding[TR_indexes,:,:],
                    batch_size=BATCH_SIZE, epochs=NB_EPOCH, #callbacks=[mcp],
                    verbose=VERBOSE, shuffle=False, validation_data = (dataset.inputs_padding[VL_indexes,:,:], dataset.targets_padding[VL_indexes,:,:]), callbacks=[mcp, es])    


In [None]:
model.load_weights(filepath)

score = model.evaluate(dataset.inputs_padding[TS_indexes,:,:], dataset.targets_padding[TS_indexes,:,:], batch_size = BATCH_SIZE)

print("Test Accuracy: ", score[1])