In [None]:
import matplotlib.pyplot as plt
import tensorflow.compat.v2 as tf
import tensorflow_datasets as tfds
from typing import Union
import cv2 as cv2
from google.colab.patches import cv2_imshow
import numpy as np
import matplotlib.pyplot as plt
from scipy import io 
import pandas as pd
import sys
tf.enable_v2_behavior()
!pip install tensorflow_addons
sys.path.append('mhist_dataset/annotations.csv')
sys.path.append('mhist_dataset/images.zip')

BATCH_SIZE = 32
NUM_EPOCHS = 35
NUM_CLASSES = 2  # 10 total classes.

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# Data Preparation

In [None]:
import zipfile as zp
  
# specifying the zip file name
file_name = "mhist_dataset/images.zip"
  
# opening the zip file in READ mode
with zp.ZipFile(file_name, 'r') as z:
    # printing all the contents of the zip file
    z.printdir()
    # extracting all the files
    print('Extracting all the files now...')
    z.extractall()
    print('Done!')

In [None]:
import os
from os import listdir
image_list = []
# get the path/directory
folder_dir = "images/"
for images in os.listdir(folder_dir):
    # check if the image ends with png
    if (images.endswith(".png")):
        image_list.append(images)

In [None]:
image_data = np.zeros((len(image_list),224,224,3))
for i in range (len(image_list)):
  img = cv2.imread('images/'+image_list[i],)
  image_data[i,...] = img
# cv2_imshow( img)

In [None]:
## Normalization
for i in range (image_data.shape[0]):
  image_data[i] = (image_data[i]-np.min(image_data[i]))/(np.max(image_data[i])-np.min(image_data[i]))  ## normalizing between 0-1

In [None]:
df = pd.read_csv('mhist_dataset/annotations.csv')
ann_list = df.values.tolist()

In [None]:
Train_set = [[],[]]
Test_set = [[],[]]
cnt_train = 0
cnt_test = 0
for i in range (len(image_list)):
## Collecting Train data samples
  if ann_list[i][3] =='train':
    idx = image_list.index(ann_list[i][0])
    Train_set[0].append(image_data[idx])
    if ann_list[i][1] == 'SSA':
      Train_set[1].append(tf.constant(([1,0]),dtype=tf.float32))
    elif ann_list[i][1] == 'HP':
      Train_set[1].append(tf.constant(([0,1]),dtype=tf.float32))
  
## Collecting Test data samples
  elif ann_list[i][3] =='test':
    idx = image_list.index(ann_list[i][0])
    Test_set[0].append(image_data[idx])
    if ann_list[i][1] == 'SSA':
      Test_set[1].append(tf.constant(([1,0]),dtype=tf.float32))
    elif ann_list[i][1] == 'HP':
      Test_set[1].append(tf.constant(([0,1]),dtype=tf.float32))


# Data Augmentation

In [None]:
train_imgs = np.asarray(Train_set[0])
train_labels = np.asarray(Train_set[1])
test_imgs = np.asarray(Test_set[0])
test_labels = np.asarray(Test_set[1])

In [None]:
del Train_set, Test_set, df, image_data, image_list

In [None]:
from tensorflow.keras import layers
data_augmentation = tf.keras.Sequential([
  layers.RandomFlip("horizontal_and_vertical"),
  layers.RandomRotation(0.2),
])

In [None]:
BUFFER_SIZE = 1000
BATCH_SIZE = 32

train_dataset = tf.data.Dataset.from_tensor_slices((train_imgs,train_labels))
test_dataset = tf.data.Dataset.from_tensor_slices((test_imgs,test_labels))

test_dataset = test_dataset.shuffle(BUFFER_SIZE)
test_dataset = test_dataset.batch(BATCH_SIZE)
## if augmentation is required, uncomment the followings ##################
# aug_ds1 = train_dataset.map(
#   lambda x, y: (data_augmentation(x, training=True), y))
# aug_ds2= train_dataset.map(
#   lambda x, y: (data_augmentation(x, training=True), y))
# train_dataset = aug_ds2.concatenate(aug_ds1)
###########################################################################
train_dataset.shuffle(BUFFER_SIZE)
train_dataset = train_dataset.batch(BATCH_SIZE)
test_size = test_imgs.shape[0]

# Model creation

In [None]:
from tensorflow_datasets.core.splits import units
from tensorflow.keras.applications.resnet_v2 import ResNet50V2
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2
from tensorflow import keras
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout,Input

