# Image Segmentation ( Make sure you are using the T4 GPU Runtime )

This task focuses on <a href="https://www.v7labs.com/blog/image-segmentation-guide" class="external">Image Segmentation</a> and Object Detection.

## What is image segmentation?

In an image classification task, the network assigns a label (or class) to each input image. However, suppose you want to know the shape of that object, which pixel belongs to which object, etc. In this case, you need to assign a class to each pixel of the image—this task is known as segmentation. A segmentation model returns much more detailed information about the image. Image segmentation has many applications in medical imaging, self-driving cars and satellite imaging, just to name a few.

This task uses the [Oxford-IIIT Pet Dataset](https://www.robots.ox.ac.uk/~vgg/data/pets/ ([Parkhi et al, 2012](https://www.robots.ox.ac.uk/~vgg/publications/2012/parkhi12a/parkhi12a.pdf)). The dataset consists of images of 37 pet breeds, with 200 images per breed (~100 each in the training and test splits). Each image includes the corresponding labels, and pixel-wise masks. The masks are class-labels for each pixel. Each pixel is given one of three categories:

- Class 1: Pixel belonging to the pet.
- Class 2: Pixel bordering the pet.
- Class 3: None of the above/a surrounding pixel.

## Environment Setup

In [None]:
!wget http://www.robots.ox.ac.uk/~vgg/data/pets/data/images.tar.gz
!wget http://www.robots.ox.ac.uk/~vgg/data/pets/data/annotations.tar.gz
!tar -xf images.tar.gz
!tar -xf annotations.tar.gz

In [None]:
!pip install -q git+https://github.com/tensorflow/examples.git

In [None]:
import os
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_datasets as tfds
from tensorflow.keras.utils import array_to_img
from tensorflow.keras.utils import load_img, img_to_array
from tensorflow_examples.models.pix2pix import pix2pix
import numpy as np
import random
from IPython.display import clear_output
import matplotlib.pyplot as plt
import gc

## Data Loading

In [None]:
input_dir = "/kaggle/working/images"
target_dir = "/kaggle/working/annotations/trimaps/"

# Loading Image Paths
input_img_paths = sorted(
    [os.path.join(input_dir, fname)
     for fname in os.listdir(input_dir)
     if fname.endswith(".jpg")])
target_paths = sorted(
    [os.path.join(target_dir, fname)
     for fname in os.listdir(target_dir)
     if fname.endswith(".png") and not fname.startswith(".")])

In [None]:
# Checking if the Paths loaded correctly (if yes, you will see a brown cat)
plt.axis("off")
plt.imshow(load_img(input_img_paths[9]))

In [None]:
# Function for Displaying Masked Image (Segmented Image)
def display_target(target_array):
    normalized_array = (target_array.astype("uint8") - 1) * 127 # Normalizing the Image
    plt.axis("off")
    plt.imshow(normalized_array[:, :, 0])

img = img_to_array(load_img(target_paths[9], color_mode="grayscale")) # Loading a single image
display_target(img)

In [None]:
# Loading entire Dataset
img_size = (200, 200) # set it as 200x200 ka tuple
num_imgs = len(input_img_paths)

random.Random(1337).shuffle(input_img_paths) # Shuffle the Paths (input_img_paths and target_paths)
random.Random(1337).shuffle(target_paths)

# Create function to load input image from path
def path_to_input_image(path):
    return img_to_array(load_img(path, target_size=img_size))

# Create function to load target image from path
def path_to_target(path):
    img = img_to_array(
        load_img(path, target_size=img_size, color_mode="grayscale"))
    img = img.astype("uint8") - 1
    return img

input_imgs = np.zeros((num_imgs,) + img_size + (3,), dtype="float32")
targets = np.zeros((num_imgs,) + img_size + (1,), dtype="uint8")
for i in range(num_imgs):
    input_imgs[i] = path_to_input_image(input_img_paths[i])
    targets[i] = path_to_target(target_paths[i])

num_val_samples = 1000 # Play around with it if want to
train_input_imgs = input_imgs[:-num_val_samples]
train_targets = targets[:-num_val_samples]
val_input_imgs = input_imgs[-num_val_samples:]
val_targets = targets[-num_val_samples:]

In [None]:
train_targets[0].shape

## Model

A Image Segmentation Model, at its core, typically comprises a convolutional neural network (CNN) architecture, leveraging its ability to capture hierarchical features in an image. The model consists of encoder or Downsampler and decoder or Upsampler components, where the encoder extracts relevant features from the input image, and the decoder reconstructs the segmented output.

Each Conv2D layer in the encoder applies filters to the input image, extracting low-level to high-level features. As you move deeper into the encoder, the spatial dimensions of the feature maps typically decrease while the depth (number of channels) increases.

Conv2DTranspose layers are employed in the decoder to upsample the feature maps, gradually increasing the spatial dimensions while reducing the number of channels. Skip connections are often added between corresponding encoder and decoder layers to preserve fine-grained details.

In [None]:
def get_model(img_size, num_classes):
    inputs = keras.Input(shape=img_size + (3,))
    x = layers.Rescaling(1./255)(inputs) # Create a Rescaling layer to convert the range [0,255] to [0,1]

    # Similarly add 6 Conv2D Layers with Relu Activation, Strides as required in alternate layers (for layers with even index) (Try khudse) and padding so that output size is equal to input size.
    # This Block acts as our Encoder Block to understand features of the Input Image
    x = layers.Conv2D(64, 3, strides=2, activation="relu", padding="same")(x)
    x = layers.Conv2D(64, 3, activation="relu", padding="same")(x)
    x = layers.Conv2D(128, 3, strides=2, activation="relu", padding="same")(x)
    x = layers.Conv2D(128, 3, activation="relu", padding="same")(x)
    x = layers.Conv2D(256, 3, strides=2, padding="same", activation="relu")(x)
    x = layers.Conv2D(256, 3, activation="relu", padding="same")(x)

    # Now add 6 Conv2DTranspose Layers with Relu, strides in alternate layers (for layers with odd indexes) and same padding.
    '''This Block acts as our Decoder Block to regenerate the Image from the Convoluted Feature Map. This allows for linking and expanding
    the Segmented Features to the original Image.'''
    x = layers.Conv2DTranspose(256, 3, activation="relu", padding="same")(x)
    x = layers.Conv2DTranspose(256, 3, activation="relu", padding="same", strides=2)(x)
    x = layers.Conv2DTranspose(128, 3, activation="relu", padding="same")(x)
    x = layers.Conv2DTranspose(128, 3, activation="relu", padding="same", strides=2)(x)
    x = layers.Conv2DTranspose(64, 3, activation="relu", padding="same")(x)
    x = layers.Conv2DTranspose(64, 3, activation="relu", padding="same", strides=2)(x)

    # Finally, add a softmax with same padding to get the final Segmented Image
    outputs = layers.Conv2D(num_classes, 3, activation="softmax", padding="same")(x)

    # Create the Model using the Model API
    model = keras.Model(inputs, outputs)
    return model

model = get_model(img_size=img_size, num_classes=3) # Why num_classes = 3 here? Batao Sochke
model.summary()

In [None]:
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) # Use appropriate Loss function (Hint: Images use kar rhe hai toh konsa use hoga?)
# Callbacks: https://keras.io/api/callbacks/
# We are saving this model after each epoch so that bichme agar band hogaya toh bhi we have somewhere to start with
callbacks = [
    keras.callbacks.ModelCheckpoint("oxford_segmentation.keras",
                                    save_best_only=True, verbose=2)
]

