## In this Notebook, We will convert the model artifact, created in previous notbooks and added to S3 Model artifact, to a TF lite model 
### Which is a nessecry step to deploy the model on the PSoC 6 board 

In [24]:
import boto3
import sagemaker
import pathlib
from sagemaker import get_execution_role, Session

region = boto3.session.Session().region_name
role = get_execution_role()
sess = Session()

def decode_audio(audio_binary):
  # Decode WAV-encoded audio files to `float32` tensors, normalized
  # to the [-1.0, 1.0] range. Return `float32` audio and a sample rate.
  audio, _ = tf.audio.decode_wav(contents=audio_binary)
  # Since all the data is single channel (mono), drop the `channels`
  # axis from the array.
  return tf.squeeze(audio, axis=-1)

def get_label(file_path):
  parts = tf.strings.split(
      input=file_path,
      sep=os.path.sep)
  # Note: You'll use indexing here instead of tuple unpacking to enable this
  # to work in a TensorFlow graph.
  return parts[-2]

def get_waveform_and_label(file_path):
  label = get_label(file_path)
  audio_binary = tf.io.read_file(file_path)
  waveform = decode_audio(audio_binary)
  return waveform, label

def get_spectrogram(waveform):
  # Zero-padding for an audio waveform with less than 16,000 samples.
  input_len = 16000
  waveform = waveform[:input_len]
  zero_padding = tf.zeros(
      [16000] - tf.shape(waveform),
      dtype=tf.float32)
  # Cast the waveform tensors' dtype to float32.
  waveform = tf.cast(waveform, dtype=tf.float32)
  # Concatenate the waveform with `zero_padding`, which ensures all audio
  # clips are of the same length.
  equal_length = tf.concat([waveform, zero_padding], 0)
  # Convert the waveform to a spectrogram via a STFT.
  spectrogram = tf.signal.stft(
      equal_length, frame_length=255, frame_step=128)
  # Obtain the magnitude of the STFT.
  spectrogram = tf.abs(spectrogram)
  # Add a `channels` dimension, so that the spectrogram can be used
  # as image-like input data with convolution layers (which expect
  # shape (`batch_size`, `height`, `width`, `channels`).
  spectrogram = spectrogram[..., tf.newaxis]
  return spectrogram

def get_spectrogram_and_label_id(audio, label):
  spectrogram = get_spectrogram(audio)
  label_id = tf.argmax(label == commands)
  return spectrogram, label_id

def preprocess_dataset(files):
  files_ds = tf.data.Dataset.from_tensor_slices(files)
  output_ds = files_ds.map(
      map_func=get_waveform_and_label,
      num_parallel_calls=AUTOTUNE)
  output_ds = output_ds.map(
      map_func=get_spectrogram_and_label_id,
      num_parallel_calls=AUTOTUNE)
  return output_ds

# Importing smaller data
DATASET_PATH = 'data/mini_speech_commands'

data_dir = pathlib.Path(DATASET_PATH)
if not data_dir.exists():
  tf.keras.utils.get_file(
      'mini_speech_commands.zip',
      origin="http://storage.googleapis.com/download.tensorflow.org/data/mini_speech_commands.zip",
      extract=True,
      cache_dir='.', cache_subdir='data')

commands = np.array(tf.io.gfile.listdir(str(data_dir)))
commands = commands[commands != 'README.md']
commands = commands[commands !='.ipynb_checkpoints']
print('Commands:', commands)

filenames = tf.io.gfile.glob(str(data_dir) + '/*/*')
filenames = tf.random.shuffle(filenames)
num_samples = len(filenames)
print('Number of total examples:', num_samples)
print('Number of examples per label:',
      len(tf.io.gfile.listdir(str(data_dir/commands[1]))))
print('Example file tensor:', filenames[0])

train_files = filenames[:int(0.8*len(filenames))]
val_files = filenames[int(0.8*len(filenames)): int(0.8*len(filenames)) + int(0.1*len(filenames))]
test_files = filenames[-int(0.1*len(filenames)):]

print('Training set size', len(train_files))
print('Validation set size', len(val_files))
print('Test set size', len(test_files))

AUTOTUNE = tf.data.AUTOTUNE

train_ds = preprocess_dataset(train_files)
val_ds = preprocess_dataset(val_files)
test_ds = preprocess_dataset(test_files)

batch_size = 1

