In [1]:
import os
import pathlib
import zlib
import numpy as np
import tempfile

import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing
from tensorflow.keras import layers
from tensorflow.keras import models
from keras.optimizers.schedules import PolynomialDecay
import keras

import tensorflow_model_optimization as tfmot

In [2]:
from dataset import SignalGenerator

In [3]:
version = 1

In [4]:
def residual_block(x, filters, conv_num=3, activation="relu"):
    # Shortcut
    s = keras.layers.Conv1D(filters, 1, padding="same")(x)
    for i in range(conv_num - 1):
        x = keras.layers.Conv1D(filters, 3, padding="same")(x)
        x = keras.layers.Activation(activation)(x)
    x = keras.layers.Conv1D(filters, 3, padding="same")(x)
    x = keras.layers.Add()([x, s]).9062 
    x = keras.layers.Activation(activation)(x)
    return keras.layers.MaxPool1D(pool_size=2, strides=2)(x)


def build_model(input_shape, num_classes, conv1, conv2, conv3, conv4):
    inputs = keras.layers.Input(shape=input_shape, name="input")
    new_inputs = tf.squeeze(inputs,3)
    print(new_inputs.shape)
    x = residual_block(new_inputs, 16, 2)
    x = residual_block(x, conv1, 2)
    x = residual_block(x, conv2, 3)
    x = residual_block(x, conv3, 3)
    x = residual_block(x, conv4, 3)

    # x = keras.layers.MaxPooling1D()(x)#(pool_size=3, strides=3)(x)
    x = keras.layers.Flatten()(x)
    x = keras.layers.Dense(256, activation="relu")(x)
    x = keras.layers.Dense(128, activation="relu")(x)

    outputs = keras.layers.Dense(num_classes, name="output")(x)

    return keras.models.Model(inputs=inputs, outputs=outputs)

SyntaxError: invalid syntax (<ipython-input-4-d9bca32e7fd5>, line 8)

In [4]:
def weights_clustering(
    in_model,
    out_path,
    epochs=4,
    lr=1e-3,
    n_clusters=12,
    only_dense=False):

    if only_dense:
      def apply_clustering_to_dense(layer):
        if isinstance(layer, tf.keras.layers.Dense):
          return tfmot.clustering.keras.cluster_weights(
              layer,
              number_of_clusters=n_clusters,
              cluster_centroids_init=tfmot.clustering.keras.CentroidInitialization.LINEAR
              )
        return layer

      clustered_model = tf.keras.models.clone_model(
          in_model,
          clone_function = apply_clustering_to_dense)

    else:
      # create model for weight clustering
      clustered_model = tfmot.clustering.keras.cluster_weights(
          in_model,
          number_of_clusters=n_clusters,
          cluster_centroids_init=tfmot.clustering.keras.CentroidInitialization.LINEAR
          )

    # compile model
    clustered_model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=lr),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['sparse_categorical_accuracy']
        )

    # fit model
    clustered_model.fit(
        train_ds,
        batch_size=32,
        epochs=epochs,
        validation_data=val_ds)

    # clustered model evaluation
    _, accuracy = clustered_model.evaluate(test_ds, verbose=0)

    # prepare for export and save
    model_for_export = tfmot.clustering.keras.strip_clustering(clustered_model)
    #zip_size = save_model(model_for_export, out_path)

    return model_for_export, accuracy



# POST TRAINING QUANTIZATION ---------------------------------------------------
def get_accuracy(interpreter, test_dataset):
    input_index = interpreter.get_input_details()[0]["index"]
    output_index = interpreter.get_output_details()[0]["index"]

    running_corrects = 0
    total_elements = 0

    for (batch, labels) in test_dataset:
        total_elements += len(batch)
        for test_sample, label in  zip(batch, labels):
            test_sample = np.expand_dims(test_sample, axis=0).astype(np.float32)
            interpreter.set_tensor(input_index, test_sample)
            interpreter.invoke()
            output = interpreter.get_tensor(output_index)
            pred = np.argmax(output)
            if pred == label:
              running_corrects += 1
  
    return running_corrects/total_elements

def representative_dataset():
  train_set = train_ds.unbatch().take(200)
  img_list = []
  for image in train_set:
    img_list.append(image[0].numpy())
  img_arr = np.array(img_list)

  for data in tf.data.Dataset.from_tensor_slices(img_arr).batch(1).take(100):
    yield [data]

