In [1]:
# global config variables
img_dir_gdrive = "/content/drive/MyDrive/ML_IoT"
img_dir_local = "./data"

words = [ "people", "happy", "unknown" ]

i16min = -2**15
i16max = 2**15-1
fsamp = 16000
use_microfrontend = True

In [2]:
# install nessicary libraries
!pip install --ignore-installed seaborn #tensorflow-io

Defaulting to user installation because normal site-packages is not writeable
Collecting seaborn
  Using cached seaborn-0.11.1-py3-none-any.whl (285 kB)
[31mERROR: Requested seaborn from https://files.pythonhosted.org/packages/68/ad/6c2406ae175f59ec616714e408979b674fe27b9587f79d59a528ddfbcd5b/seaborn-0.11.1-py3-none-any.whl#sha256=4e1cce9489449a1c6ff3c567f2113cdb41122f727e27a984950d004a88ef3c5c has different version in metadata: '0.11.1'[0m


In [3]:
import os
import pathlib
import shutil
import time
import math

import itertools

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import tensorflow as tf
import random
# import tensorflow_io as tfio

from tensorflow.keras.layers.experimental import preprocessing
from tensorflow.lite.experimental.microfrontend.python.ops import audio_microfrontend_op as frontend_op
from tensorflow.keras import layers
from tensorflow.keras import models
from IPython import display

try:
    from google.colab import drive
    drive.mount('/content/drive')
    data_dir = img_dir_gdrive
except:
    data_dir = img_dir_local
    
import PIL
import PIL.Image

seed = int(time.time())
tf.random.set_seed(seed)
np.random.seed(seed)
random.seed(seed)

AUTOTUNE = tf.data.experimental.AUTOTUNE

print(tf.__version__)

2.4.1


In [4]:
# copy over all the custom speach data from drive to local storage
if not os.path.exists("./training"):
  print("Copying over audio data")
  os.mkdir("training")
  shutil.copytree(f"{data_dir}/people_raw/", "./training/people")
  shutil.copytree(f"{data_dir}/happy_raw/", "./training/happy")
  shutil.copytree(f"{data_dir}/unknown_raw/", "./training/unknown")
  print("Copied over audio data")

In [5]:
# Create our training, validation and test sets
train_percent = 0.75
valid_percent = 0.1
test_percent  = 0.15

print("People:")
# get file names
people_filenames = tf.io.gfile.glob('./training/people/*')
people_filenames = tf.random.shuffle(people_filenames)
length = len(people_filenames)
print(f"\t Total: { length }")
# cut out training set
idx = int(math.floor(length * train_percent))
people_train = people_filenames[: idx ]
people_filenames = people_filenames[ idx :]
print(f"\t Train: { len(people_train) }")
# cut out validation set
idx = int(math.floor(length * valid_percent))
people_valid = people_filenames[: idx ]
people_filenames = people_filenames[ idx :]
print(f"\t Valid: { len(people_valid) }")
# the rest is the test set
people_test = people_filenames
print(f"\t Test: { len(people_test) }")


print("Happy:")
happy_filenames = tf.io.gfile.glob('./training/happy/*')
happy_filenames = tf.random.shuffle(happy_filenames)
length = len(happy_filenames)
print(f"\t Total: { length }")
# cut out training set
idx = int(math.floor(length * train_percent))
happy_train = happy_filenames[: idx ]
happy_filenames = happy_filenames[ idx :]
print(f"\t Train: { len(happy_train) }")
# cut out validation set
idx = int(math.floor(length * valid_percent))
happy_valid = happy_filenames[: idx ]
happy_filenames = happy_filenames[ idx :]
print(f"\t Valid: { len(happy_valid) }")
# the rest is the test set
happy_test = happy_filenames
print(f"\t Test: { len(happy_test) }")


print("Unknown:")
unknown_filenames = tf.io.gfile.glob('./training/unknown/*')
unknown_filenames = tf.random.shuffle(unknown_filenames)
length = len(unknown_filenames)
print(f"\t Total: { length }")
# cut out training set
idx = int(math.floor(length * train_percent))
unknown_train = unknown_filenames[: idx ]
unknown_filenames = unknown_filenames[ idx :]
print(f"\t Train: { len(unknown_train) }")
# cut out validation set
idx = int(math.floor(length * valid_percent))
unknown_valid = unknown_filenames[: idx ]
unknown_filenames = unknown_filenames[ idx :]
print(f"\t Valid: { len(unknown_valid) }")
# the rest is the test set
unknown_test = unknown_filenames
print(f"\t Test: { len(unknown_test) }")


train_filenames = people_train + happy_train + unknown_train
valid_filenames = people_valid + happy_valid + unknown_valid 
test_filenames  = people_test + happy_test + unknown_test 
print(f"Total Train: { len(train_filenames) }")
print(f"Total Valid: { len(valid_filenames) }")
print(f"Total Test:  { len(test_filenames) }")

People:
	 Total: 1692
	 Train: 1269
	 Valid: 169
	 Test: 254
Happy:
	 Total: 2054
	 Train: 1540
	 Valid: 205
	 Test: 309
Unknown:
	 Total: 2736
	 Train: 2052
	 Valid: 273
	 Test: 411


InvalidArgumentError: Incompatible shapes: [1269] vs. [1540] [Op:Add]

In [None]:
def decode_audio(audio_binary):
    audio, _ = tf.audio.decode_wav(audio_binary)
    return tf.squeeze(audio, axis=-1)

def get_label(file_path):
  parts = tf.strings.split(file_path, os.path.sep)
  return parts[-2]

def get_waveform_and_label(file_path):
    label = get_label(file_path)
    audio_binary = tf.io.read_file(file_path)
    waveform = decode_audio(audio_binary)
    return waveform, label

def wavds2specds(waveform_ds):
  spec_grams = np.zeros((0, 49, 40, 1))
  labels = []
  for wav, label in waveform_ds:
    spectrogram = get_spectrogram(wav)
    # TF conv layer expect inputs structured as 4D (batch_size, height, width, channels)
    # the microfrontend returns 2D tensors (freq, time), so we need to 
    spectrogram = tf.expand_dims(spectrogram, axis=0)  # add a 'batch' dimension at the front
    spectrogram = tf.expand_dims(spectrogram, axis=-1) # add a singleton 'channel' dimension at the back
    spec_grams = np.concatenate((spec_grams, spectrogram))
    new_label = label.numpy().decode('utf8')
    new_label_id = np.argmax(new_label == words)
    labels.append(new_label_id) # for numeric labels
    # labels.append(new_label) # for string labels
  return tf.data.Dataset.from_tensor_slices((spec_grams, labels))

def get_spectrogram(waveform):
    # Padding for files with less than 16000 samples
    zero_padding = tf.zeros([16000] - tf.shape(waveform), dtype=tf.float32)
    equal_length = tf.concat([waveform, zero_padding], 0)
    spectrogram = tf.signal.stft(equal_length, frame_length=255, frame_step=128)
    spectrogram = tf.abs(spectrogram)
    
#     zero_padding = tf.zeros([16000] - tf.shape(waveform), dtype=tf.int16)
#     equal_length = tf.cast(0.5*waveform*(i16max-i16min), tf.int16)  # scale float [-1,+1]=>INT16
#     equal_length = tf.concat([equal_length, zero_padding], 0)
#     spectrogram = frontend_op.audio_microfrontend(
#         equal_length, sample_rate=fsamp, num_channels=40,
#         window_size=40, window_step=20)
    
    return spectrogram

def plot_spectrogram(spectrogram, ax):
    # Convert to frequencies to log scale and transpose so that the time is
    # represented in the x-axis (columns).
    log_spec = np.log(spectrogram.T)
    height = log_spec.shape[0]
    width = log_spec.shape[1]
    X = np.linspace(0, np.size(spectrogram), num=width, dtype=int)
    Y = range(height)
    ax.pcolormesh(X, Y, log_spec, shading='auto')
    
#     freq_bins = spectrogram.shape[1]
#     time_dur = spectrogram.shape[0]
#     X = np.arange(time_dur)
#     Y = range(freq_bins)
#     ax.pcolormesh(X, Y, spectrogram.T)
    
def get_spectrogram_and_label_id(audio, label):
    spectrogram = get_spectrogram(audio)
    spectrogram = tf.expand_dims(spectrogram, -1)
    label_id = tf.argmax(label == words)
    return spectrogram, label_id


In [None]:
# Display 9 waveforms
files_ds = tf.data.Dataset.from_tensor_slices(train_filenames)
files_ds = files_ds.shuffle(len(train_filenames))
waveform_ds = files_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE)
spectrogram_ds = waveform_ds.map(get_spectrogram_and_label_id, num_parallel_calls=AUTOTUNE)
#train_ds = wavds2specds(waveform_ds)

