# Environment Setup

In [179]:
conda install -c numba numba

In [None]:
conda install -c conda-forge librosa

In [None]:
conda install -c conda-forge python-sounddevice

In [None]:
conda install -c anaconda scipy 

In [None]:
!pip install wavio

In [None]:
!pip install sklearn

In [None]:
!pip install tensorflow

In [None]:
!pip install tqdm

# Creating the Dataset

In [1]:
import librosa
import os
import json
from tqdm import tqdm
from math import *
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from copy import *
import soundfile as sf
import sounddevice as sd
from scipy.io.wavfile import write
import wavio as wv
from sklearn.metrics import *

## Enter Path to save data

In [4]:
path=input("Enter the path for saving audio files: ")
if not os.path.exists('personal_data'):
    os.makedirs(path+'personal_data')

Enter the path for saving audio files: /Users/vikasthapar/Desktop/TCS_KWS/


In [5]:
folder_path=path+"personal_data/"
folder_path

'/Users/vikasthapar/Desktop/TCS_KWS/personal_data/'

## Input Primary and Secondary Words

In [6]:
TOTAL=int(input("Enter number of Total words: "))
primary_word=input("Enter Primary word for spotting: ")
if not os.path.exists(folder_path+primary_word):
    os.makedirs(folder_path+primary_word)

secondary_words=[]
for i in range(TOTAL-1):
    s=input("Enter a Secondary word for spotting: ")
    secondary_words.append(s)
    if not os.path.exists(folder_path+s):
        os.makedirs(folder_path+s)

Enter number of Total words: 9
Enter Primary word for spotting: motivation
Enter a Secondary word for spotting: dedication
Enter a Secondary word for spotting: motion
Enter a Secondary word for spotting: monument
Enter a Secondary word for spotting: motorbike
Enter a Secondary word for spotting: notation
Enter a Secondary word for spotting: mutation
Enter a Secondary word for spotting: monsoon
Enter a Secondary word for spotting: moderator


## Data for Primary Word

In [5]:
freq = 16000
duration = 1.5

i=0
while i<(10*(TOTAL-1)):
    print("Word: "+primary_word+" File no.: "+str(i))
    a=input("Are you ready to record? (Y/N) ")
    if a.lower()=="y":
        print("Speak now!")
    else:
        continue
    
    recording = sd.rec(int(duration * freq), samplerate=freq, channels=1, dtype='int16')
    sd.wait()
    
    print("Done!")
    print()
    sf.write(folder_path+primary_word+"/"+str(i)+".wav", recording, freq, 'PCM_16')
    
    i+=1
    

Word: motivation File no.: 0
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: motivation File no.: 1
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: motivation File no.: 2
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: motivation File no.: 3
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: motivation File no.: 4
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: motivation File no.: 5
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: motivation File no.: 6
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: motivation File no.: 7
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: motivation File no.: 8
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: motivation File no.: 9
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: motivation File no.: 10
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: motivation File no.: 11
Are you ready to record? (Yes/No) 

## Data for Secondary Word

In [11]:
freq = 16000
duration = 1.5


for j in range(len(secondary_words)):
    i=0
    while i<10:
        print("Word: "+secondary_words[j]+" File no.: "+str(i))
        a=input("Are you ready to record? (Y/N) ")
        if a.lower()=="y":
            print("Speak now!")
        else:
            continue

        recording = sd.rec(int(duration * freq), samplerate=freq, channels=1, dtype='int16')
        sd.wait()

        print("Done!")
        print()
        sf.write(folder_path+secondary_words[j]+"/"+str(i)+".wav", recording, freq, 'PCM_16')
        i+=1


Word: dedication File no.: 0
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: dedication File no.: 1
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: dedication File no.: 2
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: dedication File no.: 3
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: dedication File no.: 4
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: dedication File no.: 5
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: dedication File no.: 6
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: dedication File no.: 7
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: dedication File no.: 8
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: dedication File no.: 9
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: motion File no.: 0
Are you ready to record? (Yes/No) yes
Speak now!
Done!

Word: motion File no.: 1
Are you ready to record? (Yes/No) yes
Speak 

