# <font color = purple> 2023 Synopsys ARC AIoT Design Contest - Knowledge Distillation </font>

In [None]:
import os
import glob
import time
import matplotlib.pyplot as plt
import matplotlib.image as img
import numpy as np
import pandas as pd
from PIL import Image
from sklearn.model_selection import train_test_split

import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras import Model
from tensorflow.keras.models import Sequential
from tensorflow.keras import layers

from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Conv2D, DepthwiseConv2D
from tensorflow.keras.layers import MaxPooling2D, AveragePooling2D, GlobalAveragePooling2D
from tensorflow.keras.layers import UpSampling2D
from tensorflow.keras.layers import Activation, Softmax, ReLU
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Layer, Flatten, Dense, Dropout, add
from tensorflow.keras.utils import to_categorical

In [None]:
!nvidia-smi

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')

for gpu in gpus:
    print(gpu)
    tf.config.experimental.set_memory_growth(gpu, True)

## <font color = navy> Configurations </font>

In [None]:
class Config():
    # Data path
    IMAGE_DIR = "./train_img"
    LABEL_DIR = "./train_label/label.csv"
    
    # Degree unit
    DEG_UNIT = 6
    
    #  Dataloader parameters
    SPLIT = 0.2
    
    # Batch size
    BATCH_SIZE = 16
    
    # Image size
    IMG_HEIGHT = 224 #224
    IMG_WIDTH = 224 #224
    
    # Model parameters
    NUM_CLASSES = int(360 / DEG_UNIT)
    IN_CHANNEL = 1
    INPUT_SHAPE = (IMG_HEIGHT, IMG_WIDTH, IN_CHANNEL)
    
    # Learning parameters
    lr = 1e-3
    num_epochs = 20
    teacher_loss_func = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    student_loss_func = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    distill_loss_func = keras.losses.KLDivergence()
    loss_func = "categorical_crossentropy"
    
    # Optimizer parameters
    opt = 'adam'
    
    # Distiller parameters
    alpha = 0.1      # weight to student_loss_fn and 1-alpha to distill_loss_func
    temperature = 3  # Temperature for softening probability distributions. Larger temperature gives softer distributions.
    
    
    # Model
    model_name = 'MobileNetV2'
    

## <font color = navy> Load and Preprocess Dataset </font>

In [None]:
def load_data(img_path, lab_path, split=0.2):
    img_files = sorted(glob.glob(img_path + "/*"), key=os.path.getmtime) # Sort by time
    # img_files = glob.glob(img_path + "/*") (someting wrong)
    
    lab_df = pd.read_csv(lab_path)['class'].to_numpy() # Read label from csv
    data_x = []
    data_y = []
    
    # Concatenate images & labels
    for idx, img_file in enumerate(img_files):
        # image
        img = Image.open(img_file)
        img = img.convert('L') # Convert to gray
        img = np.array(img, dtype=int)
        img = img / 255.0 # Normalize
        
        # label
        lab = np.array(lab_df[idx], dtype=int)
        lab = np.expand_dims(lab, axis=0)
        
        data_x.append(img)
        data_y.append(lab)
    
    # Train & test split
    X_train, X_test, y_train, y_test = train_test_split(data_x, data_y, test_size=split, random_state=42, shuffle=True)
    
    X_train = np.array(X_train)
    y_train = np.array(y_train)
    
    X_test = np.array(X_test)
    y_test = np.array(y_test)

    return X_train, X_test, y_train, y_test

In [None]:
# Loading the dataset
X_train, X_test, y_train, y_test = load_data(Config.IMAGE_DIR, Config.LABEL_DIR)

## <font color = navy> Show Data </font>

In [None]:
fig = plt.figure(figsize=(15,10))
for i in range(15):  
    ax = fig.add_subplot(3, 5, i+1)
    img = Image.fromarray(np.uint8(X_train[i] * 255)) # *255
    img = img.convert('RGBA')
    
    ax.imshow(img, cmap=plt.get_cmap('gray'))
    lab = np.squeeze(y_train[i])
    lower = lab * Config.DEG_UNIT
    upper = (lab+1) * Config.DEG_UNIT
    
    # ax.set_title('class: {y}'.format(y=np.squeeze(y_train[i])))
    ax.set_title('degree: {y1}~{y2}'.format(y1=lower, y2=upper))
    plt.axis('off')

## <font color = navy> Reshape Images </font>

In [None]:
# Reshape
X_train = X_train.reshape(-1, Config.IMG_HEIGHT, Config.IMG_WIDTH, 1)
X_test = X_test.reshape(-1, Config.IMG_HEIGHT, Config.IMG_WIDTH, 1)

## <font color = navy> Distiller </font>

