In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

In [None]:
import zipfile
import shutil

shutil.copy("/content/drive/MyDrive/DL/Dataset_train.zip","/content/Dataset_train.zip")
shutil.copy("/content/drive/MyDrive/DL/Dataset_test.zip","/content/Dataset_test.zip")




In [None]:
zip_path = "/content/Dataset_train.zip"
with zipfile.ZipFile(zip_path,"r") as zip_ref:
  zip_ref.extractall("./Dataset/")

import zipfile
zip_path = "/content/Dataset_test.zip"
with zipfile.ZipFile(zip_path,"r") as zip_ref:
  zip_ref.extractall("./Dataset_test/")

In [None]:
image_size = (1024, 1024)
batch_size = 64
IMG_SIZE =256


resize_and_rescale = tf.keras.Sequential([
  layers.Resizing(IMG_SIZE, IMG_SIZE),
  layers.Rescaling(1./255)
])



normalization_layer = tf.keras.layers.Rescaling(1./255)
resizing_layer = tf.keras.layers.Resizing(IMG_SIZE, IMG_SIZE)
flip_layer = tf.keras.layers.RandomRotation(0.1)
AUTOTUNE = tf.data.AUTOTUNE

train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    "/content/Dataset/",
    validation_split=0.2,
    subset="training",
    seed=1337,
    image_size=image_size,
    batch_size=batch_size,
)
train_ds = train_ds.map(lambda x, y: (normalization_layer(x), y))
train_ds = train_ds.map(lambda x, y: (resizing_layer(x), y))
train_ds = train_ds.map(lambda x, y: (flip_layer(x), y))


val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    "/content/Dataset/",
    validation_split=0.2,
    subset="validation",
    seed=1337,
    image_size=image_size,
    batch_size=batch_size,
)

val_ds = val_ds.map(lambda x, y: (normalization_layer(x), y))
val_ds = val_ds.map(lambda x, y: (resizing_layer(x), y))


test_ds = tf.keras.preprocessing.image_dataset_from_directory(
    "/content/Dataset_test/",
    image_size=image_size,
    batch_size=batch_size,
)


test_ds = test_ds.map(lambda x, y: (normalization_layer(x), y))
test_ds = test_ds.map(lambda x, y: (resizing_layer(x), y))


In [None]:
# import keras.backend as K
# def get_f1(y_true, y_pred): #taken from old keras source code
#     true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
#     possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
#     predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
#     precision = true_positives / (predicted_positives + K.epsilon())
#     recall = true_positives / (possible_positives + K.epsilon())
#     f1_val = 2*(precision*recall)/(precision+recall+K.epsilon())
#     return f1_val


teacher_model = keras.models.load_model(
    "/content/drive/MyDrive/DL/xception_final/save_at_20.h5"
)
import numpy as np


In [None]:
class Distiller(keras.Model):
    def __init__(self, student, teacher):
        super(Distiller, self).__init__()
        self.teacher = teacher
        self.student = student


    def compile(self,optimizer,metrics,student_loss_fn,distillation_loss_fn,alpha=0.1,temperature=3,):

        super(Distiller, self).compile(optimizer=optimizer, metrics=metrics)
        self.student_loss_fn = student_loss_fn
        self.distillation_loss_fn = distillation_loss_fn
        self.alpha = alpha
        self.temperature = temperature

    def train_step(self, data):
        x, y = data

        teacher_predictions = self.teacher(x, training=False)

        with tf.GradientTape() as tape:
            student_predictions = self.student(x, training=True)
            student_loss = self.student_loss_fn(y, student_predictions)
            distillation_loss = self.distillation_loss_fn(
                tf.nn.softmax(teacher_predictions / self.temperature, axis=1),
                tf.nn.softmax(student_predictions / self.temperature, axis=1),
            )
            loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss

        trainable_vars = self.student.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        self.compiled_metrics.update_state(y, student_predictions)

        results = {m.name: m.result() for m in self.metrics}
        results.update(
            {"student_loss": student_loss, "distillation_loss": distillation_loss}
        )
        return results

    def test_step(self, data):
        x, y = data
        y_prediction = self.student(x, training=False)
        student_loss = self.student_loss_fn(y, y_prediction)
        self.compiled_metrics.update_state(y, y_prediction)
        results = {m.name: m.result() for m in self.metrics}
        results.update({"student_loss": student_loss})
        return results

In [None]:






def make_model(input_shape, num_classes):
    inputs = keras.Input(shape=input_shape)
    x = layers.Conv2D(32, 3, strides=2, padding="same")(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    # x = layers.Conv2D(64, 3, padding="same")(x)
    # x = layers.BatchNormalization()(x)
    # x = layers.Activation("relu")(x)

    # x = layers.Conv2D(128, 3, padding="same")(x)
    # x = layers.BatchNormalization()(x)
    # x = layers.Activation("relu")(x)

    # x = layers.Conv2D(256, 3, padding="same")(x)
    # x = layers.BatchNormalization()(x)
    # x = layers.Activation("relu")(x)


    previous_block_activation = x  # Set aside residual
    #for size in [128]:
    for size in [128,256]:
        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(size, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(size, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.MaxPooling2D(3, strides=2, padding="same")(x)

       
        residual = layers.Conv2D(size, 1, strides=2, padding="same")(
            previous_block_activation
        )
        x = layers.add([x, residual])  
        previous_block_activation = x  

    x = layers.SeparableConv2D(1024, 3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    x = layers.GlobalAveragePooling2D()(x)

    activation = "softmax"
    units = num_classes

    x = layers.Dropout(0.3)(x)
    outputs = layers.Dense(units, activation=activation)(x)
    return keras.Model(inputs, outputs)


student_model = make_model(input_shape=(256,256) + (3,), num_classes=4)

In [None]:
distiller = Distiller(student=student_model, teacher=teacher_model)
distiller.compile(
    optimizer=keras.optimizers.Adam(),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
    student_loss_fn=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    distillation_loss_fn=keras.losses.KLDivergence(),
    alpha=0.1,
    temperature=5,
)

distiller.fit(train_ds, epochs=7)

In [None]:
import numpy as np
predictions = np.array([])
labels =  np.array([])
for x, y in test_ds:
  temp = student_model.predict(x)
  predictions = np.concatenate([predictions, student_model.predict(x).argmax(axis =1)+1])
  labels = np.concatenate([labels, y.numpy()])

In [None]:
from sklearn.metrics import classification_report
print(classification_report(labels,predictions))

In [None]:
keras_file = "/content/drive/MyDrive/DL/KD/128_256__5_7_cnn_student.h5"
student_model.save(keras_file)




In [None]:
! pip install -q tensorflow-model-optimization

In [None]:
student_model.save("/content/drive/MyDrive/DL/KD/128_256__5_7_cnn_student.h5")
pruned_keras_file = "/content/drive/MyDrive/DL/KD/128_256__5_7_cnn_student_strip.h5"
pruned_tflite_file = "/content/drive/MyDrive/DL/KD/128_256__5_7_cnn_student_strip_tflite.h5"
import tempfile
import tensorflow_model_optimization as tfmot

prune_low_magnitude = tfmot.sparsity.keras.prune_low_magnitude

batch_size = 32
epochs = 2

end_step = 300*epochs
pruning_params = {
      'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(initial_sparsity=0.50,
                                                               final_sparsity=0.80,
                                                               begin_step=0,
                                                               end_step=end_step)
}

model_for_pruning = prune_low_magnitude(student_model, **pruning_params)

model_for_pruning.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])


logdir = tempfile.mkdtemp()

callbacks = [
  tfmot.sparsity.keras.UpdatePruningStep(),
  tfmot.sparsity.keras.PruningSummaries(log_dir=logdir),
]

model_for_pruning.fit(train_ds, epochs=epochs, callbacks=callbacks, validation_data=val_ds,
)

predictions = np.array([])
labels =  np.array([])

for x, y in test_ds:
  predictions = np.concatenate([predictions, model_for_pruning.predict(x).argmax(axis =1)+1])
  labels = np.concatenate([labels, y.numpy()])


from sklearn.metrics import classification_report
print(classification_report(labels,predictions))


model_for_export = tfmot.sparsity.keras.strip_pruning(model_for_pruning)

tf.keras.models.save_model(model_for_export, pruned_keras_file, include_optimizer=False)
print('Saved pruned Keras model to:', pruned_keras_file)


converter = tf.lite.TFLiteConverter.from_keras_model(model_for_export)
pruned_tflite_model = converter.convert()


with open(pruned_tflite_file, 'wb') as f:
  f.write(pruned_tflite_model)

import os
import zipfile
def get_gzipped_model_size(file):


  _, zipped_file = tempfile.mkstemp('.zip')
  with zipfile.ZipFile(zipped_file, 'w', compression=zipfile.ZIP_DEFLATED) as f:
    f.write(file)

  return os.path.getsize(zipped_file)
print("Size of gzipped baseline Keras model: %.2f bytes" % (get_gzipped_model_size(keras_file)))
print("Size of gzipped pruned Keras model: %.2f bytes" % (get_gzipped_model_size(pruned_keras_file)))
print("Size of gzipped pruned TFlite model: %.2f bytes" % (get_gzipped_model_size(pruned_tflite_file)))

In [None]:
del student_model, teacher_model, model_for_pruning, model_for_export

# References
https://keras.io/examples/vision/image_classification_from_scratch/
<br>
https://keras.io/examples/vision/knowledge_distillation/
