<a href="https://colab.research.google.com/github/akankshakusf/Project-Deep-Learning-CNN-Malaria-Detection/blob/master/Malaria-Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#import ML packages
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
from datetime import datetime
import cv2  #computer vision
from sklearn.metrics import confusion_matrix,roc_curve

#import DL tensorflow packages
import tensorflow as tf
import tensorflow_probability as tfp
import tensorflow_datasets as tfds
import albumentations as A
from tensorflow.keras.models import Model
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Layer,BatchNormalization, Input,InputLayer,Conv2D, MaxPool2D, Flatten,Dense,Dropout
from tensorflow.keras.layers import RandomFlip,RandomRotation,Resizing,Rescaling,Reshape  #for data augmentation
from tensorflow.keras.regularizers import L2, L1
from tensorflow.keras.callbacks import Callback,CSVLogger,EarlyStopping,LearningRateScheduler
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.metrics import BinaryAccuracy,FalseNegatives,FalsePositives,TruePositives,\
TrueNegatives,Precision,Recall,AUC
from tensorboard.plugins.hparams import api as hp

#import wandb packages
import wandb
from wandb.integration.keras import WandbCallback


### What will we be carrying out?
- By this we will be able to conclude that we can mix up models:
    *  First we built Sequential model
    *  Then we built Functional API Model which we broke down in feature_extractor_model which extracts features  and lenet_model_func where we flatten our features for final compiling and training
    * Then I built complete model LenetModel mixing up Functional API Model's feature_extractor
    * Ather that I go on to create a Custom dense layer NeuralearnDense

# Wandb Install,Login and Intialization

In [2]:
!pip install wandb



In [5]:
!wandb login

