Research Paper - https://arxiv.org/pdf/1711.07128.pdf - https://github.com/ARM-software/ML-KWS-for-MCU <br>
Github Repos - <br>
https://github.com/rcmalli/keras-mobilenet <br>
https://github.com/ZainNasrullah/music-artist-classification-crnn

In [0]:
from google.colab import drive
drive.mount('/content/gdrive/', force_remount=True)
path = '/content/gdrive/My Drive/'

In [0]:
import librosa
import tensorflow as tf
import keras
from keras.layers import Dense,LSTM,GRU,GlobalMaxPool1D,Bidirectional,MaxPooling2D
from keras.models import Sequential
import numpy as np
import os
from sklearn.utils import shuffle
from sklearn import metrics
import pickle
from scipy.io.wavfile import read,write

from keras_applications.imagenet_utils import _obtain_input_shape
from keras import backend as K
from keras.layers import Input, Convolution2D, GlobalAveragePooling2D, Dense, BatchNormalization, Activation, Dropout, Permute, Reshape
from keras.models import Model
from keras.engine.topology import get_source_inputs

'''
os.chdir is throwing some error which I am not able to resolve. For the time being, do the following to import this python script -
Find the file in the drive and download it, then reupload it in the /content folder of the colab file.
You will need to do this for every new runtime connection
'''
from depthwise_conv2d import DepthwiseConvolution2D

#LOAD AND PROCESS INPUT

In [0]:
# KEYWORD_FOLDER1= 'Bachao_Data_Old/'
# KEYWORD_FOLDER2= 'Bachao_Data_Babble_10dB'
# KEYWORD_FOLDER3 = 'Bachao_Data_Natural_10dB'

KEYWORD_FOLDER4 = path + 'Help_Data_Old'
KEYWORD_FOLDER5 = path + 'Help_Data_10dB'
KEYWORD_FOLDER6= path + 'Help_Data_Natural_10dB'


# NEGATIVE_FOLDER1 = 'Negative_Data/'
# NEGATIVE_FOLDER2 = 'Negative_Data_10dB'
# NEGATIVE_FOLDER3 = 'Negative_Data_Natural_10dB'
NEGATIVE_FOLDER4 = path + 'Negative_Data/'
NEGATIVE_FOLDER5 = path + 'Negative_Data_10dB'
NEGATIVE_FOLDER6 = path + 'Negative_Data_Natural_10dB'

#OPPPOSITE_KEYWORD_FOLDER = 'Bachao_Data/'
KEYWORD_FOLDER_TEST = path + 'Bachao_Data_Test/'
NEGATIVE_FOLDER_TEST = path + 'Negative_Data_Test_Old/'
#OPPPOSITE_KEYWORD_FOLDER_TEST = 'Bachao_Data_Test/'

In [0]:
def count_files(folder, extension):
	count = 0
	for file in os.listdir(folder):
		if file.endswith(extension):
			file_path = os.path.join(folder, file)
			count += 1
	return count

def load_data_folder(folder, is_keyword):
  num_samples = count_files(folder, '.wav')
  data_X = np.zeros((num_samples, INPUT_SHAPE[0], INPUT_SHAPE[1]), dtype=np.float64)
  data_Y = np.zeros((num_samples), dtype=np.float64)

  count = 0
  for file in os.listdir(folder):
    if file.endswith('.wav'):
      file_path = os.path.join(folder, file)
      y, sr = librosa.load(file_path,sr=None)
      mfcc = librosa.feature.mfcc(y=y, sr=sr, hop_length=128, n_fft=256, n_mfcc=20)
      mfcc_delta = librosa.feature.delta(mfcc)[:10, :]
      mfcc_double_delta = librosa.feature.delta(mfcc, order=2)[:10, :]
      data_X[count, :, :20] = mfcc.T
      data_X[count, :, 20:30] = mfcc_delta.T
      data_X[count, :, 30:] = mfcc_double_delta.T
      data_Y[count] = int(is_keyword)
      count += 1
      if count%50==0:
        print(count)
  return data_X, data_Y

