# Deep Residual Learning for Audio Recogniton <br>

Author: Jamie McQuire <br>

* This notebook is for the full competition dataset.
* This notebook is going to implement a ResNet network for the classifiation of the spectrogram images.
* This [tutorial](https://www.youtube.com/watch?v=wqkc-sj5H94) helped build the model.
* This model will be trained using the image data from the pre-processing notebook.
* Upload this notebook to google colab for faster computing with a GPU.
* If you wish to repeat this analysis please upload the notebook to google drive (directories must be set appropriately).

In [None]:
#import libraries
import os
import numpy as np
import tensorflow as tf
import keras
from keras import layers
from keras.initializers import glorot_uniform
from keras.layers import Input, Add, Dense, Activation, ZeroPadding2D, BatchNormalization, Flatten, Conv2D, AveragePooling2D, MaxPooling2D, Dropout
from keras.models import Model, load_model
import matplotlib.pyplot as plt
import pandas as pd
from keras.preprocessing.image import ImageDataGenerator
from sklearn.utils.class_weight import compute_class_weight

* Mount the google drive where you should have the data stored.
* Alternatively load the data into your working directory.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


* Load in the training and validation data from the pre-processing notebook.
* Set your directories for reproducible analysis.

In [None]:
#load in the training data and labels
X_train = np.load("/content/drive/My Drive/competition_data/X_train.npy")
Y_train = np.load("/content/drive/My Drive/competition_data/Y_train.npy")

#load in the validation data and labels
X_val = np.load("/content/drive/My Drive/competition_data/X_val.npy")
Y_val = np.load("/content/drive/My Drive/competition_data/Y_val.npy")

* These are the default settings.
* Check that the image_size = (161,99,1)

In [None]:
#default settings used
batch_size = 128
epochs = 10
image_size = X_train.shape[1:]
output_size = 12
print(image_size)

(161, 99, 1)


* Scale the image pixels for the deep learning models.

In [None]:
#scale the X data
X_train_scaled = X_train * (1 / 255)
X_val_scaled = X_val * (1  / 255)

* Convert the labels to one-hot-encoded vectors.

In [None]:
#one-hot encode the Y data
Y_train = keras.utils.to_categorical(Y_train.astype(np.int),output_size)
Y_val = keras.utils.to_categorical(Y_val.astype(np.int),output_size)

* Function to define the architecture of the identity block.

In [None]:
#building the identity block of the ResNet

def identity_block(X,f,filters,stage,block):
    
    #this is the function for the identity block
    #X is the input tensor
    #f is the filter shape for the middle block
    #filters is the list containing the filter sizes (int)
    #stage names the layer relative to the position in the network
    #block is used to name the layers 
    
    #name definition
    conv_name_base = "res" + str(stage) + block + "_branch"
    bn_name_base = "bn" + str(stage) + block + "_branch"
    
    #filters 1,2,3
    f1, f2, f3 = filters
    
    #initial value
    X_shortcut = X
    
    #define the first component block 
    X = Conv2D(filters=f1, kernel_size=(1,1),strides=(1,1),padding="valid",name=conv_name_base+"2a",kernel_initializer= glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3,name=bn_name_base + "2a")(X)
    X = Activation("relu")(X)
    
    #second component block 
    X = Conv2D(filters=f2, kernel_size=(f,f),strides=(1,1),padding="same",name=conv_name_base+"2b",kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3,name=bn_name_base + "2b")(X)
    X = Activation("relu")(X)
    
    #third component block 
    X = Conv2D(filters=f3,kernel_size=(1,1),strides=(1,1),padding="valid",name=conv_name_base+"2c",kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3,name=bn_name_base + "2c")(X)
    
    #make the connection at the add block 
    X = Add()([X,X_shortcut])
    #finish with the relu activation layer
    X = Activation("relu")(X)
    
    #return the new value of X
    return X

* Function to define the architecture of the convolutional block.

In [None]:
def convolution_block(X,f,filters,stage,block,s=2):
    
    conv_name_base = "res" + str(stage) + block + "_branch"
    bn_name_base = "bn" + str(stage) + block + "_branch"
    
    f1, f2, f3 = filters
    
    #initialize the value of X
    X_shortcut = X
    
    #the branch from the main path
    X_filt3 = Conv2D(f3,(1,1),strides=(s,s),name=conv_name_base+"1",kernel_initializer=glorot_uniform(seed=0))(X_shortcut)
    X_filt3 = BatchNormalization(axis=3,name=bn_name_base + "1")(X_filt3)
    
    #main path
    
    #block 1
    X = Conv2D(f1,kernel_size=(1,1),strides=(s,s),name=conv_name_base+"2a",kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3,name=bn_name_base+"2a")(X)
    X = Activation("relu")(X)
    
    #block 2
    X = Conv2D(f2,kernel_size=(f,f),strides=(1,1),padding="same",name=conv_name_base+"2b",kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3,name=bn_name_base+"2b")(X)
    X = Activation("relu")(X)
    
    #block 3
    X = Conv2D(f3,kernel_size=(1,1),strides=(1,1),padding="valid",name=conv_name_base+"2c",kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3,name=bn_name_base+"2c")(X)
    
    #add the branch and the main path together
    X = Add()([X,X_filt3])
    
    #finish with a relu
    X = Activation("relu")(X) 
    
    #return the new value of X
    return X

* Function to define the architecture of the ResNet model.

In [None]:
def ResNet50(input_shape=(161,99,1), num_classes=6):
    
    X_input = Input(input_shape)
    
    #padding
    X = ZeroPadding2D((3,3))(X_input)
    
    #stage1
    X = Conv2D(128, (7,7),strides=(2,2),name="conv1",kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3,name="bn_conv1")(X)
    X = Activation("relu")(X)
    X = MaxPooling2D((3,3),strides=(2,2))(X)

    X = Dropout(0.25)(X)
    
    #stage2
    X = convolution_block(X,f=3,filters=[64,64,256],stage=2,s=1,block="a")
    X = identity_block(X,f=3,filters=[64,64,256],stage=2,block="b")
    X = identity_block(X,f=3,filters=[64,64,256],stage=2,block="c")

    X = Dropout(0.25)(X)
    
    #stage3
    X = convolution_block(X,f=3,filters=[128,128,512],stage=3,s=2,block="a")
    X = identity_block(X,f=3,filters=[128,128,512],stage=3,block="b")
    X = identity_block(X,f=3,filters=[128,128,512],stage=3,block="c")
    X = identity_block(X,f=3,filters=[128,128,512],stage=3,block="d")

    X = Dropout(0.25)(X)

    #stage4
    X = convolution_block(X,f=3,filters=[256,256,1024],stage=4,s=2,block="a")
    X = identity_block(X,f=3,filters=[256,256,1024],stage=4,block="b")
    X = identity_block(X,f=3,filters=[256,256,1024],stage=4,block="c")
    X = identity_block(X,f=3,filters=[256,256,1024],stage=4,block="d")
    X = identity_block(X,f=3,filters=[256,256,1024],stage=4,block="e")
    X = identity_block(X,f=3,filters=[256,256,1024],stage=4,block="f")

    X = Dropout(0.25)(X)
    
    #stage5
    X = convolution_block(X,f=3,filters=[512,512,2048],stage=5,s=2,block="a")
    X = identity_block(X,f=3,filters=[512,512,2048],stage=5,block="b")
    X = identity_block(X,f=3,filters=[512,512,2048],stage=5,block="c")
    
    X = Dropout(0.25)(X)

    #stage6
    X = AveragePooling2D(pool_size=(2,2),name="avg_pool")(X)
    X = Flatten()(X)
    X = Dense(num_classes,activation="softmax",name="fc"+str(num_classes))(X)
    
    model = Model(inputs=X_input,outputs=X,name="ResNet50")
    
    return model

* Generate the model and compile it.
* Summary should provide information about the layers.

In [None]:
model = ResNet50(input_shape=(161,99,1),num_classes=12)

model.compile(optimizer="adam",loss="categorical_crossentropy",metrics=["acc"])

model.summary()

* Generate the class weights.
* This is needed as the unknown category contains significantly more files than the other categories.

In [None]:
Y_integers = np.argmax(Y_train, axis=1)
class_weights = compute_class_weight('balanced', np.unique(Y_integers), Y_integers)
d_class_weights = dict(enumerate(class_weights))

* Train the deep learning model over the default settings.

In [None]:
history = model.fit(X_train_scaled,Y_train,epochs=20,validation_data=(X_val_scaled,Y_val),
                    batch_size=128,shuffle=True,class_weight=d_class_weights)

* Save the model to your working directory.
* This will prevent you from having to restart training incase of error.

In [None]:
model.save("ResNet_Trained.h5")

* If you do not need to reload the model you can skip this step.

In [None]:
model = load_model("/content/drive/My Drive/Tensor_Flow_Speech_Recognition_Challenge/Trained_Models/ResNet_Trained.h5")

* Code to plot the accuracy of the training and validation sets during model training.
* Will save to a PDF figure in the working environment.

In [None]:
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='bottom right')
fig = plt.gcf()
fig.savefig("CNN_fvocab_acc.pdf",bbox_inches = "tight")
plt.show()