resnet50_imagenet_model = ResNet50V2(include_top=False, weights='imagenet', input_shape=(224, 224, 3))
resnet50_imagenet_model.trainable = False

flattened = tf.keras.layers.Flatten()(resnet50_imagenet_model.output)

fc1_teacher = tf.keras.layers.Dense(256, activation='relu')(flattened)
fc1_teacher = tf.keras.layers.Dense(256, activation='relu')(fc1_teacher)
fc1_teacher.trainable = True
fc2_teacher = tf.keras.layers.Dense(2, activation='softmax')(fc1_teacher)
fc2_teacher.trainable = True
teacher_model = tf.keras.models.Model(inputs=resnet50_imagenet_model.input, outputs=fc2_teacher)



In [None]:
## student model
mobilenet_imagenet_model = tf.keras.applications.MobileNetV2(include_top = False, input_shape=(224,224,3), weights="imagenet")
# mobilenet_imagenet_model.trainable = False
fc1_student = flattened = tf.keras.layers.Flatten()(mobilenet_imagenet_model.output)
fc1_student = tf.keras.layers.Dense(256, activation='relu')(flattened)
fc1_student = tf.keras.layers.Dense(128, activation='relu')(fc1_student)
fc2_student = tf.keras.layers.Dense(2, activation='softmax')(fc1_student)
student_model = tf.keras.models.Model(inputs=mobilenet_imagenet_model.input, outputs=fc2_student)
student_model.save('student_model_untrained.h5')



# Teacher loss function

In [None]:
# @tf.function
def compute_teacher_loss(images, labels):
  """Compute subclass knowledge distillation teacher loss for given images
     and labels.

  Args:
    images: Tensor representing a batch of images.
    labels: Tensor representing a batch of labels.

  Returns:
    Scalar loss Tensor.
  """
  subclass_logits = teacher_model(images, training=True)
  # Compute cross-entropy loss for subclasses.
  # your code start from here for step 3
  cross_entropy_loss_value = tf.reduce_mean(tf.compat.v1.nn.softmax_cross_entropy_with_logits_v2(subclass_logits,labels))
  return cross_entropy_loss_value

# Student loss function

In [None]:
# Hyperparameters for distillation (need to be tuned).
ALPHA =.5  # task balance between cross-entropy and distillation loss
DISTILLATION_TEMPERATURE = 4 #temperature hyperparameter

def distillation_loss(teacher_logits: tf.Tensor, student_logits: tf.Tensor,
                      temperature: Union[float, tf.Tensor]):
  """Compute distillation loss.

  This function computes cross entropy between softened logits and softened
  targets. The resulting loss is scaled by the squared temperature so that
  the gradient magnitude remains approximately constant as the temperature is
  changed. For reference, see Hinton et al., 2014, "Distilling the knowledge in
  a neural network."

  Args:
    teacher_logits: A Tensor of logits provided by the teacher.
    student_logits: A Tensor of logits provided by the student, of the same
      shape as `teacher_logits`.
    temperature: Temperature to use for distillation.

  Returns:
    A scalar Tensor containing the distillation loss.
  """
 # your code start from here for step 3
  d = tf.math.exp(teacher_logits/temperature)
  n = tf.reduce_sum( tf.math.exp(teacher_logits/temperature))
  soft_targets =  d/n

  return tf.reduce_mean(
      tf.nn.softmax_cross_entropy_with_logits(
          soft_targets, student_logits / temperature)) * temperature ** 2

def compute_student_loss_with_distil(images, labels):
  """Compute subclass knowledge distillation student loss for given images
     and labels.

  Args:
    images: Tensor representing a batch of images.
    labels: Tensor representing a batch of labels.

  Returns:
    Scalar loss Tensor.
  """
  student_subclass_logits = student_model(images, training=True)
  # Compute subclass distillation loss between student subclass logits and
  # softened teacher subclass targets probabilities.
  # your code start from here for step 3
  teacher_subclass_logits = teacher_model(images, training=False)
  distillation_loss_value = distillation_loss(teacher_subclass_logits,student_subclass_logits,DISTILLATION_TEMPERATURE)
  # Compute cross-entropy loss with hard targets.
  # your code start from here for step 3
  cross_entropy_loss_value = tf.reduce_mean(tf.compat.v1.nn.softmax_cross_entropy_with_logits_v2(labels,student_subclass_logits))
  student_loss = cross_entropy_loss_value + ALPHA*10*distillation_loss_value ## here 10 is the scaling factor. CE is not in between 0-1, therefore, larger scale is required
  return student_loss