def load_data(folders):
	num_samples = sum([count_files(folder, '.wav') for folder, is_keyword in folders])
	data_X = np.zeros((num_samples, INPUT_SHAPE[0], INPUT_SHAPE[1]), dtype=np.float64)
	data_Y = np.zeros((num_samples), dtype=np.float64)
	count = 0
	for folder, is_keyword in folders:
		num_samples_folder = count_files(folder, '.wav')
		data_X[count:count+num_samples_folder, :, :], data_Y[count:count+num_samples_folder] = (
			load_data_folder(folder, is_keyword))
		count += num_samples_folder
	return shuffle(data_X, data_Y, random_state=0)

def load_train_data():
  #folders = [(NEGATIVE_FOLDER_TRAIN_1, False), (NEGATIVE_FOLDER_TRAIN_2, False), (NEGATIVE_FOLDER_TRAIN_3, False)]
  folders = [(KEYWORD_FOLDER4, True), (NEGATIVE_FOLDER4, False)]
  #folders = [(KEYWORD_FOLDER1, True), (KEYWORD_FOLDER2, True), (KEYWORD_FOLDER3, True), (KEYWORD_FOLDER4, True), (KEYWORD_FOLDER5, True), (KEYWORD_FOLDER6, True), (KEYWORD_FOLDER_TEST, True), (NEGATIVE_FOLDER1, False), (NEGATIVE_FOLDER2, False), (NEGATIVE_FOLDER3, False), (NEGATIVE_FOLDER4, False), (NEGATIVE_FOLDER5, False), (NEGATIVE_FOLDER6, False), (NEGATIVE_FOLDER_TEST, False)]
  return load_data(folders)

def load_test_data():
  #folders = [(NEGATIVE_FOLDER_TEST, True)]
  folders = [(KEYWORD_FOLDER_TEST, True), (NEGATIVE_FOLDER_TEST, False)] 
  return load_data(folders)

In [0]:
activ_dir = 'white_noise'
fs, x = read(path + 'white_noise.wav')
file_size = x.shape[0]
segment_time = 3.0
segment_samples = int(segment_time * fs)
no_of_segments = int(file_size/segment_samples)

# for i in range(no_of_segments - 1):
#   file_name = '{}_{}.wav'.format(activ_dir, i)
#   x_temp = x[i*segment_samples:(i+1)*segment_samples]
#   write(path + 'white_noice/' + file_name, fs, x_temp)

print(fs)
print(x.shape)
print(file_size)
print(segment_samples)
print(no_of_segments)

In [0]:
with open(path + 'help_data_total_hari_train_x.pickle', 'rb') as f:
  train_X = pickle.load(f)
with open(path + 'help_data_total_hari_train_y.pickle', 'rb') as f:
  train_Y = pickle.load(f)

print("Train data extracted")

print(train_Y.sum())
print(train_Y.shape[0])

In [0]:
np.random.seed(0)
np.random.shuffle(train_X)
np.random.shuffle(train_Y)

train_X_shuffle = train_X
train_Y_shuffle = train_Y

train_X_shuffle = train_X_shuffle[:,:,:,np.newaxis]

print(train_X_shuffle.shape)
print(train_Y_shuffle.shape)

In [0]:
# import sklearn
# train_X_cv, test_X_cv, train_Y_cv, test_Y_cv = sklearn.model_selection.train_test_split(train_X_shuffle, train_Y_shuffle, test_size=0.2)

#MODEL