def pt_quantization(in_model, out_path, q_type='float16'):
    converter = tf.lite.TFLiteConverter.from_keras_model(in_model)
    if q_type == 'float16':
      converter.optimizations = [tf.lite.Optimize.OPTIMIZE_FOR_LATENCY]
      converter.target_spec.supported_types = [tf.float16]
    elif q_type == 'int8int16':
      converter.optimizations = [tf.lite.Optimize.OPTIMIZE_FOR_LATENCY]
      converter.representative_dataset = representative_dataset
      converter.target_spec.supported_ops = [tf.lite.OpsSet.EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8]
    else:
      converter.optimizations = [tf.lite.Optimize.DEFAULT]
      converter.representative_dataset = representative_dataset
      
    quant_tflite_model = converter.convert()

    # compressed file
    compressed_model = zlib.compress(quant_tflite_model)
    
    compressed_file, c_filename = tempfile.mkstemp()


    # generate compressed version    
    with open(c_filename, 'wb') as f:
        f.write(compressed_model)
    
    
    interpreter = tf.lite.Interpreter(model_content=quant_tflite_model)
    interpreter.allocate_tensors()
    
    accuracy = get_accuracy(interpreter, test_ds)
    zip_size = os.path.getsize(c_filename)/1024
    os.remove(c_filename)
    
    return accuracy, zip_size, quant_tflite_model

In [5]:
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

data_dir = pathlib.Path('data/mini_speech_commands')
if not data_dir.exists():
  tf.keras.utils.get_file(
      'mini_speech_commands.zip',
      origin="http://storage.googleapis.com/download.tensorflow.org/data/mini_speech_commands.zip",
      extract=True,
      cache_dir='.', cache_subdir='data')

#lista di training
training_list=[]
file=open("kws_train_split.txt")
for line in file:
  training_list.append('.'+line[1:-1])

#lista di validation
validation_list=[]
file=open("kws_val_split.txt")
for line in file:
  validation_list.append('.'+line[1:-1])

# lista di test
test_list=[]
file=open("kws_test_split.txt")
for line in file:
  test_list.append('.'+line[1:-1])

# lista di labels 
labels = open('labels.txt').readlines()[0].split() 
print(labels)


MFCC_OPTIONS = {
    'frame_length': 640, 
    'frame_step': 320, 
    'mfcc': True,
    'lower_frequency': 20, 
    'upper_frequency': 4000, 
    'num_mel_bins': 40,
    'num_coefficients': 10
}

# make test dataset
generator = SignalGenerator(labels, 16000, **MFCC_OPTIONS) 
train_ds = generator.make_dataset(training_list, True)
val_ds = generator.make_dataset(validation_list, False)
test_ds = generator.make_dataset(test_list, False)

['down', 'stop', 'right', 'left', 'up', 'yes', 'no', 'go']


In [6]:
shape = [49, 10, 1]
if version == 1:
    model = models.Sequential([
      layers.Input(shape=shape),
      layers.Conv2D(filters=256, kernel_size=[3,3], strides=[2,1], use_bias=False),
      layers.BatchNormalization(momentum=0.1),
      layers.ReLU(),
      layers.DepthwiseConv2D(kernel_size=[3,3], strides=[1,1], use_bias=False),
      layers.Conv2D(filters=256, kernel_size=[1,1], strides=[1,1], use_bias=False),
      layers.BatchNormalization(momentum=0.1),
      layers.ReLU(),
      layers.DepthwiseConv2D(kernel_size=[3,3], strides=[1,1], use_bias=False),
      layers.Conv2D(filters=128, kernel_size=[1,1], strides=[1,1], use_bias=False),
      layers.BatchNormalization(momentum=0.1),
      layers.ReLU(),
      layers.DepthwiseConv2D(kernel_size=[3,3], strides=[1,1], use_bias=False),
      layers.Conv2D(filters=64, kernel_size=[1,1], strides=[1,1], use_bias=False),
      layers.BatchNormalization(momentum=0.1),
      layers.ReLU(),
      layers.GlobalAveragePooling2D(),
      layers.Dense(8)
    ])
elif version == 2:
    model = models.Sequential([
      layers.Input(shape=shape),
      layers.Conv2D(filters=512, kernel_size=[3,3], strides=[2,1], use_bias=False),
      layers.BatchNormalization(momentum=0.1),
      layers.ReLU(),
      layers.DepthwiseConv2D(kernel_size=[3,3], strides=[1,1], use_bias=False),
      layers.Conv2D(filters=256, kernel_size=[1,1], strides=[1,1], use_bias=False),
      layers.BatchNormalization(momentum=0.1),
      layers.ReLU(),
      layers.DepthwiseConv2D(kernel_size=[3,3], strides=[1,1], use_bias=False),
      layers.Conv2D(filters=256, kernel_size=[1,1], strides=[1,1], use_bias=False),
      layers.BatchNormalization(momentum=0.1),
      layers.ReLU(),
      layers.DepthwiseConv2D(kernel_size=[3,3], strides=[1,1], use_bias=False),
      layers.Conv2D(filters=128, kernel_size=[1,1], strides=[1,1], use_bias=False),
      layers.BatchNormalization(momentum=0.1),
      layers.ReLU(),
      layers.GlobalAveragePooling2D(),
      layers.Dense(8)
    ])