In [7]:
DATASET_PATH = folder_path
JSON_PATH = path+"data.json"
SAMPLES_TO_CONSIDER = 24000

In [8]:
JSON_PATH

'/Users/vikasthapar/Desktop/TCS_KWS/data.json'

# Audio Data Augmentation

In [8]:
class AudioAugmentation:
    def read_audio_file(self, file_path):
        input_length = 24000
        data = librosa.load(file_path,sr=16000)[0]
        if len(data) > input_length:
            data = data[:input_length]
        else:
            data = np.pad(data, (0, max(0, input_length - len(data))), "constant")
        return data

    def write_audio_file(self, file, data, sample_rate=16000):
        sf.write(file, data, sample_rate, 'PCM_16')


    def add_noise(self, data, factor=0.005):
        noise = np.random.randn(len(data))
        data_noise = data + factor * noise
        return data_noise

    def shift(self, data, factor=1600):
        return np.roll(data, factor)
    
    def pitch_shift(self, data, sr=16000, n_steps=4):
        return librosa.effects.pitch_shift(data, sr, n_steps=n_steps)
   

In [9]:
for i, (dirpath, dirnames, filenames) in enumerate(os.walk(DATASET_PATH)):
    if dirpath is not DATASET_PATH:
        for f in tqdm(filenames):
            file_path = os.path.join(dirpath, f)
            aa = AudioAugmentation()
            data = aa.read_audio_file(file_path)
            
            dot=file_path.rfind('.wav')
    
            
            data_noise = aa.add_noise(data, 0.005)
            aa.write_audio_file(file_path[:dot]+"_noise1"+".wav", data_noise)
            
            data_noise = aa.add_noise(data, 0.0035)
            aa.write_audio_file(file_path[:dot]+"_noise2"+".wav", data_noise)

            data_noise = aa.add_noise(data, 0.002)
            aa.write_audio_file(file_path[:dot]+"_noise3"+".wav", data_noise)




            data_roll = aa.shift(data,400)
            aa.write_audio_file(file_path[:dot]+"_shift1"+".wav", data_roll)

            data_roll = aa.shift(data,200)
            aa.write_audio_file(file_path[:dot]+"_shift2"+".wav", data_roll)
            
            
            

            data_pitch= aa.pitch_shift(data, sr=16000, n_steps=2)
            aa.write_audio_file(file_path[:dot]+"_pitch2"+".wav", data_pitch)
                
            data_pitch= aa.pitch_shift(data, sr=16000, n_steps=3)
            aa.write_audio_file(file_path[:dot]+"_pitch3"+".wav", data_pitch)
                
            data_pitch= aa.pitch_shift(data, sr=16000, n_steps=4)
            aa.write_audio_file(file_path[:dot]+"_pitch4"+".wav", data_pitch)
                
            

100%|██████████| 10/10 [00:03<00:00,  2.88it/s]
100%|██████████| 10/10 [00:01<00:00,  5.58it/s]
100%|██████████| 10/10 [00:01<00:00,  5.68it/s]
100%|██████████| 10/10 [00:01<00:00,  5.74it/s]
100%|██████████| 10/10 [00:02<00:00,  4.84it/s]
100%|██████████| 10/10 [00:01<00:00,  5.59it/s]
100%|██████████| 10/10 [00:01<00:00,  5.24it/s]
100%|██████████| 10/10 [00:01<00:00,  5.65it/s]
100%|██████████| 80/80 [00:15<00:00,  5.06it/s]


# Generating MFCCs

In [9]:
data = {
        "mapping": [],
        "labels": [],
        "MFCCs": [],
        "files": []
    }

for i, (dirpath, dirnames, filenames) in enumerate(os.walk(DATASET_PATH)):

    if dirpath is not DATASET_PATH:

        label = dirpath.split("/")[-1]
        data["mapping"]=["primary","secondary"]


        print("\nProcessing: '{}'".format(label))

        for f in tqdm(filenames):
            file_path = os.path.join(dirpath, f)

            signal, sample_rate = librosa.load(file_path,sr=16000)

            MFCCs = librosa.feature.mfcc(signal, sample_rate, n_mfcc=13, n_fft=2048, hop_length=512)
            data["MFCCs"].append(MFCCs.T.tolist())
            if label==primary_word:
                data["labels"].append(0)
            else:
                data["labels"].append(1)


            data["files"].append(file_path)

  3%|▎         | 3/90 [00:00<00:02, 29.29it/s]


