In [None]:
import numpy as np
import os
from os.path import isfile
import keras
from keras.models import Sequential, Model
from keras.layers import Input, Dense, Bidirectional, LSTM, Dropout, Activation, GRU, BatchNormalization
from keras.layers import Conv2D, concatenate, MaxPooling2D, Flatten, Embedding, Lambda


from keras.callbacks import ModelCheckpoint, TensorBoard, ReduceLROnPlateau, EarlyStopping
from keras import backend as K
from keras.utils import np_utils
from keras.optimizers import Adam, RMSprop

from keras import regularizers


import librosa
import librosa.display
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
cd drive/"My Drive"/FMA_Music

In [None]:
!ls

### Load training and Validation arrays

In [None]:
dict_genres = {'Electronic':0,  'Folk':1,  'Pop' :2, 'Instrumental':3 }

reverse_map = {v: k for k, v in dict_genres.items()}
print(reverse_map)

In [None]:
npzfile = np.load('loaddata/suf_train_arr.npz')
print(npzfile.files)
X_train = npzfile['arr_0']
y_train = npzfile['arr_1']
print(X_train.shape, y_train.shape)

In [None]:
npzfile = np.load('loaddata/suf_valid_arr.npz')
print(npzfile.files)
X_valid = npzfile['arr_0']
y_valid = npzfile['arr_1']
print(X_valid.shape, y_valid.shape)

In [None]:
y_train = np_utils.to_categorical(y_train)
y_valid = np_utils.to_categorical(y_valid)

### Parallel CNN - RNN Model

In [None]:
batch_size = 32
num_classes = 4
n_features = X_train.shape[2]
n_time = X_train.shape[1]

In [None]:
nb_filters1=16 
nb_filters2=32 
nb_filters3=64
nb_filters4=64
nb_filters5=64
ksize = (3,3)
pool_size_1= (2,2) 
pool_size_2 = (4,2)

lstm_count = 64
num_units = 120

BATCH_SIZE = 32
EPOCH_COUNT = 70
L2_regularization = 0.001

def conv_recurrent_model_build(model_input):
    print('Building model...')
    layer = model_input
    
        ### Convolutional blocks
    conv_1 = Conv2D(filters = nb_filters1, kernel_size = ksize, strides=1,
                      padding= 'valid', activation='relu', name='conv_1')(layer)
    batch_1 = BatchNormalization()(conv_1)
    pool_1 = MaxPooling2D(pool_size_1)(batch_1)
    dropout_1=Dropout(0.25)(pool_1)

    conv_2 = Conv2D(filters = nb_filters2, kernel_size = ksize, strides=1,
                      padding= 'valid', activation='relu', name='conv_2')(dropout_1)
    batch_2 = BatchNormalization()(conv_2)
    pool_2 = MaxPooling2D(pool_size_1)(conv_2)
    dropout_2=Dropout(0.25)(pool_2)

    conv_3 = Conv2D(filters = nb_filters3, kernel_size = ksize, strides=1,
                      padding= 'valid', activation='relu', name='conv_3')(dropout_2)
    batch_3 = BatchNormalization()(conv_3)
    pool_3 = MaxPooling2D(pool_size_1)(conv_3)
    dropout_3=Dropout(0.25)(pool_3)
    
    conv_4 = Conv2D(filters = nb_filters4, kernel_size = ksize, strides=1,
                      padding= 'valid', activation='relu', name='conv_4')(dropout_3)
    batch_4 = BatchNormalization()(conv_4)
    pool_4 = MaxPooling2D(pool_size_1)(conv_4)
    dropout_4=Dropout(0.25)(pool_4)
    
    conv_5 = Conv2D(filters = nb_filters5, kernel_size = ksize, strides=1,
                      padding= 'valid', activation='relu', name='conv_5')(dropout_4)
    batch_5 = BatchNormalization()(conv_5)
    pool_5 = MaxPooling2D(pool_size_1)(conv_5)
    dropout_5=Dropout(0.25)(pool_5)
    flatten1 = Flatten()(dropout_5)
    ### Recurrent Block
    
    # Pooling layer
    pool_lstm1 = MaxPooling2D(pool_size_2, name = 'pool_lstm')(layer)
    
    # Embedding layer

    squeezed = Lambda(lambda x: K.squeeze(x, axis= -1))(pool_lstm1)
    
    # Bidirectional GRU
    lstm = Bidirectional(GRU(lstm_count))(squeezed)  #default merge mode is concat
    
    # Concat Output
    concat = concatenate([flatten1, lstm], axis=-1, name ='concat')
    dropout_6=Dropout(0.25)(concat)
    ## Softmax Output
    output = Dense(num_classes, activation = 'softmax', name='preds')(dropout_6)
    
    model_output = output
    model = Model(model_input, model_output)
    
    opt = Adam(lr=0.001)
    #opt = RMSprop(lr=0.0005)  # Optimizer
    model.compile(
            loss='categorical_crossentropy',
            optimizer=opt,
            metrics=['accuracy']
        )
    
    print(model.summary())
    return model

