<a href="https://colab.research.google.com/github/EmineSener/Computer-Vision-With-Transformer/blob/main/coursera_Assignment3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Image Classification and Object Localization

In this lab, you'll build a CNN from scratch to:
- classify the main subject in an image
- localize it by drawing bounding boxes around it.

You'll use the [MNIST](http://yann.lecun.com/exdb/mnist/) dataset to synthesize a custom dataset for the task:
- Place each "digit" image on a black canvas of width 75 x 75 at random locations.
- Calculate the corresponding bounding boxes for those "digits".

The bounding box prediction can be modelled as a "regression" task, which means that the model will predict a numeric value (as opposed to a category).

## Imports

In [None]:
import os, re, time, json  # Importing necessary libraries for file operations, regular expressions, time, and JSON handling
import PIL.Image, PIL.ImageFont, PIL.ImageDraw  # Importing modules from the Python Imaging Library (PIL) for image processing
import numpy as np  # Importing NumPy library for numerical operations

try:
    # %tensorflow_version only exists in Colab.
    %tensorflow_version 2.x  # Checking if the code is running in Google Colab and setting TensorFlow version to 2.x
except Exception:
    pass

import tensorflow as tf  # Importing TensorFlow library
from matplotlib import pyplot as plt  # Importing pyplot module from matplotlib for plotting
import tensorflow_datasets as tfds  # Importing TensorFlow Datasets for accessing datasets

print("Tensorflow version " + tf.__version__)  # Printing the TensorFlow version

Colab only includes TensorFlow 2.x; %tensorflow_version has no effect.
Tensorflow version 2.15.0


# Visualization Utilities

These functions are used to draw bounding boxes around the digits.

In [None]:
# Define the width of the image
im_width = 75
# Define the height of the image
im_height = 75
# Flag indicating whether to use normalized coordinates for bounding boxes
use_normalized_coordinates = True

In [None]:
def draw_bounding_boxes_on_image_array(image,
                                       boxes,
                                       color=[],
                                       thickness=1,
                                       display_str_list=()):
  """Draws bounding boxes on image (numpy array).
  Args:
    image: a numpy array object.
    boxes: a 2 dimensional numpy array of [N, 4]: (ymin, xmin, ymax, xmax).
           The coordinates are in normalized format between [0, 1].
    color: color to draw bounding box. Default is red.
    thickness: line thickness. Default value is 4.
    display_str_list_list: a list of strings for each bounding box.
  Raises:
    ValueError: if boxes is not a [N, 4] array
  """
  # Convert the numpy array image to a PIL Image object
  image_pil = PIL.Image.fromarray(image)
  # Create a new RGBA image
  rgbimg = PIL.Image.new("RGBA", image_pil.size)
  # Paste the original image onto the new RGBA image
  rgbimg.paste(image_pil)
  # Call draw_bounding_boxes_on_image function to draw bounding boxes on the image
  draw_bounding_boxes_on_image(rgbimg, boxes, color, thickness, display_str_list)
  # Convert the RGBA image back to a numpy array
  return np.array(rgbimg)

In [None]:
def draw_bounding_boxes_on_image(image,
                                 boxes,
                                 color=[],
                                 thickness=1,
                                 display_str_list=()):
  """Draws bounding boxes on image.
  Args:
    image: a PIL.Image object.
    boxes: a 2 dimensional numpy array of [N, 4]: (ymin, xmin, ymax, xmax).
           The coordinates are in normalized format between [0, 1].
    color: color to draw bounding box. Default is red.
    thickness: line thickness. Default value is 4.
    display_str_list: a list of strings for each bounding box.
  Raises:
    ValueError: if boxes is not a [N, 4] array
  """
  # Get the shape of the bounding boxes array
  boxes_shape = boxes.shape
  # Check if the boxes_shape is empty
  if not boxes_shape:
    return
  # Check if the boxes_shape is not 2D or if the second dimension is not 4
  if len(boxes_shape) != 2 or boxes_shape[1] != 4:
    raise ValueError('Input must be of size [N, 4]')
  # Iterate over each bounding box
  for i in range(boxes_shape[0]):
    # Call draw_bounding_box_on_image function to draw each bounding box
    draw_bounding_box_on_image(image, boxes[i, 1], boxes[i, 0], boxes[i, 3],
                               boxes[i, 2], color[i], thickness, display_str_list[i])

