Speech to Intent Model Training and Conversion to TensorFlow for Microcontrollers
This model was developed to run on a Seeedstudio WIO Terminal with a ATSAMD51P19 MCU and ARM Cortex-M4F core
The specifications can be found on Seeedstudio's website, current link is posted below:
https://www.seeedstudio.com/wio-terminal

Before using this notebook, the following packages are necessary and can be installed with pip or conda
It is recommended to run these in a virtual environment and/or with conda
- pip install pandas
- pip install numpy
- pip install librosa (for audio and music processing in Python)
- pip install audiomentations
- pip install tensorflow (various versions might be needed depending on your Python/Anaconda/Jupyter versions)

Goals of this project:
- As of January 2023, there is still not a considerable amount of support for FOSS speech recognition tasks that are capable of integrating with MCUs
- Microcontrollers are cheap and consume low amounts of energy
- WIO Terminal can be integrated with SBCs and other IoT devices for a plethora of use cases
- The other side of it is I just like playing with fancy software and electronics :)

Use Cases Beyond This One:
- Audio detection for home security (high decibel levels can be detected, noises can be classified into types (gunshots, glass breaking, dogs barking, etc.)
- Home automation - it is possible to control smart light bulbs, thermostats, washers and dryers, door locks, etc with additional hardware (WIO terminal comes with built in Wi-Fi connectivity and Raspberry Pi 40-pin compatibility.)


Constraints
- Large vocabularies cannot be implemented due to the RAM and flash constraints of MCUs

Background Context for Sound Processing 
- Sound is nothing more than a vibration that propogates through a transmission medium (solid, liquid, gas)
- One molecule "pushes" another, that molucule pushes another and so on and so forth until it reaches another object
- Same principle applies to microphones
    -- A microphone has a diaphram that is pushed by sound waves and then returns to its origin position by a magnet
    -- This is then converted to an electrical signal (alternating current) which is proportional to sound amplitude (the louder the sound, the more the diaphram is pushed, and more current is produced)
    -- We record this with an analog to digital converter and record it in intervals
    -- Sampling rate = the number of times a reading is taken in one second - Hz are considered a cycle/second
- We can visualize audio signal as a graph of Amplitude (y-axis) vs Time (number of samples)
    -- This is not very helpful for analyzing sound
    -- Fourier transforms can be applied to decompose a singal into individual frequencies and the frequency's amplitude
    -- Multiple Fourier transforms can be applied and appended together to create a Spectrogram (Hz vs. Time) with color coded decibel levels
- Humans do not percieve frequencies in the linear scale
- We are better at detecting differences in lower frequencies (detecting 500 Hz vs 1000 Hz is easier than detecting 10,000 Hz and 10,500 Hz)
- Humans perception of sound range is about 20 Hz to 20,000 Hz
- The Mel scale was developed to place more weight on the frequencies that the human ear can hear

Overview of this Project
- Speech to intent directly converted to speech to parsed intent which is based on a predefined and specific domain vocabulary
- i.e. "Alexa, turn on the lights in the bedroom" this would get parsed into an output intent


In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import cm

import librosa
import librosa.display

from sklearn.utils import shuffle
import string

import io, base64
import os, sys
from datetime import datetime

import IPython
import pickle

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Reshape, Flatten, Activation, Add
from tensorflow.keras.layers import Dense, Dropout, Softmax, TimeDistributed, LSTM
from tensorflow.keras.layers import Conv2D, DepthwiseConv2D
from tensorflow.keras.activations import relu
from tensorflow.keras.layers import GlobalMaxPooling2D, GlobalAveragePooling2D, ZeroPadding2D
from tensorflow.keras.layers import MaxPooling2D, AveragePooling2D, Flatten
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint, TensorBoard
from tensorflow.keras.models import load_model

from keras import backend as K 

import random 
import glob

from tensorflow.keras.models import load_model

import scipy.io.wavfile as wav

import soundfile as sf
from audiomentations import Compose, AddGaussianNoise, AddBackgroundNoise, PitchShift, Shift, ClippingDistortion, Gain, LoudnessNormalization, TimeStretch 
from tensorflow.python.ops import gen_audio_ops as contrib_audio

DEBUG = False

project_path = "../checkpoints"  # creates directory to save the model
data_path = "../resources/"  # where I store my dataset

train_dataset_path = os.path.join(data_path, 'data/csv/train_data.csv')
valid_dataset_path = os.path.join(data_path, 'data/csv/valid_data.csv')
#test_dataset_path = os.path.join(data_path, 'data/csv/test_data.csv')
test_dataset_path = os.path.join(data_path, 'data/csv/wt_data.csv')

SAMPLING_RATE = 16000
MIN_FREQ = 100
MAX_FREQ = SAMPLING_RATE//2
WIN_SIZE_MS = 0.02
WIN_INCREASE_MS = 0.02
NUM_CEPSTRAL = 10

The cell below opens the dataset so we can look at its content

In [None]:
test_data = pd.read_csv(test_dataset_path)
test_data.head()

The cell below will display the first entry in the wav_file folder so we can ensure it matches the transcription

In [None]:
prefix = [data_path, "data"]
wav_file = os.path.join(*prefix, test_data['path'][0])
print(wav_file)
IPython.display.Audio(wav_file)

We need to match the MFCC processing params on our device, so audio_spectrogram and mfcc functions in TF gen_audio_ops. We can use generate_features to create a spectrogram and convert it to mel frequency and visualize it with matplotlib. 

In [None]:
audio, sample_rate = librosa.load(wav_file, sr=16000, res_type='kaiser_best')

if DEBUG:
    print(wav_file)

def generate_features(draw_graphs, raw_data, sampling_freq,
                      frame_length, frame_stride, num_filters, 
                      num_cepstral, low_frequency, high_frequency):
    graphs = []
    
    raw_data = np.expand_dims(raw_data, axis = -1)
    window_size = int(sampling_freq * frame_length)
    stride = int(sampling_freq * frame_stride)
    
    spectrogram = contrib_audio.audio_spectrogram(
        raw_data,
        window_size=window_size,
        stride=stride,
        magnitude_squared=True)
    
    mfcc = contrib_audio.mfcc(
        spectrogram,
        sampling_freq,
        dct_coefficient_count=num_cepstral,
        upper_frequency_limit=high_frequency, 
        lower_frequency_limit=low_frequency)
    
    mfcc = np.squeeze(mfcc)

    if draw_graphs:
        mfcc_graph = np.swapaxes(mfcc, 0, 1)
        fig, ax = plt.subplots()
        img = librosa.display.specshow(mfcc_graph, x_axis='time', ax=ax)
        fig.colorbar(img, ax=ax)
        ax.set(title='MFCC')
        buf = io.BytesIO()

        plt.savefig(buf, format='svg', bbox_inches='tight', pad_inches=0)

        buf.seek(0)
        image = (base64.b64encode(buf.getvalue()).decode('ascii'))

        buf.close()

        graphs.append({
            'name': 'Cepstral Coefficients',
            'image': image,
            'imageMimeType': 'image/svg+xml',
            'type': 'image'
        })

    return {
        'features': mfcc,
        'graphs': graphs,
        'output_config': {
            'type': 'spectrogram',
            'shape': {
                'width': mfcc.shape[1],
                'height': mfcc.shape[0]
            }
        }
    }

processed = generate_features(True, audio, SAMPLING_RATE, 
                              WIN_SIZE_MS, WIN_INCREASE_MS, 32, 
                              NUM_CEPSTRAL, MIN_FREQ, MAX_FREQ)

if DEBUG:
    print(processed['features'])
    
print(processed['output_config'])

In the proceeding cell we process the .csv file data into labels for the model. Slots are included for objects and locations. 

In [None]:
class DatasetFactory:
    
    def __init__(self):
        self.actions = set()
        self.objects = set()
        self.locations = set()
        self.vocab = set()
    
    def get_query_slots(self, sentence):

        slots = [sentence[0], sentence[1]]
        return slots      
    
    def get_properties(self, data):

        data["action"] = data['action'].str.lower()
        data["object"] = data['object'].str.lower()
        data["location"] = data['location'].str.lower()

        actions = set(data.action.unique())
        objects = set(data.object.unique())
        locations = set(data.location.unique())

        return actions, objects, locations        

    def get_vocab(self, actions, objects, locations, data):

        vocab = objects | locations

        if DEBUG:
            print(vocab)

        data["transcription"] = data['transcription'].str.replace('[^\w\s]','')
        data["transcription"] = data['transcription'].str.lower()

        for item in data.transcription:
            for word in item.split(" "):
                vocab.add(word)

        vocab = [s.strip() for s in vocab]
        
        return set(vocab)   
    
    def add_corpora(self, data):
        
        actions, objects, locations = self.get_properties(data)
        vocab = self.get_vocab(actions, objects, locations, data)

        self.actions = set(self.actions | actions)
        self.objects = set(self.objects | objects)
        self.locations = set(self.locations | locations)        
        self.vocab = set(self.vocab | vocab)  
        self.query_slots = set(self.objects | self.locations)
        
    def process_data(self, data):
        
        self.actions = list(self.actions)
        self.objects = list(self.objects)
        self.locations = list(self.locations)       
        self.vocab = list(self.vocab)
        self.query_slots = list(self.query_slots)
        
        word_ids, slot_ids, intent_ids = {' ': 0}, {}, {self.actions[i]: i for i in range(0, len(self.actions))}

        slots = []
        for sentence in zip(data.object, data.location):
            slots.append(self.get_query_slots(sentence))
        
        i = 0
        for slot in self.query_slots:
            if slot == 'none':
                continue
            slot_ids[slot] = i
            i += 1
            
        slot_ids['none'] = i

        #convert vocab to dictionary
        start = 1
        for i in range(len(self.vocab)):
            word_ids[self.vocab[i]] = start + i
        word_ids['unknown'] =  i + 1  

        #create reverse dicts
        ids2words = dict((v, k) for k, v in word_ids.items())
        ids2slots = dict((v, k) for k, v in slot_ids.items())
        ids2intents = dict((v, k) for k, v in intent_ids.items())

        n_vocab = len(ids2words)

        n_classes = len(ids2intents)
        n_slots = len(ids2slots)

        vectorized_slots = list(map(lambda slots: np.array(list(map(lambda slot: slot_ids[slot], slots))), slots))
        vectorized_intents = list(map(lambda l: np.array([intent_ids[l]]), data.action))

        filepaths = data['path'].to_numpy()

        return ids2intents, ids2slots, vectorized_slots, vectorized_intents, filepaths        
    
def save_obj(obj, name):
    with open(os.path.join(data_path, 'data/pkl/'+ name + '.pkl'), 'wb') as f:
        pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)

