# Train a Audio Recognition Model

This notebook demonstrates how to train a Audio Recognition model to recognize keywords in speech.



**Training is much faster using GPU acceleration.** Before you proceed, ensure you are using a GPU runtime by going to **Runtime -> Change runtime type** and set **Hardware accelerator: GPU**. Training 15,000 iterations will take 1.5 - 2 hours on a GPU runtime.

## Configure Defaults

**MODIFY** the following constants for your specific use case.

In [None]:
# The objective is to control light using the Malagasy language.
# Provide a comma-separated list of the words you want to train.
# All other words in the dataset will be used to train an "unknown" label,
#while silent audio data with no spoken words will be used to train a "silence" label.

# You will upload audio files in the next cell
WANTED_WORDS = "mirehitra,maty"
# Mirehitra means light on and Maty means light off

#The number of training steps and learning rates can be specified as
#comma-separated lists to define the rate at each stage. For example, if you
#set TRAINING_STEPS=12000,3000 and LEARNING_RATE=0.001,0.0001, the model will
#run a total of 15,000 training loops.
#It will use a learning rate of 0.001 for the first 12,000 steps and a rate of
#0.0001 for the final 3,000 steps. If you select TRAINING_STEPS=12000,3000,
#it will take approximately 2 hours of training.
TRAINING_STEPS = "5000,1500"
LEARNING_RATE = "0.001,0.0001"

# Calculate the total number of steps, which is used to identify the checkpoint
# file name.
TOTAL_STEPS = str(sum(map(lambda string: int(string), TRAINING_STEPS.split(","))))

# Print the configuration to confirm it
print("Training these words: %s" % WANTED_WORDS)
print("Training steps in each stage: %s" % TRAINING_STEPS)
print("Learning rate in each stage: %s" % LEARNING_RATE)
print("Total number of training steps: %s" % TOTAL_STEPS)

The following constants as they include filepaths used in this notebook and data that is shared during training and inference.

In [None]:
# Calculate the percentage of 'silence' and 'unknown' training samples required
# to ensure that we have equal number of samples for each label.
number_of_labels = WANTED_WORDS.count(',') + 1
number_of_total_labels = number_of_labels + 2 # for 'silence' and 'unknown' label
equal_percentage_of_training_samples = int(100.0/(number_of_total_labels))
SILENT_PERCENTAGE = equal_percentage_of_training_samples
UNKNOWN_PERCENTAGE = equal_percentage_of_training_samples

# Constants which are shared during training and inference
PREPROCESS = 'micro'
WINDOW_STRIDE = 20
MODEL_ARCHITECTURE = 'tiny_conv' # Other options include: single_fc, conv,
                      # low_latency_conv, low_latency_svdf, tiny_embedding_conv

# Constants used during training only
VERBOSITY = 'WARN'
EVAL_STEP_INTERVAL = '500'
SAVE_STEP_INTERVAL = '500'

# Constants for training directories and filepaths
DATASET_DIR =  'dataset/'
LOGS_DIR = 'logs/'
TRAIN_DIR = 'train/' # for training checkpoints and other files.

# Constants for inference directories and filepaths
import os
MODELS_DIR = 'models'
if not os.path.exists(MODELS_DIR):
  os.mkdir(MODELS_DIR)
MODEL_TF = os.path.join(MODELS_DIR, 'model.pb')
MODEL_TFLITE = os.path.join(MODELS_DIR, 'model.tflite')
FLOAT_MODEL_TFLITE = os.path.join(MODELS_DIR, 'float_model.tflite')
MODEL_TFLITE_MICRO = os.path.join(MODELS_DIR, 'model.cc')
SAVED_MODEL = os.path.join(MODELS_DIR, 'saved_model')

QUANT_INPUT_MIN = 0.0
QUANT_INPUT_MAX = 26.0
QUANT_INPUT_RANGE = QUANT_INPUT_MAX - QUANT_INPUT_MIN

