# **Import libraries**

In [None]:
import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt
import glob
import math
import cv2

import argparse
import random
import gc
import os
os.environ["KERAS_BACKEND"] = "tensorflow"

from tqdm import tqdm
pd.options.display.max_colwidth = 1000
tqdm.pandas()

import keras
from keras.models import Sequential, Model, load_model
from keras.layers import Dense, Dropout, Activation, Flatten, Add, Concatenate, Input
from keras.layers import Conv2D, MaxPooling2D, ZeroPadding2D, AveragePooling2D, LSTM, Reshape
from keras.layers import BatchNormalization, SeparableConv2D, DepthwiseConv2D, LeakyReLU, GlobalAveragePooling2D
from keras import optimizers
from keras import backend as K
from keras import layers

import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.keras.regularizers import L2
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from tensorflow.keras.utils import to_categorical

from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

# Make sure we are able to handle large datasets
import resource

low, high = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (high, high))

# Initialize constants and lists

In [None]:
activity_map = {'c0': 'Safe driving',
                'c1': 'Texting - right',
                'c2': 'Talking on the phone - right',
                'c3': 'Texting - left',
                'c4': 'Talking on the phone - left',
                'c5': 'Operating the radio',
                'c6': 'Drinking',
                'c7': 'Reaching behind',
                'c8': 'Hair and makeup',
                'c9': 'Talking to passenger'}
class_mapping = {'c0': 0,
                'c1': 1,
                'c2': 2,
                'c3': 3,
                'c4': 4,
                'c5': 5,
                'c6': 6,
                'c7': 7,
                'c8': 8,
                'c9': 9}

# Algorithm hyperparameters
num_epochs = 70
batch_size = 64
width = 256
temperature = 0.1

# Stronger augmentations for contrastive, weaker ones for supervised training
contrastive_augmentation = {
    "min_area": 0.75, 
    "brightness": 0.5, 
    "jitter": 0.2
}

classification_augmentation = {
    "min_area": 0.8,
    "brightness": 0.3,
    "jitter": 0.1,
}

IMG_DIM = 208
CHANNEL_SIZE = 3
BATCH_SIZE = 64

# **Model Architecture**

## **Encoder Architecture**

In [None]:
# Define model
from keras import layers
from keras import models
from keras.models import Sequential
from tensorflow.keras import datasets,models,layers

from keras.regularizers import l2

l2_reg = 0