def load_obj(name):
    with open(os.path.join(data_path, 'data/pkl/'+ name + '.pkl'), 'rb') as f:
        return pickle.load(f)

generate_data = True #change that to True the first time you are running the code

train_data = pd.read_csv(train_dataset_path)
valid_data = pd.read_csv(valid_dataset_path)
test_data = pd.read_csv(test_dataset_path)

if generate_data:
    
    dataset_processor = DatasetFactory()
    
    train_data = pd.read_csv(train_dataset_path)
    valid_data = pd.read_csv(valid_dataset_path)
    test_data = pd.read_csv(test_dataset_path)
    
    dataset_processor.add_corpora(train_data)
    dataset_processor.add_corpora(valid_data)
    dataset_processor.add_corpora(test_data)
    
    ids2intents, ids2slots, vectorized_slots_train, vectorized_intents_train, filepaths_train = dataset_processor.process_data(train_data)
    _ids2intents, _ids2slots, vectorized_slots_valid, vectorized_intents_valid, filepaths_valid = dataset_processor.process_data(valid_data)
    __ids2intents, __ids2slots, vectorized_slots_test, vectorized_intents_test, filepaths_test = dataset_processor.process_data(test_data)

    assert ids2intents == _ids2intents == __ids2intents
    assert ids2slots == _ids2slots == __ids2slots
    
    save_obj(ids2intents, 'ids2intents')
    save_obj(ids2slots, 'ids2slots')
    
    save_obj(vectorized_slots_train, 'vectorized_slots_train')
    save_obj(vectorized_intents_train, 'vectorized_intents_train')
    
    save_obj(vectorized_slots_valid, 'vectorized_slots_valid')
    save_obj(vectorized_intents_valid, 'vectorized_intents_valid')
    
    save_obj(vectorized_slots_test, 'vectorized_slots_test')
    save_obj(vectorized_intents_test, 'vectorized_intents_test') 
    
