In [23]:
import os
import pathlib

import matplotlib.pyplot as plt
import numpy
import seaborn as sns

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models, Model


@tf.function
def train_step(specter: tf.Tensor, labels: tf.Tensor) -> None:
  with tf.GradientTape() as tape:
    predictions = model(specter, training=True)
    loss = loss_object(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_loss(loss)
  train_accuracy(labels, predictions)


@tf.function
def test_step(specter: tf.Tensor, labels: tf.Tensor) -> None:
  predictions = model(specter, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss(t_loss)
  test_accuracy(labels, predictions)


class Network(Model):
  def __init__(self):
    super(Network, self).__init__()
    self.input_layer = layers.Input(shape=input_shape)
    self.resizing = layers.Resizing(32, 32)
    self.normalize = norm_layer
    self.conv1 = layers.Conv2D(32, 3, activation='relu')
    self.conv2 = layers.Conv2D(32, 3, activation='relu')
    self.pool = layers.MaxPooling2D()
    self.dropout1 = layers.Dropout(0.25)
    self.flatten = layers.Flatten()
    self.d1 = layers.Dense(128, activation='relu')
    self.dropout2 = layers.Dropout(0.5)
    self.d2 = layers.Dense(len(words))

  def call(self, x):
    if x is False:
      training = False
    else:
      training = True
    x = self.resizing(x)
    x = self.normalize(x)
    if training is False:
      x = tf.expand_dims(x, axis=-1)
    x = self.conv1(x)
    x = self.conv2(x)
    x = self.pool(x)
    x = self.dropout1(x)
    x = self.flatten(x)
    x = self.d1(x)
    x = self.dropout2(x)
    return self.d2(x)


def decode_wav_file(audio_binary: tf.Tensor) -> tf.Tensor:
  audio, _ = tf.audio.decode_wav(contents=audio_binary)
  return tf.squeeze(audio, axis=-1)


def get_labels(file_path: tf.Tensor) -> tf.Tensor:
  label = tf.strings.split(
      input=file_path,
      sep=os.path.sep
  )
  return label[-2]


def get_ds(file_path: tf.Tensor) -> tuple[tf.Tensor, tf.Tensor]:
  label = get_labels(file_path)
  audio_binary = tf.io.read_file(file_path)
  decode_wav = decode_wav_file(audio_binary)
  return decode_wav, label


def spectrogram(wave: tf.Tensor) -> tf.Tensor:
  input_len = 16000
  wave = wave[:input_len]
  padding = tf.zeros([input_len] - tf.shape(wave), dtype=tf.float32)

  wave = tf.cast(wave, dtype=tf.float32)
  len = tf.concat([wave, padding], 0)

  specter = tf.abs(tf.signal.stft(len, frame_length=255, frame_step=128))

  specter = specter[..., tf.newaxis]
  return specter


def get_specter_and_label(audio: tf.Tensor, label: tf.Tensor) -> tuple[tf.Tensor, tf.Tensor]:
  specter = spectrogram(audio)
  label = tf.argmax(label == words)
  return specter, label


def preprocessing(files: tf.Tensor) -> tf.data.Dataset:
  ds = tf.data.Dataset.from_tensor_slices(files)
  output_ds = ds.map(
      map_func=get_ds,
      num_parallel_calls=tf.data.AUTOTUNE
  )
  output_ds = output_ds.map(
      map_func=get_specter_and_label,
      num_parallel_calls=tf.data.AUTOTUNE
  )
  return output_ds


path = 'data/mini_speech_commands'
tf.keras.utils.get_file(
    'mini_speech_commands.zip',
    origin='http://storage.googleapis.com/download.tensorflow.org/data/mini_speech_commands.zip',
    extract=True,
    cache_dir='.',
    cache_subdir='data'
)

words = numpy.array(tf.io.gfile.listdir(str(pathlib.Path(path))))
words = words[words != 'README.md']

files = tf.io.gfile.glob(str(path) + '/*/*')
files = tf.random.shuffle(files)

train_ds = files[:int(len(files) / 10 * 6)]
val_ds = files[int(len(files) / 10 * 6):int(len(files) / 10 * 8)]
test_ds = files[-int(len(files) / 10 * 2):]

ds = tf.data.Dataset.from_tensor_slices(train_ds)

waves_ds = ds.map(
    map_func=get_ds,
    num_parallel_calls=tf.data.AUTOTUNE
)

specter_ds = waves_ds.map(
    map_func=get_specter_and_label,
    num_parallel_calls=tf.data.AUTOTUNE
)

train_ds = specter_ds.batch(32).cache().prefetch(tf.data.AUTOTUNE)
val_ds = preprocessing(val_ds).batch(32).cache().prefetch(tf.data.AUTOTUNE)
test_ds = preprocessing(test_ds).batch(32)

for spectrogram, _ in specter_ds.take(1):
  input_shape = spectrogram.shape

norm_layer = layers.Normalization()
norm_layer.adapt(data=specter_ds.map(map_func=lambda spec, label: spec))

model = Network()
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='_accuracy')
model.compile(optimizer, loss_object, accuracy)

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

EPOCHS = 10
for epoch in range(EPOCHS):
  for var in (train_loss, train_accuracy, test_loss, test_accuracy):
    var.reset_states()

  for specter, labels in train_ds:
    train_step(specter, labels)

  for specter, labels in val_ds:
    train_step(specter, labels)

  for specter, labels in test_ds:
    test_step(specter, labels)

index = 91

specter_subset = train_ds.skip(index).take(1)

for specter, labels in specter_subset:
    selected_specter = specter.numpy()
    print(f'Label: {words[labels]}')

predictions = model.predict(selected_specter)
print(words[numpy.argmax(predictions, axis=1)])


Label: ['down' 'no' 'left' 'right' 'up' 'right' 'down' 'go' 'left' 'left' 'no'
 'right' 'yes' 'go' 'go' 'no' 'right' 'go' 'right' 'stop' 'right' 'up'
 'up' 'no' 'no' 'yes' 'down' 'go' 'stop' 'left' 'up' 'no']
['down' 'up' 'left' 'right' 'up' 'right' 'down' 'go' 'left' 'left' 'no'
 'right' 'yes' 'go' 'go' 'no' 'up' 'go' 'right' 'stop' 'right' 'up' 'up'
 'stop' 'no' 'yes' 'down' 'no' 'stop' 'left' 'up' 'no']
