# **Transfer Learning using EfficientNetV2B0**

In [None]:
#@title **Loading data from gdrive to memory**
# from google.colab import drive
# drive.mount('/content/drive')

# %cd /content/drive/MyDrive/ANNDL_Project

%cd ../../datasets

In [None]:
!yes A | unzip data_splitted.zip -d data_splitted
!yes A | unzip data_cleaned_for_training.zip

# !yes A | unzip training_data_final.zip

In [None]:
!pip install tensorflow==2.10.1

In [None]:
#@title **Imports**
import warnings
import logging
import tensorflow as tf
import matplotlib.pyplot as plt
from keras import Sequential
from keras.preprocessing.image import ImageDataGenerator
from keras.initializers import glorot_uniform
from keras.models import Model
from keras.layers import *
from keras.optimizers import Adam
import numpy as np
import os
import random
import pandas as pd
import scipy
from PIL import Image
from datetime import datetime
from google.colab import files

In [None]:
#@title **Metadata and variables**

tfk = tf.keras
tfkl = tf.keras.layers

dir = "data_splitted"

input_shape = (96, 96, 3)
nclasses = 8
seed = 42
AUTO = tf.data.AUTOTUNE
BATCH_SIZE = 64
IMG_SIZE = 96

In [None]:
#@title **Setting seed and/or suppressing warnings**
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)
tf.get_logger().setLevel('INFO')
tf.autograph.set_verbosity(0)

tf.get_logger().setLevel(logging.ERROR)
tf.get_logger().setLevel('ERROR')
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

# Setting random seed for reproducibility
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)

In [None]:
#@title **Image Dataset from directory (already resized)**
def get_image_dataset(dir, seed, width=96, height=96):
    
    train_dir = dir+"/train"
    test_dir = dir+"/test"
    val_dir = dir+"/val"
    train_ds = tfk.utils.image_dataset_from_directory(
        train_dir,
        label_mode='categorical',
        seed=seed,
        image_size=(224,224),
        batch_size = 32
    )
    val_ds = tfk.utils.image_dataset_from_directory(
        val_dir,
        label_mode='categorical',
        seed=seed,
        image_size=(224,224),
        batch_size = 32
    )
    test_ds = tfk.utils.image_dataset_from_directory(
        test_dir,
        label_mode='categorical',
        seed=seed,
        image_size=(224,224),
        batch_size = 32
    )
    
    return train_ds, val_ds, test_ds

In [None]:
#@title **Utility function to create folders and callbacks for training**
def create_folders_and_callbacks(model_name):

  exps_dir = os.path.join('trained_models')
  if not os.path.exists(exps_dir):
      os.makedirs(exps_dir)

  now = datetime.now().strftime('%m-%d_%H-%M-%S')

  exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
  if not os.path.exists(exp_dir):
      os.makedirs(exp_dir)
      
  callbacks = []

  # Model checkpoint
  # ----------------
  ckpt_dir = os.path.join(exp_dir, 'ckpts')
  if not os.path.exists(ckpt_dir):
      os.makedirs(ckpt_dir)

  ckpt_callback = tf.keras.callbacks.ModelCheckpoint(
                                                     filepath=ckpt_dir + '/cp-{val_accuracy:.2f}-{epoch:02d}.ckpt', # Checkpoint is saved with validation accuracy in the filename
                                                     monitor='val_accuracy', 
                                                     save_weights_only=True, # True to save only weights
                                                     save_best_only=True, # True to save only the best epoch 
                                                     initial_value_threshold=0.7 # Model is saved only if val_accuracy > initial_value_threshold
                                                     ) 

  callbacks.append(ckpt_callback)


  # Visualize Learning on Tensorboard
  # ---------------------------------
  tb_dir = os.path.join(exp_dir, 'tb_logs')
  if not os.path.exists(tb_dir):
      os.makedirs(tb_dir)
      
  # By default shows losses and metrics for both training and validation
  tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir, 
                                               profile_batch=0,
                                               histogram_freq=1)  # if > 0 (epochs) shows weights histograms
  callbacks.append(tb_callback)

  # Early Stopping
  # --------------
  es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=15, restore_best_weights=True)
  callbacks.append(es_callback)

  return callbacks, exp_dir

---

## **Transfer Learning**

In [None]:
#@title **External Augmentation block**

