In [1]:
import argparse
import numpy as np
import os
import pandas as pd
import tensorflow as tf
import tensorflow.lite as tflite
from tensorflow import keras
import zlib
from platform import python_version
import tensorflow_model_optimization as tfmot   
import tempfile
print(f"Python version used to excute the code is {python_version()}")



Python version used to excute the code is 3.7.11


In [2]:
# ######################################################## Input Parameters #########################################################
# parser = argparse.ArgumentParser()
# parser.add_argument('--model', type=str, required=True, help='model name')
# parser.add_argument('--mfcc', action='store_true', help='use MFCCs')
# args = parser.parse_args()

######################################################## Inputs and model selection  #########################################################

In [3]:
version = "a"
m = "cnn"   # model name [ mlp , cnn , ds_cnn  ]
mfcc = True    # True --> excute mfcc , False --> excute STFT
alpha = 0.4    # The width multiplier used to apply the structured Pruning 

model_version = f"_V_{version}_alpha={alpha}"

mymodel = m + model_version
TFLITE =  f'{mymodel}.tflite'                                   # path for saving the best model after converted to TF.lite model 
units = 8                                                       # The number of output class [8:without silence , 9 : with silence]
################## Fix the Random seed to reproduce the same results 
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

In [4]:
# train_files_test = np.loadtxt("kws_train_split.txt" , dtype = str )
# train_files_test[:10]

In [5]:
# train_files = tf.convert_to_tensor(np.loadtxt("kws_train_split.txt" , dtype = str ))
# train_files

In [6]:
# train_files_test= tf.data.TextLineDataset("kws_train_split.txt")

In [7]:
# for line in train_files_test.take(5):
#     print(line)

In [8]:



zip_path = tf.keras.utils.get_file(
    origin="http://storage.googleapis.com/download.tensorflow.org/data/mini_speech_commands.zip",
    fname='mini_speech_commands.zip',
    extract=True,
    cache_dir='.', cache_subdir='data')

data_dir = os.path.join('.', 'data', 'mini_speech_commands')

# filenames = tf.io.gfile.glob(str(data_dir) + '/*/*')
# filenames = tf.random.shuffle(filenames)
# num_samples = len(filenames)

# total = 8000          # NUMBER OF TOTAL Files Including THE Silence records 

# train_files =   filenames[: int(total*0.8)]                                             # filenames[:int(total*0.8)]
# val_files = filenames[int(total*0.8): int(total*0.9)]
# test_files = filenames[int(total*0.9):]

train_files = tf.convert_to_tensor(np.loadtxt("kws_train_split.txt" , dtype = str ))
val_files = tf.convert_to_tensor(np.loadtxt("kws_val_split.txt" , dtype = str ) )
test_files = tf.convert_to_tensor(np.loadtxt("kws_test_split.txt" , dtype = str ))

# with silence ['stop', 'up', 'yes', 'right', 'left', 'no', 'silence', 'down', 'go']
LABELS = np.array(['stop', 'up', 'yes', 'right', 'left', 'no',  'down', 'go'] , dtype = str) 
print (f"The LABELS order as provided to the model are {LABELS}")

The LABELS order as provided to the model are ['stop' 'up' 'yes' 'right' 'left' 'no' 'down' 'go']


In [9]:
# # test_files = tf.convert_to_tensor(test_files)
# # tf.shape(test_files)
# tf.shape(test_files)
# test_files[0]

In [10]:
# parts = tf.strings.split(test_files[0], "/")

In [11]:
# parts[-2]

######################################################## Create the SignalGenerator #########################################################

In [12]:

