# Coffe Classifier: Thesis project for Bythem

## Imports

In [1]:
import os;os.environ["TF_USE_LEGACY_KERAS"]="1"

In [2]:
from tensorflow import keras
import tensorflow as tf
import tensorflow_model_optimization as tfmot

from preprocessing import dataset_preprocessing, mixup, tensorflow_to_numpy_dataset
from custom_mobilenet_v2 import MobileNet_v2
from plotting import plot_training_history, plotting_confusion_matrix
from utils import get_zipped_model_size, print_model_weights_sparsity
from evaluation import evaluate_lite_model
from distiller import Distiller, WarmUpCosine




## Configurations

In [18]:
DATASET_PATH = "../dataset_04"

MODEL_NAME = ""
DISTILLED_MODEL_NAME = ""
PRUNED_MODEL_NAME = ""
QUANTIZED_MODEL_NAME = ""

SAVE = True

## Dataset loading and preprocessing

In [27]:
BATCH_SIZE = 64
IMAGE_SIZE = 224
INPUT_SHAPE = (IMAGE_SIZE, IMAGE_SIZE, 3)
SCALE = 127.5
OFFSET = -1

In [25]:
dataset = keras.utils.image_dataset_from_directory(DATASET_PATH,
                                                   shuffle = True,
                                                   batch_size = BATCH_SIZE,
                                                   image_size = (IMAGE_SIZE, IMAGE_SIZE))

class_names = dataset.class_names
number_classes = len(class_names)

training_dataset, validation_dataset, testing_dataset = dataset_preprocessing(dataset,
                                                                              train_size=0.80,
                                                                              validation_size=0.1, 
                                                                              augmentation_flag = True, 
                                                                              rescaling_flag = True, 
                                                                              prefetch_flag = True, 
                                                                              scale = SCALE, 
                                                                              offset = OFFSET)

Found 1281 files belonging to 9 classes.


In [6]:
class_names, number_classes

(['cioccolata',
  'cioccolata senza paletta',
  'errore',
  'espresso',
  'espresso senza paletta',
  'macchiato',
  'macchiato senza paletta',
  'the',
  'the senza paletta'],
 9)

In [22]:
print('Number of training images:', len(training_dataset)*BATCH_SIZE)
print('Number of validation images:', len(validation_dataset)*BATCH_SIZE)
print('Number of testing images:', len(testing_dataset)*BATCH_SIZE)

Number of training images: 1024
Number of validation images: 128
Number of testing images: 192


In [26]:
example_epochs = 5000
dataset_num_train_examples = len(training_dataset)*BATCH_SIZE
steps_per_epoch = dataset_num_train_examples // BATCH_SIZE
total_steps = steps_per_epoch * example_epochs
warm_up_steps = 0.75*total_steps

print("Warm-up steps: ", total_steps )
print("Warm-up steps: ", warm_up_steps )

Warm-up steps:  320000
Warm-up steps:  240000.0


## **Transfer learning model**

In [None]:
DROPOUT = 0.2
FREEZING_PERCENTAGE = 1
EPOCHS = 100

Load pre-trained model

In [None]:
pre_trained_model = keras.applications.MobileNetV2(input_shape = INPUT_SHAPE, 
                                                   include_top=False, 
                                                   weights='imagenet',
                                                   alpha=0.35,
                                                   classes=number_classes)

Set weights from pre_trained model and freeze a % of the pre-trained model for transfer learning

In [None]:
model = MobileNet_v2(input_shape=INPUT_SHAPE, alpha=0.35, num_classes=number_classes, dropout=DROPOUT)

count = 0
for i, layer in enumerate(pre_trained_model.layers):
        model.layers[i].set_weights(layer.get_weights())
        count = count +1 

for i in range(int(count*FREEZING_PERCENTAGE)):
    model.layers[i].trainable= False

Compile and train model

In [None]:
dataset_num_train_examples = len(training_dataset)*BATCH_SIZE
steps_per_epoch = dataset_num_train_examples // BATCH_SIZE
total_steps = steps_per_epoch * EPOCHS

learning_rate_fn = keras.optimizers.schedules.PolynomialDecay(
    5e-3,
    total_steps,
    1e-3,
    power=3)

import matplotlib.pyplot as plt
import numpy as np

lrs = learning_rate_fn(np.arange(0,total_steps))
plt.plot(lrs)
plt.xlabel("Step", fontsize=14)
plt.ylabel("LR", fontsize=14)
plt.show()

In [None]:
model.compile(optimizer = keras.optimizers.Adam(learning_rate=learning_rate_fn),
              loss= keras.losses.SparseCategoricalCrossentropy(from_logits=False),
              metrics=['accuracy'])

