<a href="https://colab.research.google.com/github/Arup3201/AOTS/blob/main/notebooks/Artificial_Traffic_Management_ML_Model_Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Artificial Traffic Management - ML Model

## Libraries

In [1]:
!pip install opencv-python



In [2]:
import glob
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import image
from xml.dom import minidom
import cv2
import tensorflow as tf
from tensorflow import keras

**Variables**

In [36]:
TRAINING_DIR = '/content/dataset/train'
VALIDATION_DIR = '/content/dataset/valid'
TEST_DIR = '/content/dataset/test'
CLASS_NAMES = ['car', 'bus', 'truck']
S = 7
B = 2
C = len(CLASS_NAMES)
IMAGE_DIMS = [448, 448]
NUM_CHANNELS = 3
BATCH_SIZE = 16
LAMBDA_COORD = 5
LAMBDA_NOOBJ = 0.5

EPOCHS = 30
TRAIN_STEPS = 160
VAL_STEPS = 60

dataset_params = {'class_names': CLASS_NAMES,
                  'S': S,
                  'B': B,
                  'dims': IMAGE_DIMS,
                  'n_channels': NUM_CHANNELS,
                  'batch_size': BATCH_SIZE,
                  'shuffle': True,
                  'transform': None}

model_params = {'S': S,
                'B': B,
                'C': C,
                'img_shape': IMAGE_DIMS}

loss_params = {'S': S,
               'B': B,
               'C': C,
               'lambda_coord': LAMBDA_COORD,
               'lambda_noobj': LAMBDA_NOOBJ}

## Data

Download Data

In [4]:
!rm -r dataset/
!wget https://universe.roboflow.com/ds/Zq7rUF1khY?key=UXLLVXz75H

!mkdir dataset
!unzip -q Zq7rUF1khY?key=UXLLVXz75H -d dataset/
!rm Zq7rUF1khY?key=UXLLVXz75H

rm: cannot remove 'dataset/': No such file or directory
--2023-12-01 12:32:47--  https://universe.roboflow.com/ds/Zq7rUF1khY?key=UXLLVXz75H
Resolving universe.roboflow.com (universe.roboflow.com)... 151.101.65.195, 151.101.1.195
Connecting to universe.roboflow.com (universe.roboflow.com)|151.101.65.195|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://storage.googleapis.com/roboflow-platform-exports/pwYAXv9BTpqLyFfgQoPZ/peHMC7FkjCfPPfu0wVn3/2/voc.zip?X-Goog-Algorithm=GOOG4-RSA-SHA256&X-Goog-Credential=481589474394-compute%40developer.gserviceaccount.com%2F20231201%2Fauto%2Fstorage%2Fgoog4_request&X-Goog-Date=20231201T123247Z&X-Goog-Expires=901&X-Goog-SignedHeaders=host&X-Goog-Signature=2de2cb741bb179b2c80f64e1ee4cc344f8224183ed5afbe17295a5af5c7d513bdd64ef0e421976cd01ebe0e6f914fa5f81bacb7ea66ba4c8dd2182ca7f94d06bb5fd0e0fd1a1565aae0aca758ba94feff17f5a9fe0d7c67d70d21b0d8b2d33dcbb58f9d357583e56f8fc9c495cc6963157205349393cd2a592e2178804c5d32cfa910d3f01ef

Data count

In [42]:
def count_images(path):
  files = []
  for img_anot in glob.glob(str(path)+"/*.xml"):
    xml_file = minidom.parse(img_anot)
    files.append(xml_file)

  return len(files)

In [43]:
print(f"Training images: {count_images(TRAINING_DIR)}")
print(f"Validation images: {count_images(VALIDATION_DIR)}")
print(f"Test images: {count_images(TEST_DIR)}")

Training images: 2634
Validation images: 966
Test images: 458


Generate data from dataset