rows = 3
cols = 3
n = rows*cols
fig, axes = plt.subplots(rows, cols, figsize=(10, 12))
for i, (audio, label) in enumerate(waveform_ds.take(n)):
  r = i // cols
  c = i % cols
  ax = axes[r][c]
  ax.plot(audio.numpy())
  ax.set_yticks(np.arange(-1.2, 1.2, 0.2))
  label = label.numpy().decode('utf-8')
  ax.set_title(label)

plt.show()    

In [None]:
# Display one of each word clip
shown = [ False, False, False ]
fig, axes = plt.subplots(6, figsize=(12, 36))
current_axis = 0

for waveform, label in waveform_ds:
    if not False in shown:
        break
        
    if shown[words.index(label)] == False:
        print('Label:', label)
        label = label.numpy().decode('utf-8')
        print('Waveform shape:', waveform.shape)
        spectrogram = get_spectrogram(waveform)
        print('Spectrogram shape:', spectrogram.shape)
        print('Audio playback')
        display.display(display.Audio(waveform, rate=16000))
        shown[words.index(label)] = True
        
        timescale = np.arange(waveform.shape[0])
        axes[current_axis].plot(timescale, waveform.numpy())
        axes[current_axis].set_title('Waveform')
        axes[current_axis].set_xlim([0, 16000])
        plot_spectrogram(spectrogram.numpy(), axes[current_axis + 1])
        axes[current_axis + 1].set_title('Spectrogram')
        current_axis += 2
        