In [None]:
def draw_bounding_box_on_image(image,
                               ymin,
                               xmin,
                               ymax,
                               xmax,
                               color='red',
                               thickness=1,
                               display_str=None,
                               use_normalized_coordinates=True):
  """Adds a bounding box to an image.
  Bounding box coordinates can be specified in either absolute (pixel) or
  normalized coordinates by setting the use_normalized_coordinates argument.
  Args:
    image: a PIL.Image object.
    ymin: ymin of bounding box.
    xmin: xmin of bounding box.
    ymax: ymax of bounding box.
    xmax: xmax of bounding box.
    color: color to draw bounding box. Default is red.
    thickness: line thickness. Default value is 1.
    display_str_list: string to display in box
    use_normalized_coordinates: If True (default), treat coordinates
      ymin, xmin, ymax, xmax as relative to the image.  Otherwise treat
      coordinates as absolute.
  """
  # Create a PIL ImageDraw object
  draw = PIL.ImageDraw.Draw(image)
  # Get the width and height of the image
  im_width, im_height = image.size
  # Check if normalized coordinates are used
  if use_normalized_coordinates:
    # Convert normalized coordinates to absolute coordinates
    (left, right, top, bottom) = (xmin * im_width, xmax * im_width,
                                  ymin * im_height, ymax * im_height)
  else:
    # Use absolute coordinates
    (left, right, top, bottom) = (xmin, xmax, ymin, ymax)
  # Draw the bounding box on the image
  draw.line([(left, top), (left, bottom), (right, bottom),
             (right, top), (left, top)], width=thickness, fill=color)

These utilities are used to visualize the data and predictions.

In [None]:
"""
This cell contains helper functions used for visualization
and downloads only.

You can skip reading it, as there is very
little Keras or Tensorflow related code here.
"""

# Matplotlib configuration for visualization
plt.rc('image', cmap='gray')  # Set colormap for images to grayscale
plt.rc('grid', linewidth=0)    # Set grid linewidth to 0
plt.rc('xtick', top=False, bottom=False, labelsize='large')  # Set properties for x-axis ticks
plt.rc('ytick', left=False, right=False, labelsize='large')  # Set properties for y-axis ticks
plt.rc('axes', facecolor='F8F8F8', titlesize="large", edgecolor='white')  # Set properties for axes
plt.rc('text', color='a8151a')  # Set color for text
plt.rc('figure', facecolor='F0F0F0')  # Set facecolor for figures

# Set the directory for Matplotlib fonts
MATPLOTLIB_FONT_DIR = os.path.join(os.path.dirname(plt.__file__), "mpl-data/fonts/ttf")


In [None]:
# create digits from local fonts for testing
def create_digits_from_local_fonts(n):
  font_labels = []
  img = PIL.Image.new('LA', (75*n, 75), color = (0,255)) # format 'LA': black in channel 0, alpha in channel 1
  font1 = PIL.ImageFont.truetype(os.path.join(MATPLOTLIB_FONT_DIR, 'DejaVuSansMono-Oblique.ttf'), 25)
  font2 = PIL.ImageFont.truetype(os.path.join(MATPLOTLIB_FONT_DIR, 'STIXGeneral.ttf'), 25)
  d = PIL.ImageDraw.Draw(img)
  for i in range(n):
    font_labels.append(i%10)
    d.text((7+i*75,0 if i<10 else -4), str(i%10), fill=(255,255), font=font1 if i<10 else font2)
  font_digits = np.array(img.getdata(), np.float32)[:,0] / 255.0 # black in channel 0, alpha in channel 1 (discarded)
  font_digits = np.reshape(np.stack(np.split(np.reshape(font_digits, [75, 75*n]), n, axis=1), axis=0), [n, 75*75])
  return font_digits, font_labels






    plt.imshow(img_to_draw)

    if len(iou) > i :
      color = "black"
      if (n_iou[i][0] < iou_threshold):
        color = "red"
      ax.text(0.2, -0.3, "iou: %s" %(n_iou[i][0]), color=color, transform=ax.transAxes)


# utility to display training and validation curves
def plot_metrics(metric_name, title, ylim=5):
  plt.title(title)
  plt.ylim(0,ylim)
  plt.plot(history.history[metric_name],color='blue',label=metric_name)
  plt.plot(history.history['val_' + metric_name],color='green',label='val_' + metric_name)

## Selecting Between Strategies

### TPU or GPU detection

Depending on the hardware available, you'll use different distribution strategies.  For a review on distribution strategies, please check out the second course in this specialization ["Custom and Distributed Training with TensorFlow"](https://www.coursera.org/learn/custom-distributed-training-with-tensorflow), week 4, "Distributed Training".