In [None]:
class Distiller(keras.Model):
    def __init__(self, student, teacher):
        super().__init__()
        self.teacher = teacher
        self.student = student

    def compile(
        self,
        optimizer,
        metrics,
        student_loss_fn,
        distillation_loss_fn,
        alpha=0.1,
        temperature=3,
    ):
        """ Configure the distiller.

        Args:
            optimizer: Keras optimizer for the student weights
            metrics: Keras metrics for evaluation
            student_loss_fn: Loss function of difference between student
                predictions and ground-truth
            distillation_loss_fn: Loss function of difference between soft
                student predictions and soft teacher predictions
            alpha: weight to student_loss_fn and 1-alpha to distillation_loss_fn
            temperature: Temperature for softening probability distributions.
                Larger temperature gives softer distributions.
        """
        super().compile(optimizer=optimizer, metrics=metrics)
        self.student_loss_fn = student_loss_fn
        self.distillation_loss_fn = distillation_loss_fn
        self.alpha = alpha
        self.temperature = temperature

    def train_step(self, data):
        # Unpack data
        x, y = data

        # Forward pass of teacher
        teacher_predictions = self.teacher(x, training=False)

        with tf.GradientTape() as tape:
            # Forward pass of student
            student_predictions = self.student(x, training=True)

            # Compute losses
            student_loss = self.student_loss_fn(y, student_predictions)

            # Compute scaled distillation loss from https://arxiv.org/abs/1503.02531
            # The magnitudes of the gradients produced by the soft targets scale
            # as 1/T^2, multiply them by T^2 when using both hard and soft targets.
            distillation_loss = (
                self.distillation_loss_fn(
                    tf.nn.softmax(teacher_predictions / self.temperature, axis=1),
                    tf.nn.softmax(student_predictions / self.temperature, axis=1),
                )
                * self.temperature**2
            )

            loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss

        # Compute gradients
        trainable_vars = self.student.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update the metrics configured in `compile()`.
        self.compiled_metrics.update_state(y, student_predictions)

        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update(
            {"student_loss": student_loss, "distillation_loss": distillation_loss}
        )
        return results

    def test_step(self, data):
        # Unpack the data
        x, y = data

        # Compute predictions
        y_prediction = self.student(x, training=False)

        # Calculate the loss
        student_loss = self.student_loss_fn(y, y_prediction)

        # Update the metrics.
        self.compiled_metrics.update_state(y, y_prediction)

        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update({"student_loss": student_loss})
        return results

## <font color = navy> Create Teacher Model </font>

In [None]:
# Create the teacher
teacher = Sequential(name='teacher')

# layer1
teacher.add(Conv2D(32, kernel_size=(5, 5), activation='relu', padding='same', input_shape=(Config.IMG_HEIGHT,Config.IMG_WIDTH,1)))
teacher.add(MaxPooling2D(pool_size=(2, 2)))

# layer2
teacher.add(Conv2D(64, (5, 5), padding='same', activation='relu'))
teacher.add(MaxPooling2D(pool_size=(2, 2)))

# layer4
teacher.add(Flatten())
teacher.add(Dense(512, activation='relu'))

# layer5
teacher.add(Dense(Config.NUM_CLASSES)) # Without softmax

In [None]:
# teacher = keras.Sequential(
#     [
#         keras.Input(shape=(28, 28, 1)),
#         layers.Conv2D(256, (3, 3), strides=(2, 2), padding="same"),
#         layers.LeakyReLU(alpha=0.2),
#         layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding="same"),
#         layers.Conv2D(512, (3, 3), strides=(2, 2), padding="same"),
#         layers.Flatten(),
#         layers.Dense(10),
#     ],
#     name="teacher",
# )

## <font color = navy> Create Student Model </font>

In [None]:
# MobileNetV2
def expansion_block(x, t, filters, block_id):
    prefix = 'block_{}_'.format(block_id)
    total_filter = t*filters
    x = Conv2D(total_filter, 1, padding='same',use_bias=False, name=prefix+'expand')(x)
    x = BatchNormalization(name=prefix+'expand_bn')(x)
    x = ReLU(6, name=prefix+'expand_relu')(x) # ReLU 6
    
    return x

def depthwise_block(x, stride, block_id):
    prefix = 'block_{}_'.format(block_id)
    x = DepthwiseConv2D(3, strides=(stride,stride), padding='same', use_bias=False, name=prefix +'depthwise_conv')(x)
    x = BatchNormalization(name=prefix+'dw_bn')(x)
    x = ReLU(6,name=prefix+'dw_relu')(x)
    
    return x

def projection_block(x, out_channels, block_id):
    prefix = 'block_{}_'.format(block_id)
    x = Conv2D(filters=out_channels, kernel_size=1, padding='same', use_bias=False, name=prefix+'compress')(x)
    x = BatchNormalization(name=prefix+'compress_bn')(x)
    
    return x

In [None]:
def Bottleneck(x, t, filters, out_channels, stride, block_id):
    y = expansion_block(x, t, filters, block_id)
    y = depthwise_block(y, stride, block_id)
    y = projection_block(y, out_channels, block_id)
    
    if y.shape[-1]==x.shape[-1]:
        y = add([x,y])
    return y