class SignalGenerator:
    def __init__(self, labels, sampling_rate, frame_length, frame_step,
            num_mel_bins=None, lower_frequency=None, upper_frequency=None,
            num_coefficients=None, mfcc=False):
        self.labels = labels
        self.sampling_rate = sampling_rate                                             # 16000  
        self.frame_length = frame_length                                               # 640 
        self.frame_step = frame_step                                                   # 320 
        self.num_mel_bins = num_mel_bins                                               # 40 
        self.lower_frequency = lower_frequency                                         # 20 
        self.upper_frequency = upper_frequency                                         # 4000
        self.num_coefficients = num_coefficients                                       # 10 
        num_spectrogram_bins = (frame_length) // 2 + 1                                  # ( frame size // 2 ) + 1 

        '''
        STFT_OPTIONS = {'frame_length': 256, 'frame_step': 128, 'mfcc': False}
        MFCC_OPTIONS = {'frame_length': 640, 'frame_step': 320, 'mfcc': True,
        'lower_frequency': 20, 'upper_frequency': 4000, 'num_mel_bins': 40,
        'num_coefficients': 10}
        '''

        if mfcc is True:                                                                # Remember we need to compute this matrix once so it will be a class argument 
            self.linear_to_mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(
                    self.num_mel_bins, num_spectrogram_bins, self.sampling_rate,
                    self.lower_frequency, self.upper_frequency)
            self.preprocess = self.preprocess_with_mfcc
        else:
            self.preprocess = self.preprocess_with_stft

    def read(self, file_path):
        parts = tf.strings.split(file_path, "/")
        label = parts[-2]                                  # -1 is audio.wav so 
        label_id = tf.argmax(label == self.labels)
        audio_binary = tf.io.read_file(file_path)
        audio, _ = tf.audio.decode_wav(audio_binary)
        audio = tf.squeeze(audio, axis=1)

        return audio, label_id

    def pad(self, audio):
        # Padding for files with less than 16000 samples
        zero_padding = tf.zeros([self.sampling_rate] - tf.shape(audio), dtype=tf.float32)     # if the shape of the audio is already = 16000 (sampling rate) we will add nothing 

        # Concatenate audio with padding so that all audio clips will be of the  same length
        audio = tf.concat([audio, zero_padding], 0)
        # Unify the shape to the sampling frequency (16000 , )
        audio.set_shape([self.sampling_rate])

        return audio

    def get_spectrogram(self, audio):
        stft = tf.signal.stft(audio, frame_length=self.frame_length,
                frame_step=self.frame_step, fft_length=self.frame_length)
        spectrogram = tf.abs(stft)

        return spectrogram

    def get_mfccs(self, spectrogram):
        mel_spectrogram = tf.tensordot(spectrogram,
                self.linear_to_mel_weight_matrix, 1)
        log_mel_spectrogram = tf.math.log(mel_spectrogram + 1.e-6)
        mfccs = tf.signal.mfccs_from_log_mel_spectrograms(log_mel_spectrogram)
        mfccs = mfccs[..., :self.num_coefficients]

        return mfccs

    def preprocess_with_stft(self, file_path):
        audio, label = self.read(file_path)
        audio = self.pad(audio)
        spectrogram = self.get_spectrogram(audio)
        spectrogram = tf.expand_dims(spectrogram, -1)                         # expand_dims will not add or reduce elements in a tensor, it just changes the shape by adding 1 to dimensions for the batchs. 
    
        spectrogram = tf.image.resize(spectrogram, [32, 32])

        return spectrogram, label

    def preprocess_with_mfcc(self, file_path):
        audio, label = self.read(file_path)
        audio = self.pad(audio)
        spectrogram = self.get_spectrogram(audio)
        mfccs = self.get_mfccs(spectrogram)
        mfccs = tf.expand_dims(mfccs, -1)

        return mfccs, label

    def make_dataset(self, files, train):
        ds = tf.data.Dataset.from_tensor_slices(files)
        ds = ds.map(self.preprocess, num_parallel_calls = tf.data.experimental.AUTOTUNE) # better than 4 tf.data.experimental.AUTOTUNE will use the maximum num_parallel_calls 
        ds = ds.batch(32)
        ds = ds.cache()
        ds = ds.prefetch(tf.data.experimental.AUTOTUNE)
        if train is True:
            ds = ds.shuffle(100, reshuffle_each_iteration=True)

        return ds


######################################################## Options for MFCC & STFT #########################################################

In [13]:
STFT_OPTIONS = {'frame_length': 256, 'frame_step': 128, 'mfcc': False}
MFCC_OPTIONS = {'frame_length': 640, 'frame_step': 320, 'mfcc': True,                   #'lower_frequency': 40, 'upper_frequency': 2700, 'num_mel_bins': 32
        'lower_frequency': 32, 'upper_frequency': 2700, 'num_mel_bins': 32,
        'num_coefficients': 10}
if mfcc is True:
    options = MFCC_OPTIONS
    strides = [2, 1]