## Training and Testing

In [None]:
# Train the Model for 3 epochs with the batch size of 64, make sure to set the callbacks parameter
history = model.fit(train_input_imgs, train_targets,
                    epochs=3,
                    callbacks=callbacks,
                    batch_size=64,
                    validation_data=(val_input_imgs, val_targets))

In [None]:
# Plotting Loss
epochs = range(1, len(history.history["loss"]) + 1)
loss = history.history["loss"] # history me se Loss nikalo
val_loss = history.history["val_loss"] # history me se validation loss nikalo
plt.figure()
plt.plot(epochs, loss, "bo", label="Training loss")
plt.plot(epochs, val_loss, "b", label="Validation loss")
plt.title("Training and validation loss")
plt.legend()

In [None]:
tf.keras.backend.clear_session()

In [None]:
# Load the Saved Model using load_model
model = keras.models.load_model("oxford_segmentation.keras")

In [None]:
# Test Model
def predict(i):
  test_image = val_input_imgs[i]
  plt.axis("off")
  plt.imshow(array_to_img(test_image))
  mask = model.predict(np.expand_dims(test_image, 0)) # Predict karo. PS: Look carefully at the dimensions.
  display_mask(mask[0])

def display_mask(pred):
    mask = np.argmax(pred, axis=-1)
    mask *= 127
    plt.axis("off")
    plt.imshow(mask)

