# QCustom model Training using Hyderabad Dataset DS3
Date: 2024-07-15

Author: Ziad Tamim

Discription:
Thsi Script includes the training of the QCustom model. This includes model structure, Quantisation and knowledge disilation.  

Inputs:
* Dataset

Outputs:
* Custom model (without quanisation)
* QCustom model (quanized Custom model)

## Load the dataset

In [None]:
# Path to data
occupied = 'C:/Users/ziadt/Desktop/Projects/MSc Project implimintation/Datasets/Parkingdata/Parkingdata/Occupied'
empty = 'C:/Users/ziadt/Desktop/Projects/MSc Project implimintation/Datasets/Parkingdata/Parkingdata/Empty'

# load images
import os
from PIL import Image
import numpy as np
import cv2

def load_images(path):
    images = []
    for filename in os.listdir(path):
        img = cv2.imread(os.path.join(path,filename))
        if img is not None:
            images.append(img)
    return images

occupied_images = load_images(occupied)
empty_images = load_images(empty)

# check the number of images
print('Occupied images: %d' % len(occupied_images))
print('Empty images: %d' % len(empty_images))

# calculate the avrge size of the images
def avg_size(images):
    sizes = [img.shape[:2] for img in images]
    return np.mean(sizes, axis=0).astype(int)

occupied_avg_size = avg_size(occupied_images)
empty_avg_size = avg_size(empty_images)

print('Occupied average size: %s' % str(occupied_avg_size))
print('Empty average size: %s' % str(empty_avg_size))

In [None]:
# check image and label
import matplotlib.pyplot as plt
import random

def show_image(images, title):
    plt.figure(figsize=(10, 10))
    for i in range(9):
        plt.subplot(3, 3, i + 1)
        plt.imshow(cv2.cvtColor(random.choice(images), cv2.COLOR_BGR2RGB))
        plt.axis('off')
    plt.suptitle(title)
    plt.show()

show_image(occupied_images, 'Occupied')
show_image(empty_images, 'Empty')

In [None]:
# Preprocess images
def preprocess_images(images):
    processed_images = []
    for img in images:
        # Convert BGR to RGB before resizing
        # img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        # resize image
        img = cv2.resize(img, (150, 150))
        # normalize image
        img = img.astype(np.float32) / 255.0
        processed_images.append(img)
    return processed_images

occupied_images = preprocess_images(occupied_images)
empty_images = preprocess_images(empty_images)

show_image(occupied_images, 'Occupied')
show_image(empty_images, 'Empty')




In [None]:
import numpy as np
from sklearn.model_selection import train_test_split

# Assuming occupied_images_processed and empty_images_processed are lists of numpy arrays
# Label the data
occupied_labels = [1] * len(occupied_images)
empty_labels = [0] * len(empty_images)

# Combine the data
X = np.array(occupied_images + empty_images)
y = np.array(occupied_labels + empty_labels)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Now X_train, X_test, y_train, and y_test are available for training and testing
print("Training data shape:", X_train.shape)
print("Test data shape:", X_test.shape)


## The Custom model structure (Student model)
### Discription
This code defines the Custom model (Student) use in a knowledge distillation process. The model is intended to be trained by learning from a pre-trained "teacher" model. The student model takes 150x150x3 input images and passes them through a series of layers, including initial convolutional layers, multiple MobileNet blocks, and a global average pooling layer. The final output layer uses a sigmoid activation function to produce a single probability score for binary classification (Occupied or Free). The model's compact architecture makes it suitable for distillation, where it will be trained to mimic the behavior of a more complex teacher model.

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, DepthwiseConv2D, Conv2D, BatchNormalization
from tensorflow.keras.layers import ReLU, GlobalAveragePooling2D, Dense
from tensorflow.keras import Model