## Import your Custom Dataset
To begin, the algorithme will download Gonzalez's dataset, which will serve as a foundation of "unkown" and "silent" label upon which you can construct your own dataset. You can access the dataset on [Google Drive](https://drive.google.com/file/d/1hpYc-YnBCHDhu2M4uvbIFDBI6CCTWjvw/view?pli=1).

This strategy will significantly enhance your model's performance, especially if you're training with a limited amount of data. This approach will influence the results.


Gonzalez's dataset consists of:

*   environmental background noise, such as rain and wind. The "silent" label is trained using this data.
*   additionaly, there are other data representing some conversation and malagasy words to train the "unkown" label.

In [None]:
#This cell dowload the dataset available in Google Drive to your
#Google Colab Files content
# Installation needed
!pip install gdown

import gdown

# File ID from the Google Drive link
file_id = "1hpYc-YnBCHDhu2M4uvbIFDBI6CCTWjvw"

# Construct the download URL
url = f"https://drive.google.com/uc?export=download&id={file_id}"

# Specify the output file name
output = "dataset.rar"

# Download the file
gdown.download(url, output, quiet=False)

print(f"File downloaded and saved as '{output}' in Google Colab's file system.")

In [None]:
#This cell extracte the content of dataset.rar and
#creat a directory to store the dataset

# Install required libraries
!pip install patool
!sudo apt-get install unrar  # Install unrar for handling .rar files

import patoolib

# Path to the .rar file
rar_file = "dataset.rar"

# Directory to extract the contents
extract_dir = "dataset"

# Extract the .rar file
patoolib.extract_archive(rar_file, outdir=extract_dir)

print(f"File '{rar_file}' extracted to '{extract_dir}'.")

# Verify the extracted files
!ls -R {extract_dir}

Now you'll need to upload your all of your custom *.ogg audio files that you recorded using the [Open Speech Recording tool](https://tinyml.seas.harvard.edu/open_speech_recording/).
Please use the Google Chrome browser on a laptop. It only downloads 10 files at once, so refresh the site and repeat the process until you have sufficient datasets.

Once you have finished, you can select multiple files and upload them all simultaneously! Ensure you have a high-speed internet connection.


In [None]:
# Uploading the datasets with your wanted words.
from google.colab import files
uploaded = files.upload()

Then we can convert them into correctly trimmed WAV files and then store them in the appropriate folders in the DATASET_DIR. We will use Pete's extract_loudest_section tool which you can find more documentation about here: https://github.com/petewarden/extract_loudest_section

In [None]:
# convert the ogg files to wavs
!mkdir wavs
!find *.ogg -print0 | xargs -0 basename -s .ogg | xargs -I {} ffmpeg -i {}.ogg -ar 16000 wavs/{}.wav
!rm -r -f *.ogg

# then use pete's tool to only extract 1 second clips from them for use with the KWS pipeline
!mkdir trimmed_wavs
!git clone https://github.com/petewarden/extract_loudest_section.git
!make -C extract_loudest_section/
!/tmp/extract_loudest_section/gen/bin/extract_loudest_section 'wavs/*.wav' trimmed_wavs/
!rm -r -f /wavs

In [None]:
import glob
import os
import re
import shutil

# Store them in the appropriate folders
data_index = {}
print("Before:", os.getcwd())  # Prints the current working directory
os.chdir('/content/trimmed_wavs')  # Changes the directory
print("After:", os.getcwd())  # Prints the updated working directory

DATASET_DIR =  'dataset/'
search_path = os.path.join('*.wav')
for wav_path in glob.glob(search_path):
    original_wav_path = wav_path
    parts = wav_path.split('_')
    if len(parts) > 2:
        wav_path = parts[0] + '_' + ''.join(parts[1:])
    matches = re.search('([^/_]+)_([^/_]+)\.wav', wav_path)
    if not matches:
        raise Exception('File name not in a recognized form:"%s"' % wav_path)
    word = matches.group(1).lower()
    instance = matches.group(2).lower()
    if not word in data_index:
      data_index[word] = {}
    if instance in data_index[word]:
        raise Exception('Audio instance already seen:"%s"' % wav_path)
    data_index[word][instance] = original_wav_path

