In [None]:
import numpy as np
import matplotlib.pyplot as plt
import PIL.Image, PIL.ImageFont, PIL.ImageDraw
import tensorflow as tf
import tensorflow_datasets as tfds
import os

In [None]:
width=75
height=75
normalized_coordinates=True

 ### Draws a bounding box on an image (optionally with labels) using given coordinates.

In [None]:
def draw_bb_on_img(rgb_img, xmin, ymin, xmax, ymax, colour="red", thickness=1,
               display_str_list=(), normalized_coordinates=True):
  draw=PIL.ImageDraw.Draw(rgb_img)
  width, height=rgb_img.size
  if normalized_coordinates:
    (left, right, top, bottom)=(xmin*width, xmax*width, ymin*height, ymax*height)
  else:
    (left, right, top, bottom)=(xmin, xmax, ymin, ymax)
  draw.line([(top, left), (bottom, left), (bottom, right), (top, right), (top, left)],
            width=thickness,
            fill=colour)

### Draws multiple bounding boxes on an image.

In [None]:
def draw_bbs_on_img(rgb_img, b, colour=[], thickness=1, display_str_list=()):
  boxes_shape=b.shape
  if not boxes_shape:
    return
  if len(boxes_shape)!=2 or boxes_shape[1]!=4:
    raise ValueError("Input must be of size [N, 4]")
  for i in range(boxes_shape[0]):
    draw_bb_on_img(rgb_img, b[i, 1], b[i, 0], b[i, 3], b[i, 2],
                   colour[i], thickness, display_str_list[i])

### Draw multiple bounding boxes on a NumPy image array and return the modified image as a NumPy array.

In [None]:
def draw_bbs_on_img_array(img, b, colour=[], thickness=1, display_str_list=()):
  img_PIL=PIL.Image.fromarray(img)
  rgb_img=PIL.Image.new("RGBA", img_PIL.size)
  rgb_img.paste(img_PIL)
  draw_bbs_on_img(rgb_img, b, colour, thickness, display_str_list)
  return np.array(rgb_img)

### Converts a TensorFlow dataset (`train_ds`, `val_ds`, `test_ds`) into NumPy arrays — extracting one batch from each split.

In [None]:
def dataset_to_numpy(train_ds, val_ds, test_ds, N):

  batch_train_ds=train_ds.unbatch().batch(N)

  if tf.executing_eagerly():
    for val_digits, (val_labels, val_boxes) in val_ds:
      val_digits=val_digits.numpy()
      val_labels=val_labels.numpy()
      val_boxes=val_boxes.numpy()
      break
    for train_digits, (train_labels, train_boxes) in batch_train_ds:
      train_digits=train_digits.numpy()
      train_labels=train_labels.numpy()
      train_boxes=train_boxes.numpy()
      break
    for test_digits, (test_labels, test_boxes) in test_ds:
      test_digits=test_digits.numpy()
      test_labels=test_labels.numpy()
      test_boxes=test_boxes.numpy()
      break

  if len(val_labels.shape)>1:
    val_labels=np.argmax(val_labels, axis=1)
  if len(train_labels.shape)>1:
    train_labels=np.argmax(train_labels, axis=1)
  if len(test_labels.shape)>1:
    test_labels=np.argmax(test_labels, axis=1)

  return (val_digits, val_labels, val_boxes,
          train_digits, train_labels, train_boxes,
          test_digits, test_labels, test_boxes)

### Draw multiple bounding boxes (and optionally labels) on a NumPy image array and return the modified image as a NumPy array.

In [None]:
plt_font_dir=os.path.join(os.path.dirname(plt.__file__), "mpl-data/fonts/ttf")

def create_digit_from_local_fonts(n):

  font_labels=[]
  img=PIL.Image.new("LA", (75*n, 75), color=(0, 255))
  font1=PIL.ImageFont.truetype(os.path.join(plt_font_dir, "DejaVuSansMono-Oblique.ttf"), 25)
  font2=PIL.ImageFont.truetype(os.path.join(plt_font_dir, "STIXGeneral.ttf"), 25)
  d=PIL.ImageDraw.Draw(img)

  for i in range(n):
    font_labels.append(i%10)
    d.text((7+i*75, 0 if i<10 else -4), str(i%10), fill=(255, 255), font=font1 if i<10 else font2)

  font_digits=np.array(img, np.float32)[:, 0]/255
  font_digits=np.reshape(np.stack(np.split(np.reshape(font_digits, [75, 75*n]), n, axis=1), axis=0))
  return font_digits, font_labels