In [None]:
def MobileNetV2(input_image=Config.INPUT_SHAPE, n_classes=Config.NUM_CLASSES):
    input = Input(input_image)

    x = Conv2D(32,kernel_size=3,strides=(2,2),padding = 'same', use_bias=False)(input)
    x = BatchNormalization(name='conv1_bn')(x)
    x = ReLU(6, name = 'conv1_relu')(x)

    # 17 Bottlenecks

    x = depthwise_block(x,stride=1,block_id=1)
    x = projection_block(x, out_channels=16,block_id=1)

    x = Bottleneck(x, t = 6, filters = x.shape[-1], out_channels = 24, stride = 2,block_id = 2)
    x = Bottleneck(x, t = 6, filters = x.shape[-1], out_channels = 24, stride = 1,block_id = 3)

    x = Bottleneck(x, t = 6, filters = x.shape[-1], out_channels = 32, stride = 2,block_id = 4)
    x = Bottleneck(x, t = 6, filters = x.shape[-1], out_channels = 32, stride = 1,block_id = 5)
    x = Bottleneck(x, t = 6, filters = x.shape[-1], out_channels = 32, stride = 1,block_id = 6)

    x = Bottleneck(x, t = 6, filters = x.shape[-1], out_channels = 64, stride = 2,block_id = 7)
    x = Bottleneck(x, t = 6, filters = x.shape[-1], out_channels = 64, stride = 1,block_id = 8)
    x = Bottleneck(x, t = 6, filters = x.shape[-1], out_channels = 64, stride = 1,block_id = 9)
    x = Bottleneck(x, t = 6, filters = x.shape[-1], out_channels = 64, stride = 1,block_id = 10)

    x = Bottleneck(x, t = 6, filters = x.shape[-1], out_channels = 96, stride = 1,block_id = 11)
    x = Bottleneck(x, t = 6, filters = x.shape[-1], out_channels = 96, stride = 1,block_id = 12)
    x = Bottleneck(x, t = 6, filters = x.shape[-1], out_channels = 96, stride = 1,block_id = 13)

    x = Bottleneck(x, t = 6, filters = x.shape[-1], out_channels = 160, stride = 2,block_id = 14)
    x = Bottleneck(x, t = 6, filters = x.shape[-1], out_channels = 160, stride = 1,block_id = 15)
    x = Bottleneck(x, t = 6, filters = x.shape[-1], out_channels = 160, stride = 1,block_id = 16)

    x = Bottleneck(x, t = 6, filters = x.shape[-1], out_channels = 320, stride = 1,block_id = 17)


    #1*1 conv
    x = Conv2D(filters = 1280,kernel_size = 1,padding='same',use_bias=False, name = 'last_conv')(x)
    x = BatchNormalization(name='last_bn')(x)
    x = ReLU(6,name='last_relu')(x)

    #AvgPool 7*7
    x = GlobalAveragePooling2D(name='global_average_pool')(x)

    output = Dense(n_classes)(x) # Without softmax

    model = Model(input, output)

    return model

In [None]:
# Create the student
student = MobileNetV2()

In [None]:
# student = keras.Sequential(
#     [
#         keras.Input(shape=(28, 28, 1)),
#         layers.Conv2D(16, (3, 3), strides=(2, 2), padding="same"),
#         layers.LeakyReLU(alpha=0.2),
#         layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding="same"),
#         layers.Conv2D(32, (3, 3), strides=(2, 2), padding="same"),
#         layers.Flatten(),
#         layers.Dense(10),
#     ],
#     name="student",
# )

## <font color = navy> Clone Student Model for Comparison </font>

In [None]:
# Clone student for later comparison
student_scratch = keras.models.clone_model(student)

## <font color = navy> Train Teacher Model </font>

In [None]:
teacher.compile(
    optimizer=Config.opt,
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.SparseCategoricalAccuracy()]
)


In [None]:
# Train and evaluate teacher on data.
teacher_history = teacher.fit(X_train, y_train,
                              validation_split=0.2,
                              batch_size=Config.BATCH_SIZE,
                              epochs=5)

## <font color = navy> Distill Teacher to Student </font>

In [None]:
# Initialize and compile distiller
distiller = Distiller(student=student, teacher=teacher)
distiller.compile(
    optimizer=keras.optimizers.Adam(),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
    student_loss_fn=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    distillation_loss_fn=keras.losses.KLDivergence(),
    alpha=0.1,
    temperature=10,
)

In [None]:

# Distill teacher to student
distiller.fit(X_train, y_train, epochs=3)

## <font color = navy> Train </font>

In [None]:
# Train student as doen usually
student_scratch.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

In [None]:
# Train and evaluate student trained from scratch.
student_history = student_scratch.fit(X_train, y_train, epochs=3)

## <font color = navy> Evaluation </font>

### <font color = red> Teacher Model </font>

In [None]:
teacher_score = teacher.evaluate(X_test, y_test, verbose=0)
print(teacher_score)

### <font color = red> Distiller Student Model </font>

In [None]:
# Evaluate student on test dataset
distiller_score = distiller.evaluate(X_test, y_test, verbose=0)
print(distiller_score)

### <font color = red> Original Student Model </font>

In [None]:
student_score = student_scratch.evaluate(X_test, y_test, verbose=0)
print(student_score)