output_dir = os.path.join('..', 'dataset')
try:
    os.mkdir(output_dir)
except:
    pass
for word in data_index:
  word_dir = os.path.join(output_dir, word)
  try:
      os.mkdir(word_dir)
      print('Created dir: ' + word_dir)
  except:
      print('Storing in existing dir: ' + word_dir)
  for instance in data_index[word]:
    wav_path = data_index[word][instance]
    output_path = os.path.join(word_dir, instance + '.wav')
    shutil.copyfile(wav_path, output_path)
os.chdir('..')
!rm -r -f trimmed_wavs

## Setup Environment

All the datasets are ready. Now, let's install dependencies for the training.

In [None]:
import tensorflow as tf

Clone the TensorFlow Github Repository, which contains the relevant code required to run the training process.

In [None]:
!git clone -q --depth 1 https://github.com/tensorflow/tensorflow

Load TensorBoard to visualize the accuracy and loss as training proceeds.


In [None]:
%load_ext tensorboard
%tensorboard --logdir {LOGS_DIR}

## Training

The following script begin training.

In [None]:
!python tensorflow/tensorflow/examples/speech_commands/train.py \
--data_dir={DATASET_DIR} \
--wanted_words={WANTED_WORDS} \
--data_url=""\
--background_volume=0.4\
--background_frequency=0.45\
#The parameters background_volume and background_frequency are optional.
#They are used for data augmentation. Specifically, these values indicate that
#the dataset is mixed with 40% background volume and that these changes are
#applied to 45% of the dataset.
#Since they are optional, we can remove them if desired.
--silence_percentage={SILENT_PERCENTAGE} \
--unknown_percentage={UNKNOWN_PERCENTAGE} \
--preprocess={PREPROCESS} \
--window_stride={WINDOW_STRIDE} \
--model_architecture={MODEL_ARCHITECTURE} \
--how_many_training_steps={TRAINING_STEPS} \
--learning_rate={LEARNING_RATE} \
--train_dir={TRAIN_DIR} \
--summaries_dir={LOGS_DIR} \
--verbosity={VERBOSITY} \
--eval_step_interval={EVAL_STEP_INTERVAL} \
--save_step_interval={SAVE_STEP_INTERVAL}

## Generate a TensorFlow Model for Inference

Combine relevant training results (graph, weights, etc) into a single file for inference. This process is known as freezing a model and the resulting model is known as a frozen model/graph, as it cannot be further re-trained after this process.
The saved TensorFlow models are located in the models/ directory on your Google Colab Files.

In [None]:
!rm -rf {SAVED_MODEL}
!python tensorflow/tensorflow/examples/speech_commands/freeze.py \
--wanted_words=$WANTED_WORDS \
--window_stride_ms=$WINDOW_STRIDE \
--preprocess=$PREPROCESS \
--model_architecture=$MODEL_ARCHITECTURE \
--start_checkpoint=$TRAIN_DIR$MODEL_ARCHITECTURE'.ckpt-'{TOTAL_STEPS} \
--save_format=saved_model \
--output_file={SAVED_MODEL}

## Generate a TensorFlow Lite Model

Convert the frozen graph to a fully quantized TensorFlow Lite model for embedded devices. The following cell will print the model size, expected to be under 20 kilobytes.

In [None]:
import sys
#This path is includes to enable the import of speech processing modules.
sys.path.append("/content/tensorflow/tensorflow/examples/speech_commands/")
import input_data
import models
import numpy as np

