# Keyword Detection for Microcontrollers with Keras

It's expected that all words are collected in a single folder and each sound file is given as .wav, clipped to 1ms. All files should be contained in the folder that matches their label and that the folder name for background noise is `_background_noise_`.

In [None]:
import numpy as np
import sys
import glob
import matplotlib.pyplot as plt
import math
import os
import random
from pprint import pprint

import tensorflow as tf
import tensorflow_io as tfio
from tensorflow.lite.experimental.microfrontend.python.ops import audio_microfrontend_op as frontend_op

import audiomentations

# Training Setup

We use a dictionary as a single place to hold metadata concerning training, audio settings and augmentations.

In [None]:
def get_meta(meta_dict={}, **kwargs):
    """ 
        Retrieve the parameters from the provided dict or, if it doesn't exist, use a default value.
        If you want to pass just a few custom parameters you can use kwarg arguments otherwise you can pass 
        them as a dict. Function throws an error if the same keyword is passed more than once to make sure that the 
        intended value is used. Defaul values are from https://colab.research.google.com/github/tinyMLx/colabs/blob/master/4-6-8-CustomDatasetKWSModel.ipynb
    """
    merged_meta =  {**meta_dict, **kwargs}
    assert len(merged_meta) == len(meta_dict) + len(kwargs), "It appears that a key was set more than once."
    
    training, audio, augments = {}, {}, {}
    training['wanted_words']            = merged_meta.get('wanted_words', ['on', 'off'])
    training['data_path']               = merged_meta.get('data_path', 'dataset/')
    training['epochs']                  = merged_meta.get('epochs', 5)
    training['learning_rate']           = merged_meta.get('learning_rate', 1e-3)
    training['batch_size']              = merged_meta.get('batch_size', 32)
    training['excluded_words']          = merged_meta.get('excluded_words', [])

    audio['sample_rate']                = merged_meta.get('sample_rate', 16_000)
    audio['clip_duration']              = merged_meta.get('clip_duration', 1000)
    audio['window_size_ms']             = merged_meta.get('window_size_ms', 30)
    audio['window_stride']              = merged_meta.get('window_stride', 20)
    audio['feature_bin_count']          = merged_meta.get('feature_bin_count', 40)
    audio['desired_samples']            = int(audio['sample_rate'] * audio['clip_duration'] / 1000)
    window_size_samples                 = int(audio['sample_rate'] * audio['window_size_ms'] / 1000)
    window_stride_samples               = int(audio['sample_rate'] * audio['window_stride'] / 1000)
    length_minus_window                 = audio['desired_samples'] - window_size_samples
    spectrogram_lenght                  = 1 + int(length_minus_window / window_stride_samples)
    audio['spectrogram_lenght']         = spectrogram_lenght
    audio['fingerprint_size']           = spectrogram_lenght * audio['feature_bin_count']
    
    augments['background_frequency']    = merged_meta.get('background_frequency', 0.8)
    augments['background_volume_range'] = merged_meta.get('background_volume_range', 0.1)
    augments['time_shift_ms']           = merged_meta.get('time_shift_ms', 100.0)
    augments['silence_percentage']      = merged_meta.get('silence_percentage', 0.2)
    augments['unknown_percentage']      = merged_meta.get('unknown_percentage', 0.2)
        
    return dict(training=training, audio=audio, augmentation=augments)

If you want to do transfer learning you can use `get_pretrain_words` to pull some random words to pretrain on. Exclude the words you want to fine tune for later and the function can return a random set of words to use for training while leaving a few as `unknown` words.

In [None]:
def get_pretrain_words(path, excluded_words, shuffle=False, n = 25):
    """
    Pulls all folders/words found at `path`. Considers only those that are not in the `excluded_words` and returns 
    `n` of those.
    """
    all_folders = [folder.split('/')[-1] for folder in glob.glob(path+'*')]
    all_included_words = [folder for folder in all_folders if folder not in (excluded_words + ['_background_noise_'])]
    if shuffle:
        random.shuffle(all_included_words)
    return all_included_words[:n]