early_stopping = keras.callbacks.EarlyStopping(patience=5, monitor='val_accuracy', restore_best_weights=True)

history = model.fit(training_dataset,
                    validation_data=validation_dataset,
                    epochs=EPOCHS,
                    callbacks=[early_stopping])

Plotting training history

In [None]:
plot_training_history(history)

Test model

In [None]:
_, baseline_accuracy = model.evaluate(testing_dataset, verbose = 0)
print('Accuracy: ', round(baseline_accuracy*100, 3), '%')

In [None]:
plotting_confusion_matrix(testing_dataset, model, class_names)

In [None]:
if SAVE:
    model.save('saved_models/' + MODEL_NAME + '.keras')
    print("Salvato")

## **Knowladge distillation**

In [None]:
WARMUP_STEPS = 80000
INIT_LR = 0.003
DISTILLATION_EPOCHS = 5000

In [None]:
teacher_model = keras.models.load_model('saved_models/' + MODEL_NAME + '.keras')

teacher_model.compile(
    optimizer=keras.optimizers.Adam(1e-3),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

student_model = MobileNet_v2(input_shape=INPUT_SHAPE, alpha=0.35, num_classes=number_classes, dropout=DROPOUT, minimization=True)

In [None]:
lr_schedule = WarmUpCosine(
    learning_rate_base=INIT_LR,
    total_steps=total_steps,
    warmup_learning_rate=0.0,
    warmup_steps=WARMUP_STEPS,
)

lrs = lr_schedule(np.arange(0,total_steps))
plt.plot(lrs)
plt.xlabel("Step", fontsize=14)
plt.ylabel("LR", fontsize=14)
plt.show()

In [None]:
mixup_training_dataset = training_dataset.map(mixup, num_parallel_calls=tf.data.AUTOTUNE)

In [None]:
distiller = Distiller(student=student_model, teacher= teacher_model, alpha=0.1, temperature=10)

distiller.compile(
    optimizer=keras.optimizers.Adam(learning_rate=lr_schedule),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
    student_loss = keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    distillation_loss = keras.losses.KLDivergence()
)

history = distiller.fit(mixup_training_dataset, epochs= DISTILLATION_EPOCHS, validation_data= validation_dataset)

In [None]:
training_accuracy = history.history['sparse_categorical_accuracy']
validation_accuracy = history.history['val_sparse_categorical_accuracy']
training_loss = history.history['student_loss']
validation_loss = history.history['val_student_loss']

epochs_range = range(len(training_accuracy))

plt.figure(figsize=(20,8))
plt.subplot(1,2,1)
plt.plot(epochs_range, training_accuracy,   label = 'Trainin Accuracy')
plt.plot(epochs_range, validation_accuracy, label = 'Validation Accuracy')
plt.legend()
plt.title('Accuracy for training and validation')

plt.subplot(1,2,2)
plt.plot(epochs_range, training_loss,   label = 'Trainin Loss')
plt.plot(epochs_range, validation_loss, label = 'Validation Loss')
plt.legend()
plt.title('Loss for training and validation')

plt.show()

In [None]:
student_model.compile(
    optimizer=keras.optimizers.Adam(1e-3),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

_, teacher_accuracy = teacher_model.evaluate(testing_dataset, verbose = 0)
_, student_accuracy = student_model.evaluate(testing_dataset, verbose = 0)
print('Teacher model accuracy: ', round(teacher_accuracy*100, 3), '%')
print('Student model accuracy: ', round(student_accuracy*100, 3), '%')

In [None]:
if SAVE:
    student_model.save('saved_models/' + DISTILLED_MODEL_NAME + '.keras')

## **Pruned Model**

In [None]:
PRUNING_EPOCHS = 3
INITIAL_SPARSITY = 0.20
FINAL_SPARSITY = 0.60
FREQUENCY = 30

In [None]:
model = keras.models.load_model('saved_models/' + MODEL_NAME + '.keras')

In [None]:
from custom_mobilenet_v2 import MobileNet_v2

custom_model = MobileNet_v2(INPUT_SHAPE, 0.35, number_classes, dropout=DROPOUT)

for i, layer in enumerate(model.layers):
        custom_model.layers[i].set_weights(layer.get_weights())

custom_model.compile(optimizer = keras.optimizers.Adam(1e-3),
              loss= keras.losses.SparseCategoricalCrossentropy(from_logits=False),
              metrics=['accuracy'])

In [None]:
custom_model.trainable= True

prune_low_magnitude = tfmot.sparsity.keras.prune_low_magnitude

num_images = (len(training_dataset)) *BATCH_SIZE
end_step = np.ceil(num_images / BATCH_SIZE).astype(np.int32) * PRUNING_EPOCHS

pruning_params = {
    'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(initial_sparsity=INITIAL_SPARSITY, 
                                                             final_sparsity=FINAL_SPARSITY,
                                                             begin_step=0, 
                                                             end_step=end_step,
                                                             frequency = FREQUENCY)}


pruned_model = prune_low_magnitude(custom_model, **pruning_params)

callbacks = [tfmot.sparsity.keras.UpdatePruningStep()]

pruned_model.compile(optimizer= keras.optimizers.Adam(learning_rate=1e-5),
                     loss= keras.losses.SparseCategoricalCrossentropy(from_logits=False),
                     metrics=['accuracy'])


# Fine tune the model
pruned_model.fit(training_dataset,
                 validation_data=validation_dataset,
                 epochs= PRUNING_EPOCHS,
                 verbose=1,
                 callbacks=callbacks)

In [None]:
pruned_model.compile(optimizer = keras.optimizers.Adam(1e-3),
              loss= keras.losses.SparseCategoricalCrossentropy(from_logits=False),
              metrics=['accuracy'])

early_stopping = keras.callbacks.EarlyStopping(patience=5, monitor='val_accuracy', restore_best_weights=True )

pruned_model.fit(training_dataset,
                 validation_data=validation_dataset,
                 epochs=EPOCHS,
                 callbacks=[early_stopping])

In [None]:
_, pruned_accuracy = pruned_model.evaluate(testing_dataset, verbose = 0)
print('Pruned accuracy: '   , round(100* pruned_accuracy,3) ,   '%')

In [None]:
stripped_pruned_model = tfmot.sparsity.keras.strip_pruning(pruned_model)

if SAVE:
    stripped_pruned_model.save('saved_models/' + PRUNED_MODEL_NAME + '.keras')

In [None]:
print("Original model size: ", get_zipped_model_size('saved_models/' + MODEL_NAME + '.keras')/10**6, ' MB')
print("Original model size: ", get_zipped_model_size('saved_models/' + PRUNED_MODEL_NAME + '.keras')/10**6, ' MB')

## **Quantized model**

In [None]:
model = keras.models.load_model('saved_models/' + PRUNED_MODEL_NAME + '.keras')

model.compile(optimizer = keras.optimizers.Adam(1e-3),
              loss= keras.losses.SparseCategoricalCrossentropy(from_logits=False),
              metrics=['accuracy'])

In [None]:
images_batch_np, labels_batch_np = tensorflow_to_numpy_dataset(testing_dataset)

def representative_data_gen():
  for input_value in tf.data.Dataset.from_tensor_slices(images_batch_np).batch(1).take(100):
    yield [input_value]


converter = tf.lite.TFLiteConverter.from_keras_model(model);
converter.optimizations = [tf.lite.Optimize.DEFAULT]

converter.representative_dataset = representative_data_gen
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]

converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8

model_integer_quantization = converter.convert()

In [None]:
with open('saved_lite_models/' + QUANTIZED_MODEL_NAME + '.tflite', "wb") as f:
    f.write(model_integer_quantization)   

In [None]:
interpreter = tf.lite.Interpreter(model_path='saved_lite_models/' + MODEL_NAME + '.tflite')
interpreter.allocate_tensors()

quantized_accuracy = evaluate_lite_model(interpreter, testing_dataset, class_names=class_names, show_confusion_matrix=True)
print("Accuracy of compressed model model: %.2f" %(quantized_accuracy*100) , '%')

In [None]:
print("Original model size: ", get_zipped_model_size('saved_models/' + PRUNED_MODEL_NAME + '.keras')/10**6, ' MB')
print("Quantized model size: ", get_zipped_model_size('saved_lite_models/' + QUANTIZED_MODEL_NAME + '.tflite')/10**3, ' kB')

## **Check for size and compression**

In [None]:
print("Original model size: ", get_zipped_model_size('saved_models/' + MODEL_NAME + '.keras')/10**6, ' MB')
print("Original model size: ", get_zipped_model_size('saved_models/' + PRUNED_MODEL_NAME + '.keras')/10**6, ' MB')
print("Quantized model size: ", get_zipped_model_size('saved_lite_models/' + QUANTIZED_MODEL_NAME + '.tflite')/10**3, ' kB')

## **Test compressed model**

In [None]:
print('Accuracy: '          ,round(baseline_accuracy*100, 3), '%')
print('Pruned accuracy: '   ,round(100* pruned_accuracy,3) ,   '%')
print("Accuracy of compressed model model: %.2f" %(quantized_accuracy*100) , '%')