The notebook load the dataset from the Google drive and does inference

In [None]:
# Install all the required packages (borrowed from openWakeWord's automatic training notebook)
running_on_colab = False
if 'google.colab' in str(get_ipython()):
    print('Running on CoLab')
    from google.colab import drive
    drive.mount('/content/drive')
    running_on_colab = True
    restore_dataset = True
    restore_features = True
else:
    print('Not running on CoLab')
    restore_dataset = False
    restore_features = False


In [None]:
import os

if running_on_colab:
    # restoring previous trained model for finetuning from previously generated features
    trained_model_filename = "/content/drive/MyDrive/ColabNotebooks/VoiceAssistant/microWakeWord/trained_models_20240421_164022.tar"
    !cp {trained_model_filename}  .

    model_filename = os.path.basename(trained_model_filename)
    !tar -xvf {model_filename}

if os.path.exists('audio_preprocessor_int8.tflite') == False:
    !wget https://github.com/tensorflow/tflite-micro/raw/main/tensorflow/lite/micro/examples/micro_speech/models/audio_preprocessor_int8.tflite


In [None]:
# download this just as a validation model to test the actual mic/tflite process
if os.path.exists('./okay_nabu.tflite') == False:
    !wget https://github.com/esphome/micro-wake-word-models/raw/main/models/okay_nabu.tflite

In [None]:
%pip install -q tflite_micro
%pip install -q asciichartpy
%pip install -q sounddevice

In [None]:
#import gradio as gr
import tensorflow as tf
from tflite_micro.python.tflite_micro import runtime
import numpy as np
import scipy.signal

In [None]:
# the following swithc uses the tflite preprocessor (same one used in the embedded device) as opposed of using
# the microfrontend s/w based one
tflite_prep = True

import tensorflow as tf
from tensorflow.lite.experimental.microfrontend.python.ops import audio_microfrontend_op as frontend_op
from tflite_micro.python.tflite_micro import runtime

preprocessor_model = runtime.Interpreter.from_file("./audio_preprocessor_int8.tflite")
input_details = preprocessor_model.get_input_details(0)
output_details = preprocessor_model.get_output_details(0)
preprocessor_model.print_allocations()

class VectorSplitter:
    def __init__(self, chunk_size=480):
        self.chunk_size = chunk_size
        self.remainder = np.zeros(160)

    def split_into_chunks(self, vector):
        chunks = []
        #print(f"Splitting vector of size {len(vector)}, stored remainder is of size {len(self.remainder)}")
        vector = np.concatenate((self.remainder, vector))
        #print(f"Concatenated vector of size {len(vector)}")
        i = 0
        while i + self.chunk_size <= len(vector):
        #for i in range(0, len(vector), self.chunk_size):
            chunk = vector[i:i + self.chunk_size]
            chunks.append(chunk)
            #print("append chunk ", chunk.shape, " i:", i)
            #self.remainder = vector[-160:]
            #print("Remainder:", self.remainder.shape)
            i += 320
        #if i < len(vector):
        self.remainder = vector[i:]
        #print("end of vector Remainder:", self.remainder.shape)
        return chunks

splitter = VectorSplitter()
def get_features(input):
    if len(input) != 480:
        raise ValueError("Input must be of size 480")
        return
    preprocessor_model.set_input(input.reshape([1,480]).astype(np.int16), 0)
    preprocessor_model.invoke()
    return preprocessor_model.get_output(0)

# this generates the MEL spectrogram features for a given clip
def generate_features_for_clip(clip):

    if tflite_prep == True:
        chunks =  splitter.split_into_chunks(clip)
        matrix = np.array([get_features(chunk) for chunk in chunks])
        return matrix
    else:
        micro_frontend = frontend_op.audio_microfrontend(
            tf.convert_to_tensor(clip),
            sample_rate=16000,
            window_size=30,
            window_step=20,
            num_channels=40,
            upper_band_limit=7500,
            lower_band_limit=125,
            enable_pcan=True,
            min_signal_remaining=0.05,
            out_scale=1,
            out_type=tf.float32)
        output = tf.multiply(micro_frontend, 0.0390625)
        return output.numpy()