## Build dictionary

In [None]:
excluded_words = ['licht', 'aus', 'party']
data_path = os.path.join(os.path.expanduser("~"),'coding_data/keyword_detection_nano/dataset/')

In [None]:
## For pretraining
meta_dict = get_meta(
    data_path = data_path,
    wanted_words = get_pretrain_words(data_path, excluded_words),
    excluded_words = ['yes', 'no', 'left'],
    shuffle = False
)

# Data loading
Most of the functionality in this class comes from the [speech_commands](https://github.com/tensorflow/tensorflow/tree/master/tensorflow/examples/speech_commands) example in the tensorflow library with a few simplifications and additions for convenience. In particular the variable names are kept the same where possible so you can quickly find the coresponding one in the original files. This class only implements the case for `preprocessing=='micro'` since it's aimed to do inference on microcontrollers.

In [None]:
class KeywordDataset(tf.keras.utils.Sequence):
    "Test"
    def __init__(self,
                 fns,
                 background_fns,
                 meta_dict,
                 batch_size,
                 is_validation = False
                ):
        self.items = fns
        self.words = meta_dict['training']['wanted_words']
        self.vocab = {word: i for i,word in enumerate(['silence', 'unknown'] + self.words)}
        self.audio_meta = meta_dict['audio']
        self.augmentation_meta = meta_dict['augmentation']
        self.is_validation = is_validation
        self.augment = self.build_augments()
        self.background_data = self.prepare_background_data(background_fns)
        self.batch_size = batch_size

    def __len__(self):
        return math.ceil(len(self.items) / self.batch_size)

    def __getitem__(self, idx):
        """
        Pulls a subset of filenames of size `batch_size`. Loads the audio file according to its label and adds 
        augmentations if in 'training mode'. Finally creates a spectrogram and combines the batch to single 
        numpy x,y vectors.
        """
        items = self.items[idx * self.batch_size: (idx + 1) * self.batch_size]
        xs, ys = [], []
        for fn in items:
            label = self.get_label(fn)
            audio = self.get_audio(fn, label).numpy().flatten()
            if not self.is_validation:
                audio = self.augment(audio, sample_rate = self.audio_meta['sample_rate'])
            spectro = self.get_spectrogram(audio)       
            xs.append(spectro)
            ys.append(self.vocab[label])
        return np.stack(xs), np.stack(ys)
    
    def on_epoch_end(self):
        if not self.is_validation:
            random.shuffle(self.items)

    def prepare_background_data(self,fns):
        ## See `prepare_background_data` in tensorflow/examples/speech_commands/input_data.py
        background_data = []
        for fn in fns:
            file = tf.io.read_file(fn)
            audio, _ = tf.audio.decode_wav(file, desired_channels=1)
            if len(audio) < self.audio_meta['desired_samples']:
                continue
            background_data.append(audio)
        return background_data
        
    def get_label(self, fn):
        if fn == 'silence_placeholder':
            return 'silence'
        else:
            folder = fn.split('/')[-2]
            if folder in self.words:
                return folder
            return 'unknown'
    
    def load_audio(self, fn):
        file = tf.io.read_file(fn)
        audio, _ = tf.audio.decode_wav(contents = file, 
                                       desired_channels = 1, 
                                       desired_samples = self.audio_meta['desired_samples']
                                      )     
        return audio
    
    def get_timeshift_params(self):
        ## See `get_data` in tensorflow/examples/speech_commands/input_data.py
        time_shift = self.augmentation_meta['time_shift_ms']
        background_frequency = self.augmentation_meta['background_frequency']
        background_volume_range = self.augmentation_meta['background_volume_range']
        
        time_shift_amount = np.random.randint(-time_shift, time_shift) if time_shift > 0 else 0
        if time_shift_amount > 0:
            time_shift_padding = [[time_shift_amount, 0], [0,0]]
            time_shift_offset = [0,0]
        else:
            time_shift_padding = [[0,-time_shift_amount], [0,0]]
            time_shift_offset = [-time_shift_amount, 0]
            
        return time_shift_padding, time_shift_offset
    
    def get_random_background(self, label):
        ## See `get_data` in tensorflow/examples/speech_commands/input_data.py
        background_sample = random.choice(self.background_data)

        background_offset = np.random.randint(0, len(background_sample) - self.audio_meta['desired_samples'])
        background_clipped = background_sample[background_offset:(background_offset + self.audio_meta['desired_samples'])]
        background_reshaped = tf.reshape(background_clipped, [self.audio_meta['desired_samples'],1])
        
        if label == 'silence':
            background_volume = np.random.uniform(0,1)
        elif np.random.uniform(0,1) < self.augmentation_meta['background_frequency']:
            background_volume = np.random.uniform(0, self.augmentation_meta['background_volume_range'])
        else:
            background_volume = 0

        background_mul = tf.multiply(background_reshaped, background_volume)
        return background_mul
    
    def get_audio(self, fn, label):
        """
        Adds random background to audio and shifts it a bit back or forth if in 'training mode', 
        returns background-only if label is `silence`.
        """
        if self.is_validation and label != 'silence':
            return self.load_audio(fn)
        background_mul = self.get_random_background(label)
        if label == 'silence':
            return background_mul
        
        ## See `prepare_processing_graph` in tensorflow/examples/speech_commands/input_data.py
        foreground = self.load_audio(fn)
        time_shift_padding, time_shift_offset = self.get_timeshift_params()
        
        padded_foreground = tf.pad(tensor = foreground, paddings = time_shift_padding, mode = 'CONSTANT')
        sliced_foreground = tf.slice(padded_foreground, time_shift_offset, [self.audio_meta['desired_samples'], -1])
        background_add = tf.add(background_mul, sliced_foreground)
        background_clamp = tf.clip_by_value(background_add, -1., 1.)
        
        return background_clamp
        
    def get_spectrogram(self, audio):
        ## See `prepare_processing_graph` in tensorflow/examples/speech_commands/input_data.py
        int_16_input = tf.cast(tf.multiply(audio, 32768), tf.int16)
        micro_frontend = frontend_op.audio_microfrontend(
            int_16_input,
            sample_rate = self.audio_meta['sample_rate'],
            window_size = self.audio_meta['window_size_ms'],
            window_step = self.audio_meta['window_stride'],
            num_channels = self.audio_meta['feature_bin_count'],
            out_scale = 1,
            out_type = tf.float32
        )
        flat_spectro = tf.multiply(micro_frontend, (10. / 256.)).numpy().flatten()
        return flat_spectro
    
    def build_augments(self):
        ## Uses the audiomentations library. Check https://github.com/iver56/audiomentations for further details.
        augs = audiomentations.Compose([
            audiomentations.ClippingDistortion(max_percentile_threshold=20, p=.5),
            audiomentations.HighPassFilter(min_cutoff_freq=1000, p=.3),
            audiomentations.LowPassFilter(min_cutoff_freq=1000, p=.3),
            audiomentations.GainTransition(min_gain_in_db=-12,max_gain_in_db=12,min_duration=0.1,max_duration=0.9,duration_unit='fraction',p=.5),
            audiomentations.PitchShift(min_semitones=-1, max_semitones=1, p=.3),
            audiomentations.SevenBandParametricEQ(p=.5),
            audiomentations.PolarityInversion(p=0.3),
            audiomentations.TimeMask(p=.3),
            audiomentations.AddGaussianNoise(max_amplitude=0.005, p=0.3),
        ])
        return augs

### Retreive files

To hit a certain percentage of `unknown` / `silent` labels we use the following calculation:
If $p_s$ is the percentage of `silent` and $p_u$ is the percentage of `unknown` labels in the __finished__ dataset than this means 
$$
p_s = \frac{n_s}{n + n_s + n_u}, \quad
p_u = \frac{n_u}{n + n_s + n_u}
$$ where $n$ is the total number of instances $n_s$ those of label `silence`, $n_u$ those of label `unknown`. Solving the equations for $n_s$ and $n_u$ gives:
$$
n_s = \frac{p_s \cdot (n + \frac{p_u \cdot n}{1-p_u})}{1 - p_s - \frac{p_s \cdot p_u}{1-p_u}}, \quad
n_u = \frac{p_u \cdot (n + n_s)}{1 - p_u}.
$$
The function below does this calculation and returns the next bigger integers. The rounding is for computational stability (e.g. 20.0000000001 should be 20 not 21).

In [None]:
def calc_unknown_silent_n(n, p_s, p_u):
    n_s = (p_s * (n+ (p_u * n)/(1-p_u))) / (1 - ((p_s * p_u)/(1-p_u)) - p_s)
    n_u = (p_u * (n+n_s)/(1-p_u))
    return math.ceil(round(n_s, 3)), math.ceil(round(n_u, 3))

In [None]:
def get_fns(path, wanted_words, excluded_words = [], desired_samples = 16_000, val_pct = 0.2, silent_pct = 0.2, unknown_pct = 0.2, seed = None):
    """
    path:           Where to search for *.wav files
    wanted_words:   The keywords that you want to detect with your model
    excluded_wods:  Words that should not be used in training. 
                    Words that are neither wanted nor excluded will be used as unknown_words
    val_pct:        Percentage of files that should be used for validation
    unknown_pct:    Percentage of the train/val split that are unknown
    silent_pct:     Percentage of the train/val split that are labeled silent
    """
    wanted_fns_dict = {}
    unknown_fns = []
    background_fns = []
    ## Get all but excluded .wav files contained at the provided path and add them to the appropriate list
    wavs = glob.glob(os.path.join(path,'*','*.wav'))
    for fn in wavs:
        folder = os.path.split(os.path.dirname(fn))[-1]
        try: ## excepts if the audio file has a different number of samples as `desired_samples`
            tf.audio.decode_wav(contents = tf.io.read_file(fn), desired_channels = 1, desired_samples = desired_samples)
        except:
            continue
        if folder in excluded_words:
            continue
        if folder == '_background_noise_':
            background_fns.append(fn)
        elif folder in wanted_words:
            ## Creates a list containing fn at keyword if the keyword isn't contained in the dict yet,
            ## else adds fn to list.
            if wanted_fns_dict.get(folder, False):
                wanted_fns_dict[folder].append(fn)
            else:
                wanted_fns_dict[folder] = [fn]
        else:
            unknown_fns.append(fn)
            
    ## Split wanted/unknown in training and validation, for each wanted words: val_pct of the total number of 
    ## per word fns are in the validation set (1-val_pct) in the training set.
    training_words = []
    validation_words = []
    for key in wanted_fns_dict.keys():
        word_fns = wanted_fns_dict[key]
        random.shuffle(word_fns)
        n_val_word = int(len(word_fns) * val_pct)
        validation_words.extend(word_fns[:n_val_word])
        training_words.extend(word_fns[n_val_word:])
    
    ## Calcs number of silent/unknown for train/val split to hit a certain percentage
    n_silent_train, n_unknown_train = calc_unknown_silent_n(len(training_words), silent_pct, unknown_pct)
    n_silent_val, n_unknown_val = calc_unknown_silent_n(len(validation_words), silent_pct, unknown_pct)
    
    ## Keep validation determenistic
    validation_unknown = unknown_fns[:n_unknown_train]
    non_validation_unknown = unknown_fns[n_unknown_train:]
    ## Pick training unknowns at random
    random.shuffle(non_validation_unknown)
    training_unknown = non_validation_unknown[:n_unknown_train]
    
    training_fns = training_words + training_unknown + ['silence_placeholder'] * n_silent_train
    validation_fns = validation_words + validation_unknown + ['silence_placeholder'] * n_silent_val
        
    random.shuffle(training_fns)
    random.shuffle(validation_fns)
    random.shuffle(background_fns)
    
    return training_fns, validation_fns, background_fns

In [None]:
training_fns, validation_fns, background_fns = get_fns(meta_dict['training']['data_path'],
                                                       meta_dict['training']['wanted_words'],
                                                       desired_samples = meta_dict['audio']['desired_samples']
                                                      )
training_ds = KeywordDataset(
    training_fns,
    background_fns,
    meta_dict,
    meta_dict['training']['batch_size'],
    is_validation = False
)
validation_ds = KeywordDataset(
    validation_fns,
    background_fns,
    meta_dict,
    meta_dict['training']['batch_size'] * 2, ## a bigger batch size is possible since no gradients are used
    is_validation = True
)

2022-08-16 10:52:50.684358: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:975] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-08-16 10:52:50.711767: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:975] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-08-16 10:52:50.711974: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:975] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-08-16 10:52:50.730458: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags

# Build model (from pretrained)

In [None]:
## # Sequential API: doesn't work
## model = tf.keras.Sequential([
##     tf.keras.layers.Reshape((spectrogram_lenght,feature_bin_count,1), input_shape = (fingerprint_size, )),
##     tf.keras.layers.Conv2D(filters = 8, kernel_size = (8,10), strides = (2,2), padding = 'same', activation="relu"),
##     ## tf.keras.layers.Dropout(0.5),
##     ## tf.keras.layers.Flatten(),
##     tf.keras.layers.Reshape((4000,)),
##     tf.keras.layers.Dense(5, activation = "softmax"),
## ])

In [None]:
## Functional API: does work!
def create_model(arch, n_labels, meta_dict, dropout = 0.5):
    """
    arch: One of `tiny_conv`, `tiny_embedding_conv`, `small_cnov`
    n_labels: Number of output nodes, corresponding to the number of labels
    
    Implementation of `tiny_conv` and `tiny_embedding_conv` copies tensorflow/speech_commands/models.py,
    """
    fingerprint_size = meta_dict['audio']['fingerprint_size']
    spectrogram_length = meta_dict['audio']['spectrogram_lenght']
    feature_bin_count = meta_dict['audio']['feature_bin_count']
    
    inputs = tf.keras.Input(shape = (fingerprint_size,))
    x = tf.keras.layers.Reshape(target_shape = [-1, spectrogram_length, feature_bin_count, 1])(inputs)
    if arch == 'tiny_conv':
        ## Returns the same model as create_tiny_conv_model in tensorflow/speech_commands/models.py
        x = tf.keras.layers.Conv2D(filters = 8, 
                                   kernel_size = (8, 10), 
                                   strides = (2, 2), 
                                   padding = 'same', 
                                   activation = 'relu')(x)
        x = tf.keras.layers.Dropout(dropout)(x)
                
    if arch == 'tiny_embedding_conv':
        ## Returns the same model as create_tiny_embedding_conv_model in tensorflow/speech_commands/models.py
        x = tf.keras.layers.Conv2D(filters = 8,
                                  kernel_size = (8,10),
                                  strides = (2, 2),
                                  padding = 'same',
                                  activation = 'relu')(x)
        x = tf.keras.layers.Dropout(dropout)(x)
        x = tf.keras.layers.Conv2D(filters = 8,
                                  kernel_size = (8,10),
                                  strides = (8,8),
                                  padding = 'same',
                                  activation = 'relu')(x)
        x = tf.keras.layers.Dropout(dropout)(x)
        
    if arch == 'small_conv':
        ## Add a same-size convolution then downsample
        x = tf.keras.layers.Conv2D(filters = 16,
                                   kernel_size = (3,5),
                                   strides = (1,1),
                                   padding = 'same',
                                   activation = 'relu')(x)
        x = tf.keras.layers.Dropout(dropout)(x)
        x = tf.keras.layers.Conv2D(filters = 8, 
                                   kernel_size = (8, 10), 
                                   strides = (2, 2), 
                                   padding = 'same', 
                                   activation = 'relu')(x)
        x = tf.keras.layers.Dropout(dropout)(x)
        
    x = tf.keras.layers.Flatten()(x)    
    out = tf.keras.layers.Dense(n_labels, activation = 'softmax')(x)
    return tf.keras.Model(inputs = inputs, outputs = out)