img_augmentation = Sequential(
    [
        tfk.layers.RandomRotation(factor=1),
        tfk.layers.RandomZoom(width_factor=0.2, height_factor=0.2),
        tfk.layers.RandomTranslation(height_factor=0.2, width_factor=0.2),
        tfk.layers.RandomFlip(),
        tfk.layers.RandomContrast(factor=0.3),
    ],
    name="img_augmentation",
)

In [None]:
#@title **Build EfficientNet v2 B0**
def build_model_efficientnet_v2_b0_tl():
  inputs = tfkl.Input(shape=(224, 224, 3))
  x = img_augmentation(inputs)
  model = tfk.applications.EfficientNetV2B0(include_top=False, input_tensor=x, weights="imagenet", input_shape=(224, 224, 3))
  # Freeze the pretrained weights
  model.trainable = False
  # Rebuild top
  x = tfkl.GlobalAveragePooling2D(name="avg_pool")(model.output)
  top_dropout_rate = 0.3
  x = tfkl.Dense(256, activation="relu")(x)
  x = tfkl.Dropout(top_dropout_rate, name="top_dropout")(x)
  outputs = tfkl.Dense(8, activation="softmax", name="pred")(x)

  # Compile
  model = tf.keras.Model(inputs, outputs, name="EfficientNet")
  optimizer = tf.keras.optimizers.Adam(learning_rate=1e-2)
  model.compile(
      optimizer=optimizer, loss="categorical_crossentropy", metrics=["accuracy"]
  )
  return model


In [None]:
tl_model = build_model_efficientnet_v2_b0_tl()

In [None]:
tl_model.summary()

In [None]:
callbacks, model_folder_dir = create_folders_and_callbacks(model_name='EfficientNetV2B0_TL')
train_ds, val_ds, test_ds = get_image_dataset(dir, seed=seed)

In [None]:
# Train the model
tl_history = tl_model.fit(
    x = train_ds,
    batch_size = 64,
    epochs = 100,
    validation_data = val_ds,
    callbacks = callbacks
).history

In [None]:
tl_model.save('saved_models/EfficientNetv2B0')

In [None]:
del tl_model

### Fine Tuning

In [None]:
#@title Reloading the model after transfer learning (through checkpoints or the entire model)

ft_model = tfk.models.load_model('saved_models/EfficientNetv2B0')

#ft_model = build_model_efficientnet_v2_b0_tl()
#ft_model.load_weights(weight_path)

In [None]:
#@title **Unfreezing completely (or part of) the pretrained network and re-compiling**

# Otherwise, we unfreeze as much as we can
layers_to_unfreeze = 100
for i, layer in enumerate(ft_model.layers[-layers_to_unfreeze:]):
    # BatchNormalization layers are not unfreezed because, if trainable, they lead to a performance decrease
    if not isinstance(layer, tfk.layers.BatchNormalization):
        layer.trainable = True


In [None]:
# Re-compile the model
ft_model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(1e-4), metrics='accuracy') # Lowered learning rate

In [None]:
callbacks, model_folder_dir = create_folders_and_callbacks(model_name='EfficientNetV2B0_FT')
train_ds, val_ds, test_ds = get_image_dataset(dir, seed=seed)

In [None]:
#@title **Fine-Tuning the partially unfreezed model**

ft_history = ft_model.fit(
    x = train_ds,
    batch_size = 64,
    epochs = 50,
    callbacks = callbacks,
    validation_data = val_ds
).history


---

**Save of models**


In [None]:
ft_model.save("saved_models/...")



---

**Loading of models**

In [None]:
#@title **Loading of an existing model (having only the weights)**
# after having already built the model...
tl_model.load_weights('weight path')

In [None]:
#@title **Loading of an existing model (Complete)**
model = tf.keras.models.load_model("saved_models/...")

In [None]:
#@title **Testing a model and re-saving**

train_ds, val_ds, test_ds = get_image_dataset(dir=dir, seed=seed)

# getting model performance on the test set
model_test_metrics = ft_model.evaluate(test_ds, return_dict=True)

# saving model in saved_models/modelname_accuracy
# saved_model_name = 'EfficientNetV2B0' + str(model_test_metrics["accuracy"])
saved_model_name = 'EfficientNetv2B0'
saved_model_dir = 'saved_models/' + saved_model_name
ft_model.save(saved_model_dir)



---
**Download of the model in .zip format**


In [None]:
zipped_model = saved_model_dir + '/' + saved_model_name + '.zip'

In [None]:
!zip -r {zipped_model} {saved_model_dir}

In [None]:
files.download(zipped_model)



---

