Install Python Libraries

In [None]:
!pip install librosa matplotlib pandas "tensorflow==2.8.*" "tensorflow-io==0.24.*" "tensorflow-model-optimization==0.7.2"

!pip install git+https://github.com/ARM-software/CMSIS_5.git@5.8.0#egg=CMSISDSP\&subdirectory=CMSIS/DSP/PythonWrapper

In [None]:
import tensorflow as tf

tf.keras.utils.get_file('cmake-3.21.0-linux-x86_64.tar.gz',
                        'https://github.com/Kitware/CMake/releases/download/v3.21.0/cmake-3.21.0-linux-x86_64.tar.gz',
                        cache_dir='./',
                        cache_subdir='tools',
                        extract=True)

tf.keras.utils.get_file('gcc-arm-none-eabi-10-2020-q4-major-x86_64-linux.tar.bz2',
                        'https://developer.arm.com/-/media/Files/downloads/gnu-rm/10-2020q4/gcc-arm-none-eabi-10-2020-q4-major-x86_64-linux.tar.bz2',
                        cache_dir='./',
                        cache_subdir='tools',
                        extract=True)

In [None]:
!apt-get install -y xxd

In [None]:
import os

os.environ['PATH'] = f"{os.getcwd()}/tools/cmake-3.21.0-linux-x86_64/bin:{os.environ['PATH']}"
os.environ['PATH'] = f"{os.getcwd()}/tools/gcc-arm-none-eabi-10-2020-q4-major/bin:{os.environ['PATH']}"

Raspberry Pi

In [None]:
%%shell
git clone --branch 1.2.0 https://github.com/raspberrypi/pico-sdk.git
cd pico-sdk
git submodule init
git submodule update

In [None]:
os.environ['PICO_SDK_PATH'] = f"{os.getcwd()}/pico-sdk"

In [None]:
# for SparkFun MicroMod
# os.environ['PICO_BOARD'] = 'sparkfun_micromod'

# for Raspberry Pi Pico (uncomment next line)
os.environ['PICO_BOARD'] = 'pico'

print(f"PICO_BOARD env. var. set to '{os.environ['PICO_BOARD']}'")

In [None]:
%%shell
git clone --recurse-submodules https://github.com/ArmDeveloperEcosystem/ml-audio-classifier-example-for-pico.git

In [None]:
%%shell
ln -s ml-audio-classifier-example-for-pico/colab_utils colab_utils
ln -s ml-audio-classifier-example-for-pico/inference-app inference-app

In [None]:
import tensorflow as tf

tf.keras.utils.get_file('esc-50.zip',
                        'https://github.com/karoldvl/ESC-50/archive/master.zip',
                        cache_dir='./',
                        cache_subdir='datasets',
                        extract=True)

Load Dataset

In [None]:
import pandas as pd

esc50_csv = './datasets/ESC-50-master/meta/esc50.csv'
base_data_path = './datasets/ESC-50-master/audio/'

df = pd.read_csv(esc50_csv)
df.head()

In [None]:
from os import path

base_data_path = './datasets/ESC-50-master/audio/'

df['fullpath'] = df['filename'].map(lambda x: path.join(base_data_path, x))

df.head()

In [None]:
import tensorflow_io as tfio
import librosa

def load_wav(filename, desired_sample_rate, desired_channels):
  try:
    file_contents = tf.io.read_file(filename)
    wav, sample_rate = tf.audio.decode_wav(file_contents, desired_channels=desired_channels)
    wav = tf.squeeze(wav, axis=-1)
  except:
    # fallback to librosa if the wav file can be read with TF
    filename = tf.cast(filename, tf.string)
    wav, sample_rate = librosa.load(filename.numpy().decode('utf-8'), sr=None, mono=(desired_channels == 1))

  wav = tfio.audio.resample(wav, rate_in=tf.cast(sample_rate, dtype=tf.int64), rate_out=tf.cast(desired_sample_rate, dtype=tf.int64))

  return wav