def get_model(n_labels, meta_dict, arch = 'tiny_conv', dropout = 0.5, pretrain_path = False):
    ## When loading from pretrained model, remove the last, dense layer and replace by a Dense layer with `n_labels` output nodes
    if pretrain_path:
        model = tf.keras.models.load_model(pretrain_path)
        model.trainable = False ## Freezes all but the classification layers
        output = tf.keras.layers.Dense(n_labels, activation = 'softmax')(model.layers[-2].output)
        return tf.keras.Model(inputs = model.input, outputs = output)
    return create_model(arch, n_labels, meta_dict, dropout = 0.5)

In [None]:
n_labels = len(training_ds.vocab)
model = get_model(n_labels, meta_dict, arch='small_conv')
model.summary()

Model: "model_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_3 (InputLayer)        [(None, 1960)]            0         
                                                                 
 reshape_2 (Reshape)         (None, 1, 49, 40, 1)      0         
                                                                 
 conv2d_4 (Conv2D)           (None, 1, 49, 40, 16)     256       
                                                                 
 dropout_4 (Dropout)         (None, 1, 49, 40, 16)     0         
                                                                 
 conv2d_5 (Conv2D)           (None, 1, 25, 20, 8)      10248     
                                                                 
 dropout_5 (Dropout)         (None, 1, 25, 20, 8)      0         
                                                                 
 flatten_2 (Flatten)         (None, 4000)              0   