In [18]:
# Returns a batch of image and labels
def load_data(filepath, class_names, S=7, B=2,
                  dims=[448, 448], n_channels=3,
                  shuffle=True, transform=None):

  files = []
  class_labels = {}

  for img_anot in glob.glob(str(filepath)+"/*.xml"):
    xml_file = minidom.parse(img_anot)
    files.append(xml_file)

  for i, class_name in enumerate(class_names):
    class_labels[class_name] = i

  img_files = [] # (Batch, )
  labels = [] # (Batch, S, S, C+5*B)

  for i, fxml in enumerate(files):
    # Get the image dimensions
    img_height = dims[0]
    img_width = dims[1]

    # Get all the objects that are present in the image like car, truck, bus  etc
    objects = fxml.getElementsByTagName('object')

    boxes = []

    # Process all objects by taking their name and bounding box coordinates
    for object_ in objects:

      class_name = object_.getElementsByTagName('name')[0].firstChild.data
      # Modify class names
      if 'truck' in class_name:
        class_name = 'truck'
      elif 'bus' in class_name:
        class_name = 'bus'
      else:
        class_name = 'car'

      class_label = class_labels[class_name]

      bbox = object_.getElementsByTagName('bndbox')[0]
      xmin = int(bbox.getElementsByTagName('xmin')[0].firstChild.data)
      xmax = int(bbox.getElementsByTagName('xmax')[0].firstChild.data)
      ymin = int(bbox.getElementsByTagName('ymin')[0].firstChild.data)
      ymax = int(bbox.getElementsByTagName('ymax')[0].firstChild.data)

      centerx = (xmax - (xmax-xmin)/2) / img_width
      centery = ((ymax - (ymax-ymin)/2)) / img_height
      boxwidth = (xmax - xmin) / img_width
      boxheight = (ymax - ymin) / img_height

      boxes.append([class_label, centerx, centery, boxwidth, boxheight])

    # label matrix with (S, S, C+5*B) shape
    label_matrix = tf.zeros([S, S, C+5*B])

    for box in boxes:
      class_label, x, y, width, height = box

      class_label = int(class_label)

      # grid row and column
      row, column = int(S * y) , int(S * x)

      # x and y wrt cell
      cell_x, cell_y = (S * x - column), (S * y - row)

      # width and height wrt cell
      width_cell, height_cell = (
          width * S,
          height * S
      )

      if row >= S or column >= S:
        continue # skip this object

      # Check whether this cell already have a bounding box entry or not
      # By using the confidence score value which is initially zero.
      if label_matrix[row, column, C] == 0:
        # Set the confidence score to 1
        indices = [[row, column, C]]
        updates = [1]
        label_matrix = tf.tensor_scatter_nd_update(label_matrix, indices, updates)

        # Set the class label to 1
        indices = [[row, column, class_label]]
        updates = [1]
        label_matrix = tf.tensor_scatter_nd_update(label_matrix, indices, updates)

        # Set the bounding box values
        indices = [[row, column, C+1], [row, column, C+2], [row, column, C+3],
                    [row, column, C+4]]
        updates = [cell_x, cell_y, width_cell, height_cell]
        label_matrix = tf.tensor_scatter_nd_update(label_matrix, indices, updates)

    # Store them into images and labels array
    img_files.append(filepath+'/'+fxml.getElementsByTagName('filename')[0].firstChild.data)
    labels.append(label_matrix)

  img_files = tf.stack(img_files, axis=0)
  labels = tf.stack(labels, axis=0)

  dataset = tf.data.Dataset.from_tensor_slices((img_files, labels))

  return dataset

In [14]:
def process_image(img_file, labels):
  img = tf.io.read_file(img_file)
  img = tf.io.decode_jpeg(img)
  img = tf.image.convert_image_dtype(img, dtype=tf.float32)
  img = tf.image.resize(img, [*IMAGE_DIMS])

  return img, labels

def tune_training_ds(dataset):
  dataset = dataset.map(process_image, num_parallel_calls=tf.data.AUTOTUNE)
  dataset = dataset.shuffle(1024, reshuffle_each_iteration=True)
  dataset = dataset.repeat() # The dataset be repeated indefinitely.
  dataset = dataset.batch(BATCH_SIZE)
  dataset = dataset.prefetch(tf.data.AUTOTUNE)
  return dataset