# Define model
def Resnet_Inception_HRNN():
  inputs = Input((IMG_DIM, IMG_DIM, CHANNEL_SIZE))

  # Resnet
  r1 = Conv2D(3, kernel_size=(5,5), activation='relu', padding='same', kernel_regularizer = l2(0.001))(inputs)
  r2 = BatchNormalization()(r1)
  r3 = SeparableConv2D(3, kernel_size=(5,5), activation='relu', padding='same')(r2)
  r3 = BatchNormalization()(r3)
  #Add result with input
  r5 = Add()([r3, inputs])
  r6 = Activation('relu')(r5)
  r7 = AveragePooling2D(pool_size=(2,2), strides=(2,2), padding='same')(r6)

  # Cải tiến
  r7 = SeparableConv2D(3, kernel_size=(5,5), strides = (2,2) ,activation='relu', padding='same')(r7)
  r7 = BatchNormalization()(r7)
  r7 = SeparableConv2D(16, kernel_size=(5,5), strides = (2,2) ,activation='relu', padding='same')(r7)
  r7 = BatchNormalization()(r7)
  r7 = SeparableConv2D(16, kernel_size=(5,5), strides = (2,2) ,activation='relu', padding='same')(r7)
  r7 = BatchNormalization()(r7)
  # Cải tiến

  r7 = GlobalAveragePooling2D()(r7)


  # Inception module
  # Branch 1: 1x1 Conv + 5x5 Conv
  b1 = Conv2D(7, kernel_size = (1,1), activation='relu', padding='same', kernel_regularizer = l2(0.001))(inputs)
  b1 = BatchNormalization()(b1)
  b1 = SeparableConv2D(7, kernel_size = (5,5), activation='relu', padding='same')(b1)

  # Branch 2: 1x1 Conv + 5x5 Conv
  b2 = Conv2D(7, kernel_size = (1,1), activation='relu', padding='same', kernel_regularizer = l2(0.001))(inputs)
  b2 = BatchNormalization()(b2)
  b2 = SeparableConv2D(7, kernel_size = (5,5), activation='relu', padding='same')(b2)

  # Branch 3 :3x3 MaxPooling + 1x1 Conv
  b3 = MaxPooling2D(pool_size=(3,3), strides = (1,1), padding='same')(inputs)
  b3 = SeparableConv2D(7, kernel_size = (1,1), activation='relu', padding='same')(b3)
  b3 = BatchNormalization()(b3)

  # Concatenate 3 branches:
  b4 = Concatenate(axis=-1)([b1, b2, b3])

  #Cải tiến
  b4 = SeparableConv2D(16, kernel_size = (3,3), strides = (2,2), activation='relu', padding='same')(b4)
  b4 = BatchNormalization()(b4)
  b4 = SeparableConv2D(32, kernel_size = (3,3), strides = (2,2), activation='relu', padding='same')(b4)
  b4 = BatchNormalization()(b4)
  b4 = SeparableConv2D(32, kernel_size = (3,3), strides = (2,2), activation='relu', padding='same')(b4)
  b4 = BatchNormalization()(b4)
  #Cải tiến
  b4 = GlobalAveragePooling2D()(b4)


  #HRRN block
  input_shape = (26,1248)
  LSTM_UNITS = 80
  # Average Pooling
  h = AveragePooling2D(pool_size=(3,3), strides=(2,2), padding='same')(inputs)
  h = Reshape(input_shape)(h)
  h = LSTM(LSTM_UNITS, return_sequences=True, input_shape=input_shape)(h)
  h = LSTM(LSTM_UNITS)(h)

  concat = Concatenate(axis=-1)([b4, r7, h])

  x = Dense(80, activation='relu')(concat)
  x = Dropout(0.2)(x)
  x = Dense(80, activation='relu')(x)
  x = BatchNormalization()(x)
  outputs = Dense(256, activation='relu')(x)

  model = Model(inputs = inputs, outputs=outputs)
  return model

In [None]:
model = Resnet_Inception_HRNN()
model.summary()

In [None]:
def get_encoder():
    return keras.Sequential(
        [
            model
        ],
        name="encoder",
    )

## **Supervised baseline model**

In [None]:
# Baseline supervised training with random initialization
baseline_model = keras.Sequential(
    [
        get_augmenter(**classification_augmentation),
        get_encoder(),
        layers.Dense(10),
        layers.BatchNormalization(),
        layers.Activation('softmax')
    ],
    name="baseline_model",
)
baseline_model.compile(loss='categorical_crossentropy',
                optimizer=keras.optimizers.Adam(),
                metrics=['acc'])

baseline_history = baseline_model.fit(
    finetune_train_dataset, epochs=num_epochs, validation_data=finetune_val_dataset
)

print(
    "Maximal validation accuracy: {:.2f}%".format(
        max(baseline_history.history["val_acc"]) * 100
    )
)

## **Contrastive model**

