<a href="https://colab.research.google.com/github/Arup3201/Traffic-Monitoring-System/blob/main/notebooks/Object_Detection_YOLO.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Object Detection using YOLO

In [66]:
import numpy as np
import os
import cv2 as cv
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Conv2D, MaxPooling2D, Dropout, Flatten
import keras.backend as K
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import ModelCheckpoint
import joblib

In [3]:
S = 7
B = 2
C = 5
BATCH_SIZE = 4
IMG_SIZE = (448, 448)

In [8]:
def read_json(filename):
    json = joblib.load(filename)
    return json

In [10]:
train_annotations = read_json('../.data/json_annotations/train_annotations.json')
train_annotations[0]

{'image_path': 'bus/train/Elite-7-Large_jpg.rf.740cccbaac6544d3b0dd29e960cfc9ab.jpg',
 'bbox': [[45, 135, 606, 588]],
 'class': 1}

In [63]:
val_annotations = read_json('../.data/json_annotations/val_annotations.json')
val_annotations[0]

{'image_path': 'bus/valid/Bus-7_jpg.rf.4322d53440e3c377c95a8b0761776609.jpg',
 'bbox': [[103, 54, 547, 589]],
 'class': 1}

In [26]:
def read(image_path, bboxes, label):
    image = cv.imread(image_path)
    image = cv.cvtColor(image, cv.COLOR_BGR2RGB)
    image_h, image_w = image.shape[0:2]
    image = cv.resize(image, (448, 448))
    image = image / 255.

    label_matrix = np.zeros([S, S, B*5+C])
    for bbox in bboxes:
        x, y, w, h = bbox
        
        x = x / image_w
        y = y / image_h
        w = w / image_w
        h = h / image_h
        
        loc = [S * x, S * y]
        loc_i = int(loc[1])
        loc_j = int(loc[0])
        y = loc[1] - loc_i
        x = loc[0] - loc_j

        if label_matrix[loc_i, loc_j, 4] == 0:
            label_matrix[loc_i, loc_j, B*5+label] = 1
            label_matrix[loc_i, loc_j, :4] = [x, y, w, h]
            label_matrix[loc_i, loc_j, 4] = 1  # response

    return image, label_matrix

In [71]:
class YoloDataset(tf.keras.utils.Sequence) :

    def __init__(self, annotations, batch_size, data_dir) :
        self.annotations = annotations
        self.batch_size = batch_size
        self.data_dir = data_dir

    def __len__(self) :
        return (np.ceil(len(self.annotations) / float(self.batch_size))).astype(np.int32)
    
    
    def __getitem__(self, idx) :
        batch = self.annotations[idx * self.batch_size : (idx+1) * self.batch_size]
    
        train_image = []
        train_label = []
        
        for i in range(0, len(batch)):
            img_path = batch[i]['image_path']
            bboxes = batch[i]['bbox']
            label = batch[i]['class']
            image, label_matrix = read(os.path.join(self.data_dir, img_path), bboxes, label)
            train_image.append(image)
            train_label.append(label_matrix)
        
        return np.array(train_image), np.array(train_label)

In [72]:
train_gen = YoloDataset(train_annotations, BATCH_SIZE, "../.data")
sample_image, sample_label = train_gen[0]

In [73]:
val_gen = YoloDataset(val_annotations, BATCH_SIZE, '../.data')

In [74]:
print(sample_image.shape, sample_label.shape)

(4, 448, 448, 3) (4, 7, 7, 15)


In [48]:
class Yolo_Reshape(tf.keras.layers.Layer):
  def __init__(self, target_shape):
    super(Yolo_Reshape, self).__init__()
    self.target_shape = tuple(target_shape)

  def get_config(self):
    config = super().get_config().copy()
    config.update({
        'target_shape': self.target_shape
    })
    return config

  def call(self, input):
    idx1 = S * S * B
    idx2 = idx1 + S * S * C
    
    # class probabilities
    confs = K.reshape(input[:, :idx1], (K.shape(input)[0],) + tuple([S, S, B]))
    confs = K.sigmoid(confs)

    #confidence
    class_probs = K.reshape(input[:, idx1:idx2], (K.shape(input)[0],) + tuple([S, S, C]))
    class_probs = K.softmax(class_probs)

    # boxes
    boxes = K.reshape(input[:, idx2:], (K.shape(input)[0],) + tuple([S, S, B * 4]))
    boxes = K.sigmoid(boxes)

    outputs = K.concatenate([boxes, confs, class_probs])
    return outputs

