## MFCC feature extraction and Network training

In this notebook you will go through an example flow of processing audio data, complete with feature extraction and training.

Make sure you read the instructions on the exercise sheet and follow the task order.

#### Task 1 

In [1]:
import json
import numpy as np
from scipy.io import wavfile
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers
from tqdm import tqdm
import os

DataSetPath = os.path.join("hey_snips_kws_4.0", "hey_snips_research_6k_en_train_eval_clean_ter/")

with open(DataSetPath+"train.json") as jsonfile:
    traindata = json.load(jsonfile)

with open(DataSetPath+"test.json") as jsonfile:
    testdata = json.load(jsonfile)

#### Task 2

In [2]:
def load_data():
    x_train_list = []
    y_train_list = []

    x_test_list = []
    y_test_list = []

    totalSliceLength = 10 # Length to stuff the signals to, given in seconds

    # trainsize = len(traindata) # Number of loaded training samples
    # testsize = len(testdata) # Number of loaded testing samples

    trainsize = 1000 # Number of loaded training samples
    testsize = 100 # Number of loaded testing samples


    fs = 16000 # Sampling rate of the samples
    segmentLength = 1024 # Number of samples to use per segment

    sliceLength = int(totalSliceLength * fs / segmentLength)*segmentLength

    for i in tqdm(range(trainsize)): 
        fs, train_sound_data = wavfile.read(DataSetPath+traindata[i]['audio_file_path']) # Read wavfile to extract amplitudes

        _x_train = train_sound_data.copy() # Get a mutable copy of the wavfile
        _x_train.resize(sliceLength) # Zero stuff the single to a length of sliceLength
        _x_train = _x_train.reshape(-1,int(segmentLength)) # Split slice into Segments with 0 overlap
        x_train_list.append(_x_train.astype(np.float32)) # Add segmented slice to training sample list, cast to float so librosa doesn't complain
        y_train_list.append(traindata[i]['is_hotword']) # Read label 

    for i in tqdm(range(testsize)):
        fs, test_sound_data = wavfile.read(DataSetPath+testdata[i]['audio_file_path'])
        _x_test = test_sound_data.copy()
        _x_test.resize(sliceLength)
        _x_test = _x_test.reshape((-1,int(segmentLength)))
        x_test_list.append(_x_test.astype(np.float32))
        y_test_list.append(testdata[i]['is_hotword'])

    x_train = tf.convert_to_tensor(np.asarray(x_train_list))
    y_train = tf.convert_to_tensor(np.asarray(y_train_list))

    x_test = tf.convert_to_tensor(np.asarray(x_test_list))
    y_test = tf.convert_to_tensor(np.asarray(y_test_list))

    return x_train, y_train, x_test, y_test

In [3]:
def compute_mfccs(tensor):
    sample_rate = 16000.0
    lower_edge_hertz, upper_edge_hertz, num_mel_bins = 80.0, 7600.0, 64
    frame_length = 1024
    num_mfcc = 13

    stfts = tf.signal.stft(tensor, frame_length=frame_length, frame_step=frame_length, fft_length=frame_length) 
    spectrograms = tf.abs(stfts)
    spectrograms = tf.reshape(spectrograms, (spectrograms.shape[0],spectrograms.shape[1],-1))
    num_spectrogram_bins = stfts.shape[-1]
    linear_to_mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(
      num_mel_bins, num_spectrogram_bins, sample_rate, lower_edge_hertz,
      upper_edge_hertz)
    mel_spectrograms = tf.tensordot(spectrograms, linear_to_mel_weight_matrix, 1)
    log_mel_spectrograms = tf.math.log(mel_spectrograms + 1e-6)
    mfccs = tf.signal.mfccs_from_log_mel_spectrograms(log_mel_spectrograms)[..., :num_mfcc]
    return tf.reshape(mfccs, (mfccs.shape[0],mfccs.shape[1],mfccs.shape[2],-1))