### Generates synthetic digit images using local system fonts (from matplotlib) and returns them as a NumPy array along with corresponding digit labels.

In [None]:
def display_digits_with_boxes(digits, preds, labels, pred_boxes, boxes, iou, title, seed=16, iou_threshold=0.5):

  n=10
  row=1
  column=10
  np.random.seed(seed)
  indexes=np.random.choice(len(digits), size=n, replace=False)
  n_digits=digits[indexes]
  n_preds=preds[indexes]
  n_labels=labels[indexes]
  n_iou=[]

  if len(iou)>0:
    n_iou=iou[indexes]
  if len(pred_boxes)>0:
    n_pred_boxes=pred_boxes[indexes]
  if len(boxes)>0:
    n_boxes=boxes[indexes]

  n_digits=n_digits*255.0
  n_digits=n_digits.reshape(n, 75, 75)
  fig=plt.figure(figsize=(20, 5))
  plt.title(title)
  plt.xticks([])
  plt.yticks([])

  for i in range(n):
    ax=fig.add_subplot(row, column, i+1)
    boxes_to_plot=[]
    c=[]
    dsl=[]
    if len(boxes)>i:
      boxes_to_plot.append(n_boxes[i])
      c.append("red")
      dsl.append("True")
    if len(pred_boxes)>i:
      boxes_to_plot.append(n_pred_boxes[i])
      c.append("blue")
      dsl.append("Prediction")

    img_to_draw=draw_bbs_on_img_array(img=n_digits[i],
                                      b=np.asarray(boxes_to_plot),
                                      colour=c,
                                      display_str_list=dsl)
    ax.set_xlabel(f"True : {n_labels[i]}\nPredicted : {n_preds[i]}")
    plt.xticks([])
    plt.yticks([])

    if n_preds[i]!=n_labels[i]:
      ax.xaxis.label.set_color('red')

    plt.imshow(img_to_draw)

    if len(iou)>i:
      colour="black"
      if n_iou[i][0]<iou_threshold:
        colour="orange"
      ax.text(0.2, -0.5, "iou : %s"%(n_iou[i][0]), color=colour, transform=ax.transAxes)

### To plot training and validation metrics from a Keras `history` object.

In [None]:
def plot_metrics(history, metric_name, title):
  plt.title(title)
  plt.plot(history.history[metric_name], "b.-", label=metric_name)
  plt.plot(history.history["val_"+metric_name], "r.-", label="val_"+metric_name)

### Preprocesses an image-label pair from a TensorFlow Datasets (TFDS) digit dataset (like MNIST) by:

- Placing the `28x28` digit randomly inside a `75x75` canvas.

- Normalizing pixel values and computing a bounding box.

- Returning the padded image, one-hot label, and bounding box.

In [None]:
def read_image_tfds(img, label):

  xmin=tf.random.uniform((), 0, 48, dtype=tf.int32)
  ymin=tf.random.uniform((), 0, 48, dtype=tf.int32)

  img=tf.reshape(img, (28, 28, 1))
  img=tf.image.pad_to_bounding_box(img, ymin, xmin, 75, 75)
  img=tf.cast(img, tf.float32)/255.0

  xmin=tf.cast(xmin, tf.float32)
  ymin=tf.cast(ymin, tf.float32)

  xmax=(xmin+28)/75
  ymax=(ymin+28)/75
  xmin=xmin/75
  ymin=ymin/75

  box=tf.stack([xmin, ymin, xmax, ymax])
  return img, (tf.one_hot(label, 10), box)

- `tf.distribute.get_strategy()` retrieves the default distribution strategy.
- `strategy.num_replicas_in_sync` tells you how many devices (replicas) are working in parallel under that strategy.

In [None]:
strategy=tf.distribute.get_strategy()
strategy.num_replicas_in_sync

In [None]:
BATCH_SIZE=64*strategy.num_replicas_in_sync
BATCH_SIZE