def tune_validation_ds(dataset):
  dataset = dataset.map(process_image, num_parallel_calls=tf.data.AUTOTUNE)
  dataset = dataset.batch(BATCH_SIZE)
  dataset = dataset.repeat()
  return dataset

In [19]:
raw_train_ds = load_data(TRAINING_DIR, CLASS_NAMES)

In [22]:
train_ds = tune_training_ds(raw_train_ds)

In [23]:
raw_val_ds = load_data(TRAINING_DIR, CLASS_NAMES)

In [24]:
val_ds = tune_validation_ds(raw_val_ds)

## Model

### CNNBlock Class
This class implements the `convolutional neural network` along with `BatchNormalization` layer and also applies `LeakyReLU`.

In [32]:
@keras.saving.register_keras_serializable()
class ImageProcessor(tf.keras.layers.Layer):
  def __init__(self, img_shape, **kwargs):
    super().__init__(**kwargs)
    self.img_shape = img_shape

  def build(self, input_shape):
    self.model = keras.Sequential([
        # First layer
        tf.keras.layers.Conv2D(filters=64, kernel_size=(7, 7),
                               padding='same', strides=(2, 2), input_shape=self.img_shape),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.MaxPooling2D((2, 2), strides=(2, 2)),

        # Second layer
        tf.keras.layers.Conv2D(filters=192, kernel_size=(3, 3),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.MaxPooling2D((2, 2), strides=(2, 2)),

        # Third layer
        tf.keras.layers.Conv2D(filters=128, kernel_size=(1, 1),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=256, kernel_size=(3, 3),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=256, kernel_size=(1, 1),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=512, kernel_size=(3, 3),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.MaxPooling2D((2, 2), strides=(2, 2)),

        # Fourth layer
        tf.keras.layers.Conv2D(filters=256, kernel_size=(1, 1),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=512, kernel_size=(3, 3),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=256, kernel_size=(1, 1),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=512, kernel_size=(3, 3),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=256, kernel_size=(1, 1),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=512, kernel_size=(3, 3),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=256, kernel_size=(1, 1),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=512, kernel_size=(3, 3),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=512, kernel_size=(1, 1),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=1024, kernel_size=(3, 3),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.MaxPooling2D((2, 2), strides=(2, 2)),

        # Fifth layer
        tf.keras.layers.Conv2D(filters=512, kernel_size=(1, 1),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=1024, kernel_size=(3, 3),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=512, kernel_size=(1, 1),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=1024, kernel_size=(3, 3),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=1024, kernel_size=(3, 3),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=1024, kernel_size=(3, 3),
                               padding='same', strides=(2, 2)),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),

        # Sixth layer
        tf.keras.layers.Conv2D(filters=1024, kernel_size=(3, 3),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Conv2D(filters=1024, kernel_size=(3, 3),
                               padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.LeakyReLU()
    ])

  def call(self, x):
    return self.model(x)

### YoloV1 class
This class extends `tf.keras.Model` class which is the class for building models and training them.

By inheriting the `Model` class, we are making sure that our `YoloV1` class have every tensorflow details implemented in it like parallel processing, usage of GPU, and many other utilities. But, by extending we also make sure that we customize the process of training, testing and model creation.

In [37]:
@keras.saving.register_keras_serializable()
class YoloV1(tf.keras.Model):
  def __init__(self, S, B, C, img_shape=[448, 448], n_channels=3,
               name="YoloV1", **kwargs):
    super().__init__(name=name, **kwargs)

    self.image_processor = ImageProcessor([*img_shape, n_channels])

    self.dense_1 = tf.keras.layers.Dense(4096)
    self.dropout = tf.keras.layers.Dropout(0.2)
    self.leaky_relu = tf.keras.layers.LeakyReLU()
    self.final = tf.keras.layers.Dense(B*5+C, activation='linear')

  def call(self, x):
    x = self.image_processor(x)

    x = self.dense_1(x)
    x = self.dropout(x)
    x = self.leaky_relu(x)
    x = self.final(x)

    return x

  def train_step(self, data):
    if len(data) == 3:
      x, y, sample_weight = data
    else:
      sample_weight = None
      x, y = data

    with tf.GradientTape() as tape:
      y_pred = self(x, training=True)
      loss = self.compute_loss(y=y, y_pred=y_pred)

    # Calculate gradients
    trainable_vars = self.trainable_variables
    gradients = tape.gradient(loss, trainable_vars)

    # Update weights
    self.optimizer.apply_gradients(zip(gradients, trainable_vars))

    for metric in self.metrics:
      if metric.name == "loss":
          metric.update_state(loss)
      else:
          metric.update_state(y, y_pred, sample_weight=sample_weight)

    return {m.name: m.result() for m in self.metrics}

In [27]:
def intersection_over_union(boxes_pred, boxes_true):

  # Box 1 = [xmin, ymin, xmax, ymax]
  box1_xmin = boxes_pred[..., 0:1] - boxes_pred[..., 2:3] / 2
  box1_ymin = boxes_pred[..., 1:2] - boxes_pred[..., 3:4] / 2
  box1_xmax = boxes_pred[..., 0:1] + boxes_pred[..., 2:3] / 2
  box1_ymax = boxes_pred[..., 1:2] + boxes_pred[..., 3:4] / 2

  # Box 2 = [xmin, ymin, xmax, ymax]
  box2_xmin = boxes_true[..., 0:1] - boxes_true[..., 2:3] / 2
  box2_ymin = boxes_true[..., 1:2] - boxes_true[..., 3:4] / 2
  box2_xmax = boxes_true[..., 0:1] + boxes_true[..., 2:3] / 2
  box2_ymax = boxes_true[..., 1:2] + boxes_true[..., 3:4] / 2

  # [xmin, ymin, xmax, ymax] of the common area
  common_xmin = tf.reduce_max([box1_xmin, box2_xmin], axis=-1, keepdims=True)
  common_ymin = tf.reduce_max([box1_ymin, box2_ymin], axis=-1, keepdims=True)
  common_xmax = tf.reduce_min([box1_xmax, box2_xmax], axis=-1, keepdims=True)
  common_ymax = tf.reduce_min([box1_ymax, box2_ymax], axis=-1, keepdims=True)
  # Calculate the area of the common part
  common_area = tf.clip_by_value((common_xmax-common_xmin)*(common_ymax-common_ymin),
                                 clip_value_min=0, clip_value_max=tf.float32.max)

  # Find the union
  union_area = ((boxes_pred[..., 2:3] * boxes_true[..., 3:4]) - common_area) + 1e-6

  return common_area / union_area

class YoloLoss(tf.keras.losses.Loss):
  def __init__(self, S, B, C, lambda_coord, lambda_noobj):
    super(YoloLoss, self).__init__()
    self.mse = tf.keras.losses.MeanSquaredError()

    self.S = S
    self.B = B
    self.C = C
    self.lambda_coord = lambda_coord
    self.lambda_noobj = lambda_noobj

  def call(self, y_true, y_pred, sample_weights=None):
    y_true = tf.reshape(y_true, shape=[-1, self.S, self.S, self.C+5*self.B]) # (Batch, S, S, C+5*B)
    y_pred = tf.reshape(y_pred, shape=[-1, self.S, self.S, self.C+5*self.B]) # (Batch, S, S, C+5*B)

    ## Find IOU for all bounding boxes
    # ious = []
    # for i in range(self.B):
      # iou = intersection_over_union(y_pred[..., self.C+(i*5+1):self.C+(i*5+5)], y_true[..., self.C+1:self.C+5])
      # ious.append(iou)

    # Without loop, shape=>(Batch, S, S, 1)
    iou_box1 = intersection_over_union(y_pred[..., self.C+1:self.C+5], y_true[..., self.C+1:self.C+5])
    iou_box2 = intersection_over_union(y_pred[..., self.C+6:self.C+10], y_true[..., self.C+1:self.C+5])

    ious = tf.concat([iou_box1, iou_box2], axis=0) # (2, Batch, S, S, 1)

    best_boxes = tf.cast(tf.argmax(ious, axis=0), dtype=tf.float32) # (Batch, S, S, 1)

    exists_box = y_true[..., self.C:self.C+1] # I_obj, shape=>(Batch, S, S, 1)

    # Box loss, shape=>(Batch, S, S, 4)
    box_predictions = best_boxes * y_pred[...,
                                          self.C+1:self.C+5] + (1 - best_boxes) * y_pred[...,
                                                                                          self.C+1:self.C+5]
    box_targets = y_true[..., self.C+1:self.C+5]

    # ## Find the sqrt of the width and height (4 lists of shape (Batch, S, S))
    box_predictions = tf.unstack(box_predictions, axis=-1)
    box_targets = tf.unstack(box_targets, axis=-1)

    box_predictions[2] = tf.sign(box_predictions[2]) * tf.sqrt(tf.abs(box_predictions[2])+1e-6)
    box_predictions[3] = tf.sign(box_predictions[3]) * tf.sqrt(tf.abs(box_predictions[3])+1e-6)
    box_targets[2] = tf.sqrt(box_targets[2])
    box_targets[3] = tf.sqrt(box_targets[3])

    box_predictions = tf.stack(box_predictions, axis=-1)
    box_targets = tf.stack(box_targets, axis=-1)

    box_loss = self.mse(
        tf.reshape(box_predictions, shape=[-1]),
        tf.reshape(box_targets, shape=[-1])
    )

    # Object loss
    pred_box = (best_boxes) * y_pred[..., self.C:self.C+1] + (1 - best_boxes) * y_pred[..., self.C+5:self.C+6]

    object_loss = self.mse(tf.reshape(exists_box * pred_box, shape=[-1]),
                           tf.reshape(exists_box * y_true[..., self.C:self.C+1], shape=[-1]))

    # No object loss
    noobj_loss = self.mse(
        tf.reshape((1 - exists_box) * y_pred[..., self.C:self.C+1], shape=[-1]),
        tf.reshape((1 - exists_box) * y_true[..., self.C:self.C+1], shape=[-1])
    )

    noobj_loss += self.mse(
        tf.reshape((1 - exists_box) * y_pred[..., self.C+5:self.C+6], shape=[-1]),
        tf.reshape((1 - exists_box) * y_true[..., self.C:self.C+1], shape=[-1])
    )

    # Class loss
    class_loss = self.mse(
        tf.reshape(exists_box * y_pred[..., :self.C], shape=[-1]),
        tf.reshape(exists_box * y_true[..., :self.C], shape=[-1])
    )

    # Total loss
    loss = (
        self.lambda_coord * box_loss +
        self.lambda_coord * object_loss +
        object_loss +
        self.lambda_noobj * noobj_loss +
        class_loss
    )

    return loss

In [None]:
class MeanAveragePrecision(tf.keras.metrics.Metric):
  def __init__(self, name='mean_average_precision', **kwargs):
    super(MeanAveragePrecision, self).__init__(name=name, **kwargs)
    self.mean_avg_precision = self.add_weight(name='map', initializer='zeros')

  def update_state(self, y_pred, y_true, sample_weight=None):
    y_true = tf.reshape(y_true, shape=[-1, self.S, self.S, self.C+5*self.B]) # (Batch, S, S, C+5*B)
    y_pred = tf.reshape(y_pred, shape=[-1, self.S, self.S, self.C+5*self.B]) # (Batch, S, S, C+5*B)



  def result(self):
    return self.mean_avg_precision

In [None]:
# Create the model
yolo = YoloV1(**model_params)

# Compile the model
yolo.compile(optimizer='rmsprop',
             loss=YoloLoss(**loss_params),
             metrics=[MeanAveragePrecision()])

# Fit on the dataset
yolo.fit(train_ds,
         epochs=EPOCHS,
         steps_per_epoch=TRAIN_STEPS,
         validation_data=val_ds,
         validation_steps=VAL_STEPS)