- If the TPU is available, then you'll be using the TPU Strategy.
Otherwise:
- If more than one GPU is available, then you'll use the Mirrored Strategy
- If one GPU is available or if just the CPU is available, you'll use the default strategy.

In [None]:
# Detect hardware and select appropriate distribution strategy

try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()  # TPU detection
except ValueError:
    tpu = None
    gpus = tf.config.experimental.list_logical_devices("GPU")

# Select appropriate distribution strategy based on available hardware
if tpu:
    # Connect to TPU cluster and initialize TPU system
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    # Create a TPU strategy for distributed training
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
    print('Running on TPU ', tpu.cluster_spec().as_dict()['worker'])
elif len(gpus) > 1:
    # Create a MirroredStrategy for multi-GPU training
    strategy = tf.distribute.MirroredStrategy([gpu.name for gpu in gpus])
    print('Running on multiple GPUs ', [gpu.name for gpu in gpus])
elif len(gpus) == 1:
    # Use default strategy for single GPU
    strategy = tf.distribute.get_strategy()
    print('Running on single GPU ', gpus[0].name)
else:
    # Use default strategy for CPU
    strategy = tf.distribute.get_strategy()
    print('Running on CPU')

# Print the number of replicas (devices) in sync for distributed training
print("Number of accelerators: ", strategy.num_replicas_in_sync)


### Parameters

The global batch size is the batch size per replica (64 in this case) times the number of replicas in the distribution strategy.

In [None]:
BATCH_SIZE = 64 * strategy.num_replicas_in_sync # Gobal batch size.
# The global batch size will be automatically sharded across all
# replicas by the tf.data.Dataset API. A single TPU has 8 cores.
# The best practice is to scale the batch size by the number of
# replicas (cores). The learning rate should be increased as well.

## Loading and Preprocessing the Dataset

Define some helper functions that will pre-process your data:
- `read_image_tfds`: randomly overlays the "digit" image on top of a larger canvas.
- `get_training_dataset`: loads data and splits it to get the training set.
- `get_validation_dataset`: loads and splits the data to get the validation set.

In [None]:
'''
Transforms each image in dataset by pasting it on a 75x75 canvas at random locations.
'''
def read_image_tfds(image, label):
    # Randomly generate x and y coordinates for pasting the image
    xmin = tf.random.uniform((), 0 , 48, dtype=tf.int32)
    ymin = tf.random.uniform((), 0 , 48, dtype=tf.int32)

    # Reshape the image to (28, 28, 1)
    image = tf.reshape(image, (28,28,1,))

    # Pad the image to a 75x75 canvas at the randomly generated location
    image = tf.image.pad_to_bounding_box(image, ymin, xmin, 75, 75)

    # Convert image pixels to float32 and normalize them to [0, 1]
    image = tf.cast(image, tf.float32)/255.0

    # Convert coordinates to float32
    xmin = tf.cast(xmin, tf.float32)
    ymin = tf.cast(ymin, tf.float32)

    # Calculate the normalized bounding box coordinates
    xmax = (xmin + 28) / 75
    ymax = (ymin + 28) / 75
    xmin = xmin / 75
    ymin = ymin / 75

    # Return the transformed image and its corresponding label and bounding box
    return image, (tf.one_hot(label, 10), [xmin, ymin, xmax, ymax])

In [None]:
'''
Loads and maps the training split of the dataset using the map function. Note that we try to load the gcs version since TPU can only work with datasets on Google Cloud Storage.
'''
def get_training_dataset():
    # Wrap dataset loading and preprocessing in strategy.scope() to distribute across TPUs or GPUs
    with  strategy.scope():
        # Load the training split of the dataset
        dataset = tfds.load("mnist", split="train", as_supervised=True, try_gcs=True)

        # Apply preprocessing function to each image-label pair in parallel
        dataset = dataset.map(read_image_tfds, num_parallel_calls=16)

        # Shuffle the dataset
        dataset = dataset.shuffle(5000, reshuffle_each_iteration=True)

        # Repeat the dataset indefinitely (mandatory for Keras)
        dataset = dataset.repeat()

        # Batch the dataset, dropping any remaining incomplete batches
        dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)  # drop_remainder is important on TPU, batch size must be fixed

        # Prefetch data to improve performance by overlapping data preprocessing and model execution
        dataset = dataset.prefetch(-1)  # fetch next batches while training on the current one (-1: autotune prefetch buffer size)

    return dataset


