In [None]:
import logging
logging.getLogger("tensorflow").setLevel(logging.DEBUG)

import tensorflow as tf
import numpy as np
import pandas as pd
import os
import gc
import pickle
print("TensorFlow version: ", tf.__version__)

Import dataset pickle file

In [None]:
os.chdir('../..')

def get_pickle(pickle_dir, file_name):
    pickle_dir = os.path.join(os.getcwd(), 'datasets/fsc22/Pickle Files/' + pickle_dir)
    fold_dir = os.path.join(pickle_dir, file_name)
    infile = open(fold_dir, 'rb')
    fold = pickle.load(infile)
    infile.close()

    return fold

In [None]:
with tf.device('/CPU:0'):
    pickle_dir = 'aug_ts_ps_mixed_features_5_20' # change pickle file directory name accordingly

    spect_folds = []
    train_spects = []

    for fold in range(5):
        spects_fold = get_pickle(pickle_dir, pickle_dir + '_fold' + str(fold+1))
        train_spects.extend(spects_fold)
        spect_folds.append(spects_fold)

    print(f'len train_spects: {len(train_spects)}')

    train_features_df = pd.DataFrame(train_spects, columns=['feature', 'class'])

    del train_spects

    gc.collect()

    print(f'len spect_folds: {len(spect_folds)}') # num folds
    print(f'len spect_folds[0]: {len(spect_folds[0])}') # samples in a fold
    print(f'len spect_folds[0][0]: {len(spect_folds[0][0])}') # elements in a sample - should be 2
    print(f'shape spect_folds[0][0][0]: {np.shape(spect_folds[0][0][0])}') # spectrogram shape

In [None]:
with tf.device('/CPU:0'):
    IMG_SIZE = (spect_folds[0][0][0].shape[0], spect_folds[0][0][0].shape[1])
    IMG_SHAPE = IMG_SIZE + (3,)
    print(IMG_SHAPE)
    input_shape = IMG_SHAPE

    num_labels = 26 # Should be changed

Import base model

In [None]:
with tf.device('/CPU:0'):
    valid_model_type = False
    model_type = None

    while not valid_model_type:
        model_type = input(
            "Choose Model Type:\n"
            " 1. AlexNet\n"
            " 2. DenseNet121\n"
            " 3. EfficientNetV2B0\n"
            " 4. InceptionV3\n"
            " 5. MobileNetV3Small\n"
            " 6. ResNet50V2\n"
            " 7. SqueezeNet\n :")

        if model_type == '1':
            model_type = 'AlexNet'
            valid_model_type = True
        elif model_type =='2':
            model_type = 'DenseNet121'
            valid_model_type = True
        elif model_type =='3':
            model_type = 'EfficientNetV2B0'
            valid_model_type = True
        elif model_type =='4':
            model_type = 'InceptionV3'
            valid_model_type = True
        elif model_type =='5':
            model_type = 'MobileNetV3-Small'
            valid_model_type = True
        elif model_type =='6':
            model_type = 'ResNet50V2'
            valid_model_type = True
        elif model_type =='7':
            model_type = 'SqueezeNet'
            valid_model_type = True

In [None]:
with tf.device('/CPU:0'):
    models_dir = os.path.join(os.getcwd(), "models")
    model_dir = os.path.join(models_dir, model_type)

    # Example - 
    # model_name = 'AlexNet_aug5_mix_5fold'
    # base_model_save_path = 'models/AlexNet/AlexNet_aug5_mix_5fold/AlexNet_aug5_mix_5fold_base_fold1.h5'
    # fold_no = 1
    
    model_name = input('Enter model name: ')
    base_model_save_path = input('Enter base model relative save path: ')
    fold_no = int(input('Enter the fold which the base model was validated: '))

    model_save_dir = os.path.join(model_dir, model_name)

    base_model = tf.keras.models.load_model(base_model_save_path)
    base_model.summary()