# Use or Modify the above Functions to display 20 test images in the SAME PLOT
predict(100)

In [None]:
# Garbage Collection so that aage wale parts me issue nhi aaye

del(model,train_input_imgs, train_targets, val_input_imgs, val_targets, history, input_imgs, targets)
gc.collect()

So now, we implemented a Custom Model for Image Segmentation. Abhi there are a few existing architectures which efficiently do this for us. For example: U-Net, Mask R-CNN, Fast FCN etc.

You can read more about them here:
https://neptune.ai/blog/image-segmentation#:~:text=The%20basic%20architecture%20in%20image,an%20encoder%20and%20a%20decoder.&text=The%20encoder%20extracts%20features%20from,the%20outline%20of%20the%20object.



We will now implement U-Net Architecture. It is similar to the one we created above.
https://www.geeksforgeeks.org/u-net-architecture-explained/

## Data Loading

In [None]:
dataset, info = tfds.load('oxford_iiit_pet:3.*.*', with_info=True)

In [None]:
def normalize(input_image, input_mask):
  input_image = tf.cast(input_image, tf.float32) / 255.0
  input_mask -= 1
  return input_image, input_mask

In [None]:
def load_image(datapoint):
  input_image = tf.image.resize(datapoint['image'], (128, 128)) # Use tf.image to resize to (128,128)
  input_mask = tf.image.resize( # Use Nearest Neighbour Method to resize to (128,128) for the image mask
    datapoint['segmentation_mask'],
    (128, 128),
    method = tf.image.ResizeMethod.NEAREST_NEIGHBOR,
  )

  input_image, input_mask = normalize(input_image, input_mask) # Normalize the Images

  return input_image, input_mask

The dataset already contains the required training and test splits, so continue to use the same splits:

In [None]:
TRAIN_LENGTH = info.splits['train'].num_examples
BATCH_SIZE = 64
BUFFER_SIZE = 1000
STEPS_PER_EPOCH = TRAIN_LENGTH // BATCH_SIZE

In [None]:
train_images = dataset['train'].map(load_image, num_parallel_calls=tf.data.AUTOTUNE)
test_images = dataset['test'].map(load_image, num_parallel_calls=tf.data.AUTOTUNE)

The following class performs a simple augmentation by randomly-flipping an image.

[Image augmentation](https://www.tensorflow.org/tutorials/images/data_augmentation)


In [None]:
class Augment(tf.keras.layers.Layer):
  def __init__(self, seed=42):
    super().__init__()
    # both use the same seed, so they'll make the same random changes.
    self.augment_inputs = tf.keras.layers.RandomFlip(mode="horizontal", seed=seed)
    self.augment_labels = tf.keras.layers.RandomFlip(mode="horizontal", seed=seed) # Do the same for this

  def call(self, inputs, labels):
    inputs = self.augment_inputs(inputs)
    labels = self.augment_labels(labels)
    return inputs, labels

Build the input pipeline, applying the augmentation after batching the inputs:

In [None]:
train_batches = (
    train_images
    .cache()
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE)
    .repeat()
    .map(Augment())
    .prefetch(buffer_size=tf.data.AUTOTUNE)) # Prefetch is an important step towards training efficiency as it allows the Model to Fetch the Next Batch of Data while Training on a Different Batch
    # AUTOTUNE allows Tensorflow to automatically adjust the prefetch buffer. Use that.

test_batches = test_images.batch(BATCH_SIZE)

Visualize an image example and its corresponding mask from the dataset:

In [None]:
def display(display_list):
  plt.figure(figsize=(15, 15))

  title = ['Input Image', 'True Mask', 'Predicted Mask']

  for i in range(len(display_list)):
    plt.subplot(1, len(display_list), i+1)
    plt.title(title[i])
    plt.imshow(tf.keras.utils.array_to_img(display_list[i]))
    plt.axis('off')
  plt.show()

In [None]:
# Use the train_batches to obtain 2 images and display them using the function above
for images, masks in train_batches.take(2):
  sample_image, sample_mask = images[0], masks[0]
  display([sample_image, sample_mask])

## Define the model

