In [None]:
# Install cell, Install if needed

#pip install sklearn
#pip install Keras-Preprocessing

In [25]:
#import block

import tensorflow.compat.v2 as tf
import tensorflow_datasets as tfds
from typing import Union
import matplotlib.pyplot as plt
from keras_flops import get_flops
import pandas as pd
#ResNet Resource
#https://medium.com/@kenneth.ca95/a-guide-to-transfer-learning-with-keras-using-resnet50-a81a4a28084b
import tensorflow.keras as K

from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import f1_score
from sklearn.metrics import roc_auc_score

from keras_preprocessing.image import ImageDataGenerator
import numpy as np
import os
from PIL import Image
from skimage import io

from keras.models import Model

tf.enable_v2_behavior()

# Load Data

In [2]:
#load data use panda

#read annotation file
annotations = pd.read_csv("./annotations.csv", skiprows = 1,
                         names = ["Image Name","Majority Vote Label","Number of Annotations who Selected SSA(Out of 7)","Partition"])
annotations.head()

#Create dataframe
df = pd.DataFrame(annotations,
                 columns = ["Image Name","Majority Vote Label","Number of Annotations who Selected SSA(Out of 7)", "Partition"])
df.set_index("Image Name",inplace = True)

df.head()

Unnamed: 0_level_0,Majority Vote Label,Number of Annotations who Selected SSA(Out of 7),Partition
Image Name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
MHIST_aaa.png,SSA,6,train
MHIST_aab.png,HP,0,train
MHIST_aac.png,SSA,5,train
MHIST_aae.png,HP,1,train
MHIST_aaf.png,SSA,5,train


# Data Augmentation

In [26]:
#resource: https://www.youtube.com/watch?v=ccdssX4rIh8
#may be need to change later
#right now all the images are saved to local train directory, we may be
#able to directly flow the all the images and store them in the memory

#the image prefix is the label of the image

SIZE = 224
image_directory = "images/"

datagen = ImageDataGenerator(
    rotation_range = 180,
    width_shift_range = 0.2,
    height_shift_range = 0.2,
    shear_range = 0.2,
    zoom_range = 0.5,
    horizontal_flip = True,
    fill_mode = 'nearest'
)

all_images = os.listdir(image_directory)

#read test images and process them
test_images = []
test_labels = []
for i, image_name in enumerate(all_images):
    if df.loc[image_name][2] == 'test':
        #load test image and label
        image = io.imread(image_directory+image_name)
        image = Image.fromarray(image, 'RGB')
        image = image.resize((SIZE,SIZE))
        image_array = np.array(image)
        test_images.append(image_array)
        test_labels.append(df.loc[image_name][0])
        image_array = image_array.reshape((1,)+image_array.shape)
        #data augmentation
        #image_array = expand_dims(image_array,0)
        iterator = datagen.flow(image_array,batch_size = 1)
        for i in range(1,2):
            batch = iterator.next()
            image = batch[0].astype('uint8')
            test_images.append(image)
            test_labels.append(df.loc[image_name][0])
            
test_images = np.array(test_images)
test_labels = np.array(test_labels)
print("Test Image Shape: {}".format(test_images.shape))
print("Test Image label: {}".format(test_labels.shape))

#read train images and process them
train_images = []
train_labels = []
for i, image_name in enumerate(all_images):
    if df.loc[image_name][2] == 'train':
        image = io.imread(image_directory+image_name)
        image = Image.fromarray(image,'RGB')
        image = image.resize((SIZE,SIZE))
        image_array = np.array(image)
        train_images.append(image_array)
        train_labels.append(df.loc[image_name][0])
        image_array = image_array.reshape((1,)+image_array.shape)
        iterator = datagen.flow(image_array,batch_size = 1)
        for i in range(1,3):
            batch = iterator.next()
            image = batch[0].astype('uint8')
            train_images.append(image)
            train_labels.append(df.loc[image_name][0])
train_images = np.array(train_images)
train_labels = np.array(train_labels)
print("Train Image Shape: {}".format(train_images.shape))
print("Train Label Shape: {}".format(train_labels.shape))

Test Image Shape: (1954, 224, 224, 3)
Test Image label: (1954,)
Train Image Shape: (6525, 224, 224, 3)
Train Label Shape: (6525,)


In [28]:
#normalize the test images and train images
train_images_normalized = train_images.astype('float32')/255.0
test_images_normalized = test_images.astype('float32')/255.0

#encode the labels
encoder = LabelEncoder()
encoder.fit(train_labels)
train_labels_encoded = encoder.transform(train_labels)
test_label_encoded = encoder.transform(test_labels)

# Create Model

In [20]:
#teacher model

def teacher_model():
    res_model = K.applications.resnet_v2.ResNet50V2(include_top = False, weights="imagenet",input_shape=(224,224,3))
    for layers in res_model.layers:
        layer.trainable = False
    x = K.layers.Flatten()(res_model.output)
    outputs = K.layers.Dense(10,activation="softmax")(x) 
    teacher = Model(input = resnet_model.input, outputs = outputs)
    return teacher

#student model
def student_model():
    mob_model = K.applications.mobilenet_v2.MobileNetV2(include_top = False, weights="imagenet", input_shape=(224,224,3))
    for layers in mob_model.layers:
        layer.trainable = False
    x = K.layers.Flatten()(mob_model.output)
    outputs = K.layers.Dense(10, activation = "softmax")(x)
    student = Model(input = mob_model.input, outputs = outputs)
    return student

In [23]:
class Distiller(Model):
    def __init__(self, student, teacher):
        super(Distiller, self).__init__()
        self.teacher = teacher
        self.student = student

    def compile(
        self,
        optimizer,
        metrics,
        student_loss_fn,
        distillation_loss_fn,
        alpha=0.5,
        temperature=4,
    ):
        super(Distiller, self).compile(optimizer=optimizer, metrics=metrics)
        self.student_loss_fn = student_loss_fn
        self.distillation_loss_fn = distillation_loss_fn
        self.alpha = alpha
        self.temperature = temperature

    def train_step(self, data):
        # Unpack data
        x, y = data

        # Forward pass of teacher
        teacher_predictions = self.teacher(x, training=False)
        with tf.GradientTape() as tape:
            # Forward pass of student
            student_predictions = self.student(x, training=True)

            # Compute losses
            student_loss = self.student_loss_fn(y, student_predictions)
            distillation_loss = self.distillation_loss_fn(
                tf.nn.softmax(teacher_predictions / self.temperature, axis=1),
                tf.nn.softmax(student_predictions / self.temperature, axis=1),
            )
            loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss

        # Compute gradients
        trainable_vars = self.student.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update the metrics configured in `compile()`.
        self.compiled_metrics.update_state(y, student_predictions)
        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update(
            {"student_loss": student_loss, "distillation_loss": distillation_loss}
        )
        return results

    def test_step(self, data):
        # Unpack the data
        x, y = data
        # Compute predictions
        y_prediction = self.student(x, training=False)
        # Calculate the loss
        student_loss = self.student_loss_fn(y, y_prediction)
        # Update the metrics.
        self.compiled_metrics.update_state(y, y_prediction)
        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update({"student_loss": student_loss})
        # return results
        return y_prediction