# AIoT doorbell notifier example for Ameba

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/ArmDeveloperEcosystem/aiot-doorbell-notifier-example-for-ameba/blob/main/model_training.ipynb)

```python
# SPDX-FileCopyrightText: Copyright 2023 Arm Limited and/or its affiliates <open-source-office@arm.com>
# SPDX-License-Identifier: MIT
```

## Introduction

This notebook trains an Audio Classification model to detect a doorbell sound.

A [`tf.data.Dataset`](https://www.tensorflow.org/api_docs/python/tf/data/Dataset) based pipeline is created to transform audio data from public datasets into Mel power spectrogram images. A model with the `"tiny_conv"` architecture  is then used as the ML classifier.

The ML classifier is created in two phases, first a baseline model is trained on the entire [ESC-50 dataset](https://github.com/karolpiczak/ESC-50), then a model that re-uses the CNN layer of the baseline model is trained in a subset of the [FSD50K dataset](https://zenodo.org/record/4060432) with the following classes:

 * Doorbell -🚪🔔
 * Music - 🎶
 * Domestic and home sounds - 🏠
 * Human voice - 🗣
 * Hands (clapping, finger snapping) 👏 🫰

**Note:** *The trained model has relative poor metrics, but is still able to detect a doorbell sound in our testing. Further data cleaning and hyperparameter tuning needs to be done to get a model with better metrics.* 


## Install dependencies

TensorFlow 2.11.* is compatible with `tensorflow_io` 0.28.* - as per the ["TensorFlow Version Compatibility" section of the TensorFlow I/O Read Me]( https://github.com/tensorflow/io#tensorflow-version-compatibility).

In [None]:
!pip install -q --upgrade "matplotlib==3.6.*" "pandas==1.5.*" "tensorflow==2.11.*" "tensorflow_io==0.28.*"

## Import modules

In [None]:
import os
import shutil

import IPython
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import tensorflow as tf
import tensorflow_io as tfio

## ESC-50 dataset and model

Download and extract the ESC-50 data and place it in the `datasets/ESC-50` directory:

In [None]:
_ = tf.keras.utils.get_file(
    "ESC-50.zip",
    "https://github.com/karoldvl/ESC-50/archive/master.zip",
    cache_subdir="datasets",
    extract=True,
    cache_dir="./",
)

os.rename(
    os.path.join("datasets", "ESC-50-master"),
    os.path.join("datasets", "ESC-50")
)

Read the ESC-50 metadata CSV file using Pandas:

In [None]:
esc50_csv_path = os.path.join('datasets', 'ESC-50', "meta", "esc50.csv")

esc50_df = pd.read_csv(esc50_csv_path)

esc50_df.head(-1)

Add a new column with the filepath based the `filename` and path of the ESC-50 dataset audio folder.


In [None]:
esc50_audio_path = os.path.join("datasets", "ESC-50", "audio")

esc50_filepaths = esc50_df['filename'].apply(lambda filename: os.path.join(esc50_audio_path, filename))

esc50_df = esc50_df.assign(filepath=esc50_filepaths)

esc50_df.head(-1)

Use the `fold` column value to determine training (80%), validation (10%) and testing (10%) splits.


In [None]:
def esc_50_fold_to_split(fold):
  if fold < 4:
    return "train"
  elif fold == 4:
    return "val"
  else:
    return "test"

esc50_splits = esc50_df["fold"].apply(esc_50_fold_to_split)

esc50_df = esc50_df.assign(split=esc50_splits)

esc50_df.head(-1)

Create a `tf.data.Dataset` from the dataframe:

In [None]:
esc50_ds = tf.data.Dataset.from_tensor_slices((
    esc50_df.filepath,
    esc50_df.target,
    esc50_df.split
))

Create a function to load the wave file data

In [None]:
def load_wav(filename, channels=1, sample_rate=16000):
  # read the contents of the wave file
  contents = tf.io.read_file(filename)

  # decode the wave file
  audio, audio_sample_rate = tf.audio.decode_wav(contents, desired_channels=channels)

  # resample the audio to the desired sample rate
  resampled_audio = tfio.audio.resample(
      tf.squeeze(audio, axis=-1),
      rate_in=tf.cast(audio_sample_rate, dtype=tf.int64),
      rate_out=sample_rate
  )

  return resampled_audio

Load the audio data for each `filepath` in the dataset:

In [None]:
esc50_ds = esc50_ds.map(lambda filepath, label, split: (load_wav(filepath), label, split))

Create a function to trim silence from the start and end of audio data using [`tfio.audio.trim`](https://www.tensorflow.org/io/api_docs/python/tfio/audio/trim) and then apply it on the dataset.

In [None]:
def trim(samples, label, split):
  position = tfio.audio.trim(samples, axis=0, epsilon=0.1)

  start = tf.math.maximum(position[0] - 320, 0)
  stop = tf.math.minimum(
      position[1] + 320,
      tf.cast(tf.shape(samples)[0], tf.int64)
  )

  trimmed = samples[start:stop]

  return trimmed, label, split

esc50_ds = esc50_ds.map(trim)

Create a function to frame audio data into 16000 samples with a stride of 1600 samples using [`tf.signal.frame`](https://www.tensorflow.org/api_docs/python/tf/signal/frame) and then apply it on the dataset.

In [None]:
def frame(samples, label, split):
  frames = tf.signal.frame(samples, 16000, 1600)

  num_frames = tf.shape(frames)[0]

  return frames, tf.repeat(label, num_frames), tf.repeat(split, num_frames)

esc50_ds = esc50_ds.map(frame).unbatch()

Create a functions to:
 * Convert each frame into a spectrogram using samples using [`tf.signal.stft`](https://www.tensorflow.org/api_docs/python/tf/signal/stft)
 * Convert each spectrogram to Mel scale using [`tfio.audio.melscale`](https://www.tensorflow.org/io/api_docs/python/tfio/audio/melscale)
 * Convert each Mel spectrogram to dB [`tfio.audio.dbscale`](https://www.tensorflow.org/io/api_docs/python/tfio/audio/dbscale)
 * Expand the dimensions of each Mel power spectrogram

 Apply the functions to the dataset

In [None]:
def spectrogram_for_map(samples, label, split):
  spectrogram = tf.math.abs(
      tf.signal.stft(
        samples,
        frame_length=480,
        frame_step=320,
        fft_length=256
    )
  )

  return spectrogram, label, split

def mel_spectrogram_for_map(spectrogram, label, split):
  mel_spectrogram = tfio.audio.melscale(
      spectrogram,
      rate=16000,
      mels=40,
      fmin=0,
      fmax=8000
  )
  
  return mel_spectrogram, label, split

def db_scale_for_map(mel_spectrogram, label, split):
  mel_spectrogram = tf.maximum(1e-6, mel_spectrogram)
  
  dbscale_mel_spectrogram = tfio.audio.dbscale(
      mel_spectrogram,
      top_db=80
  )
  
  return dbscale_mel_spectrogram, label, split

def expand_dims_for_map(mel_spectrogram, label, split):
  return tf.expand_dims(mel_spectrogram, axis=-1), label, split

esc50_ds = esc50_ds.map(spectrogram_for_map)
esc50_ds = esc50_ds.map(mel_spectrogram_for_map)
esc50_ds = esc50_ds.map(db_scale_for_map)
esc50_ds = esc50_ds.map(expand_dims_for_map)

Set random seed from reproducibility.

In [None]:
tf.keras.utils.set_random_seed(42)

Split the dataset into training, validation and testing datasets, and remove the split column.

In [None]:
esc50_cached_ds = esc50_ds.cache()
esc50_train_ds= esc50_cached_ds.filter(lambda mel_spectrogram, label, split: split == "train")
esc50_val_ds = esc50_cached_ds.filter(lambda mel_spectrogram, label, split: split == "val")
esc50_test_ds = esc50_cached_ds.filter(lambda mel_spectrogram, label, split: split == "test")

# remove the split column now that it's not needed anymore
remove_split_column = lambda embedding, label, split: (embedding, tf.cast(label, dtype=tf.float32))

esc50_train_ds = esc50_train_ds.map(remove_split_column)
esc50_val_ds = esc50_val_ds.map(remove_split_column)
esc50_test_ds = esc50_test_ds.map(remove_split_column)

esc50_train_ds = esc50_train_ds.cache().shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)
esc50_val_ds = esc50_val_ds.cache().batch(32).prefetch(tf.data.AUTOTUNE)
esc50_test_ds = esc50_test_ds.cache().batch(32).prefetch(tf.data.AUTOTUNE)

Create `Normalization` layer and adapt it on the dataset.

In [None]:
for spectrogram, _, _ in esc50_cached_ds.take(1):
    input_shape = spectrogram.shape
    print('Input shape:', input_shape)
  
norm_layer = tf.keras.layers.Normalization(axis=None)
norm_layer.adapt(esc50_cached_ds.map(lambda x, y, z: tf.reshape(x, input_shape)))

Create the `tiny_conv` model using Keras.

In [None]:
esc50_model = tf.keras.Sequential([
  tf.keras.layers.Input(shape=input_shape),
  norm_layer,
  tf.keras.layers.DepthwiseConv2D(
      kernel_size=(10, 8),
      strides=(2, 2),
      activation="relu",
      padding="same",
      depth_multiplier=8
  ),
  tf.keras.layers.Dropout(0.001),
  tf.keras.layers.Flatten(),
  tf.keras.layers.Dense(50),
  tf.keras.layers.Activation('softmax')
], name='esc50_model')

esc50_model.summary()

Compile the model and define an early stopping callback for training.

In [None]:
esc50_model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    optimizer="adam",
    metrics=[
        "accuracy"
    ]
)

early_stopping_callback = tf.keras.callbacks.EarlyStopping(
    monitor="loss",
    patience=3,
    restore_best_weights=True
)

Train the model for up to 100 epochs.

In [None]:
history = esc50_model.fit(
    esc50_train_ds,
    epochs=100,
    validation_data=esc50_val_ds,
    callbacks=[
        early_stopping_callback
    ]
)

Evaluate the model on the test dataset.

In [None]:
esc50_model.evaluate(esc50_test_ds)

Save the model.

In [None]:
esc50_model.save("esc50_model")

Create a zip file of the saved model.

In [None]:
shutil.make_archive("esc50_model", "zip", "esc50_model")

## FSD50k dataset and model

Use the [Hugging Face dataset version of the FSD50k dataset](https://huggingface.co/datasets/Fhrozen/FSD50k). This is done to avoid downloading 25+ GB of data, `git` and `git-lfs` can be used to pull a subset of the data we need for the model.

Clone the dataset from Hugging Face:

In [None]:
! GIT_LFS_SKIP_SMUDGE=1 git clone https://huggingface.co/datasets/Fhrozen/FSD50k datasets/FSD50k

Load the `dev` and `eval` dataset metadata into Pandas dataframes, assign a `split` value of "test" to the `eval` dataframe, and add a `fullpath` column to both dataframes with the fullpath of the `.wav` file.

In [None]:
fsd50k_dev_csv_path = os.path.join("datasets", "FSD50k", "labels", "dev.csv")
fsd50k_eval_csv_path = os.path.join("datasets", "FSD50k", "labels", "eval.csv")

fsd50k_dev_df = pd.read_csv(fsd50k_dev_csv_path)
fsd50k_eval_df = pd.read_csv(fsd50k_eval_csv_path)

fsd50k_eval_df = fsd50k_eval_df.assign(split="test")

fsd50k_dev_df["fullpath"] = fsd50k_dev_df["fname"].map(lambda x: os.path.join("datasets", "FSD50k", "clips", "dev", f"{x}.wav"))
fsd50k_eval_df["fullpath"] = fsd50k_eval_df["fname"].map(lambda x: os.path.join("datasets", "FSD50k", "clips", "eval", f"{x}.wav"))

len(fsd50k_dev_df), len(fsd50k_eval_df)

In [None]:
fsd50k_dev_df.head(-1)

In [None]:
fsd50k_eval_df.head(-1)

Get all the dataset entries with label values that start with "Doorbell" and split the `dev` entries into training and validation dataframes.

In [None]:
fsd50k_doorbell_train_df = fsd50k_dev_df[
    fsd50k_dev_df.labels.str.startswith("Doorbell") &
    (fsd50k_dev_df.split == "train")
]

fsd50k_doorbell_val_df = fsd50k_dev_df[
    fsd50k_dev_df.labels.str.startswith("Doorbell") &
    (fsd50k_dev_df.split == "val")
]

fsd50k_doorbell_test_df = fsd50k_eval_df[
    fsd50k_eval_df.labels.str.startswith("Doorbell")
]

len(fsd50k_doorbell_train_df), len(fsd50k_doorbell_val_df), len(fsd50k_doorbell_test_df)

In [None]:
fsd50k_doorbell_train_df.head(-1)

Select 80 training items, 10 validation items, and 10 testing values for a selection of non-doorbell label values.

Then concatenate the selected dataframes into a single dataframe. 

In [None]:
LABELS = [
    "Doorbell",
    "Music",
    "Domestic_sounds_and_home_sounds",
    "Human_voice",
    "Hands"
]

fsd50k_dfs = [
    fsd50k_doorbell_train_df.assign(label=0),
    fsd50k_doorbell_val_df.assign(label=0),
    fsd50k_doorbell_test_df.assign(label=0)
]

for i, label in enumerate(LABELS):
  if i == 0:
    continue

  fsd50k_label_train_df = fsd50k_dev_df[
      ~fsd50k_dev_df.labels.str.contains(LABELS[0]) & 
      fsd50k_dev_df.labels.str.contains(label) & 
      (fsd50k_dev_df.split == "train")
  ]
  
  fsd50k_label_train_df = fsd50k_label_train_df.sample(
      n=min(80, len(fsd50k_label_train_df)),
      random_state=42
  )

  fsd50k_label_eval_df = fsd50k_dev_df[
      ~fsd50k_dev_df.labels.str.contains(LABELS[0]) & 
      fsd50k_dev_df.labels.str.contains(label) & 
      (fsd50k_dev_df.split == "val")
  ]
  
  fsd50k_label_eval_df = fsd50k_label_eval_df.sample(
      n=min(10, len(fsd50k_label_eval_df)),
      random_state=42
  )

  fsd50k_label_test_df = fsd50k_eval_df[
      ~fsd50k_eval_df.labels.str.contains(LABELS[0]) & 
      fsd50k_eval_df.labels.str.contains(label)
  ]
  
  fsd50k_label_test_df = fsd50k_label_test_df.sample(
      n=min(10, len(fsd50k_label_test_df)),
      random_state=42
  )

  fsd50k_dfs.append(fsd50k_label_train_df.assign(label=i))
  fsd50k_dfs.append(fsd50k_label_eval_df.assign(label=i))
  fsd50k_dfs.append(fsd50k_label_test_df.assign(label=i))

fsd50k_df = pd.concat(fsd50k_dfs)

In [None]:
len(fsd50k_df)

In [None]:
fsd50k_df.head(-1)

Use `git lfs` to pull the selected `.wav` files.

In [None]:
FSD50k_DIR = os.path.join("datasets", "FSD50k")

for df in fsd50k_dfs:
  paths = df.apply(lambda row: os.path.join("clips", "eval" if row["split"] == "test" else "dev", str(row['fname']) + ".wav"), axis=1)

  os.system(f"git --git-dir={FSD50k_DIR}/.git --work-tree={FSD50k_DIR}/ lfs pull --include {','.join(paths)}")

Convert the dataframe to a dataset.

In [None]:
fsd50k_fullpaths = fsd50k_df["fullpath"]
fsd50k_labels = fsd50k_df["label"]
fsd50k_splits = fsd50k_df["split"]

fsd50k_ds = tf.data.Dataset.from_tensor_slices((
    fsd50k_fullpaths,
    fsd50k_labels,
    fsd50k_splits
))

Load the audio data into Mel power spectrogram format using the same pipeline functions used in the ESC-50, but truncating the wave file data to 5 seconds of audio after trim step.

In [None]:
fsd50k_ds = fsd50k_ds.map(lambda filepath, label, split: (load_wav(filepath), label, split))
fsd50k_ds = fsd50k_ds.map(trim)
fsd50k_ds = fsd50k_ds.map(lambda samples, label, split: (samples[:5 * 16000], label, split))
fsd50k_ds = fsd50k_ds.map(frame).unbatch()
fsd50k_ds = fsd50k_ds.map(spectrogram_for_map)
fsd50k_ds = fsd50k_ds.map(mel_spectrogram_for_map)
fsd50k_ds = fsd50k_ds.map(db_scale_for_map)
fsd50k_ds = fsd50k_ds.map(expand_dims_for_map)

In [None]:
fsd50k_ds = fsd50k_ds.cache()

Reset the random seed for reproducibility.

In [None]:
tf.keras.utils.set_random_seed(42)

Split the dataset into training, validation, and testing sets, then remove the split column.

In [None]:
fsd50k_train_ds = fsd50k_ds.filter(lambda mel_spectrogram, label, split: split == "train")
fsd50k_val_ds = fsd50k_ds.filter(lambda mel_spectrogram, label, split: split == "val")
fsd50k_test_ds = fsd50k_ds.filter(lambda mel_spectrogram, label, split: split == "test")

# remove the split column now that it's not needed anymore
remove_split_column = lambda mel_spectrogram, label, fold: (mel_spectrogram, tf.cast(label, dtype=tf.float32))

fsd50k_train_ds = fsd50k_train_ds.map(remove_split_column)
fsd50k_val_ds = fsd50k_val_ds.map(remove_split_column)
fsd50k_test_ds = fsd50k_test_ds.map(remove_split_column)

fsd50k_train_ds = fsd50k_train_ds.cache().shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)
fsd50k_val_ds = fsd50k_val_ds.cache().batch(32).prefetch(tf.data.AUTOTUNE)
fsd50k_test_ds = fsd50k_test_ds.cache().batch(32).prefetch(tf.data.AUTOTUNE)

Clone the first 5 layers of the ESC-50 model, and set the convolutional layer as non-trainable.

In [None]:
esc50_conv_model = tf.keras.models.clone_model(
    tf.keras.Model(
        inputs=esc50_model.inputs,
        outputs=[
            esc50_model.layers[-3].output
        ]
    )
)

esc50_conv_model.summary()

esc50_conv_model.layers[-3].trainable = False

Create a new model with the convolutional layers and an new dense layer.

In [None]:
fsd50k_model = tf.keras.Sequential([
  esc50_conv_model,
  tf.keras.layers.Dense(len(LABELS), activation="softmax")
], name='fsd50k_model')

fsd50k_model.summary()

Compile the model and create an early stopping callback.

In [None]:
fsd50k_model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
    optimizer="adam",
    metrics=["accuracy"]
)

early_stopping_callback = tf.keras.callbacks.EarlyStopping(
    monitor="loss",
    patience=3,
    restore_best_weights=True
)

Train the model for upt to 20 epochs.

In [None]:
history = fsd50k_model.fit(
    fsd50k_train_ds,
    epochs=20,
    validation_data=fsd50k_val_ds,
    callbacks=[
        early_stopping_callback
    ]
)

Evaluate the model on the test dataset.

In [None]:
fsd50k_model.evaluate(fsd50k_test_ds)

Load one of the testing files and inspect the model's prediction.

In [None]:
testing_filepath = "datasets/FSD50k/clips/eval/131642.wav"
testing_samples = load_wav(testing_filepath)[4000:]

testing_spectrogram, _, _ = spectrogram_for_map(testing_samples[:16000], None, None)
testing_mel_spectrogram, _, _ = mel_spectrogram_for_map(testing_spectrogram, None, None)
testing_db_scale_mel_spectrogram, _, _ = db_scale_for_map(testing_mel_spectrogram, None, None)

IPython.display.display(IPython.display.Audio(testing_samples[:16000], rate=16000))

print(testing_db_scale_mel_spectrogram.shape)

fsd50k_model.predict(
    tf.expand_dims(
        tf.expand_dims(testing_db_scale_mel_spectrogram, axis=-1)
    , axis=0)
)

Save the model.

In [None]:
fsd50k_model.save('fsd50k_model')

Create a zip file for the saved model.

In [None]:
shutil.make_archive("fsd50k_model", "zip", "fsd50k_model")

Convert the model to TensorFlow Lite format with quantization and 8-bit inputs and outputs.

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(fsd50k_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]

def representative_data_gen():
  for input_value, output_value, _ in fsd50k_ds:
    yield [input_value]
    
converter.representative_dataset = representative_data_gen
converter.target_spec.supported_ops = [ tf.lite.OpsSet.TFLITE_BUILTINS_INT8 ]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

tflite_model_quant = converter.convert()

with open("model.tflite", "wb") as f:
  f.write(tflite_model_quant)

Convert the `.tflite` model to a C array.

In [None]:
%%shell
echo "alignas(16) const unsigned char tflite_model[] = {" > tflite_model.h
cat model.tflite | xxd -i                                >> tflite_model.h
echo "};"                                                >> tflite_model.h

Create another C header file with the weight matrix for converting FFT bins to Mel scale.

In [None]:
mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(
    num_mel_bins=40,
    num_spectrogram_bins=129,
    sample_rate=16000,
    lower_edge_hertz=0,
    upper_edge_hertz=8000,
).numpy().transpose()

with open('mel_weight_matrix.h', 'w') as out:
  out.write(f'const float mel_weight_matrix[{mel_weight_matrix.shape[0]}][{mel_weight_matrix.shape[1]}] = ' + '{\n')
  for i in range(mel_weight_matrix.shape[0]):
    out.write('  { ' + ", ".join(mel_weight_matrix[i].astype(str)) + ' },\n')

  out.write('};\n')