else:

    filepaths_train = train_data['path'].to_numpy()
    filepaths_valid = valid_data['path'].to_numpy()
    filepaths_test = test_data['path'].to_numpy()
    
    ids2intents = load_obj('ids2intents')
    ids2slots = load_obj('ids2slots')
    
    vectorized_slots_train = load_obj('vectorized_slots_train')
    vectorized_intents_train = load_obj('vectorized_intents_train')
    
    vectorized_slots_valid = load_obj('vectorized_slots_valid')
    vectorized_intents_valid = load_obj('vectorized_intents_valid')
    
    vectorized_slots_test = load_obj('vectorized_slots_test')
    vectorized_intents_test = load_obj('vectorized_intents_test')
    
if DEBUG:
    print(vectorized_slots_test)
    print(vectorized_intents_test)
    print(ids2intents) 
    print(ids2slots) 

print(str(ids2intents.values()).replace("'", "\"")) 
print(str(ids2slots.values()).replace("'", "\"")) 

n_classes = len(ids2intents)
n_slots = len(ids2slots)

The next cell generates a data generator class and instantializes it. The training data was from TinyML's Fluent Speech Commands Dataset and is slightly altered to include background noise to random samples using AddGausianNoise, AddBackgroundNoise, and ClippingDistortion. TinyMl's dataset contains 97 speakers saying 248 different phrases. The utterances are mapped to 31 unique intents which are divided into three slots: action, object, and location. The dataset also includes non native english speakers. 

In [None]:
def create_aug_pipeline():
    
    aug_pipeline = Compose([
    AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.015, p=0.1),
    AddBackgroundNoise(sounds_path=os.path.join(data_path, "data/wavs/background_noise"), p=0.3),
    ClippingDistortion(p=0.3),
    PitchShift(min_semitones=-4, max_semitones=4, p=0.2),
    Shift(min_fraction=-0.5, max_fraction=0.5, p=0.1),
    Gain(p=0.2),
    TimeStretch(p=0.05)
    ])
    
    return aug_pipeline