* Code to plot the loss of the training and validation sets during model training.
* Will save to a PDF figure in the working environment.

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper right')
fig = plt.gcf()
fig.savefig("CNN_fvocab_loss.pdf",bbox_inches = "tight")
plt.show()

* Load in part of the testing data that is generated in the pre-processing notebook.

In [None]:
X1 = np.load("/content/drive/My Drive/Tensor_Flow_Speech_Recognition_Challenge/Data_Files/test/X_test_p1.npy")

* Scale the pixels of the image data.

In [None]:
X1 = X1 * (1/255)

* Make predictions using the testing data.

In [None]:
predict_1 = model.predict(X1)

* Repeat for the other parts of the testing data.

In [None]:
#X1 = 0 to free up memory 
X1 = 0
X2 = np.load("/content/drive/My Drive/Tensor_Flow_Speech_Recognition_Challenge/Data_Files/test/X_test_p2.npy")
X2 = X2 * (1/255)
predict_2 = model.predict(X2)

In [None]:
#X2 = 0 to free up memory 
X2 = 0
X3 = np.load("/content/drive/My Drive/Tensor_Flow_Speech_Recognition_Challenge/Data_Files/test/X_test_p3.npy")
X3 = X3 * (1/255)
predict_3 = model.predict(X3)

* Load in the .csv file for the filenames as a pandas dataframe.
* This should contain the filenames for the testing data.
* This is created in the testing pre-processing notebook.