In [None]:
import matplotlib.pyplot as plt
from IPython import display

sample_rate = 16000
channels = 1

test_wav_file_path = df['fullpath'][0]
test_wav_data = load_wav(test_wav_file_path, sample_rate, channels)

plt.plot(test_wav_data)
plt.show()

display.Audio(test_wav_data, rate=sample_rate)

In [None]:
_ = plt.plot(test_wav_data[32000:48000])

In [None]:
fullpaths = df['fullpath']
targets = df['target']
folds = df['fold']

fullpaths_ds = tf.data.Dataset.from_tensor_slices((fullpaths, targets, folds))
fullpaths_ds.element_spec

In [None]:
def load_wav_for_map(fullpath, label, fold):
  wav = tf.py_function(load_wav, [fullpath, sample_rate, channels], tf.float32)

  return wav, label, fold

wav_ds = fullpaths_ds.map(load_wav_for_map)
wav_ds.element_spec

In [None]:
@tf.function
def split_wav(wav, width, stride):
  return tf.map_fn(fn=lambda t: wav[t * stride:t * stride + width], elems=tf.range((tf.shape(wav)[0] - width) // stride), fn_output_signature=tf.float32)

@tf.function
def wav_not_empty(wav):
  return tf.experimental.numpy.any(wav)

def split_wav_for_flat_map(wav, label, fold):
  wavs = split_wav(wav, width=16000, stride=4000)
  labels = tf.repeat(label, tf.shape(wavs)[0])
  folds = tf.repeat(fold, tf.shape(wavs)[0])

  return tf.data.Dataset.from_tensor_slices((wavs, labels, folds))

split_wav_ds = wav_ds.flat_map(split_wav_for_flat_map)
split_wav_ds = split_wav_ds.filter(lambda x, y, z: wav_not_empty(x))

In [None]:
for wav, _, _ in split_wav_ds.take(5):
  _ = plt.plot(wav)
  plt.show()

Create Spectrograms

In [None]:
@tf.function
def create_spectrogram(samples):
  return tf.abs(
      tf.signal.stft(samples, frame_length=256, frame_step=128)
  )

In [None]:
spectrogram = create_spectrogram(test_wav_data[32000:48000])

spectrogram.shape

In [None]:
import numpy as np

def plot_spectrogram(spectrogram, vmax=None):
  transposed_spectrogram = tf.transpose(spectrogram)

  fig = plt.figure(figsize=(8,6))
  height = transposed_spectrogram.shape[0]
  X = np.arange(transposed_spectrogram.shape[1])
  Y = np.arange(height * int(sample_rate / 256), step=int(sample_rate / 256))

  im = plt.pcolormesh(X, Y, tf.transpose(spectrogram), vmax=vmax)

  fig.colorbar(im)
  plt.show()


plot_spectrogram(spectrogram)

In [None]:
def create_spectrogram_for_map(samples, label, fold):
  return create_spectrogram(samples), label, fold

spectrograms_ds = split_wav_ds.map(create_spectrogram_for_map)
spectrograms_ds.element_spec

In [None]:
for s, _, _ in spectrograms_ds.take(5):
  plot_spectrogram(s)

Split Dataset

In [None]:
import numpy as np
import tensorflow as tf

# Set seed for experiment reproducibility
random_seed = 42
tf.random.set_seed(random_seed)
np.random.seed(random_seed)

In [None]:
cached_ds = spectrograms_ds.cache()

train_ds = cached_ds.filter(lambda spectrogram, label, fold: fold < 4)
val_ds = cached_ds.filter(lambda spectrogram, label, fold: fold == 4)
test_ds = cached_ds.filter(lambda spectrogram, label, fold: fold > 4)

# remove the folds column as it's no longer needed
remove_fold_column = lambda spectrogram, label, fold: (tf.expand_dims(spectrogram, axis=-1), label)

train_ds = train_ds.map(remove_fold_column)
val_ds = val_ds.map(remove_fold_column)
test_ds = test_ds.map(remove_fold_column)

train_ds = train_ds.cache().shuffle(1000, seed=random_seed).batch(32).prefetch(tf.data.AUTOTUNE)
val_ds = val_ds.cache().batch(32).prefetch(tf.data.AUTOTUNE)
test_ds = test_ds.cache().batch(32).prefetch(tf.data.AUTOTUNE)

Train Model

In [None]:
for spectrogram, _, _ in cached_ds.take(1):
    input_shape = tf.expand_dims(spectrogram, axis=-1).shape
    print('Input shape:', input_shape)

norm_layer = tf.keras.layers.experimental.preprocessing.Normalization()
norm_layer.adapt(cached_ds.map(lambda x, y, z: tf.reshape(x, input_shape)))

In [None]:
baseline_model = tf.keras.models.Sequential([
  tf.keras.layers.Input(shape=input_shape),
  tf.keras.layers.experimental.preprocessing.Resizing(32, 32, interpolation="nearest"),
  norm_layer,
  tf.keras.layers.Conv2D(8, kernel_size=(8,8), strides=(2, 2), activation="relu"),
  tf.keras.layers.MaxPool2D(pool_size=(2,2)),
  tf.keras.layers.Flatten(),
  tf.keras.layers.Dropout(0.25),
  tf.keras.layers.Dense(50, activation='softmax')
])

baseline_model.summary()

In [None]:
METRICS = [
      "accuracy",
]

baseline_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
    metrics=METRICS,
)

def scheduler(epoch, lr):
  if epoch < 100:
    return lr
  else:
    return lr * tf.math.exp(-0.1)

callbacks = [
    tf.keras.callbacks.EarlyStopping(verbose=1, patience=25),
    tf.keras.callbacks.LearningRateScheduler(scheduler)
]

In [None]:
EPOCHS = 250
history = baseline_model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=callbacks,
)