train_ds = train_ds.batch(batch_size)
val_ds = val_ds.batch(batch_size)
test_ds_batch = test_ds.batch(batch_size)

Commands: ['yes' 'no']
Number of total examples: 2000
Number of examples per label: 1000
Example file tensor: tf.Tensor(b'data/mini_speech_commands/yes/9b5815cd_nohash_0.wav', shape=(), dtype=string)
Training set size 1600
Validation set size 200
Test set size 200


In [7]:
import argparse
import os
import warnings

import pandas as pd
import pathlib
import numpy as np
import tensorflow as tf

from tensorflow.keras import layers
from tensorflow.keras import models
from IPython import display


# copy a pretrained model from a public bucket to your default bucket
s3 = boto3.client("s3")
bucket = "sagemaker-studio-062044820001-7qbctb3w94p"
key = "Training/models/1/tensorflow-training-2022-04-27-21-00-26-837/output/model.tar.gz"
s3.download_file(bucket, key, "model/model.tar.gz")

# Untar the model
import tarfile

fname = "model/model.tar.gz"
if fname.endswith("tar.gz"):
    tar = tarfile.open(fname, "r:gz")
    tar.extractall("model")
    tar.close()
elif fname.endswith("tar"):
    tar = tarfile.open(fname, "r:")
    tar.extractall("model")
    tar.close()







In [8]:
MODELS_DIR = 'model'

if not os.path.exists(MODELS_DIR):
  os.mkdir(MODELS_DIR)

MODEL_TFLITE = MODELS_DIR + '/tflite/model.tflite'
FLOAT_MODEL_TFLITE = MODELS_DIR + '/tflite/float_model.tflite'
MODEL_TFLITE_MICRO = MODELS_DIR + '/tflite/model.cc'
SAVED_MODEL = MODELS_DIR + '/1'

MODEL_TFLITE

'model/tflite/model.tflite'

### We will be using `TFLiteConverter` to convert TF model to TF-lite model

In [9]:
## Convert the model to the TensorFlow Lite format without quantization

float_converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir=SAVED_MODEL)
float_tflite_model = float_converter.convert()
float_tflite_model_size = open(FLOAT_MODEL_TFLITE, "wb").write(float_tflite_model)
print("Float model is %d bytes" % float_tflite_model_size)

Float model is 6502428 bytes


In [25]:
# Convert the model to the TensorFlow Lite format with quantization

converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir=SAVED_MODEL)

def representative_dataset():
  for data, _ in train_ds.take(500):
    yield [data]
# Set the optimization flag.
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# Enforce integer only quantization
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

# Provide a representative dataset to ensure we quantize correctly.
converter.representative_dataset = representative_dataset

tflite_model = converter.convert()
tflite_model_size = open(MODEL_TFLITE, "wb").write(tflite_model)
print("Quantized model is %d bytes" % tflite_model_size)

Quantized model is 1631416 bytes


In [None]:
# Helper function to run inference
def run_tflite_inference(tflite_model_path, test_ds_batch, model_type="Float" ):

    # Initialize the interpreter
    interpreter = tf.lite.Interpreter(tflite_model_path)
    interpreter.allocate_tensors()

    input_details = interpreter.get_input_details()[0]
    output_details = interpreter.get_output_details()[0]

    test_audio = []
    test_labels = []

    for audio, label in test_ds:
          test_audio.append(audio.numpy())
          test_labels.append(label.numpy())

    
    # For quantized models, manually quantize the input data from float to integer
    if model_type == "Quantized":
        input_scale, input_zero_point = input_details["quantization"]
        test_ds_batch = test_ds_batch / input_scale + input_zero_point
        test_ds_batch = test_ds_batch.astype(input_details["dtype"])

    correct_predictions = 0
    for i in range(len(test_data)):
        interpreter.set_tensor(input_details["index"], test_data[i])
        interpreter.invoke()
        output = interpreter.get_tensor(output_details["index"])[0]
        top_prediction = output.argmax()
        correct_predictions += (top_prediction == test_labels[i])

    print('%s model accuracy is %f%% (Number of test samples=%d)' % (
      model_type, (correct_predictions * 100) / len(test_data), len(test_data)))
    # Compute float model accuracy
# run_tflite_inference(FLOAT_MODEL_TFLITE)

# Compute quantized model accuracy
run_tflite_inference(MODEL_TFLITE, test_ds_batch,model_type='Quantized')