class DataGenerator(Sequence):
    """Generates data for Keras
    Sequence based data generator. Suitable for building data generator for training and prediction.
    """
    def __init__(self, entries, num_list, batch_size, shuffle=True, to_fit=True, augment = True, vis = False):

        self.entries = entries
        self.batch_size = batch_size
        
        self.n_intents, self.n_slots = num_list
        
        self.len = 2
        self.aug_pipeline = None
        if augment:
            self.aug_pipeline = create_aug_pipeline()
        self.vis = vis
        self.shuffle = shuffle
        self.to_fit = to_fit
        self.on_epoch_end()

    def __len__(self):
        """Denotes the number of batches per epoch
        :return: number of batches per epoch
        """
        return int(np.floor(len(self.entries[0]) / self.batch_size))

    def __getitem__(self, index):
        """Generate one batch of data
        :param index: index of the batch
        :return: X and y when fitting. X only when predicting
        """
        # Generate indexes of the batch
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
        
        X_batch = [self.entries[0][k] for k in indexes]
        
        Y_intent = [self.entries[1][k] for k in indexes]
        Y_slot = [self.entries[2][k] for k in indexes]
        
        # Generate data
        X = self._generate_X(X_batch)

        if self.to_fit:
            y = self._generate_y(Y_intent, Y_slot)
            return X, y
        else:
            return X

    def on_epoch_end(self):
        """Updates indexes after each epoch
        """
        self.indexes = np.arange(len(self.entries[0]))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def _generate_X(self, batch_items):

        X = np.zeros(shape = (self.batch_size, 150, NUM_CEPSTRAL, 1))
         
        for i, batch_item in enumerate(batch_items):
            wav_file = os.path.join(*prefix, batch_item)
            audio, sample_rate = librosa.load(os.path.join(wav_file), sr=16000, res_type='kaiser_best')
            audio = librosa.util.fix_length(audio, 16000*3)
            
            if self.aug_pipeline:
                audio = self.aug_pipeline(audio, sample_rate)
                
                if DEBUG:
                    new_filename = os.path.join('samples', os.path.basename(batch_item.split('.')[0]+'aug.wav'))
                    print("Augmented: ", new_filename)
                    print("--------------")
                    sf.write(new_filename, audio, sample_rate,  subtype='PCM_16')
                
            output = generate_features(self.vis, audio, SAMPLING_RATE, 
                                          WIN_SIZE_MS, WIN_INCREASE_MS, 32, 
                                          NUM_CEPSTRAL, MIN_FREQ, MAX_FREQ)

            features = output['features']
            X[i, ] = np.expand_dims(features, axis = -1)
        return X
    
    def _generate_y(self, intents, slots):
        intent_y = np.empty((self.batch_size, self.n_intents), dtype=int)
        slot_y = np.empty((self.batch_size, self.len, self.n_slots), dtype=int)      

        # Generate data
        for i, batch_item in enumerate(intents):
            intent = intents[i]
            slot = slots[i]
            intent_y[i,] = np.eye(self.n_intents)[intent]
            slot_y[i,] = np.eye(self.n_slots)[slot][np.newaxis, :]
        
        return [intent_y, slot_y]

batch_size = 32   
    
training_generator = DataGenerator([filepaths_train, vectorized_intents_train, vectorized_slots_train], 
                                   [n_classes,n_slots], batch_size = batch_size, 
                                   shuffle=True, to_fit=True, augment = True)

data = training_generator.__getitem__(0)
print(data[0].shape)
print(data[1][0].shape)
print(data[1][1].shape)
print(training_generator.__len__())

validation_generator = DataGenerator([filepaths_valid, vectorized_intents_valid, vectorized_slots_valid], 
                                     [n_classes,n_slots], batch_size = batch_size, 
                                     shuffle=False, to_fit=True, augment = False)


data = validation_generator.__getitem__(0)
print(data[0].shape)
print(data[1][0].shape)
print(data[1][1].shape)
print(validation_generator.__len__())

test_generator = DataGenerator([filepaths_test, vectorized_intents_test, vectorized_slots_test], 
                                     [n_classes, n_slots], batch_size = batch_size, vis = False,
                                     shuffle=False, to_fit=True, augment = False)

data = test_generator.__getitem__(0)
print(data[0].shape)
print(data[1][0].shape)
print(data[1][1].shape)
print(test_generator.__len__())

The next cell generates model archicture, and is simply 2D Convolution layers with Batch Normalization and Max Pooling 2D layers. Last, Global Max Pooling is implemented and features are fed into Dense layer which then are mapped to slot and intent outputs. 

In [None]:
K.clear_session()

main_input = Input(shape=(150, NUM_CEPSTRAL, 1), name='main_input')

x = Conv2D(16, 3, padding='same', activation='relu', use_bias = False)(main_input)
x = BatchNormalization()(x)

x = Conv2D(16, 2, padding='same', activation='relu', use_bias = False)(x)
x = BatchNormalization()(x)
x = MaxPooling2D(pool_size = (2,2))(x)

x = Conv2D(16, 2, padding='same', activation='relu', use_bias = False)(x)
x = BatchNormalization()(x)
x = MaxPooling2D(pool_size = (2,2))(x)

x = Conv2D(32, 3, padding='same', activation='relu', use_bias = False)(x)
x = BatchNormalization()(x)
x = MaxPooling2D(pool_size = (2,2))(x)

x = Conv2D(128, 2, padding='same', activation='relu', use_bias = False)(x)
x = BatchNormalization()(x)

x = GlobalMaxPooling2D()(x)

x = Dropout(0.1)(x)
x = Dense(32, activation='relu')(x)