In [None]:
def train_model(x_train, y_train, x_val, y_val):
    
    n_frequency = 128
    n_frames = 420
    #reshape and expand dims for conv2d
#     x_train = x_train.reshape(-1, n_frequency, n_frames)
    x_train = np.expand_dims(x_train, axis = -1)
    
#     x_val = x_val.reshape(-1, n_frequency, n_frames)
    x_val = np.expand_dims(x_val, axis = -1)
    
    
    input_shape = (n_frames, n_frequency, 1)
    model_input = Input(input_shape, name='input')
    
    model = conv_recurrent_model_build(model_input)
    
#     tb_callback = TensorBoard(log_dir='./logs/4', histogram_freq=1, batch_size=32, write_graph=True, write_grads=False,
#                               write_images=False, embeddings_freq=0, embeddings_layer_names=None,
#                               embeddings_metadata=None)
    checkpoint_callback = ModelCheckpoint('./models/parallel/weights.best.h5', monitor='val_acc', verbose=1,
                                          save_best_only=True, mode='max')
  
    #earlyStopping = EarlyStopping(monitor='val_acc', patience=70, verbose=1, mode='max')
    
    reducelr_callback = ReduceLROnPlateau(
                monitor='val_acc', factor=0.5, patience=10, min_delta=0.01,
                verbose=1
            )
    callbacks_list = [checkpoint_callback, reducelr_callback]

    # Fit the model and get training history.
    print('Training...')
    history = model.fit(x_train, y_train, batch_size=BATCH_SIZE, epochs=EPOCH_COUNT,
                        validation_data=(x_val, y_val), verbose=1, callbacks=callbacks_list)

    return model, history


In [None]:
def show_summary_stats(history):
    # List all data in history
    print(history.history.keys())

    # Summarize history for accuracy
    plt.plot(history.history['acc'])
    plt.plot(history.history['val_acc'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()

    # Summarize history for loss
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()

In [None]:
model, history  = train_model(X_train, y_train, X_valid, y_valid)

In [None]:
show_summary_stats(history)

In [None]:
from sklearn.metrics import classification_report

y_true = np.argmax(y_valid, axis = 1)
X_valid = np.expand_dims(X_valid, axis = -1)
y_pred = model.predict(X_valid)
y_pred = np.argmax(y_pred, axis=1)
labels = [0,1,2,3,4]
target_names = dict_genres.keys()

print(y_true.shape, y_pred.shape)
print(classification_report(y_true, y_pred, target_names=target_names))


In [None]:
from sklearn.metrics import accuracy_score

print(accuracy_score(y_true, y_pred))

### Look at the Test Set

In [None]:
npzfile = np.load('loaddata/test_arr.npz')
print(npzfile.files)
X_test = npzfile['arr_0']
y_test = npzfile['arr_1']
print(X_test.shape, y_test.shape)
te_idx = np.random.permutation(len(X_test))
X_test = X_test[te_idx]
y_test = y_test[te_idx]

In [None]:
from keras.models import load_model

weights_path = 'models/parallel/weights.best.h5'
model = load_model(weights_path)

In [None]:
print(np.amin(y_test), np.amax(y_test), np.mean(y_test))

In [None]:
from sklearn.metrics import classification_report

y_true = y_test
X_test = np.expand_dims(X_test, axis = -1)
y_pred = model.predict(X_test)
y_pred = np.argmax(y_pred, axis=1)
labels = [0,1,2,3]
target_names = dict_genres.keys()

print(y_true.shape, y_pred.shape)
print(classification_report(y_true, y_pred, target_names=target_names))

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns


mat = confusion_matrix(y_true, y_pred)
sns.heatmap(mat.T, square=True, annot=True, fmt='d', cbar=False,
            xticklabels=dict_genres.keys(),
            yticklabels=dict_genres.keys())
plt.xlabel('true label')
plt.ylabel('predicted label');

In [None]:
from sklearn.metrics import accuracy_score
print(accuracy_score(y_true, y_pred))