Processing: 'mutation'


100%|██████████| 90/90 [00:01<00:00, 80.37it/s]
  8%|▊         | 7/90 [00:00<00:01, 65.33it/s]


Processing: 'moderator'


100%|██████████| 90/90 [00:01<00:00, 77.46it/s]
 10%|█         | 9/90 [00:00<00:00, 87.46it/s]


Processing: 'motion'


100%|██████████| 90/90 [00:00<00:00, 92.40it/s]
 11%|█         | 10/90 [00:00<00:00, 94.61it/s]


Processing: 'motorbike'


100%|██████████| 90/90 [00:00<00:00, 95.11it/s]
  7%|▋         | 6/90 [00:00<00:01, 58.55it/s]


Processing: 'dedication'


100%|██████████| 90/90 [00:01<00:00, 64.06it/s]
  9%|▉         | 8/90 [00:00<00:01, 79.69it/s]


Processing: 'monsoon'


100%|██████████| 90/90 [00:00<00:00, 91.18it/s]
 11%|█         | 10/90 [00:00<00:00, 94.50it/s]


Processing: 'monument'


100%|██████████| 90/90 [00:00<00:00, 96.76it/s]
 10%|█         | 9/90 [00:00<00:00, 83.53it/s]


Processing: 'notation'


100%|██████████| 90/90 [00:00<00:00, 96.65it/s]
  1%|▏         | 10/720 [00:00<00:07, 96.04it/s]


Processing: 'motivation'


100%|██████████| 720/720 [00:08<00:00, 84.75it/s] 


## Saving Data in JSON

In [10]:
with open(JSON_PATH, "w") as fp:
    json.dump(data, fp, indent=4)


# Model 

In [10]:
with open(JSON_PATH, "r") as fp:
        data = json.load(fp)

X = np.array(data["MFCCs"])
y = np.array(data["labels"])
X.shape,y.shape

((1440, 47, 13), (1440,))

In [11]:
MAPPINGS=data["mapping"]
MAPPINGS

['primary', 'secondary']

## Training, Testing and Validation sets

In [13]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, shuffle=True)
X_train, X_validation, y_train, y_validation = train_test_split(X_train, y_train, test_size=0.2, stratify=y_train,shuffle=True)

X_train = X_train[..., np.newaxis]
X_test = X_test[..., np.newaxis]
X_validation = X_validation[..., np.newaxis]

In [14]:
X_train=np.repeat(X_train,10, axis=0)
y_train=np.repeat(y_train,10)

In [15]:
X_train.shape,y_train.shape

((9210, 47, 13, 1), (9210,))

## CNN Architecture

In [16]:
input_shape = (X_train.shape[1], X_train.shape[2], 1)

model = tf.keras.models.Sequential()

model.add(tf.keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=input_shape, kernel_regularizer=tf.keras.regularizers.l2(0.001)))
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.MaxPooling2D((3, 3), strides=(2,2), padding='same'))


model.add(tf.keras.layers.Conv2D(32, (3, 3), activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.001)))
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.MaxPooling2D((3, 3), strides=(2,2), padding='same'))

model.add(tf.keras.layers.Conv2D(32, (2, 2), activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.001)))
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.MaxPooling2D((2, 2), strides=(2,2), padding='same'))

model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(64, activation='relu'))
tf.keras.layers.Dropout(0.3)

model.add(tf.keras.layers.Dense(2, activation='softmax'))

model.summary()


Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 45, 11, 64)        640       
_________________________________________________________________
batch_normalization (BatchNo (None, 45, 11, 64)        256       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 23, 6, 64)         0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 21, 4, 32)         18464     
_________________________________________________________________
batch_normalization_1 (Batch (None, 21, 4, 32)         128       
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 11, 2, 32)         0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 10, 1, 32)         4

## Model Training

In [17]:
optimiser = tf.optimizers.Adam(learning_rate=0.0001)