slot_dense = Dense(n_slots*2)(x)
slot_reshape = Reshape(target_shape = (2, n_slots))(slot_dense)
slot_output = Softmax(name='slot_output')(slot_reshape)

intent_output = Dense(n_classes, activation='softmax', name='intent_output', use_bias = False)(x)

model = Model(inputs=main_input, outputs=[intent_output, slot_output])

optim = Adam(learning_rate=1e-3)

model.compile(optimizer = optim, loss='categorical_crossentropy', metrics='accuracy')
model.summary()

tf.keras.utils.plot_model(model, to_file='img.png', show_shapes=True)

It is very much possible to achieve a slot output accuracy in the 90s and an intent accuracy in the 80s and 90s. The number of epochs is a choice the user can make, but I would suggest at least 30 and an accuracy at the very least above 50%. Additionally, the epochs will run much faster if you are able to use TensorFlow connected to your computer's GPU. It is also possible to run this in Google Collab if hardware resources are limited. 

In [None]:
output_path = os.path.join(project_path, datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))
os.makedirs(output_path)
print("Project folder: {}".format(output_path))

model_name = os.path.join(output_path, "slu_model.h5")
log_dir =  os.path.join(output_path, "logs")

my_callbacks = [
    EarlyStopping(patience=10, restore_best_weights=True, verbose = 1),
    ModelCheckpoint(filepath=model_name, save_best_only=True, verbose = 1),
    TensorBoard(log_dir=log_dir),
    ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-6, verbose = 1)]
try:
    model.fit(training_generator, validation_data = test_generator,
              callbacks = my_callbacks, epochs = 70,
              workers = 4, max_queue_size = 10,
              use_multiprocessing = False)
except KeyboardInterrupt:
    raise

LSTM Layer
The baseline model falls short in that it does not preserve the temporal dependencies in the outut of the feature extractor. To preserve the temporal component of the signal, we can utilize a singular LSTM later after Conv 2D wrapped with a TimeDistributed layer. 

In [None]:
K.clear_session()

main_input = Input(shape=(150, NUM_CEPSTRAL, 1), name='main_input')

x = Conv2D(16, 3, padding='same', activation='relu', use_bias = False)(main_input)
x = BatchNormalization()(x)

x = Conv2D(16, 2, padding='same', activation='relu', use_bias = False)(x)
x = BatchNormalization()(x)
x = MaxPooling2D(pool_size = (2,2))(x)

x = Conv2D(16, 2, padding='same', activation='relu', use_bias = False)(x)
x = BatchNormalization()(x)
x = MaxPooling2D(pool_size = (2,2))(x)

x = Conv2D(32, 3, padding='same', activation='relu', use_bias = False)(x)
x = BatchNormalization()(x)
x = MaxPooling2D(pool_size = (2,2))(x)

x = Conv2D(128, 2, padding='same', activation='relu', use_bias = False)(x)
x = TimeDistributed(Flatten())(x)

x = LSTM(32, activation='relu')(x)

slot_dense = Dense(n_slots*2)(x)
slot_reshape = Reshape(target_shape = (2, n_slots))(slot_dense)
slot_output = Softmax(name='slot_output')(slot_reshape)

intent_output = Dense(n_classes, activation='softmax', name='intent_output', use_bias = False)(x)

model = Model(inputs=main_input, outputs=[intent_output, slot_output])

optim = Adam(learning_rate=1e-3)

model.compile(optimizer = optim, loss='categorical_crossentropy', metrics='accuracy')
model.summary()

tf.keras.utils.plot_model(model, to_file='img.png', show_shapes=True)

In [None]:
output_path = os.path.join(project_path, datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))
os.makedirs(output_path)
print("Project folder: {}".format(output_path))

model_name = os.path.join(output_path, "slu_model.h5")
log_dir =  os.path.join(output_path, "logs")

my_callbacks = [
    EarlyStopping(patience=10, restore_best_weights=True, verbose = 1),
    ModelCheckpoint(filepath=model_name, save_best_only=True, verbose = 1),
    TensorBoard(log_dir=log_dir),
    ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-6, verbose = 1)]
try:
    model.fit(training_generator, validation_data = test_generator,
              callbacks = my_callbacks, epochs = 30,
              workers = 4, max_queue_size = 10,
              use_multiprocessing = False)
except KeyboardInterrupt:
    raise

Residual Connections
Another way to improve the quality of the network predictions is to add skip connections by implementing ResNet blocks with the intent being that the low level feature info will be passed on to the top levels of the feature extractor and used in the prediction making process.

In [None]:
def make_residual_block(X, num_channels, use_1x1conv=False, strides=1):
        conv1 = Conv2D(num_channels, padding='same',
                                            kernel_size=3, strides=strides)
        conv2 = Conv2D(num_channels, kernel_size=3,
                                            padding='same')
        conv3 = None
        if use_1x1conv:
            conv3 = Conv2D(num_channels, kernel_size=1,
                                                strides=strides)
        bn1 = BatchNormalization()
        bn2 = BatchNormalization()
        relu1 = Activation(relu)
        relu2 = Activation(relu)
        Y = relu1((bn1(conv1(X))))
        Y = bn2(conv2(Y))
        if conv3 is not None:
            X = conv3(X)
        Y = Add()([Y, X])
        return relu2(Y) 