In [None]:
baseline_model.evaluate(test_ds)

In [None]:
baseline_model.save("baseline_model")

In [None]:
!zip -r baseline_model.zip baseline_model

Download datasets

In [None]:
tf.keras.utils.get_file('dog_barks.tar.gz',
                        'https://github.com/seaweed2018/180DA-WarmUp/raw/main/ml-audio-classifier-example-for-pico-dog_barks.tar.gz',
                        cache_dir='./',
                        cache_subdir='datasets',
                        extract=True)

In [None]:
# Since we only need the files in the _background_noise_ folder of the dataset
# use the curl command to download the archive file and then manually extract
# using the tar command, instead of using tf.keras.utils.get_file(...)
# in Python

!mkdir -p datasets/speech_commands
!curl -L -o datasets/speech_commands_v0.02.tar.gz http://download.tensorflow.org/data/speech_commands_v0.02.tar.gz
!tar --wildcards --directory datasets/speech_commands -xzvf datasets/speech_commands_v0.02.tar.gz './_background_noise_/*'

In [None]:
!ls datasets

In [None]:
dog_bark_files_ds = tf.data.Dataset.list_files("datasets/ml-audio-classifier-example-for-pico-dog_barks/*.wav", shuffle=False)
dog_bark_files_ds = dog_bark_files_ds.map(lambda x: (x, 1, -1))

In [None]:
background_noise_files_ds = tf.data.Dataset.list_files("datasets/speech_commands/_background_noise_/*.wav", shuffle=False)
background_noise_files_ds = background_noise_files_ds.map(lambda x: (x, 0, -1))

In [None]:
dog_bark_wav_ds = dog_bark_files_ds.map(load_wav_for_map)
dog_bark_wav_ds = dog_bark_wav_ds.cache()

background_noise_wav_ds = background_noise_files_ds.map(load_wav_for_map)
background_noise_wav_ds = background_noise_wav_ds.cache()

In [None]:
for wav_data, _, _ in dog_bark_wav_ds.take(1):
  plt.plot(wav_data)
  plt.ylim([-1, 1])
  plt.show()

  display.display(display.Audio(wav_data, rate=sample_rate))