# Train model and save result

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate = meta_dict['training']['learning_rate']),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(), 
    metrics = ['accuracy']
)

In [None]:
history = model.fit(
    training_ds,
    validation_data = validation_ds,
    epochs = 25,
    verbose = 1,
    shuffle = False, ## is handled by dataloader
)

Epoch 1/25
Epoch 2/25
Epoch 3/25
Epoch 4/25
Epoch 5/25
Epoch 6/25
Epoch 7/25
Epoch 8/25
Epoch 9/25
Epoch 10/25
Epoch 11/25
Epoch 12/25
Epoch 13/25
Epoch 14/25
Epoch 15/25
Epoch 16/25
Epoch 17/25
Epoch 18/25
Epoch 19/25
Epoch 20/25
Epoch 21/25
Epoch 22/25
Epoch 23/25
Epoch 24/25
Epoch 25/25


In [None]:
model.save('saved_model/pretrain_small_augs_25epo')



INFO:tensorflow:Assets written to: saved_model/pretrain_small_augs_25epo/assets


INFO:tensorflow:Assets written to: saved_model/pretrain_small_augs_25epo/assets


# Load pretrained model and fine tune
Repeat the process above with the desired words.

In [None]:
meta_dict = get_meta(
    data_path = os.path.join(os.path.expanduser("~"),'coding_data/keyword_detection_nano/dataset/'),
    wanted_words = ['licht', 'aus', 'party'],
)