def make_resnet_block(x, num_channels, num_residuals, first_block=False):
    for i in range(num_residuals):
        if i == 0 and not first_block:
            x = make_residual_block(x, num_channels, use_1x1conv=True, strides=2)
        else:
            x = make_residual_block(x, num_channels)
    return x

K.clear_session()

main_input = Input(shape=(150, NUM_CEPSTRAL, 1), name='main_input')

x = make_resnet_block(main_input, 8, 1, first_block=True)
x = make_resnet_block(x, 16, 2, first_block=False)
x = make_resnet_block(x, 32, 3, first_block=False)
x = GlobalMaxPooling2D()(x)

x = Dropout(0.1)(x)
x = Dense(32, activation='relu')(x)

slot_dense = Dense(n_slots*2)(x)
slot_reshape = Reshape(target_shape = (2, n_slots))(slot_dense)
slot_output = Softmax(name='slot_output')(slot_reshape)

intent_output = Dense(n_classes, activation='softmax', name='intent_output', use_bias = False)(x)

model = Model(inputs=main_input, outputs=[intent_output, slot_output])

optim = Adam(learning_rate=1e-3, decay=1e-6)

model.compile(optimizer = optim, loss='categorical_crossentropy', metrics='accuracy')
model.summary()

tf.keras.utils.plot_model(model, to_file='img.png', show_shapes=True)

In [None]:
output_path = os.path.join(project_path, datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))
os.makedirs(output_path)
print("Project folder: {}".format(output_path))

model_name = os.path.join(output_path, "slu_model.h5")
log_dir =  os.path.join(output_path, "logs")

my_callbacks = [
    EarlyStopping(patience=10, restore_best_weights=True, verbose = 1),
    ModelCheckpoint(filepath=model_name, save_best_only=True, verbose = 1),
    TensorBoard(log_dir=log_dir),
    ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-6, verbose = 1)]
try:
    model.fit(training_generator, validation_data = test_generator,
              callbacks = my_callbacks, epochs = 30,
              workers = 4, max_queue_size = 10,
              use_multiprocessing = False)
except KeyboardInterrupt:
    raise

Depthwise Convolutions
Depthwise convolution blocks facilitate less multiplication operations by following it with pointwise convolution. We can use this to apply a single convolution filter to each input channel. 

In [None]:
def plain_conv_block(inputs, num_filters = 16, alpha = 1, kernel_size = 2, pooling = None, block_id=1, activation = 'relu'):

    x = Conv2D(int(num_filters*alpha), kernel_size, padding='same', use_bias = False, name='conv_%d' % block_id)(inputs)
    x = BatchNormalization(name='conv_%d_bn' % block_id)(x)
    x = Activation(activation, name='conv_%d_act' % block_id)(x)

    if pooling:
        x = MaxPooling2D(pool_size = pooling, name='conv_%d_pool' % block_id)(x)
    return x
    
def dw_conv_block(inputs, num_filters, alpha, depth_multiplier=1, strides=(1, 1), block_id=1, activation = 'relu'):

    pointwise_conv_filters = int(num_filters * alpha)

    if strides == (1, 1):
        x = inputs
    else:
        x = ZeroPadding2D(((0, 1), (0, 1)),
                                 name='conv_pad_%d' % block_id)(inputs)
    x = DepthwiseConv2D((2, 2),
                               padding='same' if strides == (1, 1) else 'valid',
                               depth_multiplier=depth_multiplier,
                               #strides=strides,
                               use_bias=False,
                               name='conv_dw_%d' % block_id)(x)
    x = BatchNormalization(name='conv_dw_%d_bn' % block_id)(x)
    x = Activation(activation, name='conv_dw_%d_act' % block_id)(x)

    x = Conv2D(pointwise_conv_filters, (1, 1),
                      padding='same',
                      use_bias=False,
                      strides=(1, 1),
                      name='conv_pw_%d' % block_id)(x)
    x = BatchNormalization(name='conv_pw_%d_bn' % block_id)(x)
    x = Activation(activation, name='conv_pw_%d_act' % block_id)(x)

    if strides > 1:
        x = MaxPooling2D(pool_size = 2, name='conv_%d_pool' % block_id)(x)

    return x


def _depth(v, divisor=8, min_value=None):
    if min_value is None:
        min_value = divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    # Make sure that round down does not go down by more than 10%.
    if new_v < 0.9 * v:
        new_v += divisor
    return new_v

K.clear_session()

main_input = Input(shape=(150, NUM_CEPSTRAL, 1), name='main_input')

x = plain_conv_block(main_input, num_filters = 16, alpha = 1, kernel_size = 2, pooling = None, block_id=0, activation = 'relu')

x = dw_conv_block(x, 16, 2, depth_multiplier=1, strides=1, block_id=1, activation = 'relu')
x = dw_conv_block(x, 16, 2, depth_multiplier=1, strides=2, block_id=2, activation = 'relu')
x = dw_conv_block(x, 16, 2, depth_multiplier=1, strides=1, block_id=3, activation = 'relu')
x = dw_conv_block(x, 32, 2, depth_multiplier=1, strides=2, block_id=4, activation = 'relu')
x = dw_conv_block(x, 128, 2, depth_multiplier=1, strides=2, block_id=5, activation = 'relu')