The model being used here is a modified [U-Net](https://arxiv.org/abs/1505.04597). A U-Net consists of an encoder (downsampler) and decoder (upsampler). To learn robust features and reduce the number of trainable parameters, use a pretrained model—[MobileNetV2](https://arxiv.org/abs/1801.04381)—as the encoder. For the decoder, you will use the upsample block, which is already implemented in the [pix2pix](https://github.com/tensorflow/examples/blob/master/tensorflow_examples/models/pix2pix/pix2pix.py) example in the TensorFlow Examples repo.

As mentioned, the encoder is a pretrained MobileNetV2 model. You will use the model from `tf.keras.applications`. The encoder consists of specific outputs from intermediate layers in the model. Note that the encoder will not be trained during the training process.

In [None]:
base_model = tf.keras.applications.MobileNetV2(input_shape=[128, 128, 3], include_top=False) # Load the MobileNetV2 Pre-trained model without its final Dense Layers

# Use the activations of these layers
layer_names = [
    'block_1_expand_relu',   # 64x64
    'block_3_expand_relu',   # 32x32
    'block_6_expand_relu',   # 16x16
    'block_13_expand_relu',  # 8x8
    'block_16_project',      # 4x4
]
base_model_outputs = [base_model.get_layer(name).output for name in layer_names]

# Create the feature extraction model using the tf.keras.Model
down_stack = tf.keras.Model(inputs=base_model.input, outputs=base_model_outputs)

# Since this acts as our pre-trained encoder block, make sure to set the layers to be non-trainable
down_stack.trainable = False

The decoder/upsampler is simply a series of upsample blocks implemented in TensorFlow examples:

In [None]:
# Use pix2pix upsample layers to get our unsampling block
up_stack = [
    pix2pix.upsample(512, 3),  # 4x4 -> 8x8
    pix2pix.upsample(256, 3),  # 8x8 -> 16x16
    pix2pix.upsample(128, 3),  # 16x16 -> 32x32
    pix2pix.upsample(64, 3),   # 32x32 -> 64x64
]

In [None]:
def unet_model(output_channels:int):
  inputs = tf.keras.layers.Input(shape=[128, 128, 3]) # Create an Input Layer of shape (128,128,3)

  # Downsampling through the model
  skips = down_stack(inputs)
  x = skips[-1]
  skips = reversed(skips[:-1])

  # Upsampling and establishing the skip connections
  for up, skip in zip(up_stack, skips):
    x = up(x)
    concat = tf.keras.layers.Concatenate() # Concatenate X and Skip Layers
    x = concat([x, skip])

  # This is the last layer of the model
  last = tf.keras.layers.Conv2DTranspose(
      filters=output_channels, kernel_size=3, strides=2,
      padding='same')  #64x64 -> 128x128

  x = last(x)

  return tf.keras.Model(inputs=inputs, outputs=x)

Note that the number of filters on the last layer is set to the number of `output_channels`. This will be one output channel per class.

## Train the model

Now, all that is left to do is to compile and train the model.

Since this is a multiclass classification problem, use the `tf.keras.losses.SparseCategoricalCrossentropy` loss function with the `from_logits` argument set to `True`, since the labels are scalar integers instead of vectors of scores for each pixel of every class.

When running inference, the label assigned to the pixel is the channel with the highest value. This is what the `create_mask` function is doing.

In [None]:
OUTPUT_CLASSES = 3

model = unet_model(output_channels=OUTPUT_CLASSES)
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

Plot the resulting model architecture:

In [None]:
tf.keras.utils.plot_model(model, show_shapes=True)

Try out the model to check what it predicts before training:

In [None]:
def create_mask(pred_mask):
  pred_mask = tf.math.argmax(pred_mask, axis=-1)
  pred_mask = pred_mask[..., tf.newaxis]
  return pred_mask[0]

In [None]:
def show_predictions(dataset=None, num=1):
  if dataset:
    for image, mask in dataset.take(num):
      pred_mask = model.predict(image)
      display([image[0], mask[0], create_mask(pred_mask)])
  else:
    display([sample_image, sample_mask,
             create_mask(model.predict(sample_image[tf.newaxis, ...]))])

In [None]:
show_predictions()

In [None]:
class DisplayCallback(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    clear_output(wait=True)
    show_predictions()
    print ('\nSample Prediction after epoch {}\n'.format(epoch+1))

In the interest of saving time, the number of epochs was kept small, but you may set this higher to achieve more accurate results.

In [None]:
EPOCHS = 20
VAL_SUBSPLITS = 5
VALIDATION_STEPS = info.splits['test'].num_examples//BATCH_SIZE//VAL_SUBSPLITS

model_history = model.fit(train_batches, epochs=EPOCHS,
                          steps_per_epoch=STEPS_PER_EPOCH,
                          validation_steps=VALIDATION_STEPS,
                          validation_data=test_batches,
                          callbacks=[DisplayCallback()])

In [None]:
# Plot Loss and Validation Loss as previously done
loss = model_history.history['loss']
val_loss = model_history.history['val_loss']

plt.figure()
plt.plot(model_history.epoch, loss, 'r', label='Training loss')
plt.plot(model_history.epoch, val_loss, 'bo', label='Validation loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss Value')
plt.ylim([0, 1])
plt.legend()
plt.show()

## Make predictions

Now, make some predictions.

In [None]:
show_predictions(test_batches, 3)

# Object Detection

Object detection is a computer vision technique for locating instances of objects in images or videos.

Read more:

https://www.mathworks.com/discovery/object-detection.html

https://www.analyticsvidhya.com/blog/2022/03/a-basic-introduction-to-object-detection/

In this, we will not use one of those high performing off-the-shelf object detectors but develop a new one ourselves, from scratch, using plain python, OpenCV, and Tensorflow.

An Object Detection is a combination of two tasks:
*   Regression of the bound-box coordinates
*   Classification of the object label

This means that our model has two outputs: namely the object label and the object bound box. Therefore, the model must combine the tasks of classification and regression.

The Dataset we will be using is:

https://www.kaggle.com/datasets/techzizou/labeled-mask-dataset-yolo-darknet

In [None]:
import os, random
import tensorflow as tf
import cv2 as cv
import numpy as np
from matplotlib import pyplot as plt

## Data Loading

This code basically generates three lists: one for training (holding 70% of the data), one for validation (20% of the data) and one for test (the last 10%). The code also shuffle the data in order to avoid any natural bias.

Note that we are removing the images with more than one masks/objects. This is because we are building a simple object detector able to detect only a single object in an image.

In [None]:
def list_files(full_data_path = "/kaggle/input/labeled-mask-dataset-yolo-darknet/obj/", image_ext = '.jpg', split_percentage = [70, 20]):

    files = []

    discarded = 0
    masked_instance = 0

    for r, d, f in os.walk(full_data_path):
        for file in f:
            if file.endswith(".txt"):

                # first, let's check if there is only one object
                with open(full_data_path + "/" + file, 'r') as fp:
                    lines = fp.readlines()
                    if len(lines) > 1:
                        discarded += 1
                        continue


                strip = file[0:len(file) - len(".txt")]
                # secondly, check if the paired image actually exist
                image_path = full_data_path + "/" + strip + image_ext
                if os.path.isfile(image_path):
                    # checking the class. '0' means masked, '1' for unmasked
                    if lines[0][0] == '0':
                        masked_instance += 1
                    files.append(strip)

    size = len(files)
    print(str(discarded) + " file(s) discarded")
    print(str(size) + " valid case(s)")
    print(str(masked_instance) + " are masked cases")

    random.shuffle(files)

    split_training = int(split_percentage[0] * size / 100)
    split_validation = split_training + int(split_percentage[1] * size / 100)

    return files[0:split_training], files[split_training:split_validation], files[split_validation:]

training_files, validation_files, test_files = list_files()

In [None]:
print(str(len(training_files)) + " training files")
print(str(len(validation_files)) + " validation files")
print(str(len(test_files)) + " test files")

Since our model will be using a fixed 244 x 244 input layer, we need to format any input image before feed it to the model (to train or to predict).

In [None]:
input_size = 244

def format_image(img, box):
  '''
  1. Get the height and width of the input image.
  2. Calculate the maximum dimension of the image (either height or width).
  3. Calculate the scaling ratio needed to fit the image within the desired input_size.
  4. Calculate the new width and height of the image based on the scaling ratio.
  5. Create a tuple new_size containing the new width and height.
  6. Resize the original image to the new dimensions using linear interpolation.
  7. Create a new image (new_image) of size input_size x input_size initialized with zeros.
  8. Place the resized image in the upper-left corner of new_image, filling the remaining space with zeros.
  '''
  height, width = img.shape
  max_size = max(height, width)
  r = max_size / input_size
  new_width = int(width / r)
  new_height = int(height / r)
  new_size = (new_width, new_height)
  resized = cv.resize(img, new_size, interpolation= cv.INTER_LINEAR)
  new_image = np.zeros((input_size, input_size), dtype=np.uint8)
  new_image[0:new_height, 0:new_width] = resized

  x, y, w, h = box[0], box[1], box[2], box[3]

  # Calculate the new coordinates and dimensions (new_box) based on the scaling ratio (r)
  new_box = [int((x - 0.5*w)* width / r), int((y - 0.5*h) * height / r), int(w*width / r), int(h*height / r)]

  return new_image, new_box

In [None]:
def data_load(files, full_data_path = "/kaggle/input/labeled-mask-dataset-yolo-darknet/obj/", image_ext = ".jpg"):
    X = []
    Y = []

    for file in files:
        img = cv.imread(os.path.join(full_data_path, file + image_ext), cv.IMREAD_GRAYSCALE)

        k = 1

        with open(full_data_path + "/" + file + ".txt", 'r') as fp:
            line = fp.readlines()[0]
            if line[0] == '0':
                k = 0

            box = np.array(line[1:].split(), dtype=float)

        img, box = format_image(img, box)
        img = img.astype(float) / 255.
        box = np.asarray(box, dtype=float) / input_size
        label = np.append(box, k)

        X.append(img)
        Y.append(label)

    X = np.array(X)

    X = np.expand_dims(X, axis=3)

    X = tf.convert_to_tensor(X, dtype=tf.float32) # Convert X and Y to tensors with dtype as float32

    Y = tf.convert_to_tensor(Y, dtype=tf.float32)

    result = tf.data.Dataset.from_tensor_slices((X, Y))

    return result

In [None]:
raw_train_ds = data_load(training_files)

In [None]:
raw_validation_ds = data_load(validation_files)

In [None]:
raw_test_ds = data_load(test_files)

Since our model must implement two tasks — classification and regression — we need two different Loss Functions:

* One for the classification task: we may use any Loss Function usually found in only-classification tasks like Categorical Crossentropy.
* One for the bound box regression: we can use a regression Loss Function such as Mean Squared Error.



In [None]:
CLASSES = 2

def format_instance(image, label):
    return image, (tf.one_hot(int(label[4]), CLASSES), [label[0], label[1], label[2], label[3]])

In [None]:
BATCH_SIZE = 32

# see https://www.tensorflow.org/guide/data_performance

def tune_training_ds(dataset):
    dataset = dataset.map(format_instance, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.shuffle(1024, reshuffle_each_iteration=True)
    dataset = dataset.repeat() # The dataset be repeated indefinitely.
    dataset = dataset.batch(BATCH_SIZE)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)
    return dataset

In [None]:
train_ds = tune_training_ds(raw_train_ds)

In [None]:
def tune_validation_ds(dataset):
    dataset = dataset.map(format_instance, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(len(validation_files) // 4)
    dataset = dataset.repeat()
    return dataset

In [None]:
validation_ds = tune_validation_ds(raw_validation_ds)

In [None]:
def tune_test_ds(dataset):
    dataset = dataset.map(format_instance, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(1)
    dataset = dataset.repeat()
    return dataset

test_ds = tune_test_ds(raw_test_ds)

## Model ;)

In [None]:
def build_feature_extractor(inputs):
    # use 3 pairs of Conv2D and AveragePooling2D with relu activation and kernel size = 3. Keep in mind we will be using
    x = tf.keras.layers.Conv2D(16, kernel_size=3, activation='relu', input_shape=(input_size, input_size, 1))(inputs)
    x = tf.keras.layers.AveragePooling2D(2,2)(x)

    x = tf.keras.layers.Conv2D(32, kernel_size=3, activation = 'relu')(x)
    x = tf.keras.layers.AveragePooling2D(2,2)(x)

    x = tf.keras.layers.Conv2D(64, kernel_size=3, activation = 'relu')(x)
    x = tf.keras.layers.AveragePooling2D(2,2)(x)

    return x

def build_model_adaptor(inputs):
    # Use one Flatten and One Dense Relu Layer
    x = tf.keras.layers.Flatten()(inputs)
    x = tf.keras.layers.Dense(64, activation='relu')(x)
    return x

def build_classifier_head(inputs):
    # use a Softmax Layer named classifier_head
    return tf.keras.layers.Dense(CLASSES, activation='softmax', name = 'classifier_head')(inputs)

def build_regressor_head(inputs):
    # use a Dense layer with 4 units named regressor_head
    return tf.keras.layers.Dense(units = 4, name = 'regressor_head')(inputs)

def build_model(inputs):

    feature_extractor = build_feature_extractor(inputs)

    model_adaptor = build_model_adaptor(feature_extractor)

    classification_head = build_classifier_head(model_adaptor)

    regressor_head = build_regressor_head(model_adaptor)

    model = tf.keras.Model(inputs = inputs, outputs = [classification_head, regressor_head])

    return model

Now that we have everything, Let's start with training!

## Training

In [None]:
model = build_model(tf.keras.layers.Input(shape=(input_size, input_size, 1,)))

model.compile(optimizer=tf.keras.optimizers.Adam(), # Use Adam and set loss and metric for classifier_head and regressor_head as stated earlier
    loss = {'classifier_head' : 'categorical_crossentropy', 'regressor_head' : 'mse' },
    metrics = {'classifier_head' : 'accuracy', 'regressor_head' : 'mse' })

In [None]:
tf.keras.utils.plot_model(model, show_shapes=True, show_layer_names=True) # Plot the Model

In [None]:
EPOCHS = 100
BATCH_SIZE = 32

history = model.fit(train_ds,
                    steps_per_epoch=(len(training_files) // BATCH_SIZE),
                    validation_data=validation_ds, validation_steps=1,
                    epochs=EPOCHS)

## IoU Metric

IoU scores how well the predicted bound box overlaps the actual bound box. The idea behind IoU is pretty simple: compare the intersection and union areas between the predicted and actual bound boxes by dividing the intersection by the union. IoU provides a higher score always when the predicted bounding box best matches the actual bounding box (also called ground-truth)

In [None]:
def intersection_over_union(boxA, boxB):
	xA = max(boxA[0], boxB[0])
	yA = max(boxA[1], boxB[1])
	xB = min(boxA[0] + boxA[2], boxB[0] + boxB[2])
	yB = min(boxA[1] + boxA[3], boxB[1] + boxB[3])
	interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1) # Calculate Intersection Area
	boxAArea = (boxA[2] + 1) * (boxA[3] + 1) # Calcualate BoxA Area
	boxBArea = (boxB[2] + 1) * (boxB[3] + 1) # Calculate BoxB Area
	iou = interArea / float(boxAArea + boxBArea - interArea) # Find IOU
	return iou

## Prediction

In [None]:
plt.figure(figsize=(12, 10))

test_list = list(test_ds.take(20).as_numpy_iterator())

image, labels = test_list[0]

for i in range(len(test_list)):

    ax = plt.subplot(4, 5, i + 1)
    image, labels = test_list[i]

    predictions = model(image)

    predicted_box = predictions[1][0] * input_size
    predicted_box = tf.cast(predicted_box, tf.int32)

    predicted_label = predictions[0][0]

    image = image[0]

    actual_label = labels[0][0]
    actual_box = labels[1][0] * input_size
    actual_box = tf.cast(actual_box, tf.int32)

    image = image.astype("float") * 255.0
    image = image.astype(np.uint8)
    image_color = cv.cvtColor(image, cv.COLOR_GRAY2RGB)

    color = (255, 0, 0)
    # print box red if predicted and actual label do not match
    if (predicted_label[0] > 0.5 and actual_label[0] > 0) or (predicted_label[0] < 0.5 and actual_label[0] == 0):
        color = (0, 255, 0)

    img_label = "unmasked"
    if predicted_label[0] > 0.5:
        img_label = "masked"

    predicted_box_n = predicted_box.numpy()
    cv.rectangle(image_color, predicted_box_n, color, 2)
    cv.rectangle(image_color, actual_box.numpy(), (0, 0, 255), 2)
    cv.rectangle(image_color, (predicted_box_n[0], predicted_box_n[1] + predicted_box_n[3] - 20), (predicted_box_n[0] + predicted_box_n[2], predicted_box_n[1] + predicted_box_n[3]), color, -1)
    cv.putText(image_color, img_label, (predicted_box_n[0] + 5, predicted_box_n[1] + predicted_box_n[3] - 5), cv.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0))

    IoU = intersection_over_union(predicted_box.numpy(), actual_box.numpy())

    plt.title("IoU:" + format(IoU, '.4f'))
    plt.imshow(image_color)
    plt.axis("off")

# EfficientNetB3

EfficientNetB3 is a pre-trained convolutional neural network (CNN) architecture designed for efficient image classification. It's known for its high accuracy and low computational cost.

add this dataset to the inputs

https://www.kaggle.com/datasets/hereisburak/pins-face-recognition

load it below

In [None]:
data_dir = "/kaggle/input/pins-face-recognition/105_classes_pins_dataset"

## create a train and validation dataset

In [None]:
img_height, img_width = 180, 180
batch_size = 32

train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=123,
    label_mode='categorical',
    image_size=(img_height, img_width),
    batch_size=batch_size
)

In [None]:
img_height, img_width = 180,180
batch_size = 32

val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=123,
    label_mode='categorical',
    image_size=(img_height, img_width),
    batch_size=batch_size
)

## Loading the Model

In [None]:
base_model = tf.keras.applications.EfficientNetB3(
    include_top=False,
    weights='imagenet',
    input_shape=(180, 180, 3) #image shape here
)

## Augument the data for better generalisation

In [None]:
data_augmentation = keras.Sequential([
    layers.RandomFlip("horizontal"),
    layers.RandomRotation(0.2),
    layers.RandomZoom(0.2),
    layers.RandomContrast(0.2),
    layers.RandomBrightness(0.2),
    layers.RandomTranslation(0.1, 0.1),
])

## Build the model

In [None]:
inputs = layers.Input(shape=(180, 180, 3))
x = data_augmentation(inputs)
x = base_model(x, training=False)
x = layers.GlobalAveragePooling2D()(x) ##what is GAP
x = layers.Dropout(0.5)(x)
x = layers.Dense(512, activation='relu', kernel_regularizer=keras.regularizers.l2(0.001))(x)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.5)(x)
x = layers.Dense(256, activation='relu', kernel_regularizer=keras.regularizers.l2(0.001))(x)
x = layers.BatchNormalization()(x)
outputs = layers.Dense(105, activation='softmax')(x)

model = keras.Model(inputs, outputs)

## Use a custom LR

In [None]:
def lr_schedule(epoch):
    # tweak this around if you want to
    
    lr = 1e-3
    if epoch > 10:
        lr *= 0.1
    if epoch > 20:
        lr *= 0.1
    return lr

## Compile the model

In [None]:
model.compile(
    optimizer=keras.optimizers.Adam(1e-3),
    loss=keras.losses.CategoricalCrossentropy(label_smoothing=0.1),
    metrics=['accuracy']
)

## Using Callbacks

https://medium.com/@ompramod9921/callbacks-your-secret-weapon-in-machine-learning-b08ded5678f0

In [None]:
initial_epochs = 20
callbacks = [
    keras.callbacks.LearningRateScheduler(lr_schedule),
    keras.callbacks.ModelCheckpoint('best_model.keras', save_best_only=True, monitor='val_accuracy'),
    keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True)
]

## Initial Training

In [None]:
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=initial_epochs,
    callbacks=callbacks
)

In [None]:
# Fine-tuning
base_model.trainable = True

# Freeze batch norm layers
for layer in base_model.layers:
    if isinstance(layer, layers.BatchNormalization):
        layer.trainable = False
# Recompile the model
model.compile(
    optimizer=keras.optimizers.Adam(1e-5),
    loss=keras.losses.CategoricalCrossentropy(label_smoothing=0.1),
    metrics=['accuracy']
)

## Training the rest and fine tuning

In [None]:
fine_tune_epochs = 30
total_epochs = initial_epochs + fine_tune_epochs

history_fine = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=total_epochs,
    initial_epoch=initial_epochs,
    callbacks=callbacks
)

## Saving the model

In [None]:
from sklearn.preprocessing import LabelEncoder
import pickle

# Get class names from the dataset
class_names = train_ds.class_names

# Create and fit LabelEncoder
le = LabelEncoder()
le.fit(class_names)

# Save LabelEncoder
with open('label_encoder.pkl', 'wb') as le_file:
    pickle.dump(le, le_file)

print("LabelEncoder saved successfully.")

model.save('/kaggle/working/efficientnetb3.keras')
print("Final model saved successfully.")