else:
    options = STFT_OPTIONS
    strides = [2, 2]



######################################################## Generate Data set splits #########################################################

In [14]:
generator = SignalGenerator(LABELS, 16000, **options)
train_ds = generator.make_dataset(train_files, True)
val_ds = generator.make_dataset(val_files, False)
test_ds = generator.make_dataset(test_files, False)

In [15]:
############## checking shapes and values of data sets 

In [27]:
it = iter(val_ds)

In [29]:
inp , label = next(it)
print(inp.shape)
print(label)

(32, 49, 10, 1)
tf.Tensor([5 7 2 1 7 4 5 0 5 0 2 4 6 7 3 4 5 4 0 6 3 6 1 2 6 5 1 6 4 2 5 0], shape=(32,), dtype=int64)


##### building the models ########################################################

In [None]:


mlp = tf.keras.Sequential([
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(units = int(256 *alpha), activation='relu' , name =  "Dense-1" ),
    tf.keras.layers.Dense(units = int(256 *alpha), activation='relu', name =  "Dense-2"),
    tf.keras.layers.Dense(units = int(256 *alpha), activation='relu', name =   "Dense-3" ),
    tf.keras.layers.Dense(units = units , name =  "Output-Layer")                                   # change to 9 if silence included 
])

cnn = tf.keras.Sequential([
    tf.keras.layers.Conv2D(filters=int(128 *alpha), kernel_size=[3,3], strides=strides, use_bias=False , name = "Conv2D-1"),
    tf.keras.layers.BatchNormalization(momentum=0.1 , name = "Btch_Norm-1"),
    tf.keras.layers.ReLU(),
    tf.keras.layers.Conv2D(filters=int(128 *alpha), kernel_size=[3,3], strides=[1,1], use_bias=False , name = "Conv2D-2"),
    tf.keras.layers.BatchNormalization(momentum=0.1 , name = "Btch_Norm-2"),
    tf.keras.layers.ReLU(),
    tf.keras.layers.Conv2D(filters=int(128 *alpha), kernel_size=[3,3], strides=[1,1], use_bias=False , name = "Conv2D-3"),
    tf.keras.layers.BatchNormalization(momentum=0.1 , name = "Btch_Norm-3"),
    tf.keras.layers.ReLU(),
    tf.keras.layers.GlobalAveragePooling2D( name =  "GlobalAveragePooling-Layer"),
    tf.keras.layers.Dense(units = units, name =  "Output-Layer")
])

ds_cnn = tf.keras.Sequential([
    tf.keras.layers.Conv2D(filters=int(256 *alpha), kernel_size=[3,3], strides=strides, use_bias=False, name = "Conv2D-1"),
    tf.keras.layers.BatchNormalization(momentum=0.1),
    tf.keras.layers.ReLU(),
    tf.keras.layers.DepthwiseConv2D(kernel_size=[3, 3], strides=[1, 1], use_bias=False, name = "DepthwiseConv2D-1"),
    tf.keras.layers.Conv2D(filters=int(256 *alpha), kernel_size=[1,1], strides=[1,1], use_bias=False, name = "Conv2D-2"),
    tf.keras.layers.BatchNormalization(momentum=0.1),
    tf.keras.layers.ReLU(),
    tf.keras.layers.DepthwiseConv2D(kernel_size=[3, 3], strides=[1, 1], use_bias=False, name = "DepthwiseConv2D-2"),
    tf.keras.layers.Conv2D(filters=int(256 *alpha), kernel_size=[1,1], strides=[1,1], use_bias=False, name = "Conv2D-3"),
    tf.keras.layers.BatchNormalization(momentum=0.1),
    tf.keras.layers.ReLU(),
    tf.keras.layers.GlobalAveragePooling2D( name =  "GlobalAveragePooling-Layer"),
    tf.keras.layers.Dense(units = units, name =  "Output-Layer")
])


MODELS = {'mlp'+ model_version : mlp, 'cnn'+ model_version: cnn, 'ds_cnn'+ model_version: ds_cnn}
print(MODELS.keys())

######################################################## Define optimizer & Losses & Metrics ########################################################

In [None]:

model = MODELS[mymodel]              # initiate the selected model 

loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.optimizers.Adam()
metrics = [tf.keras.metrics.SparseCategoricalAccuracy()]