x = GlobalMaxPooling2D()(x)

x = Dropout(0.1)(x)
x = Dense(32, activation='relu')(x)

slot_dense = Dense(n_slots*2)(x)
slot_reshape = Reshape(target_shape = (2, n_slots))(slot_dense)
slot_output = Softmax(name='slot_output')(slot_reshape)

intent_output = Dense(n_classes, activation='softmax', name='intent_output', use_bias = False)(x)

model = Model(inputs=main_input, outputs=[intent_output, slot_output])

optim = Adam(learning_rate=1e-3, decay=1e-6)

model.compile(optimizer = optim, loss='categorical_crossentropy', metrics='accuracy')
model.summary()

tf.keras.utils.plot_model(model, to_file='img.png', show_shapes=True)

In [None]:
output_path = os.path.join(project_path, datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))
os.makedirs(output_path)
print("Project folder: {}".format(output_path))

model_name = os.path.join(output_path, "slu_model.h5")
log_dir =  os.path.join(output_path, "logs")

my_callbacks = [
    EarlyStopping(patience=10, restore_best_weights=True, verbose = 1),
    ModelCheckpoint(filepath=model_name, save_best_only=True, verbose = 1),
    TensorBoard(log_dir=log_dir),
    ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-6, verbose = 1)]
try:
    model.fit(training_generator, validation_data = test_generator,
              callbacks = my_callbacks, epochs = 30,
              workers = 4, max_queue_size = 10,
              use_multiprocessing = False)
except KeyboardInterrupt:
    raise

Training Complete
Now we can check the model accuracy and display random sample inference results. Further, using test_models(model_directory="checkpoints") allows us to test all the models in the experiment folder and display names, results, and model summaries. We can then determine which has the higher slot and intent accuracy if it is higher than our pre-set threshold (0.8 for me)

In [None]:
accuracy_threshold = 0.8

def test_models(model_name = None, model_directory = None):
    
    print("Testing")
    if model_directory:
        model_files_list = []
        file_search = lambda ext : glob.glob(model_directory + ext, recursive=True)
        for ext in ['/**/*.h5']: model_files_list.extend(file_search(ext))
    else:
        model_files_list = [model_name]
        
    batch_size = 1

    test_generator = DataGenerator([filepaths_test, vectorized_intents_test, vectorized_slots_test], 
                                         [n_classes, n_slots], batch_size = batch_size, vis = False,
                                         shuffle=False, to_fit=True, augment = False)
    best_model = None
    best_accuracy = 0.0

    for model_file in model_files_list:

        model = load_model(model_file)
      
        intent_correct = 0
        slot_correct = 0

        for num in range(32):

            X, y = test_generator.__getitem__(num)

            try:
                results = model(X, training=False)
            except Exception as e:
                print('Error')
                break

            if ids2intents[np.argmax(y[0])] == ids2intents[np.argmax(results[0])]:
                intent_correct += 1

            if ids2slots[np.argmax(y[1][0][0])] == ids2slots[np.argmax(results[1][0][0])]:
                slot_correct += 1 

            if ids2slots[np.argmax(y[1][0][1])] == ids2slots[np.argmax(results[1][0][1])]:
                slot_correct += 1     

        accuracy_intent = intent_correct/32
        accuracy_slot = slot_correct/64

        if accuracy_intent < accuracy_threshold or accuracy_slot < accuracy_threshold and model_directory:
            continue
            
        #model.summary()    
        num = random.randint(0, len(test_generator)-1)

        X, y = test_generator.__getitem__(num)
        
        try:
            results = model(X, training=False)
        except Exception as e:
            print('Error')
        
        print(f"""Model {model_file}
        
        Accuracy Intent {accuracy_intent} %
        Accuracy Slot {accuracy_slot} %

        Random sample num:{num} 

        Ground truth 
        Intent:{ids2intents[np.argmax(y[0])]} 
        Slot1: {ids2slots[np.argmax(y[1][0][0])]}  Slot2: {ids2slots[np.argmax(y[1][0][1])]}\n

        Prediction
        Intent:{ids2intents[np.argmax(results[0])]} 
        Slot1: {ids2slots[np.argmax(results[1][0][0])]}  Slot2: {ids2slots[np.argmax(results[1][0][1])]}\n
        """)

        if (accuracy_intent + accuracy_slot) / 2 > best_accuracy:
            best_model = model
            best_model_file = model_file
            best_accuracy = (accuracy_intent + accuracy_slot) / 2
    return best_model, best_model_file, best_accuracy

#model = test_models(model_name = model_name) 
model, model_name, accuracy = test_models(model_directory = project_path)
print(f"""----------------------------
Best model is {model_name} Accuracy {accuracy} %
----------------------------""")

Data Visualization

In [None]:
sanity_check_data_prefix = [data_path, "data", "wavs", "wt_test"]