In [None]:
SAMPLE_RATE = 16000
CLIP_DURATION_MS = 1000
WINDOW_SIZE_MS = 30.0
FEATURE_BIN_COUNT = 40
BACKGROUND_FREQUENCY = 0.45
BACKGROUND_VOLUME_RANGE = 0.4
TIME_SHIFT_MS = 100.0

DATA_URL = ''
#if you use your own dataset, the DATA_URL is empty
VALIDATION_PERCENTAGE = 10
TESTING_PERCENTAGE = 10

In [None]:
model_settings = models.prepare_model_settings(
    len(input_data.prepare_words_list(WANTED_WORDS.split(','))),
    SAMPLE_RATE, CLIP_DURATION_MS, WINDOW_SIZE_MS,
    WINDOW_STRIDE, FEATURE_BIN_COUNT, PREPROCESS)
audio_processor = input_data.AudioProcessor(
    DATA_URL, DATASET_DIR,
    SILENT_PERCENTAGE, UNKNOWN_PERCENTAGE,
    WANTED_WORDS.split(','), VALIDATION_PERCENTAGE,
    TESTING_PERCENTAGE, model_settings, LOGS_DIR)

In [None]:
with tf.compat.v1.Session() as sess:
  float_converter = tf.lite.TFLiteConverter.from_saved_model(SAVED_MODEL)
  float_tflite_model = float_converter.convert()
  float_tflite_model_size = open(FLOAT_MODEL_TFLITE, "wb").write(float_tflite_model)
  print("Float model is %d bytes" % float_tflite_model_size)

  converter = tf.lite.TFLiteConverter.from_saved_model(SAVED_MODEL)
  converter.optimizations = [tf.lite.Optimize.DEFAULT]
  converter.inference_input_type = tf.int8
  converter.inference_output_type = tf.int8
  def representative_dataset_gen():
    for i in range(75):
      data, _ = audio_processor.get_data(1, i*1, model_settings,
                                         BACKGROUND_FREQUENCY,
                                         BACKGROUND_VOLUME_RANGE,
                                         TIME_SHIFT_MS,
                                         'testing',
                                         sess)
      flattened_data = np.array(data.flatten(), dtype=np.float32).reshape(1, 1960)
      yield [flattened_data]
      print(i) # If there is an error during this cell's run, change
      # the value in the for loop by the last value printed
  converter.representative_dataset = representative_dataset_gen
  tflite_model = converter.convert()
  tflite_model_size = open(MODEL_TFLITE, "wb").write(tflite_model)
  print("Quantized model is %d bytes" % tflite_model_size)


## Testing the TensorFlow Lite model's accuracy

Verify that the model we've exported is still accurate, using the TF Lite Python API and our test set.

In [None]:
# Helper function to run inference
def run_tflite_inference(tflite_model_path, model_type="Float"):
  # Load test data
  np.random.seed(0) # set random seed for reproducible test results.
  with tf.compat.v1.Session() as sess:
    test_data, test_labels = audio_processor.get_data(
        -1, 0, model_settings, BACKGROUND_FREQUENCY, BACKGROUND_VOLUME_RANGE,
        TIME_SHIFT_MS, 'testing', sess)
  test_data = np.expand_dims(test_data, axis=1).astype(np.float32)

  # Initialize the interpreter
  interpreter = tf.lite.Interpreter(tflite_model_path,
                                    experimental_op_resolver_type=tf.lite.experimental.OpResolverType.BUILTIN_REF)
  interpreter.allocate_tensors()

  input_details = interpreter.get_input_details()[0]
  output_details = interpreter.get_output_details()[0]

  # For quantized models, manually quantize the input data from float to integer
  if model_type == "Quantized":
    input_scale, input_zero_point = input_details["quantization"]
    test_data = test_data / input_scale + input_zero_point
    test_data = test_data.astype(input_details["dtype"])

  correct_predictions = 0
  for i in range(len(test_data)):
    interpreter.set_tensor(input_details["index"], test_data[i])
    interpreter.invoke()
    output = interpreter.get_tensor(output_details["index"])[0]
    top_prediction = output.argmax()
    correct_predictions += (top_prediction == test_labels[i])

  print('%s model accuracy is %f%% (Number of test samples=%d)' % (
      model_type, (correct_predictions * 100) / len(test_data), len(test_data)))