################### Compiling the model :

model.compile(loss = loss, optimizer = optimizer, metrics = metrics)

######################################################## check points depending on preprocessing STFT , MFCC 
if mfcc is False:
    checkpoint_filepath = f'./checkpoints/stft/chkp_best_{mymodel}'

else:
    checkpoint_filepath = f'./checkpoints/mfcc/chkp_best_{mymodel}'
    
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,           
    monitor='val_sparse_categorical_accuracy',
    verbose=1,
    mode='max',
    save_best_only=True,
    save_freq='epoch')

In [None]:
######################################################## Model Training ########################################################

history = model.fit(train_ds, epochs=20,   validation_data=val_ds,callbacks=[model_checkpoint_callback ])

############################## Print Model Summary ####################
print(model.summary())    


In [None]:
import matplotlib.pyplot as plt
def plot_loss(history):
    plt.plot(history.history['sparse_categorical_accuracy'], label='Accuracy')
    plt.plot(history.history['val_sparse_categorical_accuracy'], label='val_Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True)
    plt.savefig(mymodel+".png")

plot_loss(history)

In [None]:
######################################################## Function To Evaluate the best model and convert to TF Lite ########################################################

def S_pruning_Model_evaluate_and_compress_to_TFlite( tflite_model_dir =  TFLITE , checkpoint_filepath = checkpoint_filepath ):
    if not os.path.exists('./models'):
        os.makedirs('./models')
    best_model = tf.keras.models.load_model(filepath = checkpoint_filepath )
    Loss , ACCURACY = best_model.evaluate(test_ds)
    print("*"*50,"\n",f" The accuracy achieved by the best model before convertion = {ACCURACY *100:0.2f}% ")
    # Convert to TF lite without Quantization 
    converter = tf.lite.TFLiteConverter.from_saved_model(checkpoint_filepath)
    tflite_model = converter.convert()  
    Compressed = "compressed_"+tflite_model_dir 
    tflite_model_dir = './models/'+tflite_model_dir
    # Write the model in binary formate and save it 
    with open(tflite_model_dir, 'wb') as fp:
        fp.write(tflite_model)
    Compressed = './models/'+Compressed
    with open(Compressed, 'wb') as fp:
        tflite_compressed = zlib.compress(tflite_model)
        fp.write(tflite_compressed)
    print("*"*50,"\n",f"the model is saved successfuly to {tflite_model_dir}")
    return Compressed , tflite_model_dir 

In [None]:
def getsize(file):
    st = os.stat(file)
    size = st.st_size
    return size

######################################################## Function To Load  Evaluate the TF Lite  Model ########################################################

def load_and_evaluation(path, dataset , Compressed):
    interpreter = tf.lite.Interpreter(model_path = path) 
    interpreter.allocate_tensors()
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    dataset = test_ds.unbatch().batch(1)
    
    COMMANDS = ['stop', 'up', 'yes', 'right', 'left', 'no',  'down', 'go']
    
    outputs = []
    labels = []
    count = 0                                 # counter to compute the number of correct predictions 
    total = 0                                 # total number of samples / predictions ==> acc = count/total
    
    for inp , label in dataset:
        my_input = np.array(inp, dtype = np.float32)
        label = np.array(label, dtype = np.float32)
    
         
        labels.append(label)

        interpreter.set_tensor(input_details[0]['index'], my_input)
        interpreter.invoke()
        my_output = interpreter.get_tensor(output_details[0]['index'])
        predict = np.argmax(my_output)                                 # the prediction crossponds to the index of with the highest probability   
        outputs.append(predict)
        total += 1   
        if (predict == label):                                         # if probability == labesl increase the correct predictions counter 
            count += 1
    # Compute the Accuracy         
    accuracy = count/total 
    # Evaluate the size of Tflite model 
    size = getsize(path)
    # Evaluate the size of Tflite model  after Comperession 
    size_compressed = getsize(Compressed)
    print ("*"*50,"\n",f"The Size of TF lite model  Before compression is = {size /1000 } kb" )
    print ("*"*50,"\n",f"The Size of TF lite model  After compression is = {size_compressed /1000 } kb" )
    print ("*"*50,"\n",f"The accuracy of TF lite model is = {accuracy *100 :0.2f}% " )
    




######################################################## Generate Representitive data for Weight + activation Quantization ########################################################

In [None]:
# Function for weight and activations quantization 
def representative_dataset_gen():
    for x, _ in train_ds.take(1000):
        yield [x]

In [None]:
######################################################## Apply quantization Function ########################################################

def apply_Quantization(tflite_model_dir =  TFLITE ,  PQT = False , WAPQT = False ,  checkpoint_filepath = checkpoint_filepath ): 

    converter = tf.lite.TFLiteConverter.from_saved_model(checkpoint_filepath)
    
    # Apply weight only quantization 
    if PQT == True :
        tflite_model_dir = f"PQT_{tflite_model_dir}"
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        tflite_model = converter.convert()
    # Apply weight + Activation  quantization 
    if WAPQT == True :
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        converter.representative_dataset = representative_dataset_gen
        tflite_model = converter.convert()
        
        tflite_model_dir = f"WAPQT_{tflite_model_dir}"
    Compressed =  f"compressed_{tflite_model_dir}"
    tflite_model_dir =   f"./models/{tflite_model_dir}"
    # Write the model in binary formate and save it 
    with open(tflite_model_dir, 'wb') as fp:
        fp.write(tflite_model)
    Compressed = f"./models/{Compressed}"
    with open(Compressed, 'wb') as fp:
        tflite_compressed = zlib.compress(tflite_model)
        fp.write(tflite_compressed)
    print(f"the model is saved successfuly to {tflite_model_dir}")
    return Compressed , tflite_model_dir 

### Without Quantization

In [None]:
Compressed , tflite_model_dir = S_pruning_Model_evaluate_and_compress_to_TFlite( tflite_model_dir =  TFLITE , checkpoint_filepath = checkpoint_filepath )

In [None]:
load_and_evaluation(tflite_model_dir, test_ds , Compressed)

### Weights only Quantization 

In [None]:
Compressed , Quantized   = apply_Quantization(PQT=True )

In [None]:
load_and_evaluation(Quantized , test_ds , Compressed)

### Weights + Activation only Quantization 

In [None]:
WA_Compressed , WA_Quantized  = apply_Quantization(WAPQT=True)

In [None]:
load_and_evaluation(WA_Quantized , test_ds , WA_Compressed)

## Quantization aware Training :

In [None]:
import tensorflow_model_optimization as tfmot

Q_aware_checkpoint_filepath = F'Q_aware_chkp_best_{mymodel}'
    
Q_aware_model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=Q_aware_checkpoint_filepath,           
    monitor='val_sparse_categorical_accuracy',
    verbose=1,
    mode='max',
    save_best_only=True,
    save_freq='epoch')