wav_file = os.path.join(*sanity_check_data_prefix, "change_language_to_chinese_wt.wav")
#wav_file = os.path.join(*sanity_check_data_prefix, "decrease_volume_wt.wav")
#wav_file = os.path.join(*sanity_check_data_prefix, "turn_on_the_lights_in_the_kitchen_wt.wav"

np.set_printoptions(threshold=sys.maxsize)

sample_rate, audio = wav.read(wav_file)
if DEBUG:
    print(','.join(str(e) for e in audio.tolist()[:4095]))

audio, sample_rate = librosa.load(wav_file, sr=16000, res_type='kaiser_best')
audio = librosa.util.fix_length(audio, 16000*3)
features = generate_features(True, audio, SAMPLING_RATE, 
                  WIN_SIZE_MS, WIN_INCREASE_MS, 32, 
                  NUM_CEPSTRAL, MIN_FREQ, MAX_FREQ)

features = features['features']
X = np.expand_dims(features, axis = 0)

results = model(X, training=False)
print(np.argmax(results[0]), np.argmax(results[1][0][0]), np.argmax(results[1][0][1]))
print(f"""
Prediction
Intent:{ids2intents[np.argmax(results[0])]} 
Slot1: {ids2slots[np.argmax(results[1][0][0])]}  Slot2: {ids2slots[np.argmax(results[1][0][1])]}\n
""")

IPython.display.Audio(wav_file)

Conversion to .tflite format

In [None]:
def representative_dataset():
    for i in range(len(test_data)):
        wav_file = os.path.join(*prefix, test_data['path'][i])
        audio, sample_rate = librosa.load(wav_file, sr=16000, res_type='kaiser_best')
        audio = librosa.util.fix_length(audio, 16000*3)
        features = generate_features(False, audio, SAMPLING_RATE, 
                          WIN_SIZE_MS, WIN_INCREASE_MS, 32, 
                          NUM_CEPSTRAL, MIN_FREQ, MAX_FREQ)
        
        features = features['features']
        X = np.expand_dims(features, axis = -1)
        X = np.expand_dims(X, axis = 0)
        yield [X.astype(np.float32)]

#model = tf.keras.models.load_model(model_name)
model.input.set_shape(1 + model.input.shape[1:])
            
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.experimental_new_converter = True
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.target_spec.supported_types = [tf.int8]
converter.inference_type = tf.int8
converter.inference_input_type = tf.int8 
converter.inference_output_type = tf.int8
tflite_quant_model = converter.convert()

# Save the model.
tflite_filename = os.path.abspath(model_name).split('.')[0] + '.tflite'
with open(tflite_filename, 'wb') as f:
  f.write(tflite_quant_model)

Compare INT8 results with FLOAT32 model and note differences in model accuracy

In [None]:
interpreter = tf.lite.Interpreter(model_path = tflite_filename)
interpreter.allocate_tensors()

sanity_check_data_prefix = [*prefix, "wavs", "wt_test"]

wav_file = os.path.join(*sanity_check_data_prefix, "change_language_to_chinese_wt.wav")
#wav_file = os.path.join(*sanity_check_data_prefix, "decrease_volume_wt.wav")
#wav_file = os.path.join(*sanity_check_data_prefix, "turn_on_the_lights_in_the_kitchen_wt.wav"

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

input_scale, input_zero_point = input_details[0]["quantization"]
output_scale, output_zero_point = output_details[0]["quantization"]
    
audio, sample_rate = librosa.load(wav_file, sr=16000, res_type='kaiser_best')
audio = librosa.util.fix_length(audio, 16000*3)
features = generate_features(True, audio, SAMPLING_RATE, 
                  WIN_SIZE_MS, WIN_INCREASE_MS, 32, 
                  NUM_CEPSTRAL, MIN_FREQ, MAX_FREQ)

features = features['features']

X = np.expand_dims(features, axis = -1)
X = np.expand_dims(X, axis = 0)

input_data = np.asarray(X, dtype=np.float32)

input_data_int8 = np.asarray(input_data/input_scale + input_zero_point, dtype=np.int8)

interpreter.set_tensor(input_details[0]['index'], input_data_int8)
interpreter.invoke()

output_data_slot = np.asarray(interpreter.get_tensor(output_details[0]['index']), dtype=np.float32)
output_data_intent = np.asarray(interpreter.get_tensor(output_details[1]['index']), dtype=np.float32)

intent = (output_data_intent - output_zero_point) * output_scale
slot = (output_data_slot - output_zero_point) * output_scale

if DEBUG:
    print(features)
    print(np.argmax(intent[0]), np.argmax(slot[0][0]), np.argmax(slot[0][1]))
    
print(f"""
Prediction
Intent:{ids2intents[np.argmax(intent[0])]} 
Slot1: {ids2slots[np.argmax(slot[0][0])]}  Slot2: {ids2slots[np.argmax(slot[0][1])]}\n
""")

plt.show()

Create hex dump of the .tflite model weights to C

In [None]:
tfmicro_filename = tflite_filename.split('.')[0] + '.h'
!xxd -i $tflite_filename > $tfmicro_filename