In [None]:
# base_ds="mnist"
# base_ds="fashion_mnist"
base_ds="kmnist"

### These functions create TensorFlow input pipelines for training, validation, and testing datasets using `tf.data` and `tf.distribute.Strategy` for distributed training.

In [None]:
def get_train_ds():

  with strategy.scope():
    ds=tfds.load(base_ds, split="train[:80%]", as_supervised=True, try_gcs=True)
    ds=ds.map(read_image_tfds, num_parallel_calls=16)
    ds.shuffle(5000, reshuffle_each_iteration=True)
    ds=ds.repeat()
    ds=ds.batch(BATCH_SIZE, drop_remainder=True)
    ds=ds.prefetch(tf.data.AUTOTUNE)
  return ds

In [None]:
def get_val_ds():

  with strategy.scope():
    ds=tfds.load(base_ds, split="train[80%:]", as_supervised=True, try_gcs=True)
    ds=ds.map(read_image_tfds, num_parallel_calls=16)
    ds=ds.batch(10000, drop_remainder=True)
  return ds

In [None]:
def get_test_ds():

  with strategy.scope():
    ds=tfds.load(base_ds, split="test", as_supervised=True, try_gcs=True)
    ds=ds.map(read_image_tfds, num_parallel_calls=16)
    ds=ds.batch(10000, drop_remainder=True)
  return ds

### This ensures that all variables and datasets created within the block are compatible with distributed training.

In [None]:
with strategy.scope():
  train_ds=get_train_ds()
  val_ds=get_val_ds()
  test_ds=get_test_ds()

### Breaking the data

In [None]:
(val_digits, val_labels, val_boxes,
 train_digits, train_labels, train_boxes,
 test_digits, test_labels, test_boxes)=dataset_to_numpy(train_ds, val_ds, test_ds, 10)

### Let us visualize a batch of the training data

In [None]:
display_digits_with_boxes(train_digits,
                          train_labels,
                          train_labels,  # predicted labels (same here)
                          np.array([]),
                          train_boxes,
                          np.array([]),
                          "Training digits with labels")

### This function defines a simple CNN-based feature extractor using TensorFlow Keras layers.

In [None]:
def feature_extraction(inputs):

  x=tf.keras.layers.Conv2D(16, activation='relu', kernel_size=3, input_shape=(75, 75, 1))(inputs)
  x=tf.keras.layers.AveragePooling2D((2, 2))(x)

  x=tf.keras.layers.Conv2D(32, activation='relu', kernel_size=3)(x)
  x=tf.keras.layers.AveragePooling2D((2, 2))(x)

  x=tf.keras.layers.Conv2D(64, activation='relu', kernel_size=3)(x)
  x=tf.keras.layers.AveragePooling2D((2, 2))(x)

  return x

### This function defines a simple dense block (fully connected) for a neural network

In [None]:
def dense_layers(inputs):

  x=tf.keras.layers.Flatten()(inputs)
  x=tf.keras.layers.Dense(128, activation='relu')(x)

  return x

### This function defines the classification head of the model.

In [None]:
def classifier(inputs):

  o_p=tf.keras.layers.Dense(10, activation='softmax', name='classification')(inputs)

  return o_p

### This defines the bounding box regression head of the model.

In [None]:
def bounding_box_regression(inputs):

  o_p=tf.keras.layers.Dense(4, name='bounding_box')(inputs)

  return o_p

### Sequential image augmentation pipeline using Keras preprocessing layers.

In [None]:
img_aug=tf.keras.Sequential([
    tf.keras.layers.RandomFlip('horizontal'),
    tf.keras.layers.RandomRotation(0.2),
    tf.keras.layers.RandomZoom(0.2),
    tf.keras.layers.RandomContrast(0.2),
    tf.keras.layers.RandomTranslation(0.1, 0.1)
])

### Now let us build a multi-output neural network model.

In [None]:
def model(inputs):

  inputs=img_aug(inputs)
  feature_cnn=feature_extraction(inputs)
  dense_o_p=dense_layers(feature_cnn)
  classification_o_p=classifier(dense_o_p)
  bounding_box_o_p=bounding_box_regression(dense_o_p)

  Model=tf.keras.Model(inputs=inputs, outputs=[classification_o_p, bounding_box_o_p])

  return Model

