In [None]:
import tensorflow as tf
import tensorflow.keras.layers as layers

# Model Architecture

In [None]:
architecture_config = [
    (7, 64, 2, 3),
    "M",
    (3, 192, 1, 1),
    "M",
    (1, 128, 1, 0),
    (3, 256, 1, 1),
    (1, 256, 1, 0),
    (3, 512, 1, 1),
    "M",
    [(1, 256, 1, 0), (3, 512, 1, 1), 4],
    (1, 512, 1, 0),
    (3, 1024, 1, 1),
    "M",
    [(1, 512, 1, 0), (3, 1024, 1, 1), 2],
    (3, 1024, 1, 1),
    (3, 1024, 2, 1),
    (3, 1024, 1, 1),
    (3, 1024, 1, 1),
]

In [None]:
class CNNBlock(layers.Layer):
  def __init__(self, in_channels, out_channels, **kwargs):
    super().__init__()

    self.conv = layers.Conv2D(out_channels, use_bias = False, **kwargs)
    self.bn = layers.BatchNormalization()
    self.leakyrelu = layers.LeakyReLU(0.1)

  def call(self, x):
    x = self.conv(x)
    x = self.bn(x)
    x = self.leakyrelu(x)

    return x

In [None]:
class YoloV1(tf.keras.Model):
  def __init__(self, in_channels = 3, **kwargs):
    super().__init__()

    self.architecture = architecture_config
    self.in_channels = in_channels

    self.conv = self.create_conv(self.architecture)
    self.fc = self.create_fc(**kwargs)# flattem before

  def create_conv(self, architecture):
    conv_layers = tf.keras.Sequential()
    in_channels = self.in_channels

    for x in architecture:
      
      if type(x) == tuple:
        conv_layers.add(CNNBlock(in_channels, out_channels = x[1], kernel_size = x[0], strides = x[2], padding = 'same')) 
        in_channels = x[1]
      
      elif type(x) == str:
        conv_layers.add(layers.MaxPooling2D(pool_size = 2, strides = 2))
      
      elif type(x) == list:
        conv1 = x[0]
        conv2 = x[1]
        num_repeats = x[2]

        for _ in range(num_repeats):
          conv_layers.add(CNNBlock(in_channels, out_channels = conv1[1], kernel_size = conv1[0], strides = conv1[2], padding = 'same'))
          conv_layers.add(CNNBlock(in_channels, out_channels = conv2[1], kernel_size = conv2[0], strides = conv2[2], padding = 'same')) 
          in_channels = conv2[1]
      
      
    return conv_layers

  def create_fc(self, split_size, num_boxes, num_classes):
    
    S, B, C = split_size, num_boxes, num_classes
    output_dense_units = S * S * (C + (B * 5))
   
    fc_layers = tf.keras.Sequential()

    fc_layers.add(layers.Flatten())
    fc_layers.add(layers.Dense(4096))
    fc_layers.add(layers.Dropout(0.5))
    fc_layers.add(layers.LeakyReLU(0.1))
    fc_layers.add(layers.Dense(output_dense_units)) 
    fc_layers.add(layers.Reshape((S, S, C + (B * 5)))) # Reshaped to (batch_size, S, S, (C + (B * 5))
    
    return fc_layers

  def call(self, x):

    x = self.conv(x)
    x = self.fc(x)    

    return x

In [None]:
yolo = YoloV1(in_channels = 3, split_size = 7, num_boxes = 2, num_classes = 20)

In [None]:
x = tf.random.uniform((2, 448, 448, 3))

In [None]:
out = yolo(x)
out.shape

TensorShape([2, 7, 7, 30])

# Loss Function

Remember the Bounding Box Arguments the model outputs is [x, y, w, h], where x and y are relative to each split_cell.

Most datasets have BBOX cordinates, relative to the entire image. We will change that when we generate the data.

In [None]:
import tensorflow.keras.losses as losses
import numpy as np