In [49]:
def yolov1(img_h, img_w, s, b, c):
    lrelu = tf.keras.layers.LeakyReLU(alpha=0.1)
    
    model = Sequential()
    model.add(Conv2D(filters=64, kernel_size= (7, 7), strides=(1, 1), input_shape =(img_h, img_w, 3), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding = 'same'))
    
    model.add(Conv2D(filters=192, kernel_size= (3, 3), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding = 'same'))
    
    model.add(Conv2D(filters=128, kernel_size= (1, 1), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=256, kernel_size= (3, 3), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=256, kernel_size= (1, 1), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=512, kernel_size= (3, 3), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding = 'same'))
    
    model.add(Conv2D(filters=256, kernel_size= (1, 1), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=512, kernel_size= (3, 3), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=256, kernel_size= (1, 1), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=512, kernel_size= (3, 3), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=256, kernel_size= (1, 1), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=512, kernel_size= (3, 3), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=256, kernel_size= (1, 1), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=512, kernel_size= (3, 3), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=512, kernel_size= (1, 1), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=1024, kernel_size= (3, 3), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding = 'same'))
    
    model.add(Conv2D(filters=512, kernel_size= (1, 1), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=1024, kernel_size= (3, 3), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=512, kernel_size= (1, 1), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=1024, kernel_size= (3, 3), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=1024, kernel_size= (3, 3), padding = 'same', activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=1024, kernel_size= (3, 3), strides=(2, 2), padding = 'same'))
    
    model.add(Conv2D(filters=1024, kernel_size= (3, 3), activation=lrelu, kernel_regularizer=l2(5e-4)))
    model.add(Conv2D(filters=1024, kernel_size= (3, 3), activation=lrelu, kernel_regularizer=l2(5e-4)))
    
    model.add(Flatten())
    model.add(Dense(512))
    model.add(Dense(1024))
    model.add(Dropout(0.5))
    model.add(Dense(s*s*(b*5+c), activation='sigmoid'))
    model.add(Yolo_Reshape(target_shape=(s, s, b*5+c)))

    return model

In [50]:
model = yolov1(IMG_SIZE[0], IMG_SIZE[1], S, B, C)
model.summary()

Model: "sequential_6"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_144 (Conv2D)         (None, 448, 448, 64)      9472      
                                                                 
 max_pooling2d_24 (MaxPooli  (None, 224, 224, 64)      0         
 ng2D)                                                           
                                                                 
 conv2d_145 (Conv2D)         (None, 224, 224, 192)     110784    
                                                                 
 max_pooling2d_25 (MaxPooli  (None, 112, 112, 192)     0         
 ng2D)                                                           
                                                                 
 conv2d_146 (Conv2D)         (None, 112, 112, 128)     24704     
                                                                 
 conv2d_147 (Conv2D)         (None, 112, 112, 256)    

In [51]:
class CustomLearningRateScheduler(tf.keras.callbacks.Callback):
    def __init__(self, schedule):
        super(CustomLearningRateScheduler, self).__init__()
        self.schedule = schedule

    def on_epoch_begin(self, epoch, logs=None):
        if not hasattr(self.model.optimizer, "lr"):
            raise ValueError('Optimizer must have a "lr" attribute.')
        # Get the current learning rate from model's optimizer.
        lr = float(tf.keras.backend.get_value(self.model.optimizer.learning_rate))
        # Call schedule function to get the scheduled learning rate.
        scheduled_lr = self.schedule(epoch, lr)
        # Set the value back to the optimizer before this epoch starts
        tf.keras.backend.set_value(self.model.optimizer.lr, scheduled_lr)
        print("\nEpoch %05d: Learning rate is %6.4f." % (epoch, scheduled_lr))


LR_SCHEDULE = [
    # (epoch to start, learning rate) tuples
    (0, 0.01),
    (75, 0.001),
    (105, 0.0001),
]


def lr_schedule(epoch, lr):
    """Helper function to retrieve the scheduled learning rate based on epoch."""
    if epoch < LR_SCHEDULE[0][0] or epoch > LR_SCHEDULE[-1][0]:
        return lr
    for i in range(len(LR_SCHEDULE)):
        if epoch == LR_SCHEDULE[i][0]:
            return LR_SCHEDULE[i][1]
    return lr

In [62]:
def xywh2minmax(xy, wh):
    xy_min = xy - wh / 2
    xy_max = xy + wh / 2

    return xy_min, xy_max


def iou(pred_mins, pred_maxes, true_mins, true_maxes):
    intersect_mins = K.maximum(pred_mins, true_mins)
    intersect_maxes = K.minimum(pred_maxes, true_maxes)
    intersect_wh = K.maximum(intersect_maxes - intersect_mins, 0.)
    intersect_areas = intersect_wh[..., 0] * intersect_wh[..., 1]

    pred_wh = pred_maxes - pred_mins
    true_wh = true_maxes - true_mins
    pred_areas = pred_wh[..., 0] * pred_wh[..., 1]
    true_areas = true_wh[..., 0] * true_wh[..., 1]

    union_areas = pred_areas + true_areas - intersect_areas
    iou_scores = intersect_areas / union_areas

    return iou_scores