# Train and evaluation

In [None]:
# @tf.function
def compute_num_correct(model, images, labels):
  """Compute number of correctly classified images in a batch.

  Args:
    model: Instance of tf.keras.Model.
    images: Tensor representing a batch of images.
    labels: Tensor representing a batch of labels.

  Returns:
    Number of correctly classified images.
  """
  class_logits = model(images, training=False)
  return tf.reduce_sum(
      tf.cast(tf.math.equal(tf.argmax(class_logits, -1), tf.argmax(labels, -1)),
              tf.float32)), tf.argmax(class_logits, -1), tf.argmax(labels, -1)


def train_and_evaluate(model, compute_loss_fn):
  """Perform training and evaluation for a given model.

  Args:
    model: Instance of tf.keras.Model.
    compute_loss_fn: A function that computes the training loss given the
      images, and labels.
  """

  # your code start from here for step 4
  KD = np.zeros(NUM_EPOCHS)
  CE = np.zeros(NUM_EPOCHS)
  acc =np.zeros(NUM_EPOCHS)
  for epoch in range(1, NUM_EPOCHS + 1):

    if epoch <=10:
      optimizer = tf.keras.optimizers.Adam(learning_rate = 0.0001)
    else:
      optimizer = tf.keras.optimizers.Adam(learning_rate = 0.0001 - 0.0001*0.1)
    # Run training.
    print('Epoch {}: '.format(epoch), end='')
    for images,labels in train_dataset:
      
      with tf.GradientTape() as tape:
         # your code start from here for step 4
        loss_value = compute_loss_fn(images,labels)
      #grads =
      grads = tape.gradient(loss_value, model.trainable_variables) 
      optimizer.apply_gradients(zip(grads, model.trainable_variables))
    #Run evaluation.
    num_correct = 0
    num_total = 977
    for images, labels in test_dataset:
      # your code start from here for step 4
      num_correct += compute_num_correct(model,images,labels)[0]
    print("Class_accuracy: " + '{:.2f}%'.format(
        num_correct / num_total * 100))
    acc[epoch-1] = num_correct / num_total * 100
    print('Loss',loss_value)
    ##########################################################################################
    teacher_subclass_logits = teacher_model(images, training=False)
    class_logits = model(images, training=False)
    CE[epoch-1] = tf.reduce_mean(tf.compat.v1.nn.softmax_cross_entropy_with_logits_v2(class_logits,labels))
    KD[epoch-1] = distillation_loss(teacher_subclass_logits,class_logits,DISTILLATION_TEMPERATURE)
    print("KD loss ",KD[epoch-1])
    ###########################################################################################
  return acc, KD, CE

In [None]:
train_and_evaluate(teacher_model, compute_teacher_loss)

In [None]:
ALPHA = .5
acc_student_distil, KD_student_distil, CE_student_distil = train_and_evaluate(student_model, compute_student_loss_with_distil)

# Test accuracy vs. tempreture curve

In [None]:
# your code start from here for step 6
Temp_list = [1, 2, 4, 8, 16, 32, 64]
ALPHA = .5
cnt = 0
T = np.zeros(len(Temp_list))
for temp in Temp_list:
  student_model.load_weights('student_model_untrained.h5')
  DISTILLATION_TEMPERATURE = temp 
  print('############# Training student model with Tempreture = '+ str(temp)+ ' ###########')
  train_and_evaluate(student_model,compute_student_loss_with_distil)
  #######################
  num_correct = 0
  num_total = test_size
  for images, labels in test_dataset:
    # your code start from here for step 4
    num_correct += compute_num_correct(student_model,images,labels)[0]
  print("Class_accuracy: " + '{:.2f}%'.format(
      num_correct / num_total * 100))
  T[cnt] = num_correct / num_total * 100
  cnt +=1
  #######################

# Train student from scratch

In [None]:
mobilenet_imagenet_model = tf.keras.applications.MobileNetV2(include_top = False, input_shape=(224,224,3), weights="imagenet")
# mobilenet_imagenet_model.trainable = False
fc1_student = flattened = tf.keras.layers.Flatten()(mobilenet_imagenet_model.output)
fc1_student = tf.keras.layers.Dense(256, activation='relu')(flattened)
fc1_student = tf.keras.layers.Dense(128, activation='relu')(fc1_student)
fc2_student = tf.keras.layers.Dense(2, activation='softmax')(fc1_student)
student_model_scrach = tf.keras.models.Model(inputs=mobilenet_imagenet_model.input, outputs=fc2_student)

def compute_student_loss(images, labels):
  """Computes the cross entropy loss for the student model without distillation.

  Args:
    images: Tensor representing a batch of images.
    labels: Tensor representing a batch of labels.

  Returns:
    Scalar loss Tensor.
  """
  student_subclass_logits = student_model(images, training=True)
  # Compute cross-entropy loss with hard targets.
  cross_entropy_loss_value = tf.reduce_mean(tf.compat.v1.nn.softmax_cross_entropy_with_logits_v2(labels,student_subclass_logits))
  student_loss = cross_entropy_loss_value 
  return student_loss
  
train_and_evaluate(student_model_scrach, compute_student_loss)
## in case we want to save the model


# Comparing the teacher and student model (number of of parameters and FLOPs) 

In [None]:

# your code start from here for step 8
!pip install model_profiler
from model_profiler import model_profiler
# keep order 
units = ['GPU IDs', 'MFLOPs', 'GB', 'Million', 'MB']

Batch_size = 32
profile1 = model_profiler(teacher_model, Batch_size,use_units=units,)
profile2= model_profiler(student_model, Batch_size,use_units=units,)

print("The teacher ")
print(profile1)
print("The student ")
print(profile2)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting model_profiler
  Downloading model_profiler-1.1.8-py3-none-any.whl (6.4 kB)
Installing collected packages: model-profiler
Successfully installed model-profiler-1.1.8
| Model Profile                    | Value   | Unit    |
|----------------------------------|---------|---------|
| Selected GPUs                    | ['0']   | GPU IDs |
| No. of FLOPs                     | 69.8616 | MFLOPs  |
| GPU Memory Requirement           | 3.7865  | GB      |
| Model Parameters                 | 36.4268 | Million |
| Memory Required by Model Weights | 138.957 | MB      |
| Model Profile                    | Value   | Unit    |
|----------------------------------|---------|---------|
| Selected GPUs                    | ['0']   | GPU IDs |
| No. of FLOPs                     | 5.1966  | MFLOPs  |
| GPU Memory Requirement           | 2.6175  | GB      |
| Model Parameters                 | 10.

  flops = count_flops(use_units[1], model, Batch_size)
  np.asarray(values).reshape(-1,1),


# Implementing the state-of-the-art KD algorithm

In [None]:
# your code start from here for step 5 
print('########### Training teacher model with early stop##############')
NUM_EPOCHS = 6 ## early stopping the teacher's training
teacher_model.load_weights('teacher_model_untrained.h5')
acc_teacher_2, _, CE_teacher_2 = train_and_evaluate(teacher_model,compute_teacher_loss)


def train_and_evaluate_SOTA_Alg(model, compute_loss_fn):
  """Perform training and evaluation for a given model.

  Args:
    model: Instance of tf.keras.Model.
    compute_loss_fn: A function that computes the training loss given the
      images, and labels.
  """

  # your code start from here for step 4
  optimizer = tf.keras.optimizers.Adam(learning_rate = 0.001)
  acc = np.zeros((NUM_EPOCHS))
  for epoch in range(1, NUM_EPOCHS + 1):

    ## Check if the epoch No. is reached to the half of the total iteration No.
    # if epoch >=5:
    #   ALPHA = 0 ## Hard-tuning; releasing the student to learn by its own after a while
    # Run training.
    print('Epoch {}: '.format(epoch), end='')
    for images,labels in mnist_train:
      
      with tf.GradientTape() as tape:
         # your code start from here for step 4

        loss_value = compute_loss_fn(images,labels)        
        #grads =
        grads = tape.gradient(loss_value, model.trainable_variables) 
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

    #Run evaluation.
    num_correct = 0
    num_total = builder.info.splits['test'].num_examples
    for images, labels in mnist_test:
      # your code start from here for step 4
      num_correct += compute_num_correct(model,images,labels)[0]
    print("Class_accuracy: " + '{:.2f}%'.format(
        num_correct / num_total * 100))
    acc[epoch-1] = num_correct / num_total * 100
  return acc

In [None]:
student_model.load_weights('student_model_untrained.h5')
train_and_evaluate_SOTA_Alg(student_model,compute_student_loss)