def Quantization_aware_traning(filepath = checkpoint_filepath , checkpoint_callback = Q_aware_model_checkpoint_callback ):

    quantize_model = tfmot.quantization.keras.quantize_model
    
    # Retrieve the best pre_trained model float 32 
    model = tf.keras.models.load_model(filepath = filepath )
    
    # Initiate a Quantization aware model from the Float 32 model to be trained 
    q_aware_model = quantize_model(model)
    
    # Model compile and define loss and metric 
    q_aware_model.compile(loss = loss, optimizer = optimizer, metrics = metrics)
    
    # Train the model for few epochs 
    q_aware_model_history = q_aware_model.fit(train_ds, epochs=10,   validation_data=val_ds,callbacks=[checkpoint_callback ])
    
    ############################## Print Model Summary ####################
    print(model.summary())
    
    # Evaluate the best model 
    best_model = tf.keras.models.load_model(filepath = Q_aware_checkpoint_filepath )
    Loss , ACCURACY = best_model.evaluate(test_ds)
    print("*"*50,"\n",f" The accuracy achieved by the best model before convertion = {ACCURACY *100:0.2f}% ")
    
    

In [None]:
######################################################## Apply quantization Aware Training on the Pre Trained Model ########################################################

Quantization_aware_traning(filepath = checkpoint_filepath , checkpoint_callback = Q_aware_model_checkpoint_callback )

### Quantization Aware model saving

In [None]:
QAT_tflite_model_dir , Q_Aware_T_Compressed = Q_Aware_T_Tflite_save(filepath = Q_aware_checkpoint_filepath)

load_and_evaluation(QAT_tflite_model_dir, test_ds , Q_Aware_T_Compressed)