def yolo_head(feats):
    # Dynamic implementation of conv dims for fully convolutional model.
    conv_dims = K.shape(feats)[1:3]  # assuming channels last
    # In YOLO the height index is the inner most iteration.
    conv_height_index = K.arange(0, stop=conv_dims[0])
    conv_width_index = K.arange(0, stop=conv_dims[1])
    conv_height_index = K.tile(conv_height_index, [conv_dims[1]])

    conv_width_index = K.tile(K.expand_dims(conv_width_index, 0), [conv_dims[0], 1])
    conv_width_index = K.flatten(K.transpose(conv_width_index))
    conv_index = K.transpose(K.stack([conv_height_index, conv_width_index]))
    conv_index = K.reshape(conv_index, [1, conv_dims[0], conv_dims[1], 1, 2])
    conv_index = K.cast(conv_index, K.dtype(feats))

    conv_dims = K.cast(K.reshape(conv_dims, [1, 1, 1, 1, 2]), K.dtype(feats))

    box_xy = (feats[..., :2] + conv_index) / conv_dims * IMG_SIZE[0]
    box_wh = feats[..., 2:4] * 448

    return box_xy, box_wh


def yolo_loss(y_true, y_pred):
    label_box = y_true[..., :4]  # ? * 7 * 7 * 4
    response_mask = y_true[..., 4]  # ? * 7 * 7
    response_mask = K.expand_dims(response_mask)  # ? * 7 * 7 * 1
    label_class = y_true[..., B*5:B*5+C]  # ? * 7 * 7 * 20

    predict_box = y_pred[..., :B*4]  # ? * 7 * 7 * 8
    predict_trust = y_pred[..., B*4:B*4+2]  # ? * 7 * 7 * 2
    predict_class = y_pred[..., B*4+2:B*4+2+C]  # ? * 7 * 7 * 20

    _label_box = K.reshape(label_box, [-1, 7, 7, 1, 4])
    _predict_box = K.reshape(predict_box, [-1, 7, 7, 2, 4])

    label_xy, label_wh = yolo_head(_label_box)  # ? * 7 * 7 * 1 * 2, ? * 7 * 7 * 1 * 2
    label_xy = K.expand_dims(label_xy, 3)  # ? * 7 * 7 * 1 * 1 * 2
    label_wh = K.expand_dims(label_wh, 3)  # ? * 7 * 7 * 1 * 1 * 2
    label_xy_min, label_xy_max = xywh2minmax(label_xy, label_wh)  # ? * 7 * 7 * 1 * 1 * 2, ? * 7 * 7 * 1 * 1 * 2

    predict_xy, predict_wh = yolo_head(_predict_box)  # ? * 7 * 7 * 2 * 2, ? * 7 * 7 * 2 * 2
    predict_xy = K.expand_dims(predict_xy, 4)  # ? * 7 * 7 * 2 * 1 * 2
    predict_wh = K.expand_dims(predict_wh, 4)  # ? * 7 * 7 * 2 * 1 * 2
    predict_xy_min, predict_xy_max = xywh2minmax(predict_xy, predict_wh)  # ? * 7 * 7 * 2 * 1 * 2, ? * 7 * 7 * 2 * 1 * 2

    iou_scores = iou(predict_xy_min, predict_xy_max, label_xy_min, label_xy_max)  # ? * 7 * 7 * 2 * 1
    best_ious = K.max(iou_scores, axis=4)  # ? * 7 * 7 * 2
    best_box = K.max(best_ious, axis=3, keepdims=True)  # ? * 7 * 7 * 1

    box_mask = K.cast(best_ious >= best_box, K.dtype(best_ious))  # ? * 7 * 7 * 2

    no_object_loss = 0.5 * (1 - box_mask * response_mask) * K.square(0 - predict_trust)
    object_loss = box_mask * response_mask * K.square(1 - predict_trust)
    confidence_loss = no_object_loss + object_loss
    confidence_loss = K.sum(confidence_loss)

    class_loss = response_mask * K.square(label_class - predict_class)
    class_loss = K.sum(class_loss)

    _label_box = K.reshape(label_box, [-1, 7, 7, 1, 4])
    _predict_box = K.reshape(predict_box, [-1, 7, 7, 2, 4])

    label_xy, label_wh = yolo_head(_label_box)  # ? * 7 * 7 * 1 * 2, ? * 7 * 7 * 1 * 2
    predict_xy, predict_wh = yolo_head(_predict_box)  # ? * 7 * 7 * 2 * 2, ? * 7 * 7 * 2 * 2

    box_mask = K.expand_dims(box_mask)
    response_mask = K.expand_dims(response_mask)

    box_loss = 5 * box_mask * response_mask * K.square((label_xy - predict_xy) / 448)
    box_loss += 5 * box_mask * response_mask * K.square((K.sqrt(label_wh) - K.sqrt(predict_wh)) / 448)
    box_loss = K.sum(box_loss)

    loss = confidence_loss + class_loss + box_loss

    return loss

In [67]:
mcp_save = ModelCheckpoint('../model/yolov1.hdf5', save_best_only=True, monitor='val_loss', mode='min')

In [69]:
model.compile(loss=yolo_loss, optimizer='adam')




In [None]:
model.fit(
    x=train_gen, 
    steps_per_epoch=len(train_annotations)//BATCH_SIZE,
    epochs=1, 
    validation_data=val_gen, 
    validation_steps=len(val_annotations)//BATCH_SIZE, 
    callbacks=[
        CustomLearningRateScheduler(lr_schedule), 
        mcp_save
    ]
)