# MobileNet block
def mobilnet_block(x, filters, strides): # this function is used to create a mobilenet block consisting of depthwise convolution followed by pointwise convolution
    x = DepthwiseConv2D(kernel_size=3, strides=strides, padding='same')(x) # depthwise convolution
    x = BatchNormalization()(x) 
    x = ReLU()(x)
    
    x = Conv2D(filters=filters, kernel_size=1, strides=1)(x) # pointwise convolution
    x = BatchNormalization()(x)
    x = ReLU()(x)
    
    return x # return the output of the block

# Input specification for the model
input = Input(shape=(150, 150, 3))  # image input size is 150x150x3
x = Conv2D(filters=16, kernel_size=3, strides=2, padding='same')(input) # first layer conv2d
x = BatchNormalization()(x) # first layer batch normalization
x = ReLU()(x) # first layer ReLU

# Main part of the model
x = mobilnet_block(x, filters=5, strides=1) # second layer mobilenet block
x = mobilnet_block(x, filters=5, strides=2) # third layer mobilenet block
x = mobilnet_block(x, filters=12, strides=1) # fourth layer mobilenet block
x = mobilnet_block(x, filters=12, strides=2) # fifth layer mobilenet block
x = mobilnet_block(x, filters=24, strides=1) # sixth layer mobilenet block
x = mobilnet_block(x, filters=24, strides=2) # seventh layer mobilenet block
x = mobilnet_block(x, filters=24, strides=2) # eighth layer mobilenet block

# Adjusting for binary classification
x = GlobalAveragePooling2D()(x)  # Changed from AvgPool2D to GlobalAveragePooling2D
output = Dense(units=1, activation='sigmoid')(x)  # Changed to one unit with sigmoid activation

# Create the model
student = Model(inputs=input, outputs=output) # create the model 
student.summary()
student.input_shape


## Resnet50 model (Teacher model)
This code sets up the teacher model for knowledge distillation using a pre-trained ResNet50 architecture. The ResNet50 model's layers are frozen to retain the pre-trained ImageNet weights. A global average pooling layer and a sigmoid-activated dense layer are added on top to adapt the model for binary classification. This model will serve as the teacher in the knowledge distillation process.

In [None]:
# train on resnet
from tensorflow.keras.applications import ResNet50

# load the model
resnet = ResNet50(include_top=False, weights='imagenet', input_shape=(150, 150, 3))
resnet.summary()

# Freeze the layers
for layer in resnet.layers:
    layer.trainable = False

# Add the top layers
x = resnet.output
x = GlobalAveragePooling2D()(x)
x = Dense(1, activation='sigmoid')(x)
teacher = Model(resnet.input, x)

## Distillation Model Setup
This code defines the Distiller class, a custom TensorFlow model designed to facilitate knowledge distillation. The class combines a teacher and student model, allowing the student to learn from both the true labels and the teacher's predictions. The distillation process is controlled by parameters like alpha (balancing student and distillation loss) and temperature (smoothing the teacher's predictions). The Distiller is then instantiated, compiled with binary cross-entropy for the student loss, and Kullback-Leibler divergence for the distillation loss, and is ready for training.

In [None]:

class Distiller(tf.keras.Model): # this class is used to create a distiller model that will be used to train the student model
    def __init__(self, student, teacher):
        super(Distiller, self).__init__()
        self.teacher = teacher
        self.student = student

    def compile( # this function is used to compile the model
        self,
        optimizer,
        metrics,
        student_loss_fn,
        distillation_loss_fn,
        alpha=0.1, # alpha is the weight given to the student loss
        temperature=3, # temperature is used to soften the predictions
    ):
        super(Distiller, self).compile(optimizer=optimizer, metrics=metrics) # compile the model with the optimizer and metrics
        self.student_loss_fn = student_loss_fn
        self.distillation_loss_fn = distillation_loss_fn
        self.alpha = alpha
        self.temperature = temperature

    def train_step(self, data): # this function is used to train the model using the distillation loss
        x, y = data
        teacher_predictions = self.teacher(x, training=False)

        with tf.GradientTape() as tape:
            student_predictions = self.student(x, training=True)
            student_loss = self.student_loss_fn(y, student_predictions)
            distillation_loss = self.distillation_loss_fn(
                tf.nn.softmax(teacher_predictions / self.temperature, axis=1),
                tf.nn.softmax(student_predictions / self.temperature, axis=1),
            )
            loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss # calculate the loss using the distillation loss

        gradients = tape.gradient(loss, self.student.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.student.trainable_variables))
        self.compiled_metrics.update_state(y, student_predictions)
        results = {m.name: m.result() for m in self.metrics}
        results.update({"student_loss": student_loss, "distillation_loss": distillation_loss}) # update the results with the student loss and distillation loss
        return results

    #Test 2
    def test_step(self, data): # this function is used to test the model using the distillation loss
        x, y = data
        student_predictions = self(x, training=False)
        student_loss = self.student_loss_fn(y, student_predictions)

        # Update metrics
        self.compiled_metrics.update_state(y, student_predictions)

        # Collect metrics results
        results = {m.name: m.result() for m in self.metrics}
        results['loss'] = student_loss
        return results

    def call(self, inputs, training=False): # this function is used to call the model
        if training:
            return self.student(inputs, training=True)
        else:
            return self.student(inputs, training=False)

# Create the distiller instance again and compile it
distiller = Distiller(student=student, teacher=teacher)
distiller.compile(
    optimizer=tf.keras.optimizers.Adam(),
    metrics=[tf.keras.metrics.BinaryAccuracy()],
    student_loss_fn=tf.keras.losses.BinaryCrossentropy(),
    distillation_loss_fn=tf.keras.losses.KLDivergence(),
    alpha=0.1,
    temperature=10
)

## Training

In [None]:
# tensorboard initialization
from datetime import datetime
from tensorflow.keras.callbacks import TensorBoard

path_to_logs = "C:/Users/ziadt/Desktop/Projects/MSc Project implimintation/Model Training/CustomeNEt/training supporting material/logs/fit/"

# Create a TensorBoard callback
log_dir = path_to_logs + datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)

In [None]:
# Train the distiller
distiller.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=1, callbacks=[tensorboard_callback])


## Evaluation

In [None]:
# accuracy perfomance on the test set
distiller.evaluate(X_test, y_test)

## Save the Custom model without Quantization

In [None]:
# save model
student.save('C:/Users/ziadt/Desktop/Projects/MSc Project implimintation/Model Training/CustomeNEt/training supporting material/student_model_DS3.h5')


In [None]:
# load model
from tensorflow.keras.models import load_model
model = load_model('C:/Users/ziadt/Desktop/Projects/MSc Project implimintation/Model Training/CustomeNEt/training supporting material/student_model_DS3.h5')

## Post Training Interger Quantization


In [None]:
# full integer quantization
converter = tf.lite.TFLiteConverter.from_keras_model(distiller.student)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()

# Save the model to disk
open('QCustom_D3_V1.tflite', 'wb').write(tflite_model)


## Inference speed of the QCustom model on a single slot

In [None]:
import numpy as np
import time

# Load the TFLite model and allocate tensors.
interpreter = tf.lite.Interpreter(model_content=tflite_model)
interpreter.allocate_tensors()

# Get input and output tensors.
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()


# test data
image = X_test[0]


image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = cv2.resize(image, (150, 150))
image = np.expand_dims(image, axis=0)
image = image / 255.0
image = tf.convert_to_tensor(image, dtype=tf.float32)


# Test the model on random input data.
times = []
for i in range(100):
    start_time = time.time()
    interpreter.set_tensor(input_details[0]['index'], image)
    interpreter.invoke()
    end_time = time.time()
    times.append(end_time - start_time)

print('Average inference time for 100 images with quantized model:', np.mean(times))