In [None]:
for wav_data, _, _ in background_noise_wav_ds.take(1):
  plt.plot(wav_data)
  plt.ylim([-1, 1])
  plt.show()

  display.display(display.Audio(wav_data, rate=sample_rate))

In [None]:
split_dog_bark_wav_ds = dog_bark_wav_ds.flat_map(split_wav_for_flat_map)
split_dog_bark_wav_ds = split_dog_bark_wav_ds.filter(lambda x, y, z: wav_not_empty(x))

split_background_noise_wav_ds = background_noise_wav_ds.flat_map(split_wav_for_flat_map)
split_background_noise_wav_ds = split_background_noise_wav_ds.filter(lambda x, y, z: wav_not_empty(x))

TensorFlow

In [None]:
import cmsisdsp
from numpy import pi as PI

window_size = 256
step_size = 128

hanning_window_f32 = np.zeros(window_size)
for i in range(window_size):
  hanning_window_f32[i] = 0.5 * (1 - cmsisdsp.arm_cos_f32(2 * PI * i / window_size ))

hanning_window_q15 = cmsisdsp.arm_float_to_q15(hanning_window_f32)

rfftq15 = cmsisdsp.arm_rfft_instance_q15()
status = cmsisdsp.arm_rfft_init_q15(rfftq15, window_size, 0, 1)

def get_arm_spectrogram(waveform):

  num_frames = int(1 + (len(waveform) - window_size) // step_size)
  fft_size = int(window_size // 2 + 1)

  # Convert the audio to q15
  waveform_q15 = cmsisdsp.arm_float_to_q15(waveform)

  # Create empty spectrogram array
  spectrogram_q15 = np.empty((num_frames, fft_size), dtype = np.int16)

  start_index = 0

  for index in range(num_frames):
    # Take the window from the waveform.
    window = waveform_q15[start_index:start_index + window_size]

    # Apply the Hanning Window.
    window = cmsisdsp.arm_mult_q15(window, hanning_window_q15)

    # Calculate the FFT, shift by 7 according to docs
    window = cmsisdsp.arm_rfft_q15(rfftq15, window)

    # Take the absolute value of the FFT and add to the Spectrogram.
    spectrogram_q15[index] = cmsisdsp.arm_cmplx_mag_q15(window)[:fft_size]

    # Increase the start index of the window by the overlap amount.
    start_index += step_size

  # Convert to numpy output ready for keras
  return cmsisdsp.arm_q15_to_float(spectrogram_q15).reshape(num_frames,fft_size) * 512

In [None]:
@tf.function
def create_arm_spectrogram_for_map(wav, label, fold):
  spectrogram = tf.py_function(get_arm_spectrogram, [wav], tf.float32)

  return spectrogram, label, fold

dog_bark_spectrograms_ds = split_dog_bark_wav_ds.map(create_arm_spectrogram_for_map)
dog_bark_spectrograms_ds = dog_bark_spectrograms_ds.cache()

for spectrogram, _, _ in dog_bark_spectrograms_ds.take(1):
  plot_spectrogram(spectrogram)

In [None]:
background_noise_spectrograms_ds = split_background_noise_wav_ds.map(create_arm_spectrogram_for_map)
background_noise_spectrograms_ds = background_noise_spectrograms_ds.cache()

for spectrogram, _, _ in background_noise_spectrograms_ds.take(1):
  plot_spectrogram(spectrogram)

In [None]:
def calculate_ds_len(ds):
  count = 0
  for _, _, _ in ds:
    count += 1

  return count

num_dog_bark_spectrograms = calculate_ds_len(dog_bark_spectrograms_ds)
num_background_noise_spectrograms = calculate_ds_len(background_noise_spectrograms_ds)

print(f"num_dog_bark_spectrograms = {num_dog_bark_spectrograms}")
print(f"num_background_noise_spectrograms = {num_background_noise_spectrograms}")

Data Augmentation