[34m[1mwandb[0m: Currently logged in as: [33makushwaha2[0m ([33makankshakusf2[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


In [4]:
import wandb
wandb.init(
    project="Malaria-Detection",
    entity="akankshakusf2",
    #name = "confusionmatrix"
)

[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.


<IPython.core.display.Javascript object>

[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
wandb: Paste an API key from your profile and hit enter:

 ··········


[34m[1mwandb[0m: No netrc file found, creating one.
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc
[34m[1mwandb[0m: Currently logged in as: [33makushwaha2[0m ([33makankshakusf2[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


In [6]:
from wandb.integration.keras import WandbCallback

In [None]:
wandb.tensorboard.unpatch()

In [None]:
# I used this just to start tensor borad on Wandb
import wandb
wandb.tensorboard.patch(root_logdir="./logs")
wandb.init(project="Malaria-D", entity="akushwaha2-university-of-south-florida")

In [None]:
wandb.run

## Params Dictionary

In [7]:
wandb.config={
        "LEARNING_RATE":0.001,
        "N_EPOCHS":2,
        "BATCH_SIZE":128,
        "DROPOUT_RATE":0.0,
        "IM_SIZE":224,
        "REGULARIZATION_RATE":0.0,
        "N_FILTERS":6,
        "KERNEL_SIZE":3,
        "N_STRIDES":1,
        "POOL_SIZE":2,
        "N_DENSE_1":128,
        "N_DENSE_2":32,
      }

CONFIGURATION= wandb.config

## Data Loading

In [8]:
# Import the Malaria dataset from TensorFlow Datasets (TFDS)
# and shuffle files
# Setting as_supervised=True returns the data as (image, label) pairs and not dict

dataset, dataset_info = tfds.load("malaria", with_info=True,as_supervised=True, shuffle_files=True)

In [None]:
#function to split train data

def split(dataset, TRAIN_RATIO, VAL_RATIO, TEST_RATIO):
    # Get dataset size
    DATASET_SIZE = tf.data.experimental.cardinality(dataset).numpy()

    # Make train, val, test split
    train_dataset = dataset.take(int(TRAIN_RATIO * DATASET_SIZE))
    val_test_dataset = dataset.skip(int(TRAIN_RATIO * DATASET_SIZE))  # Skip train data
    val_dataset = val_test_dataset.take(int(VAL_RATIO * DATASET_SIZE))
    test_dataset = val_test_dataset.skip(int(VAL_RATIO * DATASET_SIZE))  # skip Remaining data as test

    return train_dataset, val_dataset, test_dataset

In [None]:
###### split function logic explaination builder ######

# TRAIN_RATIO=0.6
# VAL_RATIO=0.2
# TEST_RATIO=0.2
# TOTAL=10
# #set range on data for testing logic
# ds=tf.data.Dataset.range(TOTAL)
# #train_sd,val_ds,test_ds=split(ds,TRAIN_RATIO,VAL_RATIO,TEST_RATIO)
# train_ds=ds.take(int(TRAIN_RATIO*TOTAL))
# val_test_ds=ds.skip(int(TRAIN_RATIO*TOTAL)) #not important
# val_ds=val_test_ds.take(int(VAL_RATIO*TOTAL))
# test_ds=val_test_ds.skip(int(VAL_RATIO*TOTAL))

# #print main dataset for review
# print([int(x) for x in ds.as_numpy_iterator()])
# print([int(x) for x in train_ds.as_numpy_iterator()])
# print([int(x) for x in val_test_ds.as_numpy_iterator()])
# print([int(x) for x in val_ds.as_numpy_iterator()])
# print([int(x) for x in test_ds.as_numpy_iterator()])


In [None]:
# Shuffle dataset before splitting
dataset = dataset["train"].shuffle(buffer_size=10000, reshuffle_each_iteration=False)

In [None]:
# Define split ratios
# split it into three parts:
# - The first 80% of the data will be used for training.
# - The next 10% (80%-90%) will be used for validation
# - The last 10% (90%-100%) will be used for testing.
TRAIN_RATIO = 0.8
VAL_RATIO = 0.1
TEST_RATIO = 0.1

# Call split function
train_dataset, val_dataset, test_dataset = split(dataset, TRAIN_RATIO, VAL_RATIO, TEST_RATIO)

In [None]:
# Check dataset sizes
print(f"Training size: {tf.data.experimental.cardinality(train_dataset).numpy()}")
print(f"Validation size: {tf.data.experimental.cardinality(val_dataset).numpy()}")
print(f"Test size: {tf.data.experimental.cardinality(test_dataset).numpy()}")

In [None]:
#check data info
dataset_info

- So, in the data 0 represent parasitic
and 1 represents uneffected

In [None]:
#check to values in dataset_info
print(dataset_info.features['label'].int2str(0))
print(dataset_info.features['label'].int2str(1))

In [None]:
# Print a few values from the validation dataset
for i in val_dataset.take(1):
    print(i)

## Dataset Visualization

In [None]:
for i, (image, label) in enumerate(train_dataset.take(16)):  #Unpack tuple
    ax = plt.subplot(4, 4, i + 1)
    plt.imshow(image.numpy())  # Convert Tensor to NumPy array
    plt.title(dataset_info.features['label'].int2str(label.numpy()))  # Convert label to class name
    plt.axis("off")

plt.show()

## Data Augmentation (Only Resizing technique)

- These images are of bigger sizes (255,255,3) etc. But we will have to Normalize them and bring them in the range of 0-1 so Deep learningn model converges or inference faster

In [None]:
#define the image size we want to reduce to
IM_SIZE=224
BATCH_SIZE=32

def resize_rescale(image, label):
    # Resize and rescale the image
    image = tf.image.resize(image, (IM_SIZE, IM_SIZE)) / 255.0
    return image, label

# Apply the function to the dataset
train_dataset = train_dataset.map(resize_rescale).shuffle(buffer_size=1000, reshuffle_each_iteration=True).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
val_dataset = val_dataset.map(resize_rescale).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
test_dataset = test_dataset.map(resize_rescale).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

In [None]:
train_dataset

In [None]:
###----- This is for testing batches as they give problems later in tensor to array conversion-----###

# Count the total number of images in the train, val, and test datasets
train_images = sum(1 for _ in train_dataset.unbatch())
val_images = sum(1 for _ in val_dataset.unbatch())
test_images = sum(1 for _ in test_dataset.unbatch())

print(f"Total images in train dataset: {train_images}")
print(f"Total images in validation dataset: {val_images}")
print(f"Total images in test dataset: {test_images}")

print("----------------------------------------------------------")
# Calculate the number of batches
train_batches = train_images // BATCH_SIZE
val_batches = val_images // BATCH_SIZE
test_batches = test_images // BATCH_SIZE

print(f"Number of batches in train dataset: {train_batches}")
print(f"Number of batches in validation dataset: {val_batches}")
print(f"Number of batches in test dataset: {test_batches}")  # after 2 full batches of 32, 3rd batch has only 23 images left.

* Uncomment and run this to check batch that fail to meet shape criteria

In [None]:
# def get_last_batch_info(total_images, batch_size):
#     # Calculate number of full batches
#     full_batches = total_images // batch_size

#     # Calculate the number of images in the last batch
#     last_batch_size = total_images % batch_size

#     # If there are no remainder images, last batch size will be the same as batch size
#     if last_batch_size == 0:
#         last_batch_size = batch_size  # The last batch will be of full size

#     # Calculate the total number of batches
#     total_batches = full_batches if last_batch_size == batch_size else full_batches + 1

#     # Print the results
#     print(f"Total images: {total_images}")
#     print(f"Batch size: {batch_size}")
#     print(f"Number of full batches: {full_batches}")
#     print(f"Total number of batches: {total_batches}")
#     print(f"Size of the last batch: {last_batch_size}")

#     return last_batch_size

# get_last_batch_info(2757,32)

In [None]:
#view for verification
for image, label in train_dataset.take(1):
    print("Image shape:", image.shape)
    print("Label:", label)

In [None]:
#view for verification
for image, label in test_dataset.take(1):
    print("Image shape:", image.shape)
    print("Label:", label)

* see that image size is now 224 : of shape shape=(224, 224, 3), dtype=float32) tf.Tensor(1, shape=(), dtype=int64)
- tf.Tensor(1, meaning its was not infected cell

In [None]:
#check for unique values in transformed train_dataset
np.set_printoptions(suppress=True, precision=6)
np.unique(image)

* Notice - here  goal is achieved of having all values in between 0 to 1

# **Data Preprocessing** (Advanced)

### Data Augmentation (Advanced)

In [None]:
## Augment images with Custom resize and rescale function
#define the image size we want to reduce to

IM_SIZE=224

@tf.function ### Makes the function run faster by turning it into a TensorFlow graph

##create resize_rescale fucntion
def resize_rescale(image, label):
    # Resize and rescale the image
    image = tf.image.resize(image, (IM_SIZE, IM_SIZE)) / 255.0
    return image, label

* Rather that making custom functionf like above for  resizing and rescaling we can also make use of the tf.keras.layers resize and reshape methods
- Rescaling-https://www.tensorflow.org/api_docs/python/tf/keras/layers/Rescaling?hl=en
- Resizing https://www.tensorflow.org/api_docs/python/tf/keras/layers/Resizing?hl=en

In [None]:
IM_SIZE = 224  #*RunThis
#BATCH_SIZE = 32

## 1.Augmented layer with "layers method" for resizing and rescaling
resize_rescale_layers = tf.keras.Sequential([
    Resizing(IM_SIZE, IM_SIZE),
    Rescaling(1.0 / 255),
])

* Augmentation using tf.image method : https://www.notion.so/Tensorflow-1a54ba18200f81f58982c39b61ce9ec8?pvs=4#1bf4ba18200f80e19a19dfc16dd092f7

In [None]:
## 1.create augmenting function with custom "resize_rescale function"

@tf.function ### Makes the function run faster by turning it into a TensorFlow graph
def augment(image,label):

  #call resize_rescale function above to get image and label
  image,label=resize_rescale(image,label)

  # I will be using rot90 and Draw samples from a uniform distribution.
  # using from  https://www.tensorflow.org/api_docs/python/tf/keras/random/uniform
  image =tf.image.rot90(image,k=tf.random.uniform(shape=[],minval=0,maxval=2,dtype=tf.int32))

  #using from https://www.tensorflow.org/api_docs/python/tf/image/adjust_saturation
  #image=tf.image.adjust_saturation(image, saturation_factor=0.3)

  #using from https://www.tensorflow.org/api_docs/python/tf/image/stateless_random_saturation
  #image=tf.image.stateless_random_saturation(image, 0.3, 0.5)

  #using from https://www.tensorflow.org/api_docs/python/tf/image/stateless_random_flip_left_right
  image=tf.image.stateless_random_flip_left_right(image)

  return image,label

In [None]:
## 2. Making a custom Augment layer that inherit from "Layer" class
class RotNinety(Layer):
  def __init__(self):
    super().__init__()

  @tf.function ### Makes the function run faster by turning it into a TensorFlow graph
  def call(self,image):
    return tf.image.rot90(image,k=tf.random.uniform(shape=[],minval=0,maxval=2,dtype=tf.int32))


* Augmentation using tf.layers, tf.image method to create custom Augmentation layer in Sequential API

In [None]:
### tf.keras.layer augment                        #*RunThis
augment_layers = tf.keras.Sequential([
       RandomRotation(factor = (0.25, 0.2501),),
       RandomFlip(mode='horizontal',),

])

@tf.function
def augment_layer(image, label):
    # Ensure image is in (H, W, C) format
    image = tf.ensure_shape(image, (None, None, 3))  # Allow dynamic shape
    image = resize_rescale_layers(image)
    image = augment_layers(image)
    return image, label

In [None]:
# ## 3. Augmentation: Flip, Rotate, Contrast
# augment_layer = tf.keras.Sequential([
#     RotNinety(),  #using the "layer" , "image" mthod together. Best of both worlds
#     ##RandomRoatation(factor=(0.25,0.2501))  #no longer needed as we are able to use "image" mthd now which is better
#     RandomFlip(mode='horizontal'),
# ])

# # For training: resize, rescale, then augment

# @tf.function ### Makes the function run faster by turning it into a TensorFlow graph
# def augment_layers(image, label):
#     image = resize_rescale_layer(image)
#     image = augment_layer(image,),# training=True)
#     return image, label


* why this union of tf.keras.layers and tf.keras.image was important?????
- because "image" method is very good at handling images and the "layers" method provides us with its commendable speedly processing

In [None]:

# # For validation/test: only resize and rescale
# def resize_rescale_layers(image, label):
#     image = resize_rescale_layer(image)
#     return image, label

In [None]:
for image, label in train_dataset.take(1):
    print("Image shape:", image.shape)
    print("Label:", label)

In [None]:
BATCH_SIZE = 32   #*RunThis

# Training pipeline with augmentation
train_dataset = (
    train_dataset
    .shuffle(buffer_size=1024, reshuffle_each_iteration=True)
    .map(augment_layer, num_parallel_calls=tf.data.AUTOTUNE)
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)

In [None]:
# Validation pipeline (only resize & rescale)  #*RunThis
val_dataset = (
    val_dataset
    .map(lambda image, label: (resize_rescale_layers(image, training=False), label))
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)


In [None]:
# Test pipeline (only resize & rescale) #*RunThis
test_dataset = (
    test_dataset
    .map(lambda image, label: (resize_rescale_layers(image, training=False), label))
    # .batch(BATCH_SIZE)
    # .prefetch(tf.data.AUTOTUNE)
)

In [None]:
train_dataset

In [None]:
val_dataset

In [None]:
test_dataset

* Batching with different Augmentation techniques

In [None]:
def visualize(original,augmented):
  # original image is a batch of 32 images
  # take the first image from the batch for visualization

  original = original[0]
  augmented = augmented[0]

  plt.subplot(1,2,1)
  plt.imshow(original)


  plt.subplot(1,2,2)
  plt.imshow(augmented)


In [None]:
original_image, label= next(iter(train_dataset))

* there are several methods used to Augment the data we will be using adjust_saturation
-adjust_brightness(...): Adjust the brightness of RGB or Grayscale images.
-adjust_saturation(...): Adjust saturation of RGB images.
-central_crop(...): Crop the central region of the image(s).
-crop_and_resize(...): Extracts crops from the input image tensor and resizes them.
-flip_left_right(...): Flip an image horizontally (left to right).
-etc


In [None]:
#I will be using adjust_saturation
augmented_image=tf.image.adjust_saturation(original_image, saturation_factor=0.3)

In [None]:
#call the visualize fucntion
visualize(original_image,augmented_image)

The sequential model performed very poorly with the this. Reason being simple that if I go back and see my augmentation I have used "tf.image.adjust_saturation(image, saturation_factor=0.3)" with the implementation of this

In [None]:
# ## This is a testing cell
# def visualize(original, augmented, original_labels):

#   original_image = original[0]
#   augmented_image = augmented[0]
#   original_label = original_labels[0].numpy()  # Get the label as a NumPy value

#   plt.subplot(1, 2, 1)
#   plt.imshow(original_image)
#   plt.title(f"Original - Label: {original_label}")  # Display label in title

#   plt.subplot(1, 2, 2)
#   plt.imshow(augmented_image)
#   plt.title("Augmented Image")  # Title for augmented image


# for original_images, original_labels in train_dataset.take(1):  # Take 1 batch from the dataset
#     augmented_images, augmented_labels = augment(original_images, original_labels)  # Apply augmentations

#     # Now you can call the visualize function with the images and labels
#     visualize(original_images, augmented_images, original_labels)  # Include original_labels


### Mixup Data Augmentation
- Note : This technique does not always work for all datasets

In [None]:
train_dataset_1= train_dataset.shuffle(buffer_size=4096,).map(resize_rescale)
train_dataset_2= train_dataset.shuffle(buffer_size=4096,).map(resize_rescale)

mixed_dataset=tf.data.Dataset.zip(train_dataset_1, train_dataset_2) # comment when using Albumentation
# mixed_dataset = train_dataset_1.concatenate(train_dataset_2) # use only when using Albumematation

In [None]:
# create a method for create mixup of images
def mixup(train_dataset_1, train_dataset_2):
  (image_1,label_1),(image_2,label_2) = train_dataset_1, train_dataset_2

  image_1=cv2.resize(cv2.imread('cat.jpg'), (IM_SIZE,IM_SIZE))
  image_2=cv2.resize(cv2.imread('dog.jpg'), (IM_SIZE,IM_SIZE))

  lamda=tfp.distributions.Beta(0.2,0.2)
  lamda=lamda.sample(1)[0]

  image= lamda*image_1 + (1-lamda)*image_2
  label= lamda*tf.cast(label_1,dtype=tf.float32) + (1-lamda)*tf.cast(label_2,dtype=tf.float32)

  return image,label

In [None]:
# #### This is an example show how mix up works uncomment when needed

# IM_SIZE=224
# image_1=cv2.resize(cv2.imread('cat.jpg'), (IM_SIZE,IM_SIZE)) # manually upload a cat image in colab instance to see in action
# image_2=cv2.resize(cv2.imread('dog.jpg'), (IM_SIZE,IM_SIZE))  # manually upload a cat image in colab instance to see in action

# label_1= 0
# label_2= 1

# lamda=tfp.distributions.Beta(0.4,0.4)
# lamda=lamda.sample(1)[0]

# print(image_1.shape,image_2.shape)

# image= lamda*image_1 + (1-lamda)*image_2
# label= lamda*label_1 + (1-lamda)*label_2
# print(image.shape, label)

# plt.imshow(image/255)

In [None]:
BATCH_SIZE = 32

# Training pipeline with augmentation
train_dataset = (
    mixed_dataset
    .shuffle(buffer_size=8, reshuffle_each_iteration=True)
    .map(mixup)
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)

# Validation pipeline (only resize & rescale)
val_dataset = (
    val_dataset
    .map(resize_rescale)
    .batch(BATCH_SIZE)
)

# No need to do anything to test_dataset

In [None]:
train_dataset

In [None]:
val_dataset

### CutMix Data Augmentation

In [None]:

def box(lamda):

  # Randomly select a center point (x, y) for the patch within the image
  r_x = tf.cast(tfp.distributions.Uniform(0, IM_SIZE).sample(1)[0], dtype=tf.int32)
  r_y = tf.cast(tfp.distributions.Uniform(0, IM_SIZE).sample(1)[0], dtype=tf.int32)

  # Calculate the width and height of the patch based on how much of the image we want to replace
  r_w = tf.cast(IM_SIZE * tf.math.sqrt(1 - lamda), dtype=tf.int32)
  r_h = tf.cast(IM_SIZE * tf.math.sqrt(1 - lamda), dtype=tf.int32)

  # Adjust the top-left corner so the patch stays within image boundaries
  r_x = tf.clip_by_value(r_x - r_w // 2, 0, IM_SIZE)
  r_y = tf.clip_by_value(r_y - r_h // 2, 0, IM_SIZE)

  #find the x,y bottom right
  x_b_r= tf.clip_by_value(r_x + r_w // 2, 0, IM_SIZE)
  y_b_r = tf.clip_by_value(r_y + r_h // 2, 0, IM_SIZE)

  #final value of r_w,r_h
  r_w = x_b_r - r_x
  if (r_w == 0):
    r_w = 1

  r_h = y_b_r - r_y
  if (r_h == 0):
    r_h = 1

  # Print the top-left corner (r_x, r_y) and size (r_w, r_h) of the patch to be cut and mixed
  return r_y, r_x, r_h, r_w


In [None]:
# Create a method to perform MixUp by combining two training images

def cutmix(train_dataset_1, train_dataset_2):
  # Draw a random value from a Beta distribution to decide how much to mix two images
  lamda = tfp.distributions.Beta(0.2, 0.2)
  lamda = lamda.sample(1)[0]

  r_y,r_x, r_h, r_w = box(lamda)

  # Unpack images and labels from both datasets
  (image_1, label_1), (image_2, label_2) = train_dataset_1, train_dataset_2

  # Cut a rectangular patch from image_2
  crop_2 = tf.image.crop_to_bounding_box(image_2, r_y, r_x, r_h, r_w)
  # Place the cropped patch onto a blank canvas (same size as image) at a specific location
  pad_2 = tf.image.pad_to_bounding_box(crop_2, r_y, r_x, IM_SIZE, IM_SIZE)

  # Do the same crop and pad for image_1 (to subtract the patch area later)
  crop_1 = tf.image.crop_to_bounding_box(image_1, r_y, r_x, r_h, r_w)
  pad_1 = tf.image.pad_to_bounding_box(crop_1,r_y, r_x, IM_SIZE, IM_SIZE)

  # Replace the patch in image_1 with the patch from image_2
  image = image_1 - pad_1 + pad_2

  lamda= tf.cast(1-(r_w*r_h)/(IM_SIZE*IM_SIZE),dtype=tf.float32)
  label = lamda*tf.cast(label_1,dtype=tf.float32) + (1-lamda)*tf.cast(label_2,dtype=tf.float32)

  # Return the mixed image and the label (currently label_1 only)
  return image, label


In [None]:
 #### This is an example show how cutmix up works uncomment when needed


# plt.figure(figsize=(16,6))
# plt.subplot(1,7,1)
# image_1=cv2.resize(cv2.imread('cat.jpg'),(IM_SIZE,IM_SIZE))
# plt.imshow(image_1)
# #
# plt.subplot(1,7,2)
# image_2=cv2.resize(cv2.imread('dog.jpg'),(IM_SIZE,IM_SIZE))
# plt.imshow(image_2)

# plt.subplot(1,7,3)
# # image_3=cv2.resize(cv2.imread('dog.jpg'),(IM_SIZE,IM_SIZE))
# crop=tf.image.crop_to_bounding_box(image_2, 70, 50, 100, 98)
# plt.imshow(crop)

# plt.subplot(1,7,4)
# image_4=tf.image.pad_to_bounding_box(crop, 20, 100, IM_SIZE, IM_SIZE)
# plt.imshow(image_4)

# plt.subplot(1,7,6)
# cat_crop=tf.image.crop_to_bounding_box(image_1, 70, 50, 100, 98)
# plt.imshow(cat_crop)


# plt.subplot(1,7,7)
# image_5=tf.image.pad_to_bounding_box(cat_crop, 20, 100, IM_SIZE, IM_SIZE)
# plt.imshow(image_1-image_5+ image_4)

In [None]:
BATCH_SIZE = 32

# Training pipeline with augmentation
train_dataset = (
    mixed_dataset
    .shuffle(buffer_size=8, reshuffle_each_iteration=True)
    .map(cutmix)
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)

# Validation pipeline (only resize & rescale)
val_dataset = (
    val_dataset
    .map(resize_rescale)
    .batch(BATCH_SIZE)
)

# No need to do anything to test_dataset

In [None]:
train_dataset

In [None]:
val_dataset

In [None]:
# testing: lets visualize to see what is going on
original_image, label = next(iter(train_dataset))
print(label)
plt.imshow((original_image[0].numpy() * 255).astype("uint8"))
plt.axis('off')
plt.show()


### Albumentations

In [None]:
# !pip install -U git+https://github.com/albu/albumentations --no-cache-dir

In [None]:
IM_SIZE=224

In [None]:
# Instantiate augments
# we can apply as many augments we want and adjust the values accordingly
# here I have chosen the augments and their arguments at random


transforms = A.Compose([
    A.Resize(IM_SIZE, IM_SIZE),
    A.OneOf([A.HorizontalFlip(),
              A.VerticalFlip(),], p=0.3),
    A.RandomRotate90(),
    A.RandomBrightnessContrast(brightness_limit=0.2,
                               contrast_limit=0.2,  # or set this to a float, not False
                                p=0.5),
   #A.CoarseDropout(num_holes_range=(1, 8), hole_height_range=(0.1, 0.2), hole_width_range=(0.1, 0.2), fill=0, fill_mask=False, p=0.5)
])


In [None]:
# used from https://albumentations.ai/docs/examples/tensorflow-example/
def aug_albument(image):
  data = {"image": image}
  augmented = transforms(**data) #fit the data we have initialised above
  augmented_image = augmented["image"]
  augmented_image = tf.cast(augmented_image / 255.0, tf.float32)  # Normalize
  return augmented_image


In [None]:
# # here the above python function is converted back to tensorflow
def process_data(image, label):
    aug_img = tf.numpy_function(func=aug_albument, inp=[image], Tout=tf.float32)
    # Set the shape explicitly after augmentation to avoid potential shape issues
    aug_img.set_shape([IM_SIZE, IM_SIZE, 3])
    return aug_img, label


In [None]:
BATCH_SIZE=32

train_dataset = (
    train_dataset
    .shuffle(buffer_size = 1024, reshuffle_each_iteration = True)
    .map(process_data)
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)

In [None]:
train_dataset# Now, try to display the image again
im, _ = next(iter(train_dataset))
plt.imshow(im[0])
plt.show()

In [None]:
plt.figure(figsize=(10,10))

for i in range(1,32):
  plt.subplot(8,4,i)
  plt.imshow(im[i])

### Repeating the dataset (x5)

# **Building advanced Models with Functional API, Subclassing and Custom Layers  using Keras API**

## Basic: Sequential API:LeNet Model

- This exact model summary is explained with full walk through in notion https://www.notion.so/CNN-Layering-Neuron-Count-1bc4ba18200f80ffa385ea743e4a30cd

In [None]:
# clear up session cache
from tensorflow.keras import backend as K
# Clear the previous session to reset layer count
K.clear_session()

In [None]:
                                        #*RunThis
'''
1. Instantiate the CNN model "Simple Sequential Model"
'''

IM_SIZE=CONFIGURATION['IM_SIZE']
DROPOUT_RATE=CONFIGURATION['DROPOUT_RATE']
REGULARIZATION_RATE=CONFIGURATION['REGULARIZATION_RATE']
N_FILTERS=CONFIGURATION['N_FILTERS']
KERNEL_SIZE=CONFIGURATION['KERNEL_SIZE']
N_STRIDES=CONFIGURATION['N_STRIDES']
POOL_SIZE=CONFIGURATION['POOL_SIZE']

lenet_model = tf.keras.Sequential([

    InputLayer(input_shape = (IM_SIZE, IM_SIZE, 3)),
    #InputLayer(input_shape=(None, None, 3)),  # Input: IM_SIZE x IM_SIZE RGB image

    #resize_rescale_layers,# embedding resize and rescale into SequentialAPI
    #augment_layer,    # embedding augment into SequentialAPI

    # 1st conv layer (extracts basic patterns)
    Conv2D(filters=N_FILTERS, kernel_size=KERNEL_SIZE, strides=N_STRIDES, padding="valid", activation="relu",
           kernel_regularizer=L2(REGULARIZATION_RATE)),
    BatchNormalization(),
    MaxPool2D(pool_size=POOL_SIZE, strides=N_STRIDES),  # Downsamples feature maps
    Dropout(rate=DROPOUT_RATE),   #add a dropout layer

    # 2nd conv layer (extracts deeper features)
    Conv2D(filters=N_FILTERS*2, kernel_size=KERNEL_SIZE, strides=N_STRIDES, padding="valid", activation="relu",
           kernel_regularizer=L2(REGULARIZATION_RATE)),
    BatchNormalization(),
    MaxPool2D(pool_size=POOL_SIZE, strides=N_STRIDES),  # Downsampling again


    Flatten(),  # Converts 2D feature maps into 1D array

    Dense(100, activation="relu",kernel_regularizer=L2(REGULARIZATION_RATE)),  # Fully connected layer
    BatchNormalization(),
    Dropout(rate=DROPOUT_RATE),   #add a dropout layer

    Dense(10, activation="relu",kernel_regularizer=L2(REGULARIZATION_RATE)),   # Further processing
    BatchNormalization(),

    Dense(1, activation="sigmoid"),     # Output layer (binary classification)

])

# Print model summary
lenet_model.summary()


## Functional API :LeNet Model

In [None]:
# clear up session cache
from tensorflow.keras import backend as K
# Clear the previous session to reset layer count
K.clear_session()

In [None]:
'''
1.a.Feature Extractor Model "NON Sequential"
'''
#create input function to send image
func_input=Input(shape=(IM_SIZE, IM_SIZE, 3),name="Input_Image")

# 1st conv layer (extracts basic patterns)
x= Conv2D(filters=6, kernel_size=3, strides=1, padding="valid", activation="relu")(func_input)
x= BatchNormalization()(x)
x= MaxPool2D(pool_size=2, strides=2)(x)  # Downsamples feature maps

# 2nd conv layer (extracts deeper features)
x= Conv2D(filters=16, kernel_size=3, strides=1, padding="valid", activation="relu")(x)
x= BatchNormalization()(x)
output= MaxPool2D(pool_size=2, strides=2)(x)  # Downsampling again

#create lenet model
feature_extractor_model = Model(func_input,output, name="Feature_Extractor")
feature_extractor_model.summary()

In [None]:
'''
1.b.Feature Extractor with "Sequential Model"
'''
feature_extractor_seq_model = tf.keras.Sequential([

    InputLayer(shape=(IM_SIZE, IM_SIZE, 3)),  # Input: IM_SIZE x IM_SIZE RGB image

    # 1st conv layer (extracts basic patterns)
    Conv2D(filters=6, kernel_size=3, strides=1, padding="valid", activation="relu"),
    BatchNormalization(),
    MaxPool2D(pool_size=2, strides=2),  # Downsamples feature maps

    # 2nd conv layer (extracts deeper features)
    Conv2D(filters=16, kernel_size=3, strides=1, padding="valid", activation="relu"),
    BatchNormalization(),
    MaxPool2D(pool_size=2, strides=2),  # Downsampling again


])

# Print model summary
feature_extractor_seq_model.summary()


## Callable Model

In [None]:
'''
2. Flatten and instantiate the CNN model with above Sequential or Non Sequential model
'''
#create input function to send image
func_input=Input(shape=(IM_SIZE, IM_SIZE, 3),name="Input_Image")

# 1st conv layer (extracts basic patterns)
x= feature_extractor_seq_model(func_input)

x= Flatten()(x) # Converts 2D feature maps into 1D array

x= Dense(100, activation="relu")(x)  # Fully connected layer
x= BatchNormalization()(x)

x= Dense(10, activation="relu")(x)   # Further processing
x= BatchNormalization()(x)

func_output= Dense(1, activation="sigmoid")(x)     # Output layer (binary classification)

#create lenet model
lenet_model_func = Model(func_input,func_output, name="Lenet_Model")
lenet_model_func.summary()

## Model Subclassing:LeNet Model

In [None]:
'''
1. making a class for feature extraction which will have subclass that inherits from "Layer" class
'''
class FeatureExtractor(Layer):
  #create init Method
  def __init__(self, filters, kernel_size, strides, padding, activation, pool_size):
    super(FeatureExtractor,self).__init__()
    self.conv_1  = Conv2D(filters=filters, kernel_size=kernel_size, strides=strides, padding=padding, activation=activation) # 1st conv layer (extracts basic patterns)
    self.batch_1 = BatchNormalization()
    self.pool_1  = MaxPool2D(pool_size=pool_size, strides=2*strides) # Downsamples feature maps


    self.conv_2  = Conv2D(filters=filters*2, kernel_size=kernel_size, strides=strides, padding=padding, activation=activation) # 2nd conv layer (extracts deeper features)
    self.batch_2 = BatchNormalization()
    self.pool_2  = MaxPool2D(pool_size=pool_size, strides=2*strides) # Downsampling again

  #create Call Method
  def call(self,x,training):

    x=self.conv_1(x)
    x = self.batch_1(x, training=training)
    x=self.pool_1(x)

    x=self.conv_2(x)
    x = self.batch_2(x, training=training)
    x=self.pool_2(x)

    return x

feature_sub_classed=FeatureExtractor(8, 3, 1, "valid", "relu", 2)

In [None]:
'''
2.Flatten and instantiate the CNN model with Subclassing above
'''
#create input function to send image
func_input=Input(shape=(IM_SIZE, IM_SIZE, 3),name="Input_Image")

# 1st conv layer (extracts basic patterns)
x= feature_sub_classed(func_input,training=True)

x= Flatten()(x) # Converts 2D feature maps into 1D array

x= Dense(100, activation="relu")(x)  # Fully connected layer
x= BatchNormalization()(x)

x= Dense(10, activation="relu")(x)   # Further processing
x= BatchNormalization()(x)

func_output= Dense(1, activation="sigmoid")(x)     # Output layer (binary classification)

#create lenet model
lenet_model_func = Model(func_input,func_output, name="Lenet_Model")
lenet_model_func.summary()

In [None]:
'''
1.making a new model with above FeatureExtractor method for the extracting patterns
'''
class LenetModel(Model):
  #create init Method
  def __init__(self):
    super(LenetModel,self).__init__()

    self.feature_extractor  = FeatureExtractor(8, 3, 1, "valid", "relu", 2)

    self.flatten = Flatten() # Converts 2D feature maps into 1D array

    self.dense_1=Dense(100, activation="relu") # Fully connected layer
    self.batch_1=BatchNormalization()

    self.dense_2=Dense(10, activation="relu") # Further processing
    self.batch_2=BatchNormalization()

    self.dense_3=Dense(1, activation="sigmoid")


  #create Call Method
  def call(self,x,training=False):

    x=self.feature_extractor(x, training=training)
    x=self.flatten(x)
    x=self.dense_1(x)
    x=self.batch_1(x,training=training)
    x=self.dense_2(x)
    x=self.batch_2(x, training=training)
    x=self.dense_3(x)
    return x

#instantiate the model:
lenet_sub_classed = LenetModel()
lenet_sub_classed(tf.zeros([1,224,224,3]))
lenet_sub_classed.summary()

## Custom Dense Layers

*  I create built this  method from this page :https://www.tensorflow.org/guide/keras/making_new_layers_and_models_via_subclassing
* I also used - https://www.tensorflow.org/api_docs/python/tf/keras/layers/Dense
* refer random_initializer = https://www.tensorflow.org/api_docs/python/tf/keras/initializers/RandomNormal

**Explaination of Dense Layer**

Dense Layer in Simple Terms:A dense layer in a neural network works by multiplying the inputs (features) by weights
and then adding a bias. This can be written as:
---> = mx + c
Where:
    m = weights
    x = input features
    c = bias

Shapes of Matrices:
- Input matrix has a shape of (b, f):
    - b is the batch size (number of examples),
    - f is the number of features (input size).
  
- Weights matrix has a shape of (f, o):
    - f matches the input features,
    - o is the output size (number of neurons in the layer).
    
So, understand here the shape should match meaning columns for 1 matrix (b,f)
   should have same shape as rows of 2nd matrix(w,b)

TensorFlow's Role:
TensorFlow ensures that the number of features in the input matches the number of weights,
so the multiplication works. If they don’t match, TensorFlow automatically adjusts the shapes
to prevent errors.

Finally, the bias is added to the result, and that's how the dense layer produces an output.


In [None]:
# class Linear(keras.layers.Layer):
#     def __init__(self, units=32, input_dim=32):
#         super().__init__()
#         self.w = self.add_weight(
#             shape=(input_dim, units), initializer="random_normal", trainable=True
#         )
#         self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)

#     def call(self, inputs):
#         return tf.matmul(inputs, self.w) + self.b

In [None]:
'''
1. Making Custom Dense Layer
'''

class NeuralearnDense(Layer):
  def __init__(self, output_units, activation):
    super(NeuralearnDense,self).__init__()
    self.output_units = output_units
    self.activation = activation


  def build(self,input_features_shape):
    # w stands for weights
    self.w = self.add_weight(shape=(input_features_shape[-1], self.output_units),initializer="random_normal",trainable=True)
    # b stands for biases
    self.b = self.add_weight(shape=(self.output_units,),initializer="random_normal",trainable=True)


  def call(self, input_features):
    pre_output= tf.matmul(input_features,self.w) + self.b # w stands for weights , b for biases

    if (self.activation =="relu"):
      return tf.nn.relu(pre_output)

    elif (self.activation=="sigmoid"):
      return tf.math.sigmoid(pre_output)

    else:
      return pre_output

In [None]:
'''
2.using my NeuralearnDense layer for the extracting patterns
'''
IM_SIZE = 224
# Instantiate the CNN model
lenet_custom_model = tf.keras.Sequential([

    InputLayer(shape=(IM_SIZE, IM_SIZE, 3)),  # Input: IM_SIZE x IM_SIZE RGB image

    # 1st conv layer (extracts basic patterns)
    Conv2D(filters=6, kernel_size=3, strides=1, padding="valid", activation="relu"),
    BatchNormalization(),
    MaxPool2D(pool_size=2, strides=2),  # Downsamples feature maps

    # 2nd conv layer (extracts deeper features)
    Conv2D(filters=16, kernel_size=3, strides=1, padding="valid", activation="relu"),
    BatchNormalization(),
    MaxPool2D(pool_size=2, strides=2),  # Downsampling again

    Flatten(),  # Converts 2D feature maps into 1D array

    NeuralearnDense(100, activation="relu"),  # Fully connected layer
    BatchNormalization(),

    NeuralearnDense(10, activation="relu"),   # Further processing
    BatchNormalization(),

    NeuralearnDense(1, activation="sigmoid"),     # Output layer (binary classification)

])

# Print model summary
lenet_custom_model.summary()

# Callbacks Types

- i have referred this : https://www.tensorflow.org/api_docs/python/tf/keras/callbacks/Callback
- This too:https://github.com/keras-team/keras/blob/v3.3.3/keras/src/callbacks/callback.py#L106-L119


## Custom Callback

In [None]:
# Define a custom callback class to monitor the model's loss during training
class LossCallback(Callback):

  #call this method at the end of each training epoch
  def on_epoch_end(self, epoch, logs):
    # Print the loss value
    print("\nFor the Epoch Number {} the model has a loss of {:.5f}".format(epoch+1, logs["loss"]))

  #call this method at the end of each training epoch
  def on_batch_end(self,batch,logs):
    print("\nFor the Batch Number {} the model has a loss of {:.5f}".format(batch+1, logs["loss"]))


## CSVLogger

In [None]:
csv_callback=CSVLogger(
    filename='logs.csv', separator=',', append=False
)

## EarlyStopping

In [None]:
# Create an instance of EarlyStoppping Callback
earlystopping_callback=tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    min_delta=0,
    patience=2,
    verbose=1,
    mode='auto',
    baseline=None,
    restore_best_weights=True
)
earlystopping_callback

## TensorBoard

In [None]:
#creaing proper dir so files are saved in proper format
CURRENT_TIME= datetime.now().strftime('%d%m%y-%h%m%s')
LOG_DIR = './logs/'+ CURRENT_TIME
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=LOG_DIR)

In [None]:
METRIC_DIR = './logs/' + CURRENT_TIME + '/metrics'
train_writer=tf.summary.create_file_writer(METRIC_DIR)

In [None]:
print(LOG_DIR)
print(METRIC_DIR)

## LearningRateScheduler
* i referred this - https://www.tensorflow.org/api_docs/python/tf/keras/callbacks/LearningRateScheduler

In [None]:
# This function keeps the initial learning rate for the first ten epochs
# and decreases it exponentially after that.
def scheduler(epoch, lr):
    if epoch < 1:
        learning_rate= lr
    else:
        learning_rate= lr *float(tf.math.exp(-0.1))
        #learning_rate=learning_rate.numpy()

    with train_writer.as_default():
      tf.summary.scalar('Learning Rate', data=learning_rate, step=epoch)
    return learning_rate

#initialize the scheduler and call the function
scheduler_callback=LearningRateScheduler(scheduler,verbose=1)

## ModelCheckpoint

In [None]:
checkpoint_callback=tf.keras.callbacks.ModelCheckpoint(
    filepath='checkpoints.keras',
    monitor='val_loss',
    verbose=0,
    save_best_only=True,
    save_weights_only=False,
    mode='auto',
    save_freq= 3 #epoch
)

## ReduceLROnPlateau

In [None]:
plateau_callback=tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_accuracy',
    factor=0.1,
    patience=2,
    verbose=0
)

# Custom Metrics Building (w/ and w/o params)

## Custom Metric Class

In [None]:
from tensorflow.keras.metrics import binary_accuracy    #*RunThis

class CustomAccuracy(tf.keras.metrics.Metric):
    def __init__(self, name='Custom_Accuracy', FACTOR=1):
        super(CustomAccuracy, self).__init__(name=name)
        self.FACTOR = FACTOR
        self.accuracy = self.add_weight(name='accuracy', initializer='zeros')

    def update_state(self, y_true, y_pred, sample_weight=None):
        # Ensure y_true is float32 for binary_accuracy
        y_true = tf.cast(y_true, dtype=tf.float32)
        # Compute binary accuracy (element-wise)
        output = binary_accuracy(y_true, y_pred) * self.FACTOR
        # Avoid division by zero
        total_elements = tf.cast(tf.size(output), dtype=tf.float32)
        acc_value = tf.reduce_sum(output) / (total_elements + tf.keras.backend.epsilon())
        self.accuracy.assign(acc_value)

    def result(self):
        return self.accuracy

    def reset_state(self):
        self.accuracy.assign(0.0)

## Custom Metric Method (with parametres)

In [1]:
def custom_accuracy(FACTOR):
  def metric(y_true, y_pred):
    return binary_accuracy(y_true, y_pred)* FACTOR
  return metric

In [None]:
# Custom Binary Cross-Entropy Loss
def custom_bce(y_true, y_pred):
    bce = BinaryCrossentropy()
    return bce(y_true, y_pred)

In [None]:
# Compile the model
lenet_model.compile(
    optimizer=Adam(learning_rate=0.01),
    loss=custom_bce,
    metrics=[custom_accuracy]
)

# Custom Losses Building (w/ and w/o params)

- Custom Loss Class : This class inherits for Keras loss class: https://www.tensorflow.org/api_docs/python/tf/keras/losses

In [None]:
class CustomBCE(tf.keras.losses.Loss):
  def __init__(self,FACTOR):
    super(CustomBCE,self).__init__()  # inherits from CustomBCE class
    self.FACTOR=FACTOR

  def call(self,y_true,y_pred):
    bce = BinaryCrossentropy()
    return bce(y_true, y_pred)* self.FACTOR


In [None]:
metrics=[TruePositives(name='tp'),FalsePositives(name='fp'),TrueNegatives(name='tn'),FalseNegatives(name='fn'),
         BinaryAccuracy(name='accuracy'),Precision(name='precision'),Recall(name='recall'),AUC(name='auc')]

In [None]:
lenet_model.compile(optimizer=Adam(learning_rate=0.01),
                    loss=CustomBCE(FACTOR),
                    metrics=metrics)

## Custom Loss Method (with parametres)

In [None]:
FACTOR=1
def custom_bce(FACTOR):
  def loss(y_true, y_pred):
    bce = BinaryCrossentropy()
    return bce(y_true, y_pred)*FACTOR
  return loss

In [None]:
metrics=[TruePositives(name='tp'),FalsePositives(name='fp'),TrueNegatives(name='tn'),FalseNegatives(name='fn'),
         BinaryAccuracy(name='accuracy'),Precision(name='precision'),Recall(name='recall'),AUC(name='auc')]

In [None]:
lenet_model.compile(optimizer=Adam(learning_rate=0.01),
                    loss=custom_bce(FACTOR),
                    metrics=metrics)

## Custom Loss Method (without parametres): Albumentation --- Training

In [None]:
# custom function to calculate loss           #*RunThis
def custom_bce(y_true, y_pred):
  bce = BinaryCrossentropy()
  return bce(y_true, y_pred)

In [None]:
metrics=[TruePositives(name='tp'),FalsePositives(name='fp'),TrueNegatives(name='tn'),FalseNegatives(name='fn'),  #*RunThis
         BinaryAccuracy(name='accuracy'),Precision(name='precision'),Recall(name='recall'),AUC(name='auc')]

In [None]:
lenet_model.compile(optimizer=Adam(learning_rate=0.01),   #*RunThis
                    loss=custom_bce,
                    metrics=metrics)

In [None]:
#fit the model
history =lenet_model.fit(
    train_dataset,
    epochs=5,
    verbose=True,
)

# Tensorboard: Custom CallBack Training WRITER /LOGS

In [1]:
# if you want to pull previous logs out
#!rm -rf ./logs/

In [None]:
#creaing proper dir so files are saved in proper format             #*RunThis
CURRENT_TIME= datetime.now().strftime('%d%m%y-%h%m%s')

CUSTOM_TRAIN_DIR = './logs/' + CURRENT_TIME + '/custom/train'
CUSTOM_VAL_DIR = './logs/' + CURRENT_TIME + '/custom/val'

custom_train_writer=tf.summary.create_file_writer(CUSTOM_TRAIN_DIR)
custom_val_writer=tf.summary.create_file_writer(CUSTOM_VAL_DIR)

In [None]:
test_dataset=test_dataset.batch(1)

In [None]:
class LogImagesCallbackTensorBoard(Callback):
    def __init__(self, log_dir="./logs"):
        super().__init__()
        CURRENT_TIME = datetime.now().strftime('%d%m%y-%H%M%S')
        IMAGE_DIR = f"{log_dir}/{CURRENT_TIME}/images"
        self.image_writer = tf.summary.create_file_writer(IMAGE_DIR)

    def on_epoch_end(self, epoch, logs=None):
        labels = []
        inp = []

        for x, y in test_dataset.as_numpy_iterator():
            labels.append(y)
            inp.append(x)

        labels = np.array([i[0] for i in labels])
        predicted = lenet_model.predict(np.array(inp)[:, 0, ...])
        threshold = 0.5

        cm = confusion_matrix(labels, predicted > threshold)

        plt.figure(figsize=(8, 8))
        sns.heatmap(cm, annot=True)
        plt.title('Confusion matrix - {}'.format(threshold))
        plt.ylabel('Actual')
        plt.xlabel('Predicted')
        plt.axis('off')

        import io
        buffer = io.BytesIO()
        plt.savefig(buffer, format='png')
        plt.close()  # important to free memory

        image = tf.image.decode_png(buffer.getvalue(), channels=3)
        image = tf.image.convert_image_dtype(image, dtype=tf.float32)  # normalize
        image = tf.expand_dims(image, axis=0)

        with self.image_writer.as_default():
            tf.summary.image("Confusion Matrix", image, step=epoch)


In [None]:
# Assuming LogImagesCallback is a class             #*RunThis
log_images_callback = LogImagesCallbackTensorBoard()  # Create an instance

# Wandb : Custom CallBack Training Metrics Results

* Wandb ConfusionMatrix

In [None]:
class LogImagesCallbackWandB(Callback):
    def on_epoch_end(self, epoch, logs=None):
        labels = []
        inp = []

        # Collect all test data
        for x, y in test_dataset.as_numpy_iterator():
            labels.append(y)
            inp.append(x)

        #Convert to numpy arrays (no indexing needed)
        labels = np.array(labels)
        inp = np.array(inp)

        #make predictions
        predicted = lenet_model.predict(inp)

        #checking for my confirmation
        print("labels", labels, labels.dtype)
        print("predicted", predicted, predicted.dtype)

        # Vectorized thresholding
        pred = (predicted[:, 0] >= 0.5).astype(int) # we only want 0 or 1

        # Log confusion matrix to WandB
        wandb.log({
            "Confusion Matrix": wandb.plot.confusion_matrix(
                probs=None,
                y_true=labels,
                preds=pred,
                class_names=["Parasitized", "Uninfected"]
            )
        })


In [None]:
# ###### this is a testing code to see "preds" in action

# pred = []
# # Assuming 'lenet_model' and 'test_dataset' are defined
# # and 'lenet_model' is a trained model


# # Batch the test dataset
# BATCH_SIZE = 32  # Set an appropriate batch size
# test_dataset = test_dataset.batch(BATCH_SIZE)

# # Assuming 'test_dataset' yields (image, label) pairs
# for image, label in test_dataset:
#     predicted = lenet_model.predict(image)  # Get predictions for the batch

#   # Process individual predictions within the batch
#     for i in range(len(predicted)):

#       if(predicted[i][0]< 0.5):
#         pred.append([1,0])

#       else:
#         pred.append([0,1])

# print(pred)

* Wandb ROC curve

In [None]:
class LogImagesCallbackWandBPlot(Callback):
    def on_epoch_end(self, epoch, logs=None):
        labels = []
        inp = []

        for x, y in test_dataset.as_numpy_iterator():
            labels.append(y)
            inp.append(x)

        labels = np.array(labels)
        inp = np.array(inp)

        #make predictions
        predicted = lenet_model.predict(inp)

        #checking for my confirmation
        print("labels", labels.shape, labels.dtype)
        print("predicted", predicted.shape, predicted.dtype)

        # For sigmoid output: convert to shape (n_samples, 2)
        pred = np.stack([1 - predicted[:, 0], predicted[:, 0]], axis=1) #this stack was a requirement of wandb
        print("pred shape:", pred.shape)

        wandb.log({ "ROC Curve": wandb.plot.roc_curve(labels,pred,["Parasitized", "Uninfected"])})


* wandb Log Images : Looks like Confusion Matrix


In [None]:
class LogImagesCallbackWandB(Callback):
    def on_epoch_end(self, epoch, logs):
        labels = []
        inp = []

        for x, y in test_dataset.as_numpy_iterator():
            labels.append(y)         # y is already a scalar (int), no need to index
            inp.append(x)

        labels = np.array(labels)
        predicted = lenet_model.predict(np.array(inp))
        threshold = 0.5

        cm = confusion_matrix(labels, predicted > threshold)

        plt.figure(figsize=(8, 8))
        sns.heatmap(cm, annot=True)
        plt.title('Confusion matrix - {}'.format(threshold))
        plt.ylabel('Actual')
        plt.xlabel('Predicted')
        plt.axis('off')

        import io
        buffer = io.BytesIO()
        plt.savefig(buffer, format='png')
        plt.close()  # important to free memory

        image_array = tf.image.decode_png(buffer.getvalue(), channels=3)

        images = wandb.Image(image_array, caption="Confusion Matrix for epoch:{}".format(epoch))

        wandb.log({"Confusion Matrix": images})

# Hyperparameter Tuning

In [None]:
IM_SIZE=224


def model_tune(hparams):
      lenet_model = tf.keras.Sequential([
      InputLayer(input_shape = (IM_SIZE, IM_SIZE, 3)),

      # 1st conv layer (extracts basic patterns)
      Conv2D(filters=6, kernel_size=3, strides=1, padding="valid",
            activation="relu",kernel_regularizer=L2(hparams[HP_REGULARIZATION_RATE])),
      BatchNormalization(),
      MaxPool2D(pool_size=2, strides=2),  # Downsamples feature maps
      Dropout(rate=hparams[HP_DROPOUT]),   #add a dropout layer

      # 2nd conv layer (extracts deeper features)
      Conv2D(filters=16, kernel_size=3, strides=1, padding="valid",
      activation="relu",kernel_regularizer=L2(hparams[HP_REGULARIZATION_RATE])),
      BatchNormalization(),
      MaxPool2D(pool_size=2, strides=2),  # Downsampling again

      Flatten(),  # Converts 2D feature maps into 1D array

      Dense(hparams[HP_NUM_UNIT_1], activation="relu",
            kernel_regularizer=L2(hparams[HP_REGULARIZATION_RATE])),  # Fully connected layer
      BatchNormalization(),
      Dropout(rate=hparams[HP_DROPOUT]),   #add a dropout layer

      Dense(hparams[HP_NUM_UNIT_2], activation="relu",
            kernel_regularizer=L2(hparams[HP_REGULARIZATION_RATE])),   # Further processing
      BatchNormalization(),

      Dense(1, activation="sigmoid"),     # Output layer (binary classification)

  ])

      lenet_model.compile(
      optimizer=Adam(learning_rate = hparams[HP_LEARNING_RATE]),
      loss=BinaryCrossentropy(),
      metrics=['accuracy']
      )

      #fit the model
      lenet_model.fit(val_dataset,epochs=1)
      _, accuracy =lenet_model.evaluate(val_dataset)
      return accuracy

In [None]:
HP_NUM_UNIT_1= hp.HParam('num_units_1', hp.Discrete([16,32,64,128]))
HP_NUM_UNIT_2= hp.HParam('num_units_2', hp.Discrete([16,32,64,128]))
HP_DROPOUT= hp.HParam('dropout', hp.Discrete([0.1,0.2,0.3]))
HP_REGULARIZATION_RATE= hp.HParam('regularization_rate', hp.Discrete([0.001,0.01,0.1]))
HP_LEARNING_RATE = hp.HParam('learning_rate', hp.Discrete([1e-4, 1e-3]))


In [None]:
# to perform grid search
run_number= 0
for num_units_1 in HP_NUM_UNIT_1.domain.values:
  for num_units_2 in HP_NUM_UNIT_2.domain.values:
    for dropout_rate in HP_DROPOUT.domain.values:
      for regularization_rate in HP_REGULARIZATION_RATE.domain.values:
        for learning_rate in HP_LEARNING_RATE.domain.values:

          hparams={
              HP_NUM_UNIT_1:num_units_1,
              HP_NUM_UNIT_2:num_units_2,
              HP_DROPOUT: dropout_rate,
              HP_REGULARIZATION_RATE:regularization_rate,
              HP_LEARNING_RATE: learning_rate,
          }

          file_writer =tf.summary.create_file_writer('logs/'+str(run_number))

          with file_writer.as_default():
              hp.hparams(hparams)
              accuracy = model_tune(hparams)
              tf.summary.scalar('accuracy', accuracy, step = 0)
          # Corrected variable names in the print statement
          print("For the run {}, hparams num_units_1:{}, num_units_2:{}, dropout:{}, regularization_rate:{}, learning_rate:{}".format(run_number, hparams[HP_NUM_UNIT_1], hparams[HP_NUM_UNIT_2],
                                                             hparams[HP_DROPOUT], hparams[HP_REGULARIZATION_RATE],
                                                             hparams[HP_LEARNING_RATE]))


          run_number+=1

# Custom Training Loops

## 1.Simple Custom Training Loop

In [None]:
OPTIMIZER=Adam(learning_rate=0.01)                          #*RunThis
METRIC= BinaryAccuracy()
METRIC_VAL=BinaryAccuracy()
EPOCHS=3

In [None]:
for epoch in range(EPOCHS):                                 #*RunThis
    print(f"Train starts for epoch number {epoch + 1}")

    # Training loop
    for step, (x_batch, y_batch) in enumerate(train_dataset):
        with tf.GradientTape() as recorder:
            y_pred = lenet_model(x_batch, training=True)
            loss = custom_bce(y_batch, y_pred)

        gradients = recorder.gradient(loss, lenet_model.trainable_weights)
        OPTIMIZER.apply_gradients(zip(gradients, lenet_model.trainable_weights))
        METRIC.update_state(y_batch, y_pred)

    print("Training Loss:", loss.numpy())
    print("Training Accuracy:", METRIC.result().numpy())
    METRIC.reset_state()

    # Validation loop
    for x_batch_val, y_batch_val in val_dataset:
        y_pred_val = lenet_model(x_batch_val, training=False)
        loss_val = custom_bce(y_batch_val, y_pred_val)
        METRIC_VAL.update_state(y_batch_val, y_pred_val)

    print("Validation Loss:", loss_val.numpy())
    print("Validation Accuracy:", METRIC_VAL.result().numpy())
    METRIC_VAL.reset_state()


## 2.Run same in Graph Mode  : You will notice its super fast

In [None]:
OPTIMIZER=Adam(learning_rate=0.01)      #*RunThis
METRIC= BinaryAccuracy()
METRIC_VAL=BinaryAccuracy()
EPOCHS=CONFIGURATION['N_EPOCHS']

In [None]:
#creaing proper dir so files are saved in proper format             #*RunThis
CURRENT_TIME= datetime.now().strftime('%d%m%y-%h%m%s')

CUSTOM_TRAIN_DIR = './logs/' + CURRENT_TIME + '/custom/train'
CUSTOM_VAL_DIR = './logs/' + CURRENT_TIME + '/custom/val'

custom_train_writer=tf.summary.create_file_writer(CUSTOM_TRAIN_DIR)
custom_val_writer=tf.summary.create_file_writer(CUSTOM_VAL_DIR)

In [None]:
              #*RunThis
@tf.function
def training_block(x_batch, y_batch):
    with tf.GradientTape() as recorder:
        y_pred = lenet_model(x_batch, training=True)
        loss = custom_bce(y_batch, y_pred)

    partial_derivatives = recorder.gradient(loss, lenet_model.trainable_weights)
    OPTIMIZER.apply_gradients(zip(partial_derivatives, lenet_model.trainable_weights))
    METRIC.update_state(y_batch, y_pred)
    return loss

@tf.function
def val_block(x_batch_val,y_batch_val):
  y_pred_val= lenet_model(x_batch_val,training=False)
  loss_val=custom_bce(y_batch_val,y_pred_val)
  METRIC_VAL.update_state(y_batch_val,y_pred_val)
  return loss_val


In [None]:
for epoch in range(EPOCHS):         #*RunThis
  print("Train starts for epoch number {}".format(epoch+1))
  for step, (x_batch,y_batch) in enumerate(train_dataset):
    loss= training_block(x_batch,y_batch)

  print("Training Loss is ",loss.numpy())
  print("The Accuracy is ",METRIC.result().numpy())
  METRIC.reset_state()

  #Including Validation
  for (x_batch_val,y_batch_val) in val_dataset:
    loss_val=val_block(x_batch_val,y_batch_val)

  print("Validation Loss",loss_val.numpy())
  print("The Accuracy is ",METRIC_VAL.result().numpy())
  METRIC_VAL.reset_state()

## 3.Run same as a Custom Train Model: neuralearn

In [None]:
OPTIMIZER=Adam(learning_rate=0.01)      #*RunThis
METRIC= BinaryAccuracy()
VAL_METRIC=BinaryAccuracy()
EPOCHS= 3

In [None]:
#*RunThis

def neuralearn(model,loss_function,METRIC, VAL_METRIC, OPTIMIZER, train_dataset,val_dataset,EPOCHS):
  for epoch in range(EPOCHS):
    print("Train starts for epoch number {}".format(epoch+1))
    for step, (x_batch,y_batch) in enumerate(train_dataset):
      loss= training_block(x_batch,y_batch)

    print("Training Loss is ",loss.numpy())
    print("The Accuracy is ",METRIC.result().numpy())

    #train_writer for loss
    with custom_train_writer.as_default():
      tf.summary.scalar('Training Loss', data=loss, step=epoch)
    #train_writer for accuracy
    with custom_train_writer.as_default():
      tf.summary.scalar('Training Accuracy', data=METRIC.result(), step=epoch)

    METRIC.reset_state()

    #Including Validation
    for (x_batch_val,y_batch_val) in val_dataset:
      loss_val=val_block(x_batch_val,y_batch_val)

    print("Validation Loss",loss_val.numpy())
    print("The Accuracy is ",METRIC_VAL.result().numpy())

    #train_writer for loss
    with custom_val_writer.as_default():
      tf.summary.scalar('Validation Loss', data=loss_val, step=epoch)
    #train_writer for accuracy
    with custom_val_writer.as_default():
      tf.summary.scalar('Validation Accuracy', data=METRIC_VAL.result(), step=epoch)

    METRIC_VAL.reset_state()
  print("Training Complete!!!")

In [None]:
neuralearn(lenet_model, custom_bce, METRIC, VAL_METRIC, OPTIMIZER, train_dataset,val_dataset,EPOCHS)  #*RunThis

# Model Compiling and Training

In [None]:
                                     #*RunThis
metrics=[TruePositives(name='tp'),FalsePositives(name='fp'),TrueNegatives(name='tn'),FalseNegatives(name='fn'),
         BinaryAccuracy(name='accuracy'),Precision(name='precision'),Recall(name='recall'),AUC(name='auc')]

FACTOR=1
LABELS=['Parasitized','Uninfected']

In [None]:
# Compile the model                   #*RunThis
lenet_model.compile(
    optimizer=Adam(learning_rate=CONFIGURATION['LEARNING_RATE']),
    loss=BinaryCrossentropy(),  #Binary Crossentropy for binary classification
    metrics=metrics,
    #run_eagerly=False # use only for debugging
)

In [None]:
#fit the model  for normal model                       #*RunThis custome func in wandb confusion matrix
history = lenet_model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=2,
    verbose=True,
    callbacks=[LogImagesCallbackWandB()])

In [None]:
# Unbatch and re-batch into a single batch to extract all data                 #*RunThis
val_images, val_labels = next(iter(val_dataset.unbatch().batch(len(val_dataset))))

In [None]:
### use this when want to file model in Wandb      #*RunThisfor
# Patch the model to prevent W&B crash
lenet_model.stateful = False

history = lenet_model.fit(train_dataset,
                          validation_data=(val_images, val_labels),
                          epochs=CONFIGURATION['N_EPOCHS'],
                          verbose=True,
                          callbacks=[WandbCallback(validation_data=(val_images, val_labels),
                                    labels=LABELS,
                                    input_type='image',
                                    save_graph=False,
                                    save_model=False,
                              )])

In [None]:
#stop wandb
wandb.finish()

# Visualizations

* test how the model is inferencing so far

In [None]:
### CHECK MODEL INFERENCING ####

image =cv2.imread("cell.jpg")
print(image.shape)
image=tf.expand_dims(image,axis=0)
print(image.shape)


#now pass this into the mdoel so the model can do inferencing
lenet_model.predict(image)

In [None]:
%load_ext tensorboard

In [None]:
tensorboard --logdir='./logs'

In [None]:
#plot the losses
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train_loss', 'val_loss'])
plt.show()

In [None]:
#plot the accuracies
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['train_accuracy', 'val_accuracy'])
plt.show()

# Visualizing Confusion Matrix

* ATTENTION : As we know Number of batches in test dataset: 86, And the last batch has

In [None]:
inputs = []
labels = []

# Iterate through the batched dataset
for x, y in test_dataset.as_numpy_iterator():
    inputs.append(x)  # Collect images
    labels.append(y)  # Collect labels

# Concatenate and Flatten the list of batches into a single array
inputs = np.concatenate(inputs, axis=0)
labels = np.concatenate(labels, axis=0)

In [None]:
inputs.shape

In [None]:
labels.shape

In [None]:
#make the predictions
predicted=lenet_model.predict(inputs)
print(predicted[:,0])

In [None]:
#plot confusion matrix
threshold=0.6265

plt.figure(figsize=(4,4))
cm=confusion_matrix(labels,predicted>threshold)
print(cm)

sns.heatmap(cm,annot=True, fmt='g',cmap="crest")
plt.title("Confusion Matrix -{}".format(threshold))
plt.ylabel("Actual")
plt.xlabel("Predicted")


* making Confusion Matrix work with Tensor Boards

In [None]:
# Calculate FPR, TPR, and threshold values for ROC curve
fp, tp, threshold = roc_curve(labels, predicted)

# Plot ROC curve with FPR on x-axis and TPR on y-axis
plt.plot(fp, tp)

# Label axes
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")

# Show grid
plt.grid()

# Skip some thresholds for cleaner labels
skip =30

# Annotate ROC curve with threshold values
for i in range(0, len(threshold), skip):
    plt.text(fp[i], tp[i], threshold[i])

# Display the plot
plt.show()


# **Model Evaluation and Testing**

In [None]:
#evaluate model
lenet_model.evaluate(test_dataset)

- The losses are less and accuracy is good

In [None]:
#testing model
def parasite_or_not(x):
  if x>0.5:
    return str("P")
  else:
    return str("U")

- model predictions
- test_dataset.take(1) retrieves one batch from the dataset, but it remains a Dataset object.
- model.predict() expects a NumPy array or Tensor, so test_dataset.take(1) must be properly extracted first.

- [0] extracts the first batch of predictions.
- [0] extracts the first image's prediction score (since it's a binary classification problem, the output is a single probability value).

In [None]:
#make predictions
print(parasite_or_not(lenet_model.predict(test_dataset.take(1))[0][0]))

In [None]:
#Visualize the predictions
for image.label in test_dataset.take(1):
  for i in range(9):
    ax=plt.subplot(3,3,i+1)
    plt.imshow(image[i])
    #first display what label model predicted and then show its corresponding image
    plt.title(parasite_or_not(label.numpy()[i])+ "-"+ parasite_or_not(lenet_model.predict(image)[i][0]))
    plt.axis("off")
plt.show()


# Loading and Saving model

In [None]:
#using .keras to save
lenet_model.save("/content/lenet_malaria_detection.keras")

In [None]:
from tensorflow.keras.models import load_model
# Load the saved model
lenet_loaded_model = load_model("lenet_malaria_detection.keras")
# Check the model architecture
lenet_loaded_model.summary()

# Check if weights are loaded
print("Loaded Model Weights:", len(lenet_loaded_model.weights))

- IT WAS THIS BEFORE SAVING :
87/87 ━━━━━━━━━━━━━━━━━━━━ 16s 41ms/step - accuracy: 0.9634 - loss: 0.1453
[0.41822549700737, 0.9651795625686646

In [None]:
# evaluate the performance of loaded model
# it should be similar as before

#evaluate model
lenet_loaded_model.evaluate(test_dataset)

# Saving to Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!cp -r /content/lenet/ /content/drive/MyDrive/lenet_colab/

In [None]:
!cp -r /content/drive/MyDrive/lenet_colab/ /content/lenet_colab/