In [None]:
# Define the contrastive model with model-subclassing
class ContrastiveModel(keras.Model):
    def __init__(self):
        super().__init__()

        self.temperature = temperature
        self.contrastive_augmenter = get_augmenter(**contrastive_augmentation)
        self.classification_augmenter = get_augmenter(**classification_augmentation)
        self.encoder = get_encoder()
        # Non-linear MLP as projection head
        self.projection_head = keras.Sequential(
            [
                keras.Input(shape=(width,)),
                layers.Dense(width, activation="relu"),
                layers.Dense(width),
            ],
            name="Project_Head",
        )
        # Single dense layer for linear probing
        self.linear_probe = keras.Sequential(
            [layers.Input(shape=(width,)), layers.Dense(10)], name="linear_probe"
        )

        # self.encoder.summary()
        self.projection_head.summary()
        self.linear_probe.summary()

    def compile(self, contrastive_optimizer, probe_optimizer, **kwargs):
        super().compile(**kwargs)

        self.contrastive_optimizer = contrastive_optimizer
        self.probe_optimizer = probe_optimizer

        # self.contrastive_loss will be defined as a method
        self.probe_loss = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

        self.contrastive_loss_tracker = keras.metrics.Mean(name="c_loss")
        self.contrastive_accuracy = keras.metrics.SparseCategoricalAccuracy(
            name="c_acc"
        )
        self.probe_loss_tracker = keras.metrics.Mean(name="p_loss")
        self.probe_accuracy = keras.metrics.SparseCategoricalAccuracy(name="p_acc")

    @property
    def metrics(self):
        return [
            self.contrastive_loss_tracker,
            self.contrastive_accuracy,
            self.probe_loss_tracker,
            self.probe_accuracy,
        ]

    def contrastive_loss(self, projections_1, projections_2):
        # InfoNCE loss (information noise-contrastive estimation)
        # NT-Xent loss (normalized temperature-scaled cross entropy)

        # Cosine similarity: the dot product of the l2-normalized feature vectors
        projections_1 = tf.math.l2_normalize(projections_1, axis=1)
        projections_2 = tf.math.l2_normalize(projections_2, axis=1)
        similarities = (
            tf.matmul(projections_1, projections_2, transpose_b=True) / self.temperature
        )

        # The similarity between the representations of two augmented views of the
        # same image should be higher than their similarity with other views
        batch_size = tf.shape(projections_1)[0]
        contrastive_labels = tf.range(batch_size)
        self.contrastive_accuracy.update_state(contrastive_labels, similarities)
        self.contrastive_accuracy.update_state(
            contrastive_labels, tf.transpose(similarities)
        )

        # The temperature-scaled similarities are used as logits for cross-entropy
        # a symmetrized version of the loss is used here
        loss_1_2 = keras.losses.sparse_categorical_crossentropy(
            contrastive_labels, similarities, from_logits=True
        )
        loss_2_1 = keras.losses.sparse_categorical_crossentropy(
            contrastive_labels, tf.transpose(similarities), from_logits=True
        )
        return (loss_1_2 + loss_2_1) / 2

    def train_step(self, data):
        (unlabeled_images, _), (labeled_images, labels) = data

        # Both labeled and unlabeled images are used, without labels
        images = tf.concat((unlabeled_images, labeled_images), axis=0)
        # Each image is augmented twice, differently
        augmented_images_1 = self.contrastive_augmenter(images, training=True)
        augmented_images_2 = self.contrastive_augmenter(images, training=True)
        with tf.GradientTape() as tape:
            features_1 = self.encoder(augmented_images_1, training=True)
            features_2 = self.encoder(augmented_images_2, training=True)
            # The representations are passed through a projection mlp
            projections_1 = self.projection_head(features_1, training=True)
            projections_2 = self.projection_head(features_2, training=True)
            contrastive_loss = self.contrastive_loss(projections_1, projections_2)
        gradients = tape.gradient(
            contrastive_loss,
            self.encoder.trainable_weights + self.projection_head.trainable_weights,
        )
        self.contrastive_optimizer.apply_gradients(
            zip(
                gradients,
                self.encoder.trainable_weights + self.projection_head.trainable_weights,
            )
        )
        self.contrastive_loss_tracker.update_state(contrastive_loss)

        # Labels are only used in evaluation for an on-the-fly logistic regression
        preprocessed_images = self.classification_augmenter(
            labeled_images, training=True
        )
        with tf.GradientTape() as tape:
            # the encoder is used in inference mode here to avoid regularization
            # and updating the batch normalization parameters if they are used
            features = self.encoder(preprocessed_images, training=False)
            class_logits = self.linear_probe(features, training=True)
            probe_loss = self.probe_loss(labels, class_logits)
        gradients = tape.gradient(probe_loss, self.linear_probe.trainable_weights)
        self.probe_optimizer.apply_gradients(
            zip(gradients, self.linear_probe.trainable_weights)
        )
        self.probe_loss_tracker.update_state(probe_loss)
        self.probe_accuracy.update_state(labels, class_logits)

        return {m.name: m.result() for m in self.metrics}

    def test_step(self, data):
        labeled_images, labels = data

        # For testing, the components are used with a training=False flag
        preprocessed_images = self.classification_augmenter(
            labeled_images, training=False
        )
        features = self.encoder(preprocessed_images, training=False)
        class_logits = self.linear_probe(features, training=False)
        probe_loss = self.probe_loss(labels, class_logits)
        self.probe_loss_tracker.update_state(probe_loss)
        self.probe_accuracy.update_state(labels, class_logits)

        # Only the probe metrics are logged at test time
        return {m.name: m.result() for m in self.metrics[2:]}