In [4]:
x_train, y_train, x_test, y_test = load_data()

import pickle
with open('x_train.txt', 'wb') as f:
    pickle.dump(x_train, f)

print(x_train.shape)

100%|██████████| 1000/1000 [00:00<00:00, 1260.83it/s]
100%|██████████| 100/100 [00:00<00:00, 1359.52it/s]
(1000, 156, 1024)


#### Task 3

In [5]:
x_train_mfcc = compute_mfccs(x_train)
x_test_mfcc = compute_mfccs(x_test)

print(x_train_mfcc.shape)
print(x_test_mfcc.shape)


(1000, 156, 13, 1)
(100, 156, 13, 1)


In [26]:
print(tf.reduce_max(x_train_mfcc))
print(tf.reduce_min(x_train_mfcc))


tf.Tensor(167.86346, shape=(), dtype=float32)
tf.Tensor(-174.75427, shape=(), dtype=float32)


#### Task 4

In [28]:
batchSize = 10
epochs = 30

train_set = (x_train_mfcc/512 + 0.5)
train_labels = y_train

test_set = (x_test_mfcc/512 + 0.5)
test_labels = y_test


In [8]:

model = tf.keras.models.Sequential()

#model.add(layers.InputLayer(input_shape=(train_set.shape[1],train_set.shape[2],train_set.shape[3]), batch_size=(batchSize)))
model.add(layers.Conv2D(filters=3,kernel_size=(3,3),padding="same",input_shape=(train_set[0].shape)))
model.add(layers.BatchNormalization())
model.add(layers.Activation('relu'))