In [None]:
'''
Loads and maps the validation split of the dataset using the map function. Note that we try to load the gcs version since TPU can only work with datasets on Google Cloud Storage.
'''
def get_validation_dataset():
    # Load the validation split of the MNIST dataset, attempting to load from Google Cloud Storage (GCS) for TPU compatibility
    dataset = tfds.load("mnist", split="test", as_supervised=True, try_gcs=True)

    # Apply preprocessing function to each image-label pair in parallel
    dataset = dataset.map(read_image_tfds, num_parallel_calls=16)

    # Batch the dataset with a batch size of 10,000, encompassing all items in the validation dataset
    dataset = dataset.batch(10000, drop_remainder=True)

    # Mandatory for Keras for now: repeat the dataset indefinitely, allowing multiple epochs during training
    dataset = dataset.repeat()

    return dataset


In [None]:
# Instantiate the datasets within the context of the strategy scope
with strategy.scope():
    # Get the training dataset with preprocessing and batching suitable for TPU training
    training_dataset = get_training_dataset()

    # Get the validation dataset with preprocessing suitable for evaluation
    validation_dataset = get_validation_dataset()

### Visualize Data

In [None]:
# pull a batch from the datasets. This code is not very nice, it gets much better in eager mode (TODO)
def dataset_to_numpy_util(training_dataset, validation_dataset, N):
    """
    Converts TensorFlow datasets to NumPy arrays.

    Args:
        training_dataset: TensorFlow dataset for training.
        validation_dataset: TensorFlow dataset for validation.
        N: Number of training digits to extract.

    Returns:
        training_digits: NumPy array of training digits.
        training_labels: NumPy array of training labels.
        training_bboxes: NumPy array of bounding boxes for training digits.
        validation_digits: NumPy array of validation digits.
        validation_labels: NumPy array of validation labels.
        validation_bboxes: NumPy array of bounding boxes for validation digits.
    """
    # get one batch from each: 10000 validation digits, N training digits
    batch_train_ds = training_dataset.unbatch().batch(N)

    # eager execution: loop through datasets normally
    if tf.executing_eagerly():
        # Extract validation data
        for validation_digits, (validation_labels, validation_bboxes) in validation_dataset:
            validation_digits = validation_digits.numpy()  # Convert validation digits to NumPy array
            validation_labels = validation_labels.numpy()  # Convert validation labels to NumPy array
            validation_bboxes = validation_bboxes.numpy()  # Convert validation bounding boxes to NumPy array
            break
        # Extract training data
        for training_digits, (training_labels, training_bboxes) in batch_train_ds:
            training_digits = training_digits.numpy()  # Convert training digits to NumPy array
            training_labels = training_labels.numpy()  # Convert training labels to NumPy array
            training_bboxes = training_bboxes.numpy()  # Convert training bounding boxes to NumPy array
            break

    # Decode one-hot encoded labels
    validation_labels = np.argmax(validation_labels, axis=1)
    training_labels = np.argmax(training_labels, axis=1)

    return (training_digits, training_labels, training_bboxes,
            validation_digits, validation_labels, validation_bboxes)


In [None]:
# Utility function to display a row of digits with their predicted bounding boxes
def display_digits_with_boxes(digits, predictions, labels, pred_bboxes, bboxes, iou, title):
    """
    Displays a row of digits with their predicted bounding boxes.

    Args:
        digits: Array of digit images.
        predictions: Predicted labels for the digits.
        labels: True labels for the digits.
        pred_bboxes: Predicted bounding boxes for the digits.
        bboxes: True bounding boxes for the digits.
        iou: Intersection over Union values between predicted and true bounding boxes.
        title: Title for the plot.
    """

    n = 10  # Number of digits to display

    # Randomly select n digits
    indexes = np.random.choice(len(predictions), size=n)
    n_digits = digits[indexes]
    n_predictions = predictions[indexes]
    n_labels = labels[indexes]

    # Select corresponding bounding boxes and IoU values
    n_iou = []
    if len(iou) > 0:
        n_iou = iou[indexes]

    if len(pred_bboxes) > 0:
        n_pred_bboxes = pred_bboxes[indexes, :]

    if len(bboxes) > 0:
        n_bboxes = bboxes[indexes, :]

    # Rescale digit images to [0, 255] range
    n_digits = n_digits * 255.0
    n_digits = n_digits.reshape(n, 75, 75)

    # Create the plot
    fig = plt.figure(figsize=(20, 4))
    plt.title(title)
    plt.yticks([])
    plt.xticks([])

    # Plot each digit with its predicted and true bounding boxes
    for i in range(10):
        ax = fig.add_subplot(1, 10, i+1)
        bboxes_to_plot = []
        if len(pred_bboxes) > i:
            bboxes_to_plot.append(n_pred_bboxes[i])

        if len(bboxes) > i:
            bboxes_to_plot.append(n_bboxes[i])

        # Draw bounding boxes on the digit image
        img_to_draw = draw_bounding_boxes_on_image_array(image=n_digits[i], boxes=np.asarray(bboxes_to_plot),
                                                         color=['red', 'green'], display_str_list=["true", "pred"])

        plt.xlabel(n_predictions[i])
        plt.xticks([])
        plt.yticks([])

        # Highlight incorrect predictions in red
        if n_predictions[i] != n_labels[i]:
            ax.xaxis.label.set_color('red')

