<a href="https://colab.research.google.com/github/Ibrah-N/Deep-Learning-Projects-Computer-Vision/blob/main/dl_07_custom_training_loops_losses_metrics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Imports

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns


import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.keras.metrics import Precision, Recall, TruePositives, TrueNegatives, FalsePositives, FalseNegatives

## Data Loading & Preprocessing

In [None]:
# download dataset


dataset, info = tfds.load('malaria', with_info=True, as_supervised=True, split='train')

In [None]:
# save the dataset


dataset.save('/content/drive/MyDrive/malaria_dataset')

In [4]:
# load dataset if you have saved dataset


dataset = tf.data.Dataset.load('/content/drive/MyDrive/malaria_dataset')

In [5]:
dataset = dataset.take(600)

In [6]:
# resize and rescale function
def resize_rescale(img, label):
  img = tf.image.resize(img, (224, 224))
  img = img/255.0
  return img, label

In [7]:
# Train Validation Test Split ratios
TRAIN_RATIO  =  0.8
VAL_RATIO    =  0.1
TEST_RATIO   =  0.1


# labels
labels = ["P", "U"]
# 1 -- > U --> Uninfacted
# 0 -- > P --> Parsitic

In [8]:
# train split
train_dataset = dataset.take(int(len(dataset)*TRAIN_RATIO))
print(f"train_dataset length : {len(train_dataset)}")

train_dataset length : 480


In [9]:
# validation split
val_dataset = dataset.skip(int(len(dataset)*TRAIN_RATIO))
val_dataset = val_dataset.take(int(len(dataset)*VAL_RATIO))
print(f"val_dataset length : {len(val_dataset)}")

val_dataset length : 60


In [10]:
# test split
test_dataset = dataset.skip(int(len(dataset)*(TEST_RATIO + VAL_RATIO)))
test_dataset = test_dataset.take(int(len(dataset)*TEST_RATIO))
print(f"test_dataset length : {len(test_dataset)}")

test_dataset length : 60


In [11]:
# train dataset preparation
train_dataset = (train_dataset
                 .map(resize_rescale)
                 .shuffle(buffer_size= 16, reshuffle_each_iteration=True)
                 .batch(32)
                 .prefetch(tf.data.AUTOTUNE))


batch_1 = train_dataset.take(1)
for img, label in batch_1.take(3):
  print(f"Image : {img.shape}, label: {label.shape}")

Image : (32, 224, 224, 3), label: (32,)


In [12]:
def batching(batch_img, batch_label):
  tf.print(batch_img.shape, batch_label.shape)
  tf.experimental.numpy.experimental_enable_numpy_behavior()
  return batch_img, batch_label.reshape(len(batch_label), 1)

In [13]:
# validation dataset preparation
validation_dataset = (val_dataset
                      .map(resize_rescale)
                      .shuffle(buffer_size= 16, reshuffle_each_iteration=True)
                      .batch(32)
                      .prefetch(tf.data.AUTOTUNE)
                      )


print(validation_dataset.element_spec)

(TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.int64, name=None))


In [14]:
x = validation_dataset.map(batching)

In [15]:
# test dataset preparation
test_dataset = (test_dataset
                .map(resize_rescale)
              )

## Custom Losses

In [16]:
# Custom Loss Without Parameters

def custom_bce(y_true, y_pred):
  bce = tf.keras.losses.BinaryCrossentropy()
  return bce(y_true, y_pred)


In [17]:
# Custom Loss With Paramets

def custom_bce_params(factor):
  def loss(y_true, y_pred):
    bce = tf.keras.losses.BinaryCrossentropy()
    return bce(y_true, y_pred) + factor
  return loss

In [18]:
# Custom Loss Class

class CustomBCE(tf.keras.losses.Loss):
  def __init__(self, factor):
    super(CustomBCE, self).__init__()
    self.factor = factor


  def call(self, y_true, y_pred):
    bce = tf.keras.losses.BinaryCrossentropy()
    return bce(y_true, y_pred) + self.factor

## Metric

In [19]:
# Custom Metric Without Parameter


def custom_ba(y_true, y_pred):
  return tf.keras.metrics.binary_accuracy(y_true, y_pred)

In [20]:
# Custom Metric With Parameter


def custom_ba_params(factor):
  def metric(y_true, y_pred):
    return tf.keras.metrics.binary_accuracy(y_true, y_pred) + factor
  return metric

In [37]:
# Custom Metric Class

class CustomMetric(tf.keras.metrics.Metric):
  def __init__(self, name="custom_metric", factor=1):
    super(CustomMetric, self).__init__(name="custom_metric")
    self.factor = factor
    self.count = self.add_weight(name='accuracy', initializer='zeros')
    self.total = self.add_weight(name='accuracy', initializer='zeros')


  def update_state(self, y_true, y_pred, sample_weight=None):
    y_pred = tf.round(y_pred)
    correct = tf.cast(tf.equal(tf.cast(y_true, dtype=tf.float32), y_pred), dtype=tf.float32)
    self.count.assign_add(tf.reduce_sum(correct))
    self.total.assign_add(tf.cast(tf.size(correct), dtype=tf.float32))


  def result(self):
    return tf.math.divide_no_nan(self.count, self.total)


  def reset_state(self):
    self.count.assign(0.)
    self.total.assign(0.)

## Model

In [22]:
# rate parameters for model
DROPOUT_RATE = 0.01
L2_REG_RATE = 0.01

In [23]:
# model architecture