plt.show()
        

In [None]:
# Finish seting up the datasets
files_ds = tf.data.Dataset.from_tensor_slices(train_filenames)
waveform_ds = files_ds.map(get_waveform_and_label)
train_ds = waveform_ds.map(get_spectrogram_and_label_id)
# train_ds = wavds2specds(waveform_ds)

files_ds = tf.data.Dataset.from_tensor_slices(valid_filenames)
waveform_ds = files_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE)
valid_ds = waveform_ds.map(get_spectrogram_and_label_id, num_parallel_calls=AUTOTUNE)
# valid_ds = wavds2specds(waveform_ds)

files_ds = tf.data.Dataset.from_tensor_slices(test_filenames)
waveform_ds = files_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE)
test_ds = waveform_ds.map(get_spectrogram_and_label_id, num_parallel_calls=AUTOTUNE)
# test_ds = wavds2specds(waveform_ds)

batch_size = 64
train_ds = train_ds.batch(batch_size)
valid_ds = valid_ds.batch(batch_size)

train_ds = train_ds.cache().prefetch(AUTOTUNE)
valid_ds = valid_ds.cache().prefetch(AUTOTUNE)

for spectrogram, _ in train_ds.take(1):
  spec1 = spectrogram
# take(1) takes 1 *batch*, so we have to select the first 
# spectrogram from it, hence the [0]
print(f"Spectrogram shape {spec1[0].shape}")
print(f"ranges from {np.min(spec1)} to {np.max(spec1)}")   # min/max across the whole batch