elif version == 3:
    model = models.Sequential([
      layers.Input(shape=shape),
      layers.Conv2D(filters=512, kernel_size=[3,3], strides=[2,1], use_bias=False),
      layers.BatchNormalization(momentum=0.1),
      layers.ReLU(),
      layers.DepthwiseConv2D(kernel_size=[3,3], strides=[1,1], use_bias=False),
      layers.Conv2D(filters=512, kernel_size=[1,1], strides=[1,1], use_bias=False),
      layers.BatchNormalization(momentum=0.1),
      layers.ReLU(),
      layers.DepthwiseConv2D(kernel_size=[3,3], strides=[1,1], use_bias=False),
      layers.Conv2D(filters=256, kernel_size=[1,1], strides=[1,1], use_bias=False),
      layers.BatchNormalization(momentum=0.1),
      layers.ReLU(),
      layers.GlobalAveragePooling2D(),
      layers.Dense(8)
    ])
elif version == 4:
    conv_filters = {
        "conv1": 32,
        "conv2": 64,
        "conv3": 128,
        "conv4": 128
    }
    model = build_model(shape, 8, **conv_filters)
elif version == 5:
    conv_filters = {
        "conv1": 32,
        "conv2": 64,
        "conv3": 128,
        "conv4": 256
    }
    model = build_model(shape, 8, **conv_filters)
else :
    print("invalid input value!")
    



    
    

In [7]:
model.summary()

# learning rate scheduler
learning_rate_fn = PolynomialDecay(
    initial_learning_rate=1e-3,
    decay_steps=3000,
    end_learning_rate=1e-5
    )

# compile model
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate_fn),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['sparse_categorical_accuracy'],
)

# callbacks
ckp_dir = "./checkpoint/"
try:
  os.mkdir(ckp_dir)
except FileExistsError:
  pass

checkpoint_cb = tf.keras.callbacks.ModelCheckpoint(
    ckp_dir, 
    monitor='val_sparse_categorical_accuracy', 
    verbose=0, 
    save_best_only=True,
    save_weights_only=False, 
    mode='max', 
    save_freq='epoch')

# fit model
EPOCHS = 30
history = model.fit(
    train_ds, 
    validation_data=val_ds,  
    epochs=EPOCHS,
    callbacks=[checkpoint_cb],)

# load and evaluate the best model
base_model = tf.keras.models.load_model(ckp_dir)
acc = base_model.evaluate(test_ds, batch_size=32, return_dict=True)['sparse_categorical_accuracy']
print()
print("sparse categorical accuracy on test set : "+str(acc))

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 24, 8, 256)        2304      
_________________________________________________________________
batch_normalization (BatchNo (None, 24, 8, 256)        1024      
_________________________________________________________________
re_lu (ReLU)                 (None, 24, 8, 256)        0         
_________________________________________________________________
depthwise_conv2d (DepthwiseC (None, 22, 6, 256)        2304      
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 22, 6, 256)        65536     
_________________________________________________________________
batch_normalization_1 (Batch (None, 22, 6, 256)        1024      
_________________________________________________________________
re_lu_1 (ReLU)               (None, 22, 6, 256)        0

Epoch 30/30

sparse categorical accuracy on test set : 0.9424999952316284


In [8]:
### OPTIMIZATIONS
  
out_path = "model"+str(version)

quant_acc, quant_size, quant_model = pt_quantization(in_model=clustered_model, out_path=out_path, q_type='else')
print('accuracy of quantized model: %.4f %%'%quant_acc)
print('size of quantized model: %.4f kB'%quant_size)
with open('model_'+str(version)+'.tflite', 'wb') as f:
    f.write(quant_model)

Epoch 1/4
Epoch 2/4
Epoch 3/4
Epoch 4/4
INFO:tensorflow:Assets written to: /tmp/tmpyap7untj/assets
accuracy of quantized model: 0.9012 %
size of quantized model: 91.9941 kB


In [18]:
interpreter = tf.lite.Interpreter("model_4.tflite")
interpreter.allocate_tensors()
accuracy = get_accuracy(interpreter, test_ds)

In [19]:
print(accuracy)

0.89