# Contrastive pretraining
pretraining_model = ContrastiveModel()
pretraining_model.compile(
    contrastive_optimizer=keras.optimizers.Adam(),
    probe_optimizer=keras.optimizers.Adam(),
)
pretraining_model.summary()


In [None]:
pretraining_history = pretraining_model.fit(
    train_dataset, epochs=num_epochs, validation_data=pretrain_val_dataset
)
print(
    "Maximal validation accuracy: {:.2f}%".format(
        max(pretraining_history.history["val_p_acc"]) * 100
    )
)

# **Supervised finetuning of pretrain model**

In [None]:
finetuning_model = keras.Sequential(
    [
        pretraining_model.encoder,
        layers.Dense(10),
        layers.BatchNormalization(),
        layers.Activation('softmax')
    ],
    name="finetuning_model",
)

In [None]:
finetuning_model.summary()

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
from IPython.display import clear_output

import keras

In [None]:
class TrainingPlot(keras.callbacks.Callback):

  # This function is called when the training begins
  def on_train_begin(self, logs={}):
    # Initialize the lists for holding the logs, losses and and accuracies
    self.losses = []
    self.acc = []
    self.logs = []
    self.val_losses = []
    self.val_acc = []

  # This function is called at the end of each epoch
  def on_epoch_end(self, epoch, logs={}):
    # Append the logs, losses, and accuracies to the lists
    self.logs.append(logs)
    self.losses.append(logs.get('loss'))
    self.acc.append(logs.get('acc'))
    self.val_losses.append(logs.get('val_loss'))
    self.val_acc.append(logs.get('val_acc'))

    # Before plotting ensure at least 2 epochs have passed
    if len(self.losses) > 1:

      # Clear the previous plot
      clear_output(wait=True)
      N = np.arange(0, len(self.losses))

      # You can chose the style of your preference
      # print(plt.style.available) to see the available options
      plt.style.use("seaborn")

      # Plot train loss, train acc, val loss and val acc against epochs passed
      plt.figure()
      plt.plot(N, self.losses, label = "Training Loss")
      plt.plot(N, self.val_losses, label = "Val loss")
      plt.title("Train and Val Loss")
      plt.xlabel("Epoch #")
      plt.ylabel("Loss")
      plt.legend()
      plt.show()

      plt.plot(N, self.acc, label = "Training Acc")
      plt.plot(N, self.val_acc, label = "Val Acc")
      plt.title("Train and Val Accuracy ")
      plt.xlabel("Epoch #")
      plt.ylabel("Accuracy")
      plt.legend()
      plt.show()

In [None]:
plot_losses = TrainingPlot()

In [None]:
# callbacks: ensure efficiency of the training process
save_path = os.path.join(dir_path, '/ResNet_InceptionModule_HRNN.h5')

callbacks = [
        tf.keras.callbacks.EarlyStopping(patience=10, monitor="val_loss"),
        tf.keras.callbacks.ModelCheckpoint(save_path, verbose=2, save_best_only=True),
        tf.keras.callbacks.ReduceLROnPlateau(monitor='acc', factor=0.1, patience=10, min_lr=0.0000005),

        plot_losses
            ]

In [None]:
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.0005)
finetuning_model.compile(loss='categorical_crossentropy',
                optimizer = optimizer,
                metrics=['acc'])

In [None]:
finetuning_history = finetuning_model.fit(
    finetune_train_dataset,
    batch_size = 64,
    validation_data = finetune_val_dataset,
    epochs = num_epochs,
    callbacks = callbacks,
    verbose = 1,
    shuffle = True
)

## **Save model**

In [None]:
import os

model_path = os.path.join(dir_path, '/Resnet_HRNN_Inc_SF_selfsupervised.h5')
if os.path.exists(model_path):
    os.remove(model_path)

finetuning_model.save(model_path)

In [None]:
from IPython.display import FileLink
FileLink(r'Resnet_HRNN_Inc_SF_selfsupervised_new.h5')