model.add(layers.Conv2D(filters=16,kernel_size=(3,3),strides=(2,2),padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.Activation('relu'))

model.add(layers.MaxPool2D((2,2)))

model.add(layers.Conv2D(filters=32,kernel_size=(3,3),strides=(2,2),padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.Activation('relu'))

model.add(layers.MaxPool2D((2,2)))

model.add(layers.Conv2D(filters=48,kernel_size=(3,3),padding='same',strides=(2,2)))
model.add(layers.BatchNormalization())
model.add(layers.Activation('relu'))

model.add(layers.GlobalAveragePooling2D())

model.add(layers.Flatten())

model.add(layers.Dense(8, kernel_regularizer=(regularizers.l1(0))))
model.add(layers.Activation('relu'))

model.add(layers.Dense(2))
model.add(layers.Activation('softmax'))


model.compile(loss='sparse_categorical_crossentropy', optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy'])
history = model.fit(train_set, y_train, batchSize, epochs, validation_split=0.1, workers=3)


Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
14/90 [===>..........................] - ETA: 1:07 - loss: 0.0521 - accuracy: 0.9760

In [30]:
model.summary()
score = model.evaluate(test_set, y_test)

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 156, 13, 3)        30        
_________________________________________________________________
batch_normalization (BatchNo (None, 156, 13, 3)        12        
_________________________________________________________________
activation (Activation)      (None, 156, 13, 3)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 78, 7, 16)         448       
_________________________________________________________________
batch_normalization_1 (Batch (None, 78, 7, 16)         64        
_________________________________________________________________
activation_1 (Activation)    (None, 78, 7, 16)         0         
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 39, 3, 16)        

In [11]:
model.save("MFCCmodel.h5")

#### TFLite conversion

In [12]:
train_set = train_set.numpy()
test_set = test_set.numpy()
train_labels = train_labels.numpy()
test_labels = test_labels.numpy()
tflite_model_name = 'MFCC'
# Convert Keras model to a tflite model
converter = tf.lite.TFLiteConverter.from_keras_model(model)
# Convert the model to the TensorFlow Lite format with quantization
quantize = True
if (quantize):
    def representative_dataset():
        for i in range(500):
            yield([train_set[i].reshape(1,156,13,1)])
    # Set the optimization flag.
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    # Enforce full-int8 quantization
    converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
    converter.inference_input_type = tf.int8  # or tf.uint8
    converter.inference_output_type = tf.int8  # or tf.uint8
    # Provide a representative dataset to ensure we quantize correctly.
converter.representative_dataset = representative_dataset
tflite_model = converter.convert()

open(tflite_model_name + '.tflite', 'wb').write(tflite_model)

INFO:tensorflow:Assets written to: /tmp/tmpb0sp_d5q/assets


29192

In [13]:
# Function: Convert some hex value into an array for C programming
def hex_to_c_array(hex_data, var_name):

    c_str = ''

    # Create header guard
    c_str += '#ifndef ' + var_name.upper() + '_H\n'
    c_str += '#define ' + var_name.upper() + '_H\n\n'

    # Add array length at top of file
    c_str += '\nunsigned int ' + var_name + '_len = ' + str(len(hex_data)) + ';\n'

    # Declare C variable
    c_str += 'unsigned char ' + var_name + '[] = {'
    hex_array = []
    for i, val in enumerate(hex_data) :

        # Construct string from hex
        hex_str = format(val, '#04x')

        # Add formatting so each line stays within 80 characters
        if (i + 1) < len(hex_data):
            hex_str += ','
        if (i + 1) % 12 == 0:
            hex_str += '\n '
        hex_array.append(hex_str)

    # Add closing brace
    c_str += '\n ' + format(' '.join(hex_array)) + '\n};\n\n'

    # Close out header guard
    c_str += '#endif //' + var_name.upper() + '_H'

    return c_str

In [14]:
c_model_name = 'MFCC'
# Write TFLite model to a C source (or header) file
with open(c_model_name + '.h', 'w') as file:
    file.write(hex_to_c_array(tflite_model, c_model_name))

In [15]:
tflite_interpreter = tf.lite.Interpreter(model_path=tflite_model_name + '.tflite')
tflite_interpreter.allocate_tensors()
input_details = tflite_interpreter.get_input_details()
output_details = tflite_interpreter.get_output_details()

print("== Input details ==")
print("name:", input_details[0]['name'])
print("shape:", input_details[0]['shape'])
print("type:", input_details[0]['dtype'])

print("\n== Output details ==")
print("name:", output_details[0]['name'])
print("shape:", output_details[0]['shape'])
print("type:", output_details[0]['dtype'])

== Input details ==
name: conv2d_input_int8
shape: [  1 156  13   1]
type: <class 'numpy.int8'>

== Output details ==
name: Identity_int8
shape: [1 2]
type: <class 'numpy.int8'>


In [18]:
input_details[0]["quantization"]

(0.003246503183618188, -128)

In [16]:
predictions = np.zeros((len(test_set),), dtype=int)
input_scale, input_zero_point = input_details[0]["quantization"]
for i in range(len(test_set)):
    val_batch = test_set[i]
    val_batch = val_batch / input_scale + input_zero_point
    val_batch = np.expand_dims(val_batch, axis=0).astype(input_details[0]["dtype"])
    tflite_interpreter.set_tensor(input_details[0]['index'], val_batch)
    tflite_interpreter.allocate_tensors()
    tflite_interpreter.invoke()

    tflite_model_predictions = tflite_interpreter.get_tensor(output_details[0]['index'])
    #print("Prediction results shape:", tflite_model_predictions.shape)
    output = tflite_interpreter.get_tensor(output_details[0]['index'])
    predictions[i] = output.argmax()

In [17]:
sum = 0
for i in range(len(predictions)):
    if (predictions[i] == test_labels[i]):
        sum = sum + 1
accuracy_score = sum / 100
print("Accuracy of quantized to int8 model is {}%".format(accuracy_score*100))
print("Compared to float32 accuracy of {}%".format(score[1]*100))
print("We have a change of {}%".format((accuracy_score-score[1])*100))

Accuracy of quantized to int8 model is 100.0%
Compared to float32 accuracy of 98.00000190734863%
We have a change of 1.9999980926513672%