In [None]:
training_fns, validation_fns, background_fns = get_fns(meta_dict['training']['data_path'],
                                                       meta_dict['training']['wanted_words'])

training_ds = KeywordDataset(
    training_fns,
    background_fns,
    meta_dict,
    meta_dict['training']['batch_size'],
    is_validation = False
)
validation_ds = KeywordDataset(
    validation_fns,
    background_fns,
    meta_dict,
    meta_dict['training']['batch_size'] * 2, ## a bigger batch size is possible since no gradients are used
    is_validation = True
)

2022-08-16 18:34:32.542471: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:975] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-08-16 18:34:32.579357: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:975] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-08-16 18:34:32.579565: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:975] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-08-16 18:34:32.580144: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags

`get_model` loads the pretrained model, removes the last layer and adds in a new Dense layer.

In [None]:
n_labels = len(training_ds.vocab)
fine_tune_model = get_model(n_labels, 
                            meta_dict, 
                            arch = 'small_conv', 
                            dropout = 0.5, 
                            pretrain_path = 'saved_model/pretrain_small_augs_25epo'
                           )
fine_tune_model.summary()

Model: "model_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_3 (InputLayer)        [(None, 1960)]            0         
                                                                 
 reshape_2 (Reshape)         (None, 1, 49, 40, 1)      0         
                                                                 
 conv2d_4 (Conv2D)           (None, 1, 49, 40, 16)     256       
                                                                 
 dropout_4 (Dropout)         (None, 1, 49, 40, 16)     0         
                                                                 
 conv2d_5 (Conv2D)           (None, 1, 25, 20, 8)      10248     
                                                                 
 dropout_5 (Dropout)         (None, 1, 25, 20, 8)      0         
                                                                 
 flatten_2 (Flatten)         (None, 4000)              0   