Create train and validation data

In [None]:
def get_train_vaild_data(spects, fold):
    valid_spects = None

    for i in range(5):
        if i + 1 == fold:
            valid_spects = spects[i]

    valid_df = pd.DataFrame(valid_spects, columns=['feature', 'class'])

    del valid_spects

    gc.collect()

    X_valid_cv = np.array(valid_df['feature'].tolist())
    y_valid_cv = np.array(valid_df['class'].tolist())

    return X_valid_cv, y_valid_cv

In [None]:
X_valid_cv, y_valid_cv = get_train_vaild_data(spect_folds, fold_no)

print(f'X_valid_cv shape: {np.shape(X_valid_cv)}')
print(f'y_valid_cv shape: {np.shape(y_valid_cv)}')

Create base model

In [None]:
# Change according to the hyperparameters of the base model

lr = 0.07263866958
best_optimizer = 'sgd'
batch_size = 16

optimizer = tf.keras.optimizers.Adam(learning_rate=lr) if best_optimizer=='adam' else tf.keras.optimizers.SGD(learning_rate=lr)
base_model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

base_model_loss, base_model_accuracy = base_model.evaluate(x=X_valid_cv,y=y_valid_cv)

Quantize model

In [None]:
def representative_data_gen():
  for input_value in tf.data.Dataset.from_tensor_slices(X_valid_cv).batch(1).take(500):
    yield [input_value]

converter = tf.lite.TFLiteConverter.from_keras_model(base_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen
# Ensure that if any ops can't be quantized, the converter throws an error
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
# Set the input and output tensors to uint8 (APIs added in r2.3)
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8

tflite_model_quant = converter.convert()

In [None]:
interpreter = tf.lite.Interpreter(model_content=tflite_model_quant)
input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)
output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)

Save quantized model

In [None]:
import pathlib

quantized_model_name = model_name + "_quantized_fold" + str(fold_no) + ".tflite"
tflite_models_dir = pathlib.Path(model_save_dir)

# Save the quantized model:
tflite_model_quant_file = tflite_models_dir/quantized_model_name
tflite_model_quant_file.write_bytes(tflite_model_quant)

### Test Quantized model

Function to run tflite model

In [None]:
# Helper function to run inference on a TFLite model
def run_tflite_model(tflite_file, test_image_indices):
  global X_valid_cv

  # Initialize the interpreter
  interpreter = tf.lite.Interpreter(model_path=str(tflite_file))
  interpreter.allocate_tensors()

  input_details = interpreter.get_input_details()[0]
  output_details = interpreter.get_output_details()[0]

  predictions = np.zeros((len(test_image_indices),), dtype=int)
  for i, test_image_index in enumerate(test_image_indices):
    test_image = X_valid_cv[test_image_index]

    # Check if the input type is quantized, then rescale input data to uint8
    if input_details['dtype'] == np.uint8:
      input_scale, input_zero_point = input_details["quantization"]
      test_image = test_image / input_scale + input_zero_point

    test_image = np.expand_dims(test_image, axis=0).astype(input_details["dtype"])
    interpreter.set_tensor(input_details["index"], test_image)
    interpreter.invoke()
    output = interpreter.get_tensor(output_details["index"])[0]

    predictions[i] = output.argmax()

  return predictions

Function to evaluate tflite model

In [None]:
# Helper function to evaluate a TFLite model on all images
def evaluate_model(tflite_file, model_type):
  global X_valid_cv
  global y_valid_cv

  test_image_indices = range(X_valid_cv.shape[0])
  predictions = run_tflite_model(tflite_file, test_image_indices)

  accuracy = (np.sum(y_valid_cv== predictions) * 100) / len(X_valid_cv)

  print('%s model accuracy is %.4f%% (Number of test samples=%d)' % (
      model_type, accuracy, len(X_valid_cv)))

In [None]:
evaluate_model(tflite_model_quant_file, model_type="Quantized")