In [None]:
for spectrogram, _ in train_ds.take(1):
  # take(1) takes 1 *batch*, so we have to select the first 
  # spectrogram from it, hence the [0]
  input_shape = spectrogram[0].shape  
print('Input shape:', input_shape)
num_labels = len(words)

# norm_layer = preprocessing.Normalization()
# norm_layer.adapt(train_ds.map(lambda x, _: x))

model = models.Sequential([
    layers.Input(shape=input_shape),
    layers.Conv2D(32, 3, activation='relu'),
    layers.MaxPooling2D(name='pool2'),
    layers.BatchNormalization(),
    layers.Conv2D(64, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.BatchNormalization(),
    layers.Conv2D(64, 3, activation='relu'),
    layers.MaxPooling2D(pool_size=(4,4)),
    layers.BatchNormalization(),
    layers.Dropout(0.25),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(num_labels),
])

model.summary()

model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy'],
)

In [None]:
EPOCHS = 25
history = model.fit(
    train_ds,
    validation_data=valid_ds,  
    epochs=EPOCHS,
    #callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2),
)

In [None]:
metrics = history.history
plt.plot(history.epoch, metrics['loss'], metrics['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.show()

plt.figure()
plt.plot(history.epoch, metrics['accuracy'], metrics['val_accuracy'])
plt.legend(['accuracy', 'val_accuracy'])
plt.show()

In [None]:
test_audio = []
test_labels = []

for audio, label in test_ds:
  test_audio.append(audio.numpy())
  test_labels.append(label.numpy())

test_audio = np.array(test_audio)
test_labels = np.array(test_labels)
y_pred = np.argmax(model.predict(test_audio), axis=1)
y_true = test_labels

test_acc = sum(y_pred == y_true) / len(y_true)
print(f'Test set accuracy: {test_acc:.0%}')

confusion_mtx = tf.math.confusion_matrix(y_true, y_pred) 
plt.figure(figsize=(10, 8))
sns.heatmap(confusion_mtx, xticklabels=words, yticklabels=words, 
            annot=True, fmt='g')
plt.xlabel('Prediction')
plt.ylabel('Label')
plt.show()

In [None]:
sample_file = f"./training/people/people-01.wav"


files_ds = tf.data.Dataset.from_tensor_slices([ sample_file ])
files_ds = files_ds.shuffle(len(train_filenames))
waveform_ds = files_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE)
final_ds = waveform_ds.map(get_spectrogram_and_label_id, num_parallel_calls=AUTOTUNE)

for spectrogram, label in final_ds.batch(1):
  prediction = model(spectrogram)
  plt.bar(words, tf.nn.softmax(prediction[0]))
  plt.title(f'Predictions for "{words[label[0]]}"')
  plt.show()

In [None]:
# # Convert to TFLite
# converter = tf.lite.TFLiteConverter.from_keras_model(model)
# converter.optimizations = [tf.lite.Optimize.DEFAULT]

# num_calibration_steps = 10
# ds_iter = valid_ds.unbatch().batch(1).as_numpy_iterator()
# def representative_dataset_gen():
#   for _ in range(num_calibration_steps):
#     next_input = next(ds_iter)[0]
#     next_input = next_input.astype(np.float32)  # (DIFF_FROM_LECTURE)
#     yield [next_input]
    
# converter.optimizations = [tf.lite.Optimize.DEFAULT]
# converter.representative_dataset = representative_dataset_gen
# converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
# converter.inference_input_type = tf.int8  # or tf.uint8; should match dat_q in eval_quantized_model.py
# converter.inference_output_type = tf.int8  # or tf.uint8

# tflite_quant_model = converter.convert()

# fname = 'kws_model.tflite'
# with open(fname, "wb") as fpo:
#   num_bytes_written = fpo.write(tflite_quant_model)
# print(f"Wrote {num_bytes_written} / {len(tflite_quant_model)} bytes to tflite file")