In [None]:
class YoloLoss(losses.Loss):
  def __init__(self, S = 7, B = 2, C = 20):
    super().__init__()
    
    self.mse = losses.MeanSquaredError()
    self.S = S
    self.B = B
    self.C = C
    self.lambda_noobj = 0.5
    self.lambda_coord = 5

  #################### IOU LOSS #########################
  def compute_iou(self, boxes_preds, boxes_labels, box_format="midpoint"):
    """
    Calculates intersection over union
    Parameters:
        boxes_preds (tensor): Predictions of Bounding Boxes (BATCH_SIZE, 4)
        boxes_labels (tensor): Correct labels of Bounding Boxes (BATCH_SIZE, 4)
        box_format (str): midpoint/corners, if boxes (x,y,w,h) or (x1,y1,x2,y2)
    Returns:
        tensor: Intersection over union for all examples
    """

    if box_format == "midpoint":
        box1_x1 = boxes_preds[..., 0:1] - boxes_preds[..., 2:3] / 2
        box1_y1 = boxes_preds[..., 1:2] - boxes_preds[..., 3:4] / 2
        box1_x2 = boxes_preds[..., 0:1] + boxes_preds[..., 2:3] / 2
        box1_y2 = boxes_preds[..., 1:2] + boxes_preds[..., 3:4] / 2
        box2_x1 = boxes_labels[..., 0:1] - boxes_labels[..., 2:3] / 2
        box2_y1 = boxes_labels[..., 1:2] - boxes_labels[..., 3:4] / 2
        box2_x2 = boxes_labels[..., 0:1] + boxes_labels[..., 2:3] / 2
        box2_y2 = boxes_labels[..., 1:2] + boxes_labels[..., 3:4] / 2

    if box_format == "corners":
        box1_x1 = boxes_preds[..., 0:1]
        box1_y1 = boxes_preds[..., 1:2]
        box1_x2 = boxes_preds[..., 2:3]
        box1_y2 = boxes_preds[..., 3:4]  # (N, 1)
        box2_x1 = boxes_labels[..., 0:1]
        box2_y1 = boxes_labels[..., 1:2]
        box2_x2 = boxes_labels[..., 2:3]
        box2_y2 = boxes_labels[..., 3:4]

    x1 = tf.math.maximum(box1_x1, box2_x1)
    y1 = tf.math.maximum(box1_y1, box2_y1)
    x2 = tf.math.maximum(box1_x2, box2_x2)
    y2 = tf.math.maximum(box1_y2, box2_y2)

    
    intersection = tf.clip_by_value((x2 - x1), 0, 1) * tf.clip_by_value((x2 - x1), 0, 1)

    box1_area = abs((box1_x2 - box1_x1) * (box1_y2 - box1_y1))
    box2_area = abs((box2_x2 - box2_x1) * (box2_y2 - box2_y1))

    return intersection / (box1_area + box2_area - intersection + 1e-6)


  ################ LOSS FOR BOX CORDINATES ##################
  def compute_coordinate_loss(self, predictions, target, bestbox, exists_box):
    
    box_predictions = exists_box * (                #Multiplying by exists_box is like identity function, only if box_exists, we need to compute all these
        
        (bestbox * predictions[..., 26:30])          #This assumes we use only two boxes, might wana change this
                
      + ((1-bestbox)* predictions[..., 21:25])
    )

    box_targets = exists_box * target[..., 21:25]


    x_y_box_predictions = box_predictions[...,:2]
    x_y_box_targets = box_targets[...,:2]

    epsilon = tf.fill(box_predictions[..., 2:4].shape, 1e-6)
    w_h_box_predictions = tf.math.sign(box_predictions[..., 2:4]) * tf.math.sqrt(tf.math.abs(box_predictions[..., 2:4] + epsilon)) #small epsilon ensures the gradient of sqrt (1/sqrt) does not go to infinity incase sqrt = 0 
    w_h_box_targets = tf.sqrt(box_targets[..., 2:4])

    # Reshapes (N, S, S, 2) ----> (N*S*S, 2) and then MSE
    x_y_coordinate_loss = self.mse(
        
        tf.reshape(x_y_box_predictions, (-1, x_y_box_predictions.shape[-1])),
        tf.reshape(x_y_box_targets, (-1, x_y_box_targets.shape[-1]))
    )

    # Reshapes (N, S, S, 2) ----> (N*S*S, 2) and then MSE
    w_h_coordinate_loss = self.mse(
        
        tf.reshape(w_h_box_predictions, (-1, w_h_box_predictions.shape[-1])),
        tf.reshape(w_h_box_targets, (-1, w_h_box_targets.shape[-1]))
    )

    coordinate_loss = x_y_coordinate_loss + w_h_coordinate_loss

    return coordinate_loss

  
  ################### LOSS FOR OBJECT #####################
  def compute_object_loss(self, predictions, target, bestbox, exists_box):
    
    pred_box = (
        bestbox * predictions[..., 25:26] +
        
        (1-bestbox) * predictions[..., 20:21]
    )

    target_pred_box = target[..., 20:21]

    #(N, S, S, 1) ---> (N*S*S) for MSE Loss
    object_loss = self.mse(
        
        tf.reshape(exists_box*pred_box, (-1, )),
        tf.reshape(exists_box*target_pred_box, (-1, ))

    )

    return object_loss

  
  ################## LOSS FOR NO OBJECT #####################
  def compute_no_object_loss(self, predictions, target, exists_box):
    
    #(N, S, S, 1) ---> (N*S*S) for MSE Loss 
    no_object_loss = self.mse(
        
        tf.reshape((1- exists_box)*predictions[..., 20:21], (-1, )),
        tf.reshape((1 -exists_box)*target[..., 20:21], (-1, ))
        
    )

    no_object_loss += self.mse(
        
        tf.reshape((1- exists_box)*predictions[..., 25:26], (-1, )),
        tf.reshape((1 -exists_box)*target[..., 20:21], (-1, ))
        
    )

    return no_object_loss

  
  ################# LOSS FOR CLASSES ##################
  def compute_class_loss(self, predictions, target, exists_box):
   
    #(N, S, S, 20) ---> (N*S*S, 20) for MSE Loss
    class_loss = self.mse(
        
        tf.reshape(exists_box*predictions[..., :20], (-1, predictions[..., :20].shape[-1])),
        tf.reshape(exists_box*target[..., :20], (-1, predictions[..., :20].shape[-1]))
    )

    return class_loss


  def call(self, target, predictions):


    iou_b1 = self.compute_iou(predictions[...,21:25], target[...,21:25])
    iou_b2 = self.compute_iou(predictions[...,26:30], target[...,21:25])
    ious = tf.concat([tf.expand_dims(iou_b1, 0), tf.expand_dims(iou_b2, 0)], axis = 0)
    
    bestbox = tf.math.argmax(ious, axis = 0)
    bestbox = tf.cast(bestbox, tf.float32)
    exists_box = tf.expand_dims(target[..., 20], 3)

    coordinate_loss = self.compute_coordinate_loss(predictions, target, bestbox, exists_box)
    
    
    object_loss = self.compute_object_loss(predictions, target, bestbox, exists_box)

    
    no_object_loss = self.compute_no_object_loss(predictions, target, exists_box)

    
    class_loss = self.compute_class_loss(predictions, target, exists_box)
    
    ############ TOTAL LOSS ##############

    loss = ( self.lambda_coord * coordinate_loss
            + object_loss
            + self.lambda_noobj * no_object_loss
            + class_loss
           )
    

    return loss