In [None]:
(training_digits, training_labels, training_bboxes,
 validation_digits, validation_labels, validation_bboxes) = dataset_to_numpy_util(training_dataset, validation_dataset, 10)


NameError: name 'dataset_to_numpy_util' is not defined

In [None]:
display_digits_with_boxes(training_digits, training_labels, training_labels, np.array([]), training_bboxes, np.array([]), "training digits and their labels")
display_digits_with_boxes(validation_digits, validation_labels, validation_labels, np.array([]), validation_bboxes, np.array([]), "validation digits and their labels")

## Define the Network

Here, you'll define your custom CNN.
- `feature_extractor`: these convolutional layers extract the features of the image.
- `classifier`:  This define the output layer that predicts among 10 categories (digits 0 through 9)
- `bounding_box_regression`: This defines the output layer that predicts 4 numeric values, which define the coordinates of the bounding box (xmin, ymin, xmax, ymax)
- `final_model`: This combines the layers for feature extraction, classification and bounding box prediction.  
  - Notice that this is another example of a branching model, because the model splits to produce two kinds of output (a category and set of numbers).  
  - Since you've learned to use the Functional API earlier in the specialization (course 1), you have the flexibility to define this kind of branching model!
- `define_and_compile_model`: choose the optimizer and metrics, then compile the model.

In [None]:
'''
Feature extractor is the CNN that is composed of convolution and pooling layers.
'''
def feature_extractor(inputs):
    """
    Defines the feature extractor CNN consisting of convolution and pooling layers.

    Args:
        inputs: Input tensor.

    Returns:
        Output tensor after passing through the feature extractor CNN.
    """
    # First convolutional layer with 16 filters, ReLU activation, and kernel size of 3x3
    x = tf.keras.layers.Conv2D(16, activation='relu', kernel_size=3, input_shape=(75, 75, 1))(inputs)

    # Average pooling layer with pool size of 2x2
    x = tf.keras.layers.AveragePooling2D((2, 2))(x)

    # Second convolutional layer with 32 filters, ReLU activation, and kernel size of 3x3
    x = tf.keras.layers.Conv2D(32, kernel_size=3, activation='relu')(x)

    # Average pooling layer with pool size of 2x2
    x = tf.keras.layers.AveragePooling2D((2, 2))(x)

    # Third convolutional layer with 64 filters, ReLU activation, and kernel size of 3x3
    x = tf.keras.layers.Conv2D(64, kernel_size=3, activation='relu')(x)

    # Average pooling layer with pool size of 2x2
    x = tf.keras.layers.AveragePooling2D((2, 2))(x)

    return x


In [None]:
'''
dense_layers function adds a flatten and dense layer.
This function follows the feature extraction layers.
'''
def dense_layers(inputs):
    """
    Adds a flatten and dense layer to the network.

    Args:
        inputs: Input tensor.

    Returns:
        Output tensor after passing through the flatten and dense layers.
    """
    # Flatten layer to convert the 3D tensor into a 1D tensor
    x = tf.keras.layers.Flatten()(inputs)

    # Dense layer with 128 units and ReLU activation
    x = tf.keras.layers.Dense(128, activation='relu')(x)

    return x

In [None]:
'''
Classifier function defines the classification output.
This includes a set of fully connected layers followed by a softmax layer.
'''
def classifier(inputs):
    """
    Defines the classification output of the model.

    Args:
        inputs: Input tensor.

    Returns:
        Output tensor representing the classification.
    """
    # Dense layer with 10 units (for 10 classes) and softmax activation
    classification_output = tf.keras.layers.Dense(10, activation='softmax', name='classification')(inputs)

    return classification_output