In [None]:
import os
import numpy as np
import keras
import math
import itertools
import pandas as pd
import matplotlib.pyplot as plt
from librosa import display
from kapre.utils import Normalization2D
from keras.models import Model
from keras.layers import *
from keras import backend as k
from keras import optimizers
from keras.callbacks import ModelCheckpoint, EarlyStopping, LearningRateScheduler
from keras.preprocessing import sequence
from sklearn.metrics import confusion_matrix, classification_report
#from sklearn.preprocessing import LabelEncoder

In [None]:
os.environ["CUDA_VISIBLE_DEVICES"]="1"

In [None]:
FEATURES_PATH = os.path.abspath('/data/aumkar/data_asr/features')

In [None]:
train_feat = np.load(os.path.join(FEATURES_PATH, 'train_feat.npy'))
val_feat = np.load(os.path.join(FEATURES_PATH, 'val_feat.npy'))
test_feat = np.load(os.path.join(FEATURES_PATH, 'test_feat.npy'))

ytrain = np.load('ytrain.npy')
yval = np.load('yval.npy')
ytest = np.load('ytest.npy')

In [None]:
train_feat.shape

In [None]:
plt.figure()
display.specshow(train_feat[1], x_axis='time')
plt.colorbar()
plt.title('MFCC')
plt.tight_layout()

In [None]:
features_dim = 98
wide = 40
N1 = 128
Nfc1 = 520
Nfc2 = 120
out_dim = 35
BATCH_SIZE = 64

In [None]:
def residual_block(y, i):
    
    shortcut = y
    
    y = Conv2D(N1, (1, 2), padding = 'same', activation = 'relu', dilation_rate = i)(y)
    y = Activation('relu')(y)
    y = BatchNormalization(axis = -1, scale = None)(y)

    y = Add()([shortcut, y])

    return y

In [None]:
input1 = Input(shape = (features_dim, wide, 1))

model = (Normalization2D(int_axis = 1))(input1)

model = (Permute((2, 1, 3)))(model)

model = (Conv2D(N1, kernel_size = (1, 2), strides = (1, 2), padding = 'same', activation = 'relu'))(model)

model = (BatchNormalization(axis = -1, scale = None))(model)

res1 = residual_block(model, 2)

res2 = residual_block(res1, 4)

res3 = residual_block(res2, 8)
    
conv_ = (Conv2D(1, kernel_size = (1, 2), activation = 'relu', dilation_rate = 16))(res3)
    
bn = (BatchNormalization(axis = -1, scale = None))(conv_)
    
avg_pool = (MaxPooling2D(pool_size = 2))(bn)

flat = (Flatten())(avg_pool)

dense1 = (Dense(Nfc1, activation = 'relu'))(flat)

#drop1 = (Dropout(0.5))(dense1)

dense2 = (Dense(Nfc2, activation = 'relu'))(dense1)

#drop2 = (Dropout(0.5))(dense2)

out = (Dense(out_dim, activation = 'softmax'))(dense2)

model_ = Model(inputs = [input1], outputs = out)

adam_ = optimizers.Adam(lr = 1e-5, beta_1=0.9, beta_2=0.999, epsilon=1e-9)

In [None]:
model_.compile(loss = 'categorical_crossentropy', optimizer = adam_, metrics = ['categorical_accuracy'])

In [None]:
model_.summary()

In [None]:
batches = int(len(train_feat)/BATCH_SIZE)

In [None]:
def batch_generator():
    
    while True:

        for batch in range(batches):
            
            x_ = []
            data = train_feat[batch * BATCH_SIZE: (batch + 1) * BATCH_SIZE]
            label = ytrain[batch * BATCH_SIZE: (batch + 1) * BATCH_SIZE]
            
            for i in data:
                x_.append(np.reshape(i, (features_dim, wide, 1)))
                 
            x = np.asarray(x_)
            yield (x, label)

In [None]:
x_val = []

for i in range(len(val_feat)):
    x_val.append(np.reshape(val_feat[i], (features_dim, wide, 1)))
    
valx = np.asarray(x_val)

In [None]:
def step_decay(epoch):
    initial_lrate = 0.001
    drop = 0.4
    epochs_drop = 10.0
    lrate = initial_lrate * math.pow(drop, math.floor((1+epoch)/epochs_drop))
    
    if (lrate < 4e-5):
        lrate = 4e-5
      
    print('Changing learning rate to {}'.format(lrate))
    return lrate

In [None]:
lrate = LearningRateScheduler(step_decay)

In [None]:
checkpointer = ModelCheckpoint(filepath='checkpoint_asr.h5', monitor='val_categorical_accuracy', save_best_only=True)

In [None]:
earlystopper = EarlyStopping(monitor='val_categorical_accuracy', patience=10, verbose=1)

In [None]:
history = model_.fit_generator(batch_generator(), steps_per_epoch = batches, epochs = 500, validation_data=(valx, yval), 
                              callbacks=[checkpointer, earlystopper, lrate], shuffle = True)

In [None]:
plt.plot(history.history['categorical_accuracy'])
plt.plot(history.history['val_categorical_accuracy'])
plt.title('Categorical accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='lower right')
plt.show()

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper right')
plt.show()

In [None]:
testEval = model.evaluate(np.reshape(test_feat, (len(test_feat), features_dim, wide, 1)), ytest, batch_size = BATCH_SIZE)

In [None]:
print('Evaluation scores: \nMetrics: {} \nTest: {}'.format(model.metrics_names, testEval))

In [None]:
ROOT_DIR = os.path.normpath(os.path.join(os.path.dirname(os.path.realpath('__file__'))))
DATA_INFO = os.path.join(ROOT_DIR, 'data_asr', 'data_info')

In [None]:
test_files = pd.read_csv(os.path.join(DATA_INFO, 'testing_list.txt'), sep = ' ', header = None)[0].tolist()

In [None]:
test_lab = [os.path.dirname(i) for i in test_files]

In [None]:
lab = LabelEncoder()

test_encode = lab.fit_transform(test_lab)

In [None]:
y_pred = model.predict(np.reshape(test_feat, (len(test_feat), features_dim, wide, 1)), verbose=1)

In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    #print(cm)

    plt.figure(figsize=(25,25))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title, fontsize=30)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45, fontsize=15)
    plt.yticks(tick_marks, classes, fontsize=15)

    fmt = '.3f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt), size=11,
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.ylabel('True label', fontsize=30)
    plt.xlabel('Predicted label', fontsize=30)
    plt.tight_layout()

In [None]:
classes = ['backward', 'bed', 'bird', 'cat', 'dog', 'down', 'eight', 'five', 'follow', 
          'forward', 'four', 'go', 'happy', 'house', 'learn', 'left', 'marvin', 'nine', 
          'no', 'off', 'on', 'one', 'right', 'seven', 'sheila', 'six', 'stop', 'three', 'tree',
          'two', 'up', 'visual', 'wow', 'yes', 'zero']

In [None]:
cm = confusion_matrix(test_encode, np.argmax(y_pred, 1))

In [None]:
plot_confusion_matrix(cm, classes, normalize=True)

In [None]:
print(classification_report(test_encode, np.argmax(y_pred, 1), target_names = classes))