In [None]:
yolo_loss = YoloLoss()

x = tf.random.uniform((1, 7, 7, 30))
y = tf.random.uniform((1, 7, 7, 30))

yolo_loss(x, y)

<tf.Tensor: shape=(), dtype=float32, numpy=0.58694303>

# Data Generation

CSV File Format - ImageName.jpg LabelTextName.txt

Image Format : Standard RGB with all same size

Label Format(each line in the txt represents a unique bbox) : 
index_of_target_class x_center y_center height 
width

Note: These coordinates will be relative to the entire image(in most cases), and will be converted to cell wise relative coordinates in the DataGenerator.

In [None]:
import os
import pandas as pd
from PIL import Image

In [None]:
class CustomDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, csv_file, img_dir, label_dir, img_res, batch_size = 32, splitcell_size = 7, num_bbox = 2, num_classes = 20):
        
        self.annotations = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.label_dir = label_dir
        self.img_res = img_res
        self.batch_size = batch_size
        self.S = splitcell_size
        self.B = num_bbox
        self.C = num_classes
    

    def __len__(self):
        
        return int(np.floor(len(self.annotations) / self.batch_size))

    
    def __getitem__(self, index):
        'Generate one batch of data'
        
        # Generate indexes of the batch
        indexes = range(index*self.batch_size, (index+1)*self.batch_size)

        # Generate data
        X, y = self.__data_generation(indexes)

        return X, y

    def on_epoch_end(self):
        pass
    
    def __data_generation(self, indexes):
        
        image_data = np.empty((self.batch_size, self.img_res, self.image_res, 3), dtype = float)
        label_matrix = np.empty((self.batch_size, self.S, self.S, self.C + 5 * self.B), dtype = float)
        
        for index in indexes:
          label_path = os.path.join(self.label_dir, self.annotations.iloc[index, 1])
          boxes = []
          with open(label_path) as f:
              for label in f.readlines():
                  class_label, x, y, width, height = [
                    float(x) if float(x) != int(float(x)) else int(x)
                    for x in label.replace("\n", "").split()
                    ]

                  boxes.append([class_label, x, y, width, height])

          img_path = os.path.join(self.img_dir, self.annotations.iloc[index, 0])
          image = np.asarray(Image.open(img_path))
          image_data[index, :, :, :] = image

        # Convert To Cells
          label_matrix[index, :, :, :] = np.zeros((self.S, self.S, self.C + 5 * self.B), dtype = float)
        
          for box in boxes:

            class_label, x, y, width, height = box
            class_label = int(class_label)

            # i,j represents the cell row and cell column
            i, j = int(self.S * y), int(self.S * x)
            x_cell, y_cell = self.S * x - j, self.S * y - i

            """
            Calculating the width and height of cell of bounding box,
            relative to the cell is done by the following, with
            width as the example:
            
            width_pixels = (width*self.image_width)
            cell_pixels = (self.image_width)
            
            Then to find the width relative to the cell is simply:
            width_pixels/cell_pixels, simplification leads to the
            formulas below.
            """
            width_cell, height_cell = (width * self.S, height * self.S)

                  
            # If no object already found for specific cell i,j
            # Note: This means we restrict to ONE object
            # per cell!
            if label_matrix[index, i, j, 20] == 0.0:

                # Set that there exists an object
              label_matrix[index, i, j, 20] = 1.0

                # Box coordinates
              box_coordinates = np.array([x_cell, y_cell, width_cell, height_cell], dtype = float)

              label_matrix[index, i, j, 21:25] = box_coordinates

                # Set one hot encoding for class_label
              label_matrix[index, i, j, class_label] = 1.0


        return image_data, label_matrix #(N, H, W, C) and (N, S, S, 30)