## **Creating a Model Prediction Function**

In [None]:
from PIL import Image
from IPython.display import display
from IPython.display import clear_output

import matplotlib.image as mpimg

In [None]:
print("Class Indeces Are :-->", finetune_val_dataset.class_indices)

In [None]:
def model_prediction(img_path, model, target_size=(IMG_DIM,IMG_DIM)):
    # load and preprocess the image
    img = Image.open(img_path)
    img_array = img.resize(target_size)
    
    # Expand the dimension of img arry to match input shape
    expand_dim = np.expand_dims(img_array, axis=0)
    
    #predict
    predictions = model.predict(expand_dim, verbose=False)
    predicted_class_index = np.argmax(predictions)
    
    #Map the label
    predicted_label = next((k for k, v in finetune_val_dataset.class_indices.items() if v == predicted_class_index), None)
    
    return predicted_label

In [None]:
finetune_val = finetune_val['ImgPath class'.split()].copy().reset_index(drop=True)
print("shape_of_the_finetune_val", finetune_val.shape)
finetune_val.sample(1)

**Prediction on the Valid Dataset**

In [None]:
finetune_val['prediction'] = finetune_val['ImgPath'].progress_apply(lambda x:model_prediction(x, finetuning_model))

In [None]:
finetune_val.sample(100)

**Confusion matrix**

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2_as_graph

In [None]:
y_true = list(finetune_val['class'])
y_pred = list(finetune_val['prediction'])

In [None]:
y_true[:5]

In [None]:
y_pred[:5]

In [None]:
# Sort the labels alphabetically
sorted_labels = sorted(activity_map.values())

# Create a dictionary to map label to index
label_map = {label: i for i, label in enumerate(sorted_labels)}

In [None]:
c_m = confusion_matrix(y_true, y_pred)

In [None]:
# Setting default size of the plot
# Setting default fontsize used in the plot
plt.rcParams['figure.figsize'] = (10.0, 9.0)
plt.rcParams['font.size'] = 20

# Implementing visualization of Confusion Matrix
display_c_m = ConfusionMatrixDisplay(c_m, display_labels=sorted_labels)

# Plotting Confusion Matrix
# Setting colour map to be used
display_c_m.plot(cmap='OrRd', xticks_rotation=25)
# Other possible options for colour map are:
# 'autumn_r', 'Blues', 'cool', 'Greens', 'Greys', 'PuRd', 'copper_r'

# Setting fontsize for xticks and yticks
plt.xticks(np.arange(len(label_map)), label_map.keys(), fontsize=10, rotation=25)
plt.yticks(np.arange(len(label_map)), label_map.keys(), fontsize=10)

# Giving name to the plot
plt.title('Confusion Matrix on Validation data', fontsize=24)

# Showing the plot
plt.show()

**Precision, recall, F1-score for validation data**

In [None]:
# !pip install pandas tabulate

import pandas as pd
from tabulate import tabulate
from sklearn.metrics import classification_report

report = classification_report(y_true, y_pred, output_dict=True)

# Put dictionary into DataFrame
df = pd.DataFrame(report).transpose()
df_filtered = df.drop('accuracy')

# In kết quả dưới dạng bảng sử dụng tabulate
table = tabulate(df_filtered, headers='keys', tablefmt='fancy_grid')
print(table)

**Comparision against the baseline**

In [None]:
# The classification accuracies of the baseline and the pretraining + finetuning process:
def plot_training_curves(pretraining_history, finetuning_history, baseline_history):
    for metric_key, metric_name in zip(["acc", "loss"], ["accuracy", "loss"]):
        plt.figure(figsize=(8, 5), dpi=100)
        plt.plot(
            baseline_history.history[f"{metric_key}"],
            label="supervised baseline",
        )
        plt.plot(
            pretraining_history.history[f"p_{metric_key}"],
            label="self-supervised pretraining",
        )
        plt.plot(
            finetuning_history.history[f"{metric_key}"],
            label="supervised finetuning",
        )
        plt.legend()
        plt.title(f"Classification {metric_name} during training")
        plt.xlabel("epochs")
        plt.ylabel(f"validation {metric_name}")


plot_training_curves(pretraining_history, finetuning_history, baseline_history)