In [None]:
# Compute float model accuracy
run_tflite_inference(FLOAT_MODEL_TFLITE)

# Compute quantized model accuracy
run_tflite_inference(MODEL_TFLITE, model_type='Quantized')

## Generate a TensorFlow Lite for MicroControllers Model
Convert the TensorFlow Lite model into a C source file that can be loaded by TensorFlow Lite for Microcontrollers.

In [None]:
# Install xxd if it is not available
!apt-get update && apt-get -qq install xxd
# Convert to a C source file
!xxd -i {MODEL_TFLITE} > {MODEL_TFLITE_MICRO}
# Update variable names
REPLACE_TEXT = MODEL_TFLITE.replace('/', '_').replace('.', '_')
!sed -i 's/'{REPLACE_TEXT}'/g_model/g' {MODEL_TFLITE_MICRO}

## Deploy to a Microcontroller
The model was deployed on Arduino Nano 33 BLE Sense Rev2 board.

First, download the model.cc file located in the models/ directory. Open the Arduino IDE and make sure you have installed all the libraries to use the appropriate board. Install the [TensorFlow Lite Micro Library for Arduino](https://github.com/spaziochirale/Chirale_TensorFlowLite.git), and in the libraries installed, open the micro_speech example. Follow these instructions:

1. Open the micro_features_model.cpp file and change the raw data to the new one in model.cc. It represents the AI model created to perform the tasks.
2. Change the kCategoryLabels in micro_features_micro_model_settings.cpp file. Change the yes and no to your desired words; in this case, "mirehatra," "maty."
3. Download the float_model.tflite model and visualize the model structure using [Netron](https://netron.app/). Then modify your operators used to run the model in the .ino file.

# Data processing

This section is don't need to be executed for the training.

The code below slices uploaded WAV files into one-second clips and compresses all resulting files into a ZIP archive. It was used to prepare the unknown dataset.

## Slicing the WAV file

In [None]:
import os
import librosa
import soundfile as sf
from google.colab import files

# Create a directory to store sliced audio files
output_dir = "/content/sliced_audio"
os.makedirs(output_dir, exist_ok=True)

# Upload WAV files
uploaded = files.upload()

for filename in uploaded.keys():
    if filename.endswith(".wav"):
        # Load audio file
        filepath = os.path.join("/content", filename)
        audio, sr = librosa.load(filepath, sr=None)  # Keep original sample rate

        # Get duration and number of 1-second slices
        duration = librosa.get_duration(y=audio, sr=sr)
        num_slices = int(duration)

        for i in range(num_slices):
            start_sample = i * sr  # Start sample index
            end_sample = (i + 1) * sr  # End sample index

            # Slice audio
            sliced_audio = audio[start_sample:end_sample]

            # Save the sliced audio
            slice_filename = f"{os.path.splitext(filename)[0]}_{i+1}.wav"
            slice_path = os.path.join(output_dir, slice_filename)
            sf.write(slice_path, sliced_audio, sr)

        print(f"Processed: {filename} -> {num_slices} slices saved in {output_dir}")

print("Slicing complete! You can download the files from the 'Files' tab in Colab.")

##Creating a ZIP file

In [None]:
import zipfile

output_dir = "/content/sliced_audio"
zip_path = "/content/sliced_audio.zip"
os.makedirs(output_dir, exist_ok=True)

# Create a ZIP file containing all sliced audio files
with zipfile.ZipFile(zip_path, 'w') as zipf:
    for root, _, files in os.walk(output_dir):
        for file in files:
            zipf.write(os.path.join(root, file), file)

print(f"ZIP file created: {zip_path}")