# MFCC Creation

In [None]:
!pip install python_speech_features
from os import listdir
from os.path import isdir, join
import librosa
import random
import numpy as np
import matplotlib.pyplot as plt
import python_speech_features


**Get Data From G Drive**

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
# this creates a symbolic link so that now the path /content/gdrive/My\ Drive/ is equal to /mydrive
!ln -s /content/gdrive/My\ Drive/ /mydrive

Mounted at /content/gdrive


In [None]:
# copy the .zip file into the root directory of cloud VM
!cp /content/gdrive/MyDrive/Speech_Recognition/Data/test-1-31.zip ../
# unzip the zip file and its contents should now be in /darknet/data/obj
!unzip ../test-1-31.zip -d data/;

In [None]:
# Dataset path and view possible targets
dataset_path = '/content/data'
for name in listdir(dataset_path):
    if isdir(join(dataset_path, name)):
        print(name)

In [None]:
# Create an all targets list
all_targets = [name for name in listdir(dataset_path) if isdir(join(dataset_path, name))]
print(all_targets)

In [None]:
# See how many files are in each
num_samples = 0
for target in all_targets:
    print(len(listdir(join(dataset_path, target))))
    num_samples += len(listdir(join(dataset_path, target)))
print('Total samples:', num_samples)

In [None]:
# Settings
target_list = all_targets
feature_sets_file = 'all_targets_mfcc_sets.npz'
perc_keep_samples = 1.0 # 1.0 is keep all samples
val_ratio = 0.1
test_ratio = 0.1
sample_rate = 8000
num_mfcc = 16
len_mfcc = 16

In [None]:

# Create list of filenames along with ground truth vector (y)
filenames = []
y = []
for index, target in enumerate(target_list):
    print(join(dataset_path, target))
    filenames.append(listdir(join(dataset_path, target)))
    y.append(np.ones(len(filenames[index])) * index)

In [None]:
# Check ground truth Y vector
print(y)
for item in y:
    print(len(item))

In [None]:
y


In [None]:

# Flatten filename and y vectors
filenames = [item for sublist in filenames for item in sublist]
y = [item for sublist in y for item in sublist]

In [None]:
# Associate filenames with true output and shuffle
filenames_y = list(zip(filenames, y))
random.shuffle(filenames_y)
filenames, y = zip(*filenames_y)

In [None]:
print(filenames)
print(y)

In [None]:
# Only keep the specified number of samples (shorter extraction/training)
print(len(filenames))
filenames = filenames[:int(len(filenames) * perc_keep_samples)]
print(len(filenames))

5729
5729


In [None]:
# Calculate validation and test set sizes
val_set_size = int(len(filenames) * val_ratio)
test_set_size = int(len(filenames) * test_ratio)

In [None]:

# Break dataset apart into train, validation, and test sets
filenames_val = filenames[:val_set_size]
filenames_test = filenames[val_set_size:(val_set_size + test_set_size)]
filenames_train = filenames[(val_set_size + test_set_size):]

In [None]:
# Break y apart into train, validation, and test sets
y_orig_val = y[:val_set_size]
y_orig_test = y[val_set_size:(val_set_size + test_set_size)]
y_orig_train = y[(val_set_size + test_set_size):]

In [None]:
# Function: Create MFCC from given path
def calc_mfcc(path):
    
    # Load wavefile
    signal, fs = librosa.load(path, sr=sample_rate)
    
    # Create MFCCs from sound clip
    mfccs = python_speech_features.base.mfcc(signal, 
                                            samplerate=fs,
                                            winlen=0.256,
                                            winstep=0.050,
                                            numcep=num_mfcc,
                                            nfilt=26,
                                            nfft=2048,
                                            preemph=0.0,
                                            ceplifter=0,
                                            appendEnergy=False,
                                            winfunc=np.hanning)
    return mfccs.transpose()

### Checking Corrupt Data And Fixing