In [None]:
traingen = CustomDataGenerator(csv_file, 
                         img_dir, 
                         label_dir, 
                         img_res, 
                         batch_size = 32, 
                         splitcell_size = 7, 
                         num_bbox = 2, 
                         num_classes = 20)

valgen = CustomDataGenerator(csv_file, 
                         img_dir, 
                         label_dir, 
                         img_res, 
                         batch_size = 32, 
                         splitcell_size = 7, 
                         num_bbox = 2, 
                         num_classes = 20)

# Training the Model

In [None]:
yolo.compile(
    optimizer="adam", 
    loss=yolo_loss)

In [None]:
callbacks = [
    tf.keras.callbacks.ModelCheckpoint("MODEL_CKPT.h5", save_best_only=True)
]

In [None]:
EPOCHS = 100

In [None]:
history = yolo.fit(traingen, 
                    epochs=EPOCHS, 
                    validation_data=valgen, 
                    callbacks=callbacks)

# Evaluating the Model

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches

In [None]:
class YoloEval():
  def __init__(self, model, generator, classes):
    self.model = model()
    self.gen = generator()
    self.batch_size = self.gen.batch_size

    self.X, self.y = self.gen[batch]

    self.image = self.X[idx]
    self.image_as_single_batch = np.expand_dims(self.image, 0)

    self.predictions = self.model(self.image_as_single_batch)
    self.classes = classes

  
  def convert_cellboxes(self, predictions, S=7):
    """
    Converts bounding boxes output from Yolo with
    an image split size of S into entire image ratios
    rather than relative to cell ratios.
    """

    batch_size = predictions.shape[0]
    bboxes1 = predictions[..., 21:25]
    bboxes2 = predictions[..., 26:30]

    scores = tf.concat([tf.expand_dims(predictions[..., 20], 0), tf.expand_dims(predictions[..., 25], 0)], axis = 0)

    bestbox = tf.expand_dims(tf.math.argmax(scores, axis=0), -1)
    bestbox = tf.cast(bestbox, tf.float32)
    
    best_boxes = bboxes1 * (1 - bestbox) + bestbox * bboxes2
    
    cell_indices = tf.expand_dims(tf.tile(tf.constant([[[0,1,2,3,4,5,6]]], dtype = tf.float32), tf.constant([32, 7 , 1])), -1)
   
    x = (1 / S) * (best_boxes[..., :1] + cell_indices)
    y = (1 / S) * (best_boxes[..., 1:2] + layers.Permute((2, 1, 3))(cell_indices))
    w_y = (1 / S)* (best_boxes[..., 2:4])
    
    
    converted_bboxes = tf.concat([x, y, w_y], axis = -1)
    
    predicted_class = tf.expand_dims(tf.math.argmax(predictions[..., :20], axis = -1), -1)
    predicted_class = tf.cast(predicted_class, tf.float32)
    
    best_confidence = tf.expand_dims(tf.math.maximum(predictions[..., 20], predictions[..., 25]), -1)

    converted_preds = tf.concat([predicted_class, best_confidence, converted_bboxes], axis = -1)

    return converted_preds 

  def cellboxes_to_boxes(self, predictions, S=7):


    """
        Converts cell relative bounding boxes output from Yolo with
        an image split size of S into bounding boxes with entire image ratio for plotting. 
    """

    converted_pred = tf.reshape(self.convert_cellboxes(predictions), (predictions.shape[0], S * S, -1))
    all_bboxes = []

    for ex_idx in range(out.shape[0]):
      bboxes = []

      for bbox_idx in range(S * S):
        bboxes.append([x.numpy().tolist() for x in converted_pred[ex_idx, bbox_idx, :]])
      all_bboxes.append(bboxes)

    return all_bboxes  
  
  
    # All_bboxes is of shape [image_example_index, bbox_index, [class_predction, confidence, x_mid, y_mid, width, height]]

    # Implies each image_example containa multiple bboxes, with each bbox having their coordinates, class_prediction and confidence in the array     

  def plot_image(self, batch, idx):
    
    """Plots predicted bounding boxes on the image
      
      Image : Standard RGB

      Boxes = [box_index, [class_prediction, confidence, x_mid, y_mid, width, height]]

      Classes = ["class_1", "class_2".......num_classes]

    """
    if not batch < len(self.generator):
      raise IndexError("Given batch index is out of bounds of Generator.")
      
    if not idx < self.batch_size:
      raise IndexError("Given sample index is out of bounds of Batch Size.")

    X, y = self.gen[batch]

    im = self.X[idx]
    im_as_single_batch = np.expand_dims(im, 0)

    predictions = self.model(im_as_single_batch)
    boxes = self.cellboxes_to_boxes(predictions)

    height, width, _ = im.shape

    # Create figure and axes
    fig, ax = plt.subplots(1)
    # Display the image
    ax.imshow(im)

    # Create a Rectangle patch
    for box in boxes:
        box = box[2:]
        # box[0] is class, box[1] is confidence
        # box[2] is x midpoint, box[3] is y midpoint
        # box[4] is width, box[5] is height
        assert len(box) == 6, "Got more values than in class, prob, x, y, w, h, in a box!"
        
        pred_class = self.classes[int(box[0])]
        confidence = box[1]*100
        upper_left_x = box[2] - box[4] / 2
        upper_left_y = box[3] - box[5] / 2
        rect = patches.Rectangle(
            (upper_left_x * width, upper_left_y * height),
            box[4] * width,
            box[5] * height,
            linewidth=1,
            edgecolor="r",
            facecolor="none",
        )
        # Add the patch to the Axes
        ax.add_patch(rect)
        ax.text(upper_left_x, upper_left_y, f'Predicted Class : {pred_class}\n Confidence: {confidence} %', 
                transform = ax.transAxes, fontsize=14, color='green')

    plt.show()

In [None]:
classes = [f"class_{x}" for x in range(20)]

In [None]:
yolo_eval = YoloEval(model = yolo, generator = valgen, classes = classes)

In [None]:
yolo_eval.plot_image(batch_idx, sample_idx)