In [None]:
fine_tune_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate = meta_dict['training']['learning_rate']),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(), 
    metrics = ['accuracy']
)

## Fine tune classification layer only

In [None]:
fine_tune_history_frozen = fine_tune_model.fit(
    training_ds,
    validation_data = validation_ds,
    epochs = 5,
    verbose = 1,
    shuffle = False, ## is handled by dataloader
)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


## Fine tune all layers

In [None]:
fine_tune_model.traiable = True
fine_tune_history_thawn = fine_tune_model.fit(
    training_ds,
    validation_data = validation_ds,
    epochs = 20,
    verbose = 1,
    shuffle = False, ## is handled by dataloader
)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


# Export as Tensorflow Lite model

In [None]:
export_name = 'functional_fromPre_wAugs'
export_dir = f'saved_model/{export_name}'
#tf.saved_model.save(fine_tune_model, export_dir)

In [None]:
REP_DATA_SIZE = 100
def representative_dataset_gen():
    for i in range(REP_DATA_SIZE):
            fn = random.choice(validation_ds.items)
            label = validation_ds.get_label(fn)
            audio = validation_ds.get_audio(fn, label)
            spectro = validation_ds.get_spectrogram(audio).reshape(1,1960)
            
            yield [spectro]

In [None]:
converter = tf.lite.TFLiteConverter.from_saved_model(export_dir)

converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.inference_input_type = tf.compat.v1.lite.constants.INT8  #tf.int8#
converter.inference_output_type = tf.compat.v1.lite.constants.INT8 #tf.int8#

converter.representative_dataset = representative_dataset_gen
tflite_model = converter.convert()
tflite_model_size = open(f"models/{export_name}_v1int8.tflite", "wb").write(tflite_model)
print(f"Quantized modelsize: {tflite_model_size}")

2022-08-16 18:35:12.873350: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:362] Ignored output_format.
2022-08-16 18:35:12.873376: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:365] Ignored drop_control_dependency.
2022-08-16 18:35:12.873955: I tensorflow/cc/saved_model/reader.cc:43] Reading SavedModel from: saved_model/functional_fromPre_wAugs
2022-08-16 18:35:12.875582: I tensorflow/cc/saved_model/reader.cc:81] Reading meta graph with tags { serve }
2022-08-16 18:35:12.875595: I tensorflow/cc/saved_model/reader.cc:122] Reading SavedModel debug info (if present) from: saved_model/functional_fromPre_wAugs
2022-08-16 18:35:12.879805: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:354] MLIR V1 optimization pass is not enabled
2022-08-16 18:35:12.880740: I tensorflow/cc/saved_model/loader.cc:228] Restoring SavedModel bundle.
2022-08-16 18:35:12.927856: I tensorflow/cc/saved_model/loader.cc:212] Running initialization op on SavedModel 

Quantized modelsize: 35136


In [None]:
!xxd -i models/{export_name}.tflite > models/{export_name}.cc
#REPLACE_TEXT = MODEL_TFLITE.replace('/', '_').replace('.', '_')
#!sed -i 's/'{REPLACE_TEXT}'/g_model/g' {MODEL_TFLITE_MICRO}

In [None]:
!code models/{export_name}.cc