def features_generator(generator):
    for data in generator:
        for clip in data:
            yield generate_features_for_clip(clip)

In [None]:

infer_model = tf.lite.Interpreter(model_path="./stream_state_internal_quant.tflite", num_threads=1)
#infer_model = tf.lite.Interpreter(model_path="./okay_nabu.tflite", num_threads=1)
infer_model.resize_tensor_input(0, [1,1,40], strict=True)  # initialize with fixed input size
infer_model.allocate_tensors()
infer_model_input_details = infer_model.get_input_details()
infer_model_output_details = infer_model.get_output_details()
print()
print("Input details:")
print(infer_model_input_details)
print()
print("Output details:")
print(infer_model_output_details)
print()

# Live Mic testing WW

In [None]:
import sounddevice as sd
import numpy as np
import asciichartpy
from IPython.display import clear_output
# Initialize a list to hold the last 100 data points
last_100_data_points = [0] * 100

# Define your callback here
def process_audio_callback(indata, frames, time, status):
    global last_100_data_points
    indata = (indata.flatten()*32767).astype(np.int16)
    #print("indata shape:", indata.shape)
    res = generate_features_for_clip(indata)
    #print("res shape:", res.shape)
    # Get predictions
    for row in res:
        row1 = row.astype(np.int8)
        row3 = row1.reshape([1,1,40])
        infer_model.set_tensor(infer_model_input_details[0]['index'], row3)
        infer_model.invoke()
        pred = infer_model.get_tensor(infer_model_output_details[0]['index'])
        # Update the list of last 100 data points
        last_100_data_points = last_100_data_points[1:] + [pred[0,0]]
        last_100_data_points[0] = 255

# Set the callback to be called every 500 ms
stream = sd.InputStream(callback=process_audio_callback, channels=1, blocksize=int(320), samplerate = 16000)
with stream:
    while True:
        sd.sleep(500)
        # Clear the console
        clear_output(wait=True)
        #print(last_100_data_points)
        print(asciichartpy.plot(last_100_data_points, {"height": 10}))

# WAV file testing WW

In [None]:
import sounddevice as sd
import soundfile as sf
import numpy as np
import asciichartpy
import itertools
from IPython.display import clear_output
# Initialize a list to hold the last 100 data points
last_100_data_points = [0] * 100

# Read the wav file
data, samplerate = sf.read('./ww2.wav')
# print("data shape:", data.shape)
# print("data:", data)
# print("samplerate:", samplerate)
# print("data length in sec", data.shape[0]/16000)

# Create a generator to yield samples
def gen_samples():
    for sample in data:
        yield sample

samples = gen_samples()

# Define your callback here
def process_audio_callback(outdata, frames, time, status):

    global last_100_data_points

    try:
        outdata[:] = np.array(list(itertools.islice(samples, frames))).reshape(-1, 1)
    except StopIteration:
        raise sd.CallbackStop()
    
    indata = (outdata.flatten()*32767).astype(np.int16)
    res = generate_features_for_clip(indata)

    # Get predictions
    for row in res:
        row1 = row.astype(np.int8)
        row3 = row1.reshape([1,1,40])
        infer_model.set_tensor(infer_model_input_details[0]['index'], row3)
        infer_model.invoke()
        pred = infer_model.get_tensor(infer_model_output_details[0]['index'])
        # Update the list of last 100 data points
        last_100_data_points = last_100_data_points[1:] + [pred[0,0]]
        # this will make sure the graph is scaled properly
        last_100_data_points[0] = 255



with sd.OutputStream(callback=process_audio_callback, blocksize=320, channels=1, samplerate=samplerate) as stream:
    while stream.active:
        sd.sleep(500)
        # Clear the console
        clear_output(wait=True)
        #print(last_100_data_points)
        print(asciichartpy.plot(last_100_data_points, {"height": 10}))