In [None]:
output_data = pd.read_csv("/content/drive/My Drive/Tensor_Flow_Speech_Recognition_Challenge/Data_Files/test/filenames.csv")

* Convert the predictions from the model to labels.

In [None]:
predicted_class1 = np.argmax(predict_1,axis=1)
predicted_class2 = np.argmax(predict_2,axis=1)
predicted_class3 = np.argmax(predict_3,axis=1)

* Create a dictionary to map the labels to the correct voice command.

In [None]:
label_dict = {
    0 : "yes",
    1 : "no",
    2 : "up",
    3 : "down",
    4 : "left",
    5 : "right",
    6 : "on",
    7 : "off",
    8 : "stop",
    9 : "go",
    10 : "silence",
    11 : "unknown"

}

* Map the labels to the correct voice command.
* Join the lists together to get the full list of predicted words.

In [None]:
predicted_class_labels1 = [label_dict[k] for k in predicted_class1]
predicted_class_labels2 = [label_dict[k] for k in predicted_class2]
predicted_class_labels3 = [label_dict[k] for k in predicted_class3]

predicted_class_label = predicted_class_labels1 + predicted_class_labels2 + predicted_class_labels3

* Create a new column in the dataframe called "label" which we will fill with the predicted words.

In [None]:
output_data["label"] = np.nan

output_data.head()

* Fill the "label" column with the predicted words.

In [None]:
predicted_class_array = np.asarray(predicted_class_label)
print(predicted_class_array)

output_data["label"] = predicted_class_array

output_data.head()

* Remove the unamed axis from the filename.csv dataframe.

In [None]:
output_data = output_data.drop("Unnamed: 0",axis=1)

* Check that the output is in the form of the Kaggle submission.

In [None]:
output_data.head()

* Export the output to a .csv file ensuring that index=False.
* The file is now ready for submission to Kaggle.


In [None]:
output_data.to_csv("submission_resnet.csv",index=False)