In [0]:
def MobileNet(input_shape=(376,40,1), alpha=1, classes=2):
    """Instantiates the MobileNet.Network has two hyper-parameters
        which are the width of network (controlled by alpha)
        and input size.
        
        # Arguments
            input_tensor: optional Keras tensor (i.e. output of `layers.Input()`)
                to use as image input for the model.
            input_shape: optional shape tuple, only to be specified
                if `include_top` is False (otherwise the input shape
                has to be `(224, 224, 3)` (with `channels_last` data format)
                or `(3, 224, 244)` (with `channels_first` data format).
                It should have exactly 3 inputs channels,
                and width and height should be no smaller than 96.
                E.g. `(200, 200, 3)` would be one valid value.
            alpha: optional parameter of the network to change the 
                width of model.
            shallow: optional parameter for making network smaller.
            classes: optional number of classes to classify images
                into.
        # Returns
            A Keras model instance.
        """

    img_input = Input(shape=input_shape)

    x = Convolution2D(int(512 * alpha), (3, 3), strides=(2, 2), padding='same', use_bias=False)(img_input)
    x = BatchNormalization()(x)
    x = Activation('elu')(x)

    x = DepthwiseConvolution2D(int(512 * alpha), (3, 3), strides=(2, 2), padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = Activation('elu')(x)
    x = Convolution2D(int(512 * alpha), (1, 1), strides=(1, 1), padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = Activation('elu')(x)

    x = DepthwiseConvolution2D(int(512 * alpha), (3, 3), strides=(2, 2), padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = Activation('elu')(x)
    x = Convolution2D(int(512 * alpha), (1, 1), strides=(1, 1), padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = Activation('elu')(x)

    x = DepthwiseConvolution2D(int(512 * alpha), (3, 3), strides=(2, 2), padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = Activation('elu')(x)
    x = Convolution2D(int(512 * alpha), (1, 1), strides=(1, 1), padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = Activation('elu')(x)

    x = DepthwiseConvolution2D(int(512 * alpha), (3, 3), strides=(2, 2), padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = Activation('elu')(x)
    x = Convolution2D(int(512 * alpha), (1, 1), strides=(1, 1), padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = Activation('elu')(x)

    x = DepthwiseConvolution2D(int(512 * alpha), (3, 3), strides=(1, 1), padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = Activation('elu')(x)
    x = Convolution2D(int(512 * alpha), (1, 1), strides=(1, 1), padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = Activation('elu')(x)

    x = GlobalAveragePooling2D()(x)
    out = Dense(classes, activation='softmax')(x)

    model = Model(img_input, out, name='mobilenet')

    return model

def CRNN1(X_shape = (376,40,1), nb_classes = 2):
    '''
    Model used for evaluation in paper. Inspired by K. Choi model in:
    https://github.com/keunwoochoi/music-auto_tagging-keras/blob/master/music_tagger_crnn.py
    '''

    nb_layers = 4  # number of convolutional layers
    nb_filters = [256, 512, 512, 512]  # filter sizes
    kernel_size = (3, 3)  # convolution kernel size
    activation = 'elu'  # activation function to use after each layer
    pool_size = [(2, 2), (4, 2), (4, 2), (4, 2),
                 (4, 2)]  # size of pooling area

    # shape of input data (frequency, time, channels)
    input_shape = (X_shape[0], X_shape[1], X_shape[2])
    frequency_axis = 1
    time_axis = 2
    channel_axis = 3

    # Create sequential model and normalize along frequency axis
    model = Sequential()
    #model.add(BatchNormalization(axis=frequency_axis, input_shape=input_shape))

    # First convolution layer specifies shape
    model.add(Convolution2D(nb_filters[0], kernel_size=kernel_size, padding='same',
                     data_format="channels_last",
                     input_shape=input_shape))
    model.add(Activation(activation))
    model.add(BatchNormalization(axis=channel_axis))
    model.add(MaxPooling2D(pool_size=pool_size[0], strides=pool_size[0]))
    model.add(Dropout(0.1))

    # Add more convolutional layers
    for layer in range(nb_layers - 1):
        # Convolutional layer
        model.add(Convolution2D(nb_filters[layer + 1], kernel_size=kernel_size,
                         padding='same'))
        model.add(Activation(activation))
        model.add(BatchNormalization(
            axis=channel_axis))  # Improves overfitting/underfitting
        model.add(MaxPooling2D(pool_size=pool_size[layer + 1],
                               strides=pool_size[layer + 1]))  # Max pooling
        model.add(Dropout(0.1))

        # Reshaping input for recurrent layer
    # (frequency, time, channels) --> (time, frequency, channel)
    model.add(Permute((time_axis, frequency_axis, channel_axis)))
    resize_shape = model.output_shape[2] * model.output_shape[3]
    model.add(Reshape((model.output_shape[1], resize_shape)))

    # recurrent layer
    model.add(Bidirectional(GRU(256, return_sequences=True)))
    model.add(Bidirectional(GRU(256, return_sequences=False)))
    model.add(Dropout(0.3))

    # Output layer
    model.add(Dense(nb_classes))
    model.add(Activation("softmax"))
    return model



In [0]:
model = CRNN1()
model.summary()

In [0]:
from keras.optimizers import Adam
model.compile(optimizer = Adam(learning_rate = 0.001),loss = tf.keras.losses.SparseCategoricalCrossentropy(),metrics = ['acc'])

In [0]:
model.fit(x=train_X_shuffle,y=train_Y_shuffle,batch_size=64,epochs=10,validation_split=0.2)