model.compile(optimizer=optimiser,
                loss="sparse_categorical_crossentropy",
                metrics=["accuracy"])

In [18]:
earlystop_callback = tf.keras.callbacks.EarlyStopping(monitor="accuracy", min_delta=0.001, patience=5)

history = model.fit(X_train,
                    y_train,
                    epochs=10,
                    batch_size=16,
                    validation_data=(X_validation, y_validation),
                    callbacks=[earlystop_callback])

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10


## Model Testing

In [19]:
test_loss, test_acc = model.evaluate(X_test, y_test)
print("\nTest loss: {}, test accuracy: {}".format(test_loss, 100*test_acc))
model.save("cnn_model.h5")



Test loss: 0.050191666930913925, test accuracy: 100.0


In [20]:
pred=model.predict(X_test)

In [21]:
y_pred=[]
for i in range(len(pred)):
    y_pred.append(np.argmax(pred[i]))


In [22]:
print("Precision scores= ",sep="  ")

precision_score(y_test,y_pred, average='micro'), precision_score(y_test,y_pred, average='macro')

Precision scores= 


(1.0, 1.0)

In [23]:
print("Recall scores= ",sep="  ")

recall_score(y_test,y_pred, average='micro'), recall_score(y_test,y_pred, average='macro')

Recall scores= 


(1.0, 1.0)

In [24]:
print("F1 scores= ",sep="  ")

f1_score(y_test,y_pred, average='micro'), f1_score(y_test,y_pred, average='macro')

F1 scores= 


(1.0, 1.0)

# User Model Testing

In [12]:
SAVED_MODEL_PATH = "cnn_model.h5"
SAMPLES_TO_CONSIDER=24000


class _Keyword_Spotting_Service:

    model = None
    _instance = None


    def predict(self, file_path):
        
        MFCCs = self.preprocess(file_path)
        MFCCs = MFCCs[np.newaxis, ..., np.newaxis]

        predictions = self.model.predict(MFCCs)
        return predictions


    def preprocess(self, file_path, num_mfcc=13, n_fft=2048, hop_length=512):

        signal, sample_rate = librosa.load(file_path,sr=16000)

        if len(signal) >= SAMPLES_TO_CONSIDER:
            signal = signal[:SAMPLES_TO_CONSIDER]

            MFCCs = librosa.feature.mfcc(signal, sample_rate, n_mfcc=num_mfcc, n_fft=n_fft,
                                         hop_length=hop_length)
        return MFCCs.T


def Keyword_Spotting_Service():

    if _Keyword_Spotting_Service._instance is None:
        _Keyword_Spotting_Service._instance = _Keyword_Spotting_Service()
        _Keyword_Spotting_Service.model = tf.keras.models.load_model(SAVED_MODEL_PATH)
    return _Keyword_Spotting_Service._instance


## Speak Any Keyword

In [13]:
MAPPINGS

['primary', 'secondary']

In [16]:
freq = 16000
duration = 1.5
IND=MAPPINGS.index("primary")



while True:
    a=input("Are you ready to record? (Y/N) ")

    if a.lower()=="y":
        print("Speak now!")
        break

recording = sd.rec(int(duration * freq), samplerate=freq, channels=1, dtype='int16')
sd.wait()

print("Done!")

sf.write(path+"testfile.wav", recording, freq, 'PCM_16')

kss = Keyword_Spotting_Service()
prob = kss.predict(path+"testfile.wav")

print(prob)
primary_prob=prob[0][0]
secondary_prob=prob[0][1]

if primary_prob>=0.7:
    print("Predicted Keyword= Primary: ", primary_word)
    print("Primary Keyword Probability=",round(primary_prob*100,4)," %")
    print("Secondary Keyword Probability=",round(secondary_prob*100,4)," %")
else:
    print("Predicted Keyword= Secondary")
    print("Primary Keyword Probability=",round(primary_prob*100,4)," %")
    print("Secondary Keyword Probability=",round(secondary_prob*100,4)," %")

Are you ready to record? (Y/N) y
Speak now!
Done!
[[0.9383226  0.06167738]]
Predicted Keyword= Primary:  motivation
Primary Keyword Probability= 93.8323  %
Secondary Keyword Probability= 6.1677  %