In [None]:
def define_and_compile_model(inputs):

  Model=model(inputs)
  Model.compile(optimizer="adam",
                loss={
                    "classification":"categorical_crossentropy",
                    "bounding_box":"mse"
                },
                metrics={
                    "classification":"accuracy",
                    "bounding_box":"mse"
                })

  return Model

In [None]:
lr_sched=tf.keras.callbacks.ReduceLROnPlateau(
    monitor="val_loss",
    patience=3,
    factor=0.5,
    min_lr=1e-6,
    verbose=1
)

In [None]:
early_stop=tf.keras.callbacks.EarlyStopping(
    monitor="val_loss",
    patience=5,
    restore_best_weights=True,
    verbose=1
)

In [None]:
with strategy.scope():
  input=tf.keras.layers.Input(shape=(75, 75, 1))
  Model=define_and_compile_model(input)

Model.summary()

### Training phase

In [None]:
EPOCHS=20
steps_per_epoch=60000//BATCH_SIZE

history=Model.fit(train_ds,
                  steps_per_epoch=steps_per_epoch,
                  validation_data=val_ds,
                  validation_steps=1,
                  epochs=EPOCHS,
                  callbacks=[lr_sched, early_stop])

loss, classification_loss, bounding_box_loss, classification_acc, bounding_box_mse=Model.evaluate(val_ds, steps=1)

### Let us visualize the metrics

In [None]:
plot_metrics(history, "bounding_box_mse", "Bounding Box MSE")

In [None]:
plot_metrics(history, "classification_accuracy", "Classification Accuracy")

In [None]:
plot_metrics(history, "classification_loss", "Classification Loss")

### This function computes the Intersection over Union (IOU) between predicted and ground truth bounding boxes.

In [None]:
def intersection_over_union(pred_box, true_box):

  xmin_pred, ymin_pred, xmax_pred, ymax_pred=np.split(pred_box, 4, axis=1)
  xmin_true, ymin_true, xmax_true, ymax_true=np.split(true_box, 4, axis=1)

  smoothing_factor=1e-10

  xmin_overlap=np.maximum(xmin_pred, xmin_true)
  xmax_overlap=np.minimum(xmax_pred, xmax_true)
  ymin_overlap=np.maximum(ymin_pred, ymin_true)
  ymax_overlap=np.minimum(ymax_pred, ymax_true)

  pred_box_area=(xmax_pred-xmin_pred)*(ymax_pred-ymin_pred)
  true_box_area=(xmax_true-xmin_true)*(ymax_true-ymin_true)

  overlap_area=np.maximum((xmax_overlap-xmin_overlap), 0)*np.maximum((ymax_overlap-ymin_overlap), 0)
  union_area=pred_box_area+true_box_area-overlap_area

  iou=(overlap_area+smoothing_factor)/(union_area+smoothing_factor)
  iou=np.round(iou, 4)

  return iou

### Let us make the predictions

In [None]:
from sklearn.metrics import accuracy_score

In [None]:
prediction=Model.predict(val_digits, batch_size=BATCH_SIZE)
predicted_labels=np.argmax(prediction[0], axis=1)
predicted_boxes=prediction[1]

In [None]:
acc_v=accuracy_score(val_labels, predicted_labels)
print(f"Accuracy : {acc_v*100}%")

In [None]:
iou=intersection_over_union(predicted_boxes, val_boxes)
display_digits_with_boxes(val_digits,
                          predicted_labels,
                          val_labels,
                          predicted_boxes,
                          val_boxes,
                          iou,
                          "Actual and Predicted values for validation dataset")

In [None]:
prediction=Model.predict(test_digits, batch_size=BATCH_SIZE)
predicted_labels=np.argmax(prediction[0], axis=1)
predicted_boxes=prediction[1]

In [None]:
acc_t=accuracy_score(test_labels, predicted_labels)
print(f"Accuracy : {acc_t*100}%")

In [None]:
iou=intersection_over_union(predicted_boxes, test_boxes)
display_digits_with_boxes(test_digits,
                          predicted_labels,
                          test_labels,
                          predicted_boxes,
                          test_boxes,
                          iou,
                          "Actual and Predicted values for test dataset")