temp_model = tf.keras.Sequential([

      tf.keras.layers.InputLayer(input_shape=(224, 224, 3)),

      tf.keras.layers.Conv2D(filters=64, kernel_size=3, activation='relu', kernel_regularizer=tf.keras.regularizers.L2(L2_REG_RATE)),
      tf.keras.layers.Conv2D(filters=64, kernel_size=3, activation='relu', kernel_regularizer=tf.keras.regularizers.L2(L2_REG_RATE)),
      tf.keras.layers.Dropout(DROPOUT_RATE),
      tf.keras.layers.BatchNormalization(),

      tf.keras.layers.Conv2D(filters=32, kernel_size=3, activation='relu', kernel_regularizer=tf.keras.regularizers.L2(L2_REG_RATE)),
      tf.keras.layers.Conv2D(filters=32, kernel_size=3, activation='relu', kernel_regularizer=tf.keras.regularizers.L2(L2_REG_RATE)),
      tf.keras.layers.Dropout(DROPOUT_RATE),
      tf.keras.layers.BatchNormalization(),

      tf.keras.layers.MaxPool2D(pool_size=2),

      tf.keras.layers.Conv2D(filters=16, kernel_size=3, activation='relu', kernel_regularizer=tf.keras.regularizers.L2(L2_REG_RATE)),
      tf.keras.layers.Conv2D(filters=16, kernel_size=3, activation='relu', kernel_regularizer=tf.keras.regularizers.L2(L2_REG_RATE)),
      tf.keras.layers.BatchNormalization(),


      tf.keras.layers.Flatten(),
      # tf.keras.layers.Dense(units=1000, activation='relu', kernel_regularizer=tf.keras.regularizers.L2(L2_REG_RATE)),
      # tf.keras.layers.Dropout(DROPOUT_RATE+0.2),
      tf.keras.layers.BatchNormalization(),
      tf.keras.layers.Dense(units=100, activation='relu', kernel_regularizer=tf.keras.regularizers.L2(L2_REG_RATE)),
      tf.keras.layers.BatchNormalization(),
      tf.keras.layers.Dense(units=10, activation='relu'),
      tf.keras.layers.Dense(units=1, activation='sigmoid')
])

In [24]:
# model summary

temp_model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 222, 222, 64)      1792      
                                                                 
 conv2d_1 (Conv2D)           (None, 220, 220, 64)      36928     
                                                                 
 dropout (Dropout)           (None, 220, 220, 64)      0         
                                                                 
 batch_normalization (Batch  (None, 220, 220, 64)      256       
 Normalization)                                                  
                                                                 
 conv2d_2 (Conv2D)           (None, 218, 218, 32)      18464     
                                                                 
 conv2d_3 (Conv2D)           (None, 216, 216, 32)      9248      
                                                        

In [37]:
# compile model

temp_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
                   loss = custom_bce,
                   metrics = CustomMetric())

In [None]:
temp_model.fit(train_dataset, validation_data=validation_dataset,
               epochs=2, verbose=1)

Epoch 1/2

## Custom Training Loop

In [38]:
# Training Loop Function
# Validation Loop Function


# Training Loop Function
@tf.function  # to be executed in Graph Mode
def train_loop(img_batch, label_batch):
    with tf.GradientTape() as tape:
      train_pred = temp_model(img_batch, training=True)
      loss = custom_bce(label_batch, train_pred)
    partial_derivatives = tape.gradient(loss, temp_model.trainable_weights)
    OPTIMIZER.apply_gradients(zip(partial_derivatives, temp_model.trainable_weights))

    ### Metric -- BinaryAccuracy
    TRAIN_ACCURACY.update_state(label_batch, train_pred)

    return loss



# Validation Loop Function
@tf.function  # to be executed in Graph Mode
def val_loop(img_batch, label_batch):
    val_pred = temp_model(img_batch, training=False)
    val_loss = custom_bce(label_batch, val_pred)

    ### Metric -- BinaryAccuracy
    VAL_ACCURACY.update_state(label_batch, val_pred)

    return val_loss

In [45]:
# Complete Training Loop Function


def train(train_dataset, validation_dataset, EPOCHS=4, OPTIMIZER=tf.keras.optimizers.Adam(learning_rate=0.001)):
  TRAIN_ACCURACY = CustomMetric()
  VAL_ACCURACY = CustomMetric()

  for epoch in range(EPOCHS):
    ## Training
    for idx, (img_batch, label_batch) in enumerate(train_dataset):
      loss = train_loop(img_batch, label_batch)
      ### Prompt Results After 100 steps
      if idx % 10 == 0:
        print("Epoch:{}/{} : T-Loss: {} -- T-Accuracy: {}".format(epoch+1, EPOCHS, loss, TRAIN_ACCURACY.result()))


    ## Validation
    for idx, (img_batch, label_batch) in enumerate(x):
      val_loss = val_loop(img_batch, label_batch)
      ### Prompt Results After 100 steps
      if idx % 10 == 0:
        print("Epoch:{}/{} : V-Loss: {} -- V-Accuracy: {}".format(epoch+1, EPOCHS, val_loss, VAL_ACCURACY.result()))



    print(f"\n##############################\nEpoch:{epoch+1}/{EPOCHS} : T-Loss: {loss} -- V-Loss: {val_loss} -- T-Accuracy: {TRAIN_ACCURACY.result()} -- V-Accuracy: {VAL_ACCURACY.result()}\n##############################\n")
    VAL_ACCURACY.reset_state()
    TRAIN_ACCURACY.reset_state()


  print("\n\n TRAINING COMPLETED!!!!!")



In [None]:
train(train_dataset=train_dataset, validation_dataset=validation_dataset, EPOCHS=2, OPTIMIZER=tf.keras.optimizers.Adam(learning_rate=0.001))