In [None]:
# TEST: Construct test set by computing MFCC of each WAV file
prob_cnt = 0
x_test = []
y_test = []
for index, filename in enumerate(filenames_train):
    
    # Stop after 500
    if index >= 500:
        break
    
    # Create path from given filename and target item
    path = join(dataset_path, target_list[int(y_orig_train[index])], 
                filename)
    
    # Create MFCCs
    mfccs = calc_mfcc(path)
    
    if mfccs.shape[1] == len_mfcc:
        x_test.append(mfccs)
        y_test.append(y_orig_train[index])
    else:
        print('Dropped:', index, mfccs.shape)
        prob_cnt += 1

In [None]:
print('% of problematic samples:', prob_cnt / 500)

In [None]:
%%capture
! apt-get update && apt-get -y --no-install-recommends install \
    sudo \
    gstreamer-1.0 \
    gstreamer1.0-gtk3\

In [None]:
! apt-get update && apt-get -y --no-install-recommends install \
    sudo \
    vim \
    wget \
    build-essential \
    pkg-config \
    python3.6 \
    python3-pip \
    python3.6-dev \
    python3.6-venv \
    python-dev \
    python3-dev \
    git \
    cmake \
    autoconf \
    automake \
    libtool \
    gstreamer-1.0 \
    gstreamer1.0-dev \
    libgstreamer1.0-0 \
    gstreamer1.0-plugins-base \
    gstreamer1.0-plugins-good \
    gstreamer1.0-plugins-bad \
    gstreamer1.0-plugins-ugly \
    gstreamer1.0-libav \
    gstreamer1.0-doc \
    gstreamer1.0-tools \
    gstreamer1.0-x \
    gstreamer1.0-alsa \
    gstreamer1.0-gl \
    gstreamer1.0-gtk3 \
    gstreamer1.0-qt5 \
    gstreamer1.0-pulseaudio \
    python-gst-1.0 \
    libgirepository1.0-dev \
    libgstreamer-plugins-base1.0-dev \
    libcairo2-dev \
    gir1.2-gstreamer-1.0 \
    python3-gi \
    python-gi-dev


In [None]:
# TEST: Test shorter MFCC
!pip install playsound
from playsound import playsound
#sudo apt-get install gstreamer-1.0

idx = 290

# Create path from given filename and target item
path = join(dataset_path, target_list[int(y_orig_train[idx])], 
            filenames_train[idx])
print(path)
# Create MFCCs
mfccs = calc_mfcc(path)
print("MFCCs:", mfccs)

# Plot MFCC
fig = plt.figure()
plt.imshow(mfccs, cmap='inferno', origin='lower')

# TEST: Play problem sounds
print(target_list[int(y_orig_train[idx])])
#playsound(path)

In [None]:
from PyQt5.QtWidgets import QMainWindow, QLabel, QSizePolicy, QApplication 
from PyQt5.QtGui import QPixmap, QImage                                
from PyQt5.QtCore import Qt                 
qImage = array2qimage(mfccs, normalize = False) # create QImage from ndarray
success = qImage.save('aaa') # use Qt's image IO functions for saving PNG/JPG/..

In [None]:
from PIL import Image
im = Image.fromarray(mfccs)
im = im.convert('RGB')
im.save("your_file.jpeg")

In [None]:

import os
filenames_train[290]
os.path.splitext(filenames_train[290])[0]
%cd '/content/data'

In [None]:
# Importing Image module from PIL package  
from PIL import Image  
import PIL  
  
# creating a image object (main image)  
#im1 = Image.open(r"C:\Users\System-Pc\Desktop\flower1.jpg")  
im1 = Image.open(mfccs)  
  
# save a image using extension 
%cd '/content'
im1 = mfccs.save(os.path.splitext(filenames_train[290])[0]) 

### Creating Mfcc & Output file
 

In [None]:
# Function: Create MFCCs, keeping only ones of desired length
def extract_features(in_files, in_y):
    prob_cnt = 0
    out_x = []
    out_y = []
        
    for index, filename in enumerate(in_files):
    
        # Create path from given filename and target item
        path = join(dataset_path, target_list[int(in_y[index])], 
                    filename)
        
        # Check to make sure we're reading a .wav file
        if not path.endswith('.wav'):
            continue

        # Create MFCCs
        mfccs = calc_mfcc(path)

        # Only keep MFCCs with given length
        if mfccs.shape[1] == len_mfcc:
            out_x.append(mfccs)
            out_y.append(in_y[index])
        else:
            print('Dropped:', index, mfccs.shape)
            prob_cnt += 1

        
            
    return out_x, out_y, prob_cnt

In [None]:
print(x_val[2])
imshow(x_val[2])

In [None]:
# Create train, validation, and test sets
x_train, y_train, prob = extract_features(filenames_train, 
                                          y_orig_train)
print('Removed percentage:', prob / len(y_orig_train))
x_val, y_val, prob = extract_features(filenames_val, y_orig_val)
print('Removed percentage:', prob / len(y_orig_val))
x_test, y_test, prob = extract_features(filenames_test, y_orig_test)
print('Removed percentage:', prob / len(y_orig_test))

In [None]:
# Save features and truth vector (y) sets to disk
np.savez(feature_sets_file, 
         x_train=x_train, 
         y_train=y_train, 
         x_val=x_val, 
         y_val=y_val, 
         x_test=x_test, 
         y_test=y_test)

In [None]:
# TEST: Load features
feature_sets = np.load(feature_sets_file)
feature_sets.files

In [None]:
len(feature_sets['x_train'])

In [None]:
print(feature_sets['y_val'])


# Model Train

In [None]:
#############Save As Image################
from PIL import Image
im = Image.fromarray(A)
im.save("your_file.jpeg")

In [None]:
# multi-class classification with Keras
import pandas
from keras.models import Sequential
from keras.layers import Dense
from keras.wrappers.scikit_learn import KerasClassifier
from keras.utils import np_utils
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import KFold
from sklearn.preprocessing import LabelEncoder
from sklearn.pipeline import Pipeline
# load dataset
dataframe = pandas.read_csv("iris.data", header=None)
dataset = dataframe.values
X = dataset[:,0:4].astype(float)
Y = dataset[:,4]
# encode class values as integers
encoder = LabelEncoder()
encoder.fit(Y)
encoded_Y = encoder.transform(Y)
# convert integers to dummy variables (i.e. one hot encoded)
dummy_y = np_utils.to_categorical(encoded_Y)

# define baseline model
def baseline_model():
	# create model
	model = Sequential()
	model.add(Dense(8, input_dim=4, activation='relu'))
	model.add(Dense(3, activation='softmax'))
	# Compile model
	model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
	return model



In [None]:
import pandas
from keras.models import Sequential
from keras.layers import Dense
from keras.wrappers.scikit_learn import KerasClassifier
from keras.utils import np_utils
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import KFold
from sklearn.preprocessing import LabelEncoder
from sklearn.pipeline import Pipeline
from os import listdir
from os.path import isdir, join
from tensorflow.keras import layers, models
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import datasets

In [None]:
print(dataset_path)
print(all_targets)

In [None]:
# Settings
feature_sets_path = '/content'
feature_sets_filename = 'all_targets_mfcc_sets.npz'
model_filename = 'wake_word_stop_model.h5'
#wake_word = 'stop'

In [None]:
# Load feature sets
feature_sets = np.load(join(feature_sets_path, feature_sets_filename))
print(feature_sets.files)

In [None]:
# Assign feature sets
x_train = feature_sets['x_train']
y_train = feature_sets['y_train']
x_val = feature_sets['x_val']
y_val = feature_sets['y_val']
x_test = feature_sets['x_test']
y_test = feature_sets['y_test']

In [None]:
# Look at tensor dimensions
print(x_train.shape)
print(x_val.shape)
print(x_test.shape)

In [None]:

# Peek at labels
print(y_val)

In [None]:

# #############Encode Y##########################################
# encoder = LabelEncoder()
# encoder.fit(y_train)
# encoded_y_train = encoder.transform(y_train)
# encoder.fit(y_val)
# encoded_y_val = encoder.transform(y_val)
# encoder.fit(y_test)
# encoded_y_test = encoder.transform(y_test)

In [None]:
dummy_y_train = np_utils.to_categorical(encoded_y_train)
dummy_y_test = np_utils.to_categorical(encoded_y_test)
dummy_y_val = np_utils.to_categorical(encoded_y_val)

In [None]:
# Convert ground truth arrays to one wake word (1) and 'other' (0)
#wake_word_index = all_targets.index(wake_word)
#y_train = np.equal(y_train, wake_word_index).astype('float64')
#y_val = np.equal(y_val, wake_word_index).astype('float64')
#y_test = np.equal(y_test, wake_word_index).astype('float64')

In [None]:
# Peek at labels after conversion
print(encoded_y_val)
print(dummy_y_val)
#print(y_val)

In [None]:
# What percentage of 'stop' appear in validation labels
print(sum(y_val) / len(y_val))
print(1 - sum(y_val) / len(y_val))

In [None]:
# View the dimensions of our input data
print(x_train.shape)

In [None]:
# CNN for TF expects (batch, height, width, channels)
# So we reshape the input tensors with a "color" channel of 1
x_train = x_train.reshape(x_train.shape[0], 
                          x_train.shape[1], 
                          x_train.shape[2], 
                          1)
x_val = x_val.reshape(x_val.shape[0], 
                      x_val.shape[1], 
                      x_val.shape[2], 
                      1)
x_test = x_test.reshape(x_test.shape[0], 
                        x_test.shape[1], 
                        x_test.shape[2], 
                        1)
print(x_train.shape)
print(x_val.shape)
print(x_test.shape)

In [None]:

# Input shape for CNN is size of MFCC of 1 sample
sample_shape = x_test.shape[1:]
print(sample_shape)

In [None]:
# Build model
# Based on: https://www.geeksforgeeks.org/python-image-classification-using-keras/
model = models.Sequential()
model.add(layers.Conv2D(32, 
                        (2, 2), 
                        activation='relu',
                        input_shape=sample_shape))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))

model.add(layers.Conv2D(32, (2, 2), activation='relu'))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))

model.add(layers.Conv2D(64, (2, 2), activation='relu'))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))

# Classifier
model.add(layers.Flatten())
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(1, activation='sigmoid'))


In [None]:

# Display model
model.summary()


In [None]:
# Add training parameters to model
model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), 
              optimizer='adam', 
              metrics=['accuracy'])
#model.compile(optimizer='adam',                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),               metrics=['accuracy'])

In [None]:
num_classes = 3 #number of classes, here is 10 (0,1,...,9)
train_y = keras.utils.to_categorical(encoded_y_train, num_classes)
test_y = keras.utils.to_categorical(encoded_y_test, num_classes)
val_y = keras.utils.to_categorical(encoded_y_val, num_classes)

In [None]:
# Train
history = model.fit(x_train, 
                    dummy_y_train, 
                    epochs=30, 
                    batch_size=300, 
                    validation_data=(x_val, dummy_y_val)
                    )

In [None]:
# Plot results
import matplotlib.pyplot as plt

acc = history.history['acc']
val_acc = history.history['val_acc']
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(1, len(acc) + 1)

plt.plot(epochs, acc, 'bo', label='Training acc')
plt.plot(epochs, val_acc, 'b', label='Validation acc')
plt.title('Training and validation accuracy')
plt.legend()

plt.figure()

plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()

plt.show()


In [None]:
# Save the model as a file
models.save_model(model, model_filename)

In [None]:

# See which are 'stop'
for idx, y in enumerate(encoded_y_test):
    if y == 1:
        print(idx)

In [None]:
# TEST: Load model and run it against test set
model = models.load_model(model_filename)
for i in range(100, 110):
    print('Answer:', encoded_y_test[i], ' Prediction:', model.predict(np.expand_dims(x_test[i], 0)))

In [None]:
# Evaluate model with test set
model.evaluate(x=x_test, y=encoded_y_test)

# Cross validation for Unsplitted DAta set

In [None]:
estimator = KerasClassifier(build_fn=baseline_model, epochs=200, batch_size=5, verbose=0)
kfold = KFold(n_splits=10, shuffle=True)
results = cross_val_score(estimator, X, dummy_y, cv=kfold)
print("Baseline: %.2f%% (%.2f%%)" % (results.mean()*100, results.std()*100))