# Connect to google drive

In [None]:
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive/My Drive/[2023-2024] AN2DL/Homework 1

# Import libraries and set parameters

In [None]:
# Fix randomness and hide warnings
seed = 65

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['PYTHONHASHSEED'] = str(seed)
os.environ['MPLCONFIGDIR'] = os.getcwd()+'/configs/'

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)

import numpy as np
np.random.seed(seed)

import logging

import random
random.seed(seed)

In [None]:
# Import tensorflow
import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl
tf.autograph.set_verbosity(0)
tf.get_logger().setLevel(logging.ERROR)
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)
print(tf.__version__)

In [None]:
# Import other libraries
#library for computer vision
import cv2
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
import seaborn as sns

# Read data and preprocessing

In [None]:
# Conditional check for unzipping
unzip = False

# Unzip the file if the 'unzip' flag is True
if unzip:
    !unzip public_data.zip


In [None]:
data=np.load('public_data.npz', allow_pickle=True)
# get the non normalized data
images_not_normalized = data['data']
labels_strings= data['labels']
# map the labels from string to integers
label_map = {"healthy": 0, "unhealthy": 1}
labels = np.vectorize(label_map.get)(labels_strings)
print(images_not_normalized.shape)
print(labels.shape)

In [None]:
#Normalize images
images=[]
for img in images_not_normalized:
  img=(img/255).astype(np.float32)
  images.append(img)

images= np.array(images)
print(img)

In [None]:
#DATASET CLEANING!
#Remove from the dataset outliers (manually found previously)
indices= np.array([ 58, 95, 137, 138, 171, 207, 338,  412, 434, 486, 506, 529, 571, 599, 622, 658, 692, 701, 723, 725, 753, 779, 783, 827, 840, 880, 898, 901, 961, 971, 974, 989,
 1028, 1044, 1064, 1065, 1101, 1149, 1172, 1190, 1191, 1265, 1268, 1280, 1333, 1384, 1443, 1466, 1483, 1528, 1541, 1554, 1594, 1609, 1630, 1651, 1690, 1697, 1752, 1757, 1759,
 1806, 1828, 1866, 1903, 1938, 1939, 1977, 1981, 1988, 2022, 2081, 2090, 2150, 2191, 2192, 2198, 2261, 2311, 2328, 2348, 2380, 2426, 2435, 2451, 2453, 2487, 2496, 2515, 2564, 2581,
 2593, 2596, 2663, 2665, 2676, 2727, 2734, 2736, 2755, 2779, 2796, 2800, 2830, 2831, 2839, 2864, 2866, 2889, 2913, 2929, 2937, 3033, 3049, 3055, 3086, 3105, 3108, 3144, 3155, 3286,
 3376, 3410, 3436, 3451, 3488, 3490, 3572, 3583, 3666, 3688, 3700, 3740, 3770, 3800, 3801, 3802, 3806, 3811, 3821, 3835, 3862, 3885, 3896, 3899, 3904, 3927, 3931, 3946, 3950, 3964,
 3988, 3989, 4049, 4055, 4097, 4100, 4118, 4144, 4150, 4282, 4310, 4314, 4316, 4368, 4411, 4475, 4476, 4503, 4507, 4557, 4605, 4618, 4694, 4719, 4735, 4740, 4766, 4779, 4837, 4848,
 4857, 4860, 4883, 4897, 4903, 4907, 4927, 5048, 5080, 5082, 5121, 5143, 5165, 5171])
print(indices.shape)
# Define a boolean mask
mask = np.ones(len(images), dtype=bool)
mask[indices]=False
images=images[mask]
print(images.shape)

In [None]:
# Bring labels to the correct dimension
labels=np.expand_dims(labels,axis=1)
labels=labels[mask]
print(labels.shape)

In [None]:
#one hot encoding
labels = tfk.utils.to_categorical(labels,len(np.unique(labels)))

In [None]:
#split data into training and validation
X_train, X_val, y_train, y_val = train_test_split(images, labels, random_state=seed, test_size=.25, stratify=np.argmax(labels,axis=1))

In [None]:
# Print shapes of the datasets
print(f"X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
print(f"X_val shape: {X_val.shape}, y_val shape: {y_val.shape}")

In [None]:
# Define input shape, output shape, batch size, and number of epochs
input_shape = X_train.shape[1:]
output_shape = y_train.shape[1:]
batch_size = 32
epochs = 100

# Print input shape, batch size, and number of epochs
print(f"Input Shape: {input_shape}, Output Shape: {output_shape}, Batch Size: {batch_size}, Epochs: {epochs}")

# <h1>SELF SUPERVISED LEARNING</h1>

# Firtst attempt - Random rotations

# Create a new dataset, assigning new labels based on the rotation applied

In [None]:
X_train_0 = X_train.copy() # The starting dataset, not rotated
X_train_90 = np.rot90(X_train, axes = (1,2)) #Apply a rotation of 90 degrees
X_train_180 = np.rot90(X_train, 2, axes = (1,2)) #Apply a rotation of 180 degrees
X_train_270 = np.rot90(X_train, 3, axes = (1,2)) # Apply a rotation of 270 degrees

# Assigning pseudo-labels to rotated image datasets
y_train_0=np.full((3753), 0)
y_train_90=np.full((3753), 1)
y_train_180=np.full((3753), 2)
y_train_270=np.full((3753), 3)

In [None]:
# Concatenate all the new images and labeks
X_train_rot = np.concatenate((X_train_0, X_train_90, X_train_180, X_train_270), axis=0)
y_train_rot = np.concatenate((y_train_0, y_train_90, y_train_180, y_train_270), axis=0)

In [None]:
del X_train_0
del X_train_90
del X_train_180
del X_train_270
del y_train_0
del y_train_90
del y_train_180
del y_train_270

In [None]:
# The function will distribute the samples uniformly over dataset
def unison_shuffled_copies(a, b):
    assert len(a) == len(b)
    p = np.random.permutation(len(a))
    return a[p], b[p]

In [None]:
# Randomly shuffling the concatenated dataset
X_train_rot_shuffled, y_train_rot_shuffled = unison_shuffled_copies(X_train_rot, y_train_rot)

In [None]:
del X_train_rot
del y_train_rot

# Load EffNetB0

In [None]:
#Load efficient net b0
effnet_b0 = tf.keras.applications.efficientnet.EfficientNetB0(
    include_top= False, #discard the dense part
    weights= "imagenet",
    input_shape=(96, 96, 3) ,
    pooling= 'max',
    )
tfk.utils.plot_model(effnet_b0, show_shapes=True)

In [None]:
tf.random.set_seed(seed)

inputs = tfk.Input(shape=(96, 96, 3))

#Create a new dense part, custom for the pretext task
x = effnet_b0(inputs)
x = tfkl.Dense(units=128, kernel_initializer=tfk.initializers.HeUniform(seed=seed))(x)
x = tfkl.Activation('relu', name='HiddenActivation1')(x)
x = tfkl.Dropout(rate=0.45, seed=seed)(x)
x = tfkl.Dense(units=64, kernel_initializer=tfk.initializers.HeUniform(seed=seed))(x)
x = tfkl.Activation('relu', name='HiddenActivation2')(x)
x = tfkl.Dropout(rate=0.45, seed=seed)(x)
#4 output neurons, as the types of rotations applies
outputs = tfkl.Dense(units=4,activation = 'softmax')(x)

effnet_b0_model = tfk.Model(inputs = inputs, outputs=outputs, name='effnet_b0_tl')

effnet_b0_model.compile(loss=tfk.losses.SparseCategoricalCrossentropy(),\
                  optimizer=tfk.optimizers.AdamW(1e-4,weight_decay=5e-4),\
                  metrics=['accuracy'])
effnet_b0_model.summary()

In [None]:
# Creating Validation and Test Dataset for Pretext Task
X_rot_val, X_rot_train = X_train_rot_shuffled[:1000], X_train_rot_shuffled[1000:]
y_rot_val, y_rot_train = y_train_rot_shuffled[:1000], y_train_rot_shuffled[1000:]

In [None]:
del X_train_rot_shuffled
del y_train_rot_shuffled

# Training, composed of 3 steps:
#- Train on the pretext task, with all layers unfrozen
#- Train on the downstream task, with only the classifier unfrozen
#- Finetune on the downstream task

In [None]:
# Train the model
eff_history = effnet_b0_model.fit(
    x = X_rot_train*255, #Effnet expexts input in 0-255 range
    y = y_rot_train,
    batch_size = 16,
    epochs = 600,
    validation_data = (X_rot_val*255, y_rot_val),
    callbacks = [tfk.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', patience=30, restore_best_weights=True),
                 tfk.callbacks.ReduceLROnPlateau(monitor="val_accuracy", factor=0.1, patience=20, min_lr=1e-5, mode='max')]
).history

In [None]:
effnet_b0_model.save("EffnetB0_self_supervised_rotations_step1")

In [None]:
del effnet_b0_model

In [None]:
del X_rot_train
del y_rot_train
del X_rot_val
del y_rot_val

In [None]:
effnet_b0_ft = tfk.models.load_model("EffnetB0_self_supervised_rotations_step1")

In [None]:
effnet_b0_ft.summary()

In [None]:
effnet_b0_ft = tf.keras.models.Sequential(effnet_b0_ft.layers[:-7])
effnet_b0_ft._name = "efficientnetb0"
effnet_b0_ft.summary()

In [None]:
effnet_b0_ft.trainable = False # Freeze all layers of effnetb0
tf.random.set_seed(seed)

preprocessing = tf.keras.Sequential([ #a layer that applies the augmentaiton
    tfkl.RandomFlip("vertical"),
    tfkl.RandomFlip("horizontal"),
    tfkl.RandomRotation(0.5),
], name='preprocessing')


inputs = tfk.Input(shape=(96, 96, 3))
preprocessing = preprocessing(inputs)
x = effnet_b0_ft(preprocessing)

#Create a new dense part, custom for the downstream task, once the network has learned the features with the pretext task
x = tfkl.Dense(units=128, kernel_initializer=tfk.initializers.HeUniform(seed=seed), name='HiddenDense1')(x)
x = tfkl.Activation('relu', name='HiddenActivation1')(x)
x = tfkl.BatchNormalization()(x)
x = tfkl.Dropout(rate=0.3, seed=seed)(x)
x = tfkl.Dense(units=64, kernel_initializer=tfk.initializers.HeUniform(seed=seed), name='HiddenDense2')(x)
x = tfkl.Activation('relu', name='HiddenActivation2')(x)
x = tfkl.BatchNormalization()(x)
x = tfkl.Dropout(rate=0.3, seed=seed)(x)

# Add a Dense layer with 2 units and softmax activation as the classifier
outputs = tfkl.Dense(2, activation='softmax')(x)

eff_model = tfk.Model(inputs=inputs, outputs=outputs, name='model')

eff_model.compile(loss=tfk.losses.CategoricalCrossentropy(),\
                  optimizer=tfk.optimizers.AdamW(1e-4,weight_decay=5e-4),\
                  metrics=['accuracy'])
eff_model.summary()

In [None]:
# Train the model
ft_history = eff_model.fit(
    x = X_train*255,
    y = y_train,
    batch_size = 32,
    epochs = 600,
    validation_data = (X_val*255, y_val),
    callbacks = [tfk.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', patience=25, restore_best_weights=True),
                 tfk.callbacks.ReduceLROnPlateau(monitor="val_accuracy", factor=0.1, patience=20, min_lr=1e-5, mode='max')]
).history

In [None]:
eff_model.save("EffnetB0_self_supervised_rotations_step2")

In [None]:
eff_model.get_layer('efficientnetb0').trainable = True
for i, layer in enumerate(eff_model.get_layer('efficientnetb0').layers[0].layers):
   print(i, layer.name, layer.trainable)

In [None]:
# Freeze first N layers, e.g., until the 218
N = 218
for i, layer in enumerate(eff_model.get_layer('efficientnetb0').layers[0].layers[:N]):
  layer.trainable=False
for i, layer in enumerate(eff_model.get_layer('efficientnetb0').layers[0].layers):
   print(i, layer.name, layer.trainable)
eff_model.summary()

In [None]:
eff_model.compile(loss=tfk.losses.CategoricalCrossentropy(),\
                  optimizer=tfk.optimizers.AdamW(1e-5,weight_decay=5e-5),\
                  metrics=['accuracy'])

In [None]:
# Train the model
ft_history = eff_model.fit(
    x = X_train*255,
    y = y_train,
    batch_size = 32,
    epochs = 600,
    validation_data = (X_val*255, y_val),
    callbacks = [tfk.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', patience=25, restore_best_weights=True),
                 tfk.callbacks.ReduceLROnPlateau(monitor="val_accuracy", factor=0.1, patience=20, min_lr=1e-6, mode='max')]
).history

In [None]:
eff_model.save("effnetB0_Self_Supervised_rotations_FineTuned")

# Second attempt - Jigsaw puzzles

# Create a new dataset, assigning new labels based on the permutation applied to the puzzle pieces

In [None]:
# A function that returns the patches of the image
def generate_patches(image):
    height, width = image.shape[:2]
    h_mid, w_mid = height // 2, width // 2

    # Cut the image into 4 patches
    patch1 = image[:h_mid, :w_mid]
    patch2 = image[:h_mid, w_mid:]
    patch3 = image[h_mid:, :w_mid]
    patch4 = image[h_mid:, w_mid:]

    return [patch1, patch2, patch3, patch4]
#A function that fiven the patches, returns all possible permutations, so all the new images
def generate_all_combinations(patches):

    combinations = []
    for i in range(4):
        for j in range(4):
          for k in range(4):
            for z in range(4):
              if len(set([i, j, k, z])) == 4:
                new_image1=np.vstack((patches[i], patches[j]))
                new_image2=np.vstack((patches[k], patches[z]))
                new_image = np.hstack((new_image1, new_image2))
                combinations.append(new_image)
    return combinations

#Wrapper function that manages the other 2 functions and generates the labels
def generate_new_images(images):
    new_images = []
    labels = []

    for image in images:
        patches = generate_patches(image)
        combinations = generate_all_combinations(patches)
        for index,combination in enumerate(combinations):
          new_images.append(combination)
          labels.append(index)  # Generate labels

    return np.array(new_images), np.array(labels)

X_train_puzzle, y_train_puzzle = generate_new_images(X_train)

In [None]:
print(X_train_puzzle.shape)
print(y_train_puzzle.shape)

In [None]:
y_train_puzzle=np.expand_dims(y_train_puzzle,axis=1)
print(y_train_puzzle.shape)

In [None]:
imgs_to_show=100
startToShowFrom=0

fig, axes = plt.subplots(10, 10, figsize=(30, 20))

# Reshape the axes to a 1D array for easier indexing
axes = axes.ravel()

# Display the first 100 images with an index
for i in range(imgs_to_show):
    axes[i].imshow(X_train_puzzle[i+startToShowFrom])
    axes[i].set_title(f'class: {y_train_puzzle[i]}')
    axes[i].axis('off')

# Ensure tight layout
plt.tight_layout()

# Show the grid of images with labels
plt.show()


In [None]:
# The function will distribute the samples uniformly over dataset
def unison_shuffled_copies(a, b):
    assert len(a) == len(b)
    p = np.random.permutation(len(a))
    return a[p], b[p]

In [None]:
# Randomly shuffling the concatenated dataset
X_train_puzzle_shuffled, y_train_puzzle_shuffled = unison_shuffled_copies(X_train_puzzle, y_train_puzzle)

In [None]:
del X_train_puzzle
del y_train_puzzle

# Load EffNetB0

In [None]:
effnet_b0 = tf.keras.applications.efficientnet.EfficientNetB0(
    include_top= False, #Discard the dense part
    weights= "imagenet",
    input_shape=(96, 96, 3) ,
    pooling= 'max',
    )
tfk.utils.plot_model(effnet_b0, show_shapes=True)

In [None]:
tf.random.set_seed(seed)

inputs = tfk.Input(shape=(96, 96, 3))

x = effnet_b0(inputs)

#Create a new classifier part, custom for out pretext task
x = tfkl.Dense(units=128, kernel_initializer=tfk.initializers.HeUniform(seed=seed))(x)
x = tfkl.Activation('relu', name='HiddenActivation1')(x)
x = tfkl.BatchNormalization()(x)
x = tfkl.Dense(units=64, kernel_initializer=tfk.initializers.HeUniform(seed=seed))(x)
x = tfkl.Activation('relu', name='HiddenActivation2')(x)
x = tfkl.BatchNormalization()(x)
#Output has 24 neurons, 1 for every possible permutation of the patches
outputs = tfkl.Dense(units=24,activation = 'softmax')(x)

effnet_b0_model = tfk.Model(inputs = inputs, outputs=outputs, name='effnet_b0')

effnet_b0_model.compile(loss=tfk.losses.SparseCategoricalCrossentropy(),\
                  optimizer=tfk.optimizers.AdamW(1e-4,weight_decay=5e-4),\
                  metrics=['accuracy'])
effnet_b0_model.summary()

In [None]:
#create splits
X_train_puzzle, X_val_puzzle, y_train_puzzle, y_val_puzzle = train_test_split(X_train_puzzle_shuffled, y_train_puzzle_shuffled, random_state=seed, test_size=.25, stratify=np.argmax(y_train_puzzle_shuffled,axis=1))


In [None]:
del X_train_puzzle_shuffled
del y_train_puzzle_shuffled

# Training, composed of 3 steps:
#- Train on the pretext task, with all layers unfrozen
#- Train on the downstream task, with only the classifier unfrozen
#- Finetune on the downstream task

In [None]:
# Train the model
eff_history = effnet_b0_model.fit(
    x = X_train_puzzle*255,
    y = y_train_puzzle,
    batch_size = 80,
    epochs = 600,
    validation_data = (X_val_puzzle*255, y_val_puzzle),
    callbacks = [tfk.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', patience=8, restore_best_weights=True),
                 tfk.callbacks.ReduceLROnPlateau(monitor="val_accuracy", factor=0.1, patience=8, min_lr=1e-5, mode='max')]
).history

In [None]:
effnet_b0_model.save("EffnetB0_self_supervised_jigsaw_step1")

In [None]:
del effnet_b0_model

In [None]:
del X_train_puzzle
del y_train_puzzle
del X_val_puzzle
del y_val_puzzle

In [None]:
effnet_b0_ft = tfk.models.load_model("EffnetB0_self_supervised_jigsaw_step1")

In [None]:
effnet_b0_ft.summary()

In [None]:
effnet_b0_ft = tf.keras.models.Sequential(effnet_b0_ft.layers[:-7])
effnet_b0_ft._name = "efficientnetb0"
effnet_b0_ft.summary()

In [None]:
effnet_b0_ft.trainable = False #Freeze all layers
tf.random.set_seed(seed)

preprocessing = tf.keras.Sequential([ #Define a layer to appply augmentation
    tfkl.RandomFlip("vertical"),
    tfkl.RandomFlip("horizontal"),
    tfkl.RandomRotation(0.5),
], name='preprocessing')


inputs = tfk.Input(shape=(96, 96, 3))
preprocessing = preprocessing(inputs)
x = effnet_b0_ft(preprocessing)
#Create a new classifier, custom for out downstream task
x = tfkl.Dense(units=128, kernel_initializer=tfk.initializers.HeUniform(seed=seed), name='HiddenDense1')(x)
x = tfkl.Activation('relu', name='HiddenActivation1')(x)
x = tfkl.BatchNormalization()(x)
x = tfkl.Dense(units=64, kernel_initializer=tfk.initializers.HeUniform(seed=seed), name='HiddenDense2')(x)
x = tfkl.Activation('relu', name='HiddenActivation2')(x)
x = tfkl.BatchNormalization()(x)

# Add a Dense layer with 2 units and softmax activation as the classifier
outputs = tfkl.Dense(2, activation='softmax')(x)

eff_model = tfk.Model(inputs=inputs, outputs=outputs, name='model')

eff_model.compile(loss=tfk.losses.CategoricalCrossentropy(),\
                  optimizer=tfk.optimizers.AdamW(1e-4,weight_decay=5e-4),\
                  metrics=['accuracy'])
eff_model.summary()

In [None]:
# Train the model
ft_history = eff_model.fit(
    x = X_train*255,
    y = y_train,
    batch_size = 32,
    epochs = 600,
    validation_data = (X_val*255, y_val),
    callbacks = [tfk.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', patience=25, restore_best_weights=True),
                 tfk.callbacks.ReduceLROnPlateau(monitor="val_accuracy", factor=0.1, patience=20, min_lr=1e-5, mode='max')]
).history

In [None]:
eff_model.save("EffnetB0_self_supervised_jigsaw_step2")

In [None]:
eff_model.get_layer('efficientnetb0').trainable = True
for i, layer in enumerate(eff_model.get_layer('efficientnetb0').layers[0].layers):
   print(i, layer.name, layer.trainable)

In [None]:
# Freeze first N layers, e.g., until the 218
N = 218
for i, layer in enumerate(eff_model.get_layer('efficientnetb0').layers[0].layers[:N]):
  layer.trainable=False
for i, layer in enumerate(eff_model.get_layer('efficientnetb0').layers[0].layers):
   print(i, layer.name, layer.trainable)
eff_model.summary()

In [None]:
eff_model.compile(loss=tfk.losses.CategoricalCrossentropy(),\
                  optimizer=tfk.optimizers.AdamW(1e-5,weight_decay=5e-5),\
                  metrics=['accuracy'])

In [None]:
# Train the model
ft_history = eff_model.fit(
    x = X_train*255,
    y = y_train,
    batch_size = 32,
    epochs = 600,
    validation_data = (X_val*255, y_val),
    callbacks = [tfk.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', patience=25, restore_best_weights=True),
                 tfk.callbacks.ReduceLROnPlateau(monitor="val_accuracy", factor=0.1, patience=20, min_lr=1e-6, mode='max')]
).history

In [None]:
eff_model.save("effnetB0_Self_Supervised_jigsaw_FineTuned")