<a id="2"></a>
## 1. Dependencies

In [1]:
#import tensorflow as tf
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
import numpy as np
from matplotlib import cm
import PIL.Image
from PIL import Image, ImageDraw, ImageFont
from IPython.display import display
from seaborn import color_palette
import cv2
import os

Instructions for updating:
non-resource variables are not supported in the long term


In [2]:
! git clone https://github.com/Luca1920342/VISIOPE_PROJECT_caterino_murra_petroni.git
! cp -R VISIOPE_PROJECT_caterino_murra_petroni/* /content
! rm -rf VISIOPE_PROJECT_caterino_murra_petroni

Cloning into 'VISIOPE_PROJECT_caterino_murra_petroni'...
remote: Enumerating objects: 79, done.[K
remote: Counting objects: 100% (16/16), done.[K
remote: Compressing objects: 100% (16/16), done.[K
remote: Total 79 (delta 5), reused 0 (delta 0), pack-reused 63[K
Unpacking objects: 100% (79/79), 47.76 MiB | 5.83 MiB/s, done.


<a id="3"></a>
## 2. Model hyperparameters
Some configurations for Yolo.

In [3]:
_BATCH_NORM_DECAY = 0.9
_BATCH_NORM_EPSILON = 1e-05
_LEAKY_RELU = 0.1
_ANCHORS = [(10, 13), (16, 30), (33, 23),
            (30, 61), (62, 45), (59, 119),
            (116, 90), (156, 198), (373, 326)]
_MODEL_SIZE = (416, 416)
_CONF_THRESHOLD = 0.5
_BATCH_SIZE = 25

<a id="3"></a>
## 3. Model definition

### Batch norm and fixed padding
It's useful to define `batch_norm` function since the model uses batch norms with shared parameters heavily. Also, same as ResNet, Yolo uses convolution with fixed padding, which means that padding is defined only by the size of the kernel.

In [4]:
def batch_norm(inputs, training, data_format):
    """Performs a batch normalization using a standard set of parameters."""
    return tf.layers.batch_normalization(
        inputs = inputs, axis=1 if data_format == 'channels_first' else 3,
        momentum=_BATCH_NORM_DECAY, epsilon=_BATCH_NORM_EPSILON,
        scale=True, training=training)


def fixed_padding(inputs, kernel_size, data_format):
    """ResNet implementation of fixed padding.

    Pads the input along the spatial dimensions independently of input size.

    Args:
        inputs: Tensor input to be padded.
        kernel_size: The kernel to be used in the conv2d or max_pool2d.
        data_format: The input format.
    Returns:
        A tensor with the same format as the input.
    """
    pad_total = kernel_size - 1
    pad_beg = pad_total // 2
    pad_end = pad_total - pad_beg

    if data_format == 'channels_first':
        padded_inputs = tf.pad(inputs, [[0, 0], [0, 0],
                                        [pad_beg, pad_end],
                                        [pad_beg, pad_end]])
    else:
        padded_inputs = tf.pad(inputs, [[0, 0], [pad_beg, pad_end],
                                        [pad_beg, pad_end], [0, 0]])
    return padded_inputs


def conv2d_fixed_padding(inputs, filters, kernel_size, data_format, strides=1):
    """Strided 2-D convolution with explicit padding."""
    if strides > 1:
        inputs = fixed_padding(inputs, kernel_size, data_format)

    return tf.layers.conv2d(
        inputs = inputs, filters=filters, kernel_size=kernel_size,
        strides=strides, padding=('SAME' if strides == 1 else 'VALID'),
        use_bias=False, data_format=data_format)

### Feature extraction: Darknet-53
For feature extraction Yolo uses Darknet-53 neural net pretrained on ImageNet. Same as ResNet,  Darknet-53 has shortcut (residual) connections, which help information from earlier layers flow further. We omit the last 3 layers (Avgpool, Connected and Softmax) since we only need the features.

In [5]:
def darknet53_residual_block(inputs, filters, training, data_format,
                             strides=1):
    """Creates a residual block for Darknet."""
    shortcut = inputs

    inputs = conv2d_fixed_padding(
        inputs, filters=filters, kernel_size=1, strides=strides,
        data_format=data_format)
    inputs = batch_norm(inputs, training=training, data_format=data_format)
    inputs = tf.nn.leaky_relu(inputs, alpha=_LEAKY_RELU)

    inputs = conv2d_fixed_padding(
        inputs, filters=2 * filters, kernel_size=3, strides=strides,
        data_format=data_format)
    inputs = batch_norm(inputs, training=training, data_format=data_format)
    inputs = tf.nn.leaky_relu(inputs, alpha=_LEAKY_RELU)

    inputs += shortcut

    return inputs


def darknet53(inputs, training, data_format):
    """Creates Darknet53 model for feature extraction."""
    inputs = conv2d_fixed_padding(inputs, filters=32, kernel_size=3,
                                  data_format=data_format)
    inputs = batch_norm(inputs, training=training, data_format=data_format)
    inputs = tf.nn.leaky_relu(inputs, alpha=_LEAKY_RELU)
    inputs = conv2d_fixed_padding(inputs, filters=64, kernel_size=3,
                                  strides=2, data_format=data_format)
    inputs = batch_norm(inputs, training=training, data_format=data_format)
    inputs = tf.nn.leaky_relu(inputs, alpha=_LEAKY_RELU)

    inputs = darknet53_residual_block(inputs, filters=32, training=training,
                                      data_format=data_format)

    inputs = conv2d_fixed_padding(inputs, filters=128, kernel_size=3,
                                  strides=2, data_format=data_format)
    inputs = batch_norm(inputs, training=training, data_format=data_format)
    inputs = tf.nn.leaky_relu(inputs, alpha=_LEAKY_RELU)

    for _ in range(2):
        inputs = darknet53_residual_block(inputs, filters=64,
                                          training=training,
                                          data_format=data_format)

    inputs = conv2d_fixed_padding(inputs, filters=256, kernel_size=3,
                                  strides=2, data_format=data_format)
    inputs = batch_norm(inputs, training=training, data_format=data_format)
    inputs = tf.nn.leaky_relu(inputs, alpha=_LEAKY_RELU)

    for _ in range(8):
        inputs = darknet53_residual_block(inputs, filters=128,
                                          training=training,
                                          data_format=data_format)

    route1 = inputs

    inputs = conv2d_fixed_padding(inputs, filters=512, kernel_size=3,
                                  strides=2, data_format=data_format)
    inputs = batch_norm(inputs, training=training, data_format=data_format)
    inputs = tf.nn.leaky_relu(inputs, alpha=_LEAKY_RELU)

    for _ in range(8):
        inputs = darknet53_residual_block(inputs, filters=256,
                                          training=training,
                                          data_format=data_format)

    route2 = inputs

    inputs = conv2d_fixed_padding(inputs, filters=1024, kernel_size=3,
                                  strides=2, data_format=data_format)
    inputs = batch_norm(inputs, training=training, data_format=data_format)
    inputs = tf.nn.leaky_relu(inputs, alpha=_LEAKY_RELU)

    for _ in range(4):
        inputs = darknet53_residual_block(inputs, filters=512,
                                          training=training,
                                          data_format=data_format)

    return route1, route2, inputs

### Convolution layers
Yolo has a large number of convolutional layers. It's useful to group them in blocks.

In [6]:
def yolo_convolution_block(inputs, filters, training, data_format):
    """Creates convolution operations layer used after Darknet."""
    inputs = conv2d_fixed_padding(inputs, filters=filters, kernel_size=1,
                                  data_format=data_format)
    inputs = batch_norm(inputs, training=training, data_format=data_format)
    inputs = tf.nn.leaky_relu(inputs, alpha=_LEAKY_RELU)

    inputs = conv2d_fixed_padding(inputs, filters=2 * filters, kernel_size=3,
                                  data_format=data_format)
    inputs = batch_norm(inputs, training=training, data_format=data_format)
    inputs = tf.nn.leaky_relu(inputs, alpha=_LEAKY_RELU)

    inputs = conv2d_fixed_padding(inputs, filters=filters, kernel_size=1,
                                  data_format=data_format)
    inputs = batch_norm(inputs, training=training, data_format=data_format)
    inputs = tf.nn.leaky_relu(inputs, alpha=_LEAKY_RELU)

    inputs = conv2d_fixed_padding(inputs, filters=2 * filters, kernel_size=3,
                                  data_format=data_format)
    inputs = batch_norm(inputs, training=training, data_format=data_format)
    inputs = tf.nn.leaky_relu(inputs, alpha=_LEAKY_RELU)

    inputs = conv2d_fixed_padding(inputs, filters=filters, kernel_size=1,
                                  data_format=data_format)
    inputs = batch_norm(inputs, training=training, data_format=data_format)
    inputs = tf.nn.leaky_relu(inputs, alpha=_LEAKY_RELU)

    route = inputs

    inputs = conv2d_fixed_padding(inputs, filters=2 * filters, kernel_size=3,
                                  data_format=data_format)
    inputs = batch_norm(inputs, training=training, data_format=data_format)
    inputs = tf.nn.leaky_relu(inputs, alpha=_LEAKY_RELU)

    return route, inputs

### Detection layers
Yolo has 3 detection layers, that detect on 3 different scales using respective anchors. For each cell in the feature map the detection layer predicts `n_anchors * (5 + n_classes)` values using 1x1 convolution. For each scale we have `n_anchors = 3`. `5 + n_classes` means that respectively to each of 3 anchors we are going to predict 4 coordinates of the box, its confidence score (the probability of containing an object) and class probabilities.

In [7]:
def yolo_layer(inputs, n_classes, anchors, img_size, data_format):
    """Creates Yolo final detection layer.

    Detects boxes with respect to anchors.

    Args:
        inputs: Tensor input.
        n_classes: Number of labels.
        anchors: A list of anchor sizes.
        img_size: The input size of the model.
        data_format: The input format.

    Returns:
        Tensor output.
    """
    n_anchors = len(anchors)

    inputs = tf.layers.conv2d(inputs, filters=n_anchors * (5 + n_classes),
                              kernel_size=1, strides=1, use_bias=True,
                              data_format=data_format)

    shape = inputs.get_shape().as_list()
    grid_shape = shape[2:4] if data_format == 'channels_first' else shape[1:3]
    if data_format == 'channels_first':
        inputs = tf.transpose(inputs, [0, 2, 3, 1])
    inputs = tf.reshape(inputs, [-1, n_anchors * grid_shape[0] * grid_shape[1],
                                 5 + n_classes])

    strides = (img_size[0] // grid_shape[0], img_size[1] // grid_shape[1])

    box_centers, box_shapes, confidence, classes = \
        tf.split(inputs, [2, 2, 1, n_classes], axis=-1)

    x = tf.range(grid_shape[0], dtype=tf.float32)
    y = tf.range(grid_shape[1], dtype=tf.float32)
    x_offset, y_offset = tf.meshgrid(x, y)
    x_offset = tf.reshape(x_offset, (-1, 1))
    y_offset = tf.reshape(y_offset, (-1, 1))
    x_y_offset = tf.concat([x_offset, y_offset], axis=-1)
    x_y_offset = tf.tile(x_y_offset, [1, n_anchors])
    x_y_offset = tf.reshape(x_y_offset, [1, -1, 2])
    box_centers = tf.nn.sigmoid(box_centers)
    box_centers = (box_centers + x_y_offset) * strides

    anchors = tf.tile(anchors, [grid_shape[0] * grid_shape[1], 1])
    box_shapes = tf.exp(box_shapes) * tf.to_float(anchors)

    confidence = tf.nn.sigmoid(confidence)

    classes = tf.nn.sigmoid(classes)

    inputs = tf.concat([box_centers, box_shapes,
                        confidence, classes], axis=-1)

    return inputs

### Upsample layer
In order to concatenate with shortcut outputs from Darknet-53 before applying detection on a different scale, we are going to upsample the feature map using nearest neighbor interpolation.

In [8]:
def upsample(inputs, out_shape, data_format):
    """Upsamples to `out_shape` using nearest neighbor interpolation."""
    if data_format == 'channels_first':
        inputs = tf.transpose(inputs, [0, 2, 3, 1])
        new_height = out_shape[3]
        new_width = out_shape[2]
    else:
        new_height = out_shape[2]
        new_width = out_shape[1]

    inputs = tf.image.resize_nearest_neighbor(inputs, (new_height, new_width))

    if data_format == 'channels_first':
        inputs = tf.transpose(inputs, [0, 3, 1, 2])

    return inputs

### Non-max suppression
The model is going to produce a lot of boxes, so we need a way to discard the boxes with low confidence scores. Also, to avoid having multiple boxes for one object, we will discard the boxes with high overlap as well using non-max suppresion for each class.

In [9]:
def build_boxes(inputs):
    """Computes top left and bottom right points of the boxes."""
    center_x, center_y, width, height, confidence, classes = \
        tf.split(inputs, [1, 1, 1, 1, 1, -1], axis=-1)

    top_left_x = center_x - width / 2
    top_left_y = center_y - height / 2
    bottom_right_x = center_x + width / 2
    bottom_right_y = center_y + height / 2

    boxes = tf.concat([top_left_x, top_left_y,
                       bottom_right_x, bottom_right_y,
                       confidence, classes], axis=-1)

    return boxes


def non_max_suppression(inputs, n_classes, max_output_size, iou_threshold,
                        confidence_threshold):
    """Performs non-max suppression separately for each class.

    Args:
        inputs: Tensor input.
        n_classes: Number of classes.
        max_output_size: Max number of boxes to be selected for each class.
        iou_threshold: Threshold for the IOU.
        confidence_threshold: Threshold for the confidence score.
    Returns:
        A list containing class-to-boxes dictionaries
            for each sample in the batch.
    """
    batch = tf.unstack(inputs)
    boxes_dicts = []
    for boxes in batch:
        boxes = tf.boolean_mask(boxes, boxes[:, 4] > confidence_threshold)
        classes = tf.argmax(boxes[:, 5:], axis=-1)
        classes = tf.expand_dims(tf.to_float(classes), axis=-1)
        boxes = tf.concat([boxes[:, :5], classes], axis=-1)

        boxes_dict = dict()
        for cls in range(n_classes):
            mask = tf.equal(boxes[:, 5], cls)
            mask_shape = mask.get_shape()
            if mask_shape.ndims != 0:
                class_boxes = tf.boolean_mask(boxes, mask)
                boxes_coords, boxes_conf_scores, _ = tf.split(class_boxes,
                                                              [4, 1, -1],
                                                              axis=-1)
                boxes_conf_scores = tf.reshape(boxes_conf_scores, [-1])
                indices = tf.image.non_max_suppression(boxes_coords,
                                                       boxes_conf_scores,
                                                       max_output_size,
                                                       iou_threshold)
                class_boxes = tf.gather(class_boxes, indices)
                boxes_dict[cls] = class_boxes[:, :5]

        boxes_dicts.append(boxes_dict)

    return boxes_dicts

### Final model class
Finally, let's define the model class using all of the layers described previously.

In [10]:
class Yolo_v3:
    """Yolo v3 model class."""

    def __init__(self, n_classes, model_size, max_output_size, iou_threshold,
                 confidence_threshold, data_format=None):
        """Creates the model.

        Args:
            n_classes: Number of class labels.
            model_size: The input size of the model.
            max_output_size: Max number of boxes to be selected for each class.
            iou_threshold: Threshold for the IOU.
            confidence_threshold: Threshold for the confidence score.
            data_format: The input format.

        Returns:
            None.
        """
        if not data_format:
            if tf.test.is_built_with_cuda():
                data_format = 'channels_first'
            else:
                data_format = 'channels_last'

        self.n_classes = n_classes
        self.model_size = model_size
        self.max_output_size = max_output_size
        self.iou_threshold = iou_threshold
        self.confidence_threshold = confidence_threshold
        self.data_format = data_format

    def __call__(self, inputs, training):
        """Add operations to detect boxes for a batch of input images.

        Args:
            inputs: A Tensor representing a batch of input images.
            training: A boolean, whether to use in training or inference mode.

        Returns:
            A list containing class-to-boxes dictionaries
                for each sample in the batch.
        """
        with tf.variable_scope('yolo_v3_model'):
            if self.data_format == 'channels_first':
                inputs = tf.transpose(inputs, [0, 3, 1, 2])

            inputs = inputs / 255

            route1, route2, inputs = darknet53(inputs, training=training,
                                               data_format=self.data_format)

            route, inputs = yolo_convolution_block(
                inputs, filters=512, training=training,
                data_format=self.data_format)
            detect1 = yolo_layer(inputs, n_classes=self.n_classes,
                                 anchors=_ANCHORS[6:9],
                                 img_size=self.model_size,
                                 data_format=self.data_format)

            inputs = conv2d_fixed_padding(route, filters=256, kernel_size=1,
                                          data_format=self.data_format)
            inputs = batch_norm(inputs, training=training,
                                data_format=self.data_format)
            inputs = tf.nn.leaky_relu(inputs, alpha=_LEAKY_RELU)
            upsample_size = route2.get_shape().as_list()
            inputs = upsample(inputs, out_shape=upsample_size,
                              data_format=self.data_format)
            axis = 1 if self.data_format == 'channels_first' else 3
            inputs = tf.concat([inputs, route2], axis=axis)
            route, inputs = yolo_convolution_block(
                inputs, filters=256, training=training,
                data_format=self.data_format)
            detect2 = yolo_layer(inputs, n_classes=self.n_classes,
                                 anchors=_ANCHORS[3:6],
                                 img_size=self.model_size,
                                 data_format=self.data_format)

            inputs = conv2d_fixed_padding(route, filters=128, kernel_size=1,
                                          data_format=self.data_format)
            inputs = batch_norm(inputs, training=training,
                                data_format=self.data_format)
            inputs = tf.nn.leaky_relu(inputs, alpha=_LEAKY_RELU)
            upsample_size = route1.get_shape().as_list()
            inputs = upsample(inputs, out_shape=upsample_size,
                              data_format=self.data_format)
            inputs = tf.concat([inputs, route1], axis=axis)
            route, inputs = yolo_convolution_block(
                inputs, filters=128, training=training,
                data_format=self.data_format)
            detect3 = yolo_layer(inputs, n_classes=self.n_classes,
                                 anchors=_ANCHORS[0:3],
                                 img_size=self.model_size,
                                 data_format=self.data_format)

            inputs = tf.concat([detect1, detect2, detect3], axis=1)

            inputs = build_boxes(inputs)

            boxes_dicts = non_max_suppression(
                inputs, n_classes=self.n_classes,
                max_output_size=self.max_output_size,
                iou_threshold=self.iou_threshold,
                confidence_threshold=self.confidence_threshold)

            return boxes_dicts

<a id="4"></a>
## 4. Utility functions
Here are some utility functions that will help us load images as NumPy arrays, load class names from the official file and draw the predicted boxes.

In [11]:
def load_images(imgs, model_size):
    """Loads images in a 4D array.

    Args:
        img_names: A list of images names.
        model_size: The input size of the model.
        data_format: A format for the array returned
            ('channels_first' or 'channels_last').

    Returns:
        A 4D NumPy array.
    """
    result_imgs = []

    for img_array in imgs:
        #img = Image.open(img_name)
        img = Image.fromarray(img_array)
        img = img.resize(size=model_size)
        img = np.array(img, dtype=np.float32)
        img = np.expand_dims(img, axis=0)
        result_imgs.append(img)

    result_imgs = np.concatenate(result_imgs)

    return result_imgs


def load_class_names(file_name):
    """Returns a list of class names read from `file_name`."""
    with open(file_name, 'r') as f:
        class_names = f.read().splitlines()
    return class_names


def draw_boxes(imgs, boxes_dicts, class_names, model_size):
    """Draws detected boxes.

    Args:
        img_names: A list of input images names.
        boxes_dict: A class-to-boxes dictionary.
        class_names: A class names list.
        model_size: The input size of the model.

    Returns:
        None.
    """
    result = []
    colors = ((np.array(color_palette("hls", 80)) * 255)).astype(np.uint8)
    for num, img_array, boxes_dict in zip(range(len(imgs)), imgs,
                                         boxes_dicts):
        #img = Image.open(img_name)
        img = Image.fromarray(img_array)
        draw = ImageDraw.Draw(img)
        font = ImageFont.truetype(font='/content/futur.ttf',
                                  size=(img.size[0] + img.size[1]) // 100)
        resize_factor = \
            (img.size[0] / model_size[0], img.size[1] / model_size[1])

        for cls in class_names:
            boxes = boxes_dict[cls]
            if np.size(boxes) != 0:
                color = colors[cls]

                for box in boxes:
                    xy, confidence = box[:4], box[4]

                    if confidence < _CONF_THRESHOLD:
                      continue

                    xy = [xy[i] * resize_factor[i % 2] for i in range(4)]
                    x0, y0 = xy[0], xy[1]
                    thickness = (img.size[0] + img.size[1]) // 200
                    for t in np.linspace(0, 1, thickness):
                        xy[0], xy[1] = xy[0] + t, xy[1] + t
                        xy[2], xy[3] = xy[2] - t, xy[3] - t
                        draw.rectangle(xy, outline=tuple(color))
                    #text = '{} {:.1f}%'.format(class_names[cls], confidence * 100)
                    #text_size = draw.textsize(text, font=font)
                    #draw.rectangle([x0, y0 - text_size[1], x0 + text_size[0], y0], fill=tuple(color))
                    #draw.text((x0, y0 - text_size[1]), text, fill='black', font=font)

        result.append(img)
        #display(img)

    return result

<a id="5"></a>
## 5. Converting weights to Tensorflow format
Now it's time to load the official weights. We are going to iterate through the file and gradually create `tf.assign` operations.

In [12]:
def load_weights(variables, file_name):
    """Reshapes and loads official pretrained Yolo weights.

    Args:
        variables: A list of tf.Variable to be assigned.
        file_name: A name of a file containing weights.

    Returns:
        A list of assign operations.
    """
    with open(file_name, "rb") as f:
        # Skip first 5 values containing irrelevant info
        np.fromfile(f, dtype=np.int32, count=5)
        weights = np.fromfile(f, dtype=np.float32)

        assign_ops = []
        ptr = 0

        # Load weights for Darknet part.
        # Each convolution layer has batch normalization.
        for i in range(52):
            conv_var = variables[5 * i]
            gamma, beta, mean, variance = variables[5 * i + 1:5 * i + 5]
            batch_norm_vars = [beta, gamma, mean, variance]

            for var in batch_norm_vars:
                shape = var.shape.as_list()
                num_params = np.prod(shape)
                var_weights = weights[ptr:ptr + num_params].reshape(shape)
                ptr += num_params
                assign_ops.append(tf.assign(var, var_weights))

            shape = conv_var.shape.as_list()
            num_params = np.prod(shape)
            var_weights = weights[ptr:ptr + num_params].reshape(
                (shape[3], shape[2], shape[0], shape[1]))
            var_weights = np.transpose(var_weights, (2, 3, 1, 0))
            ptr += num_params
            assign_ops.append(tf.assign(conv_var, var_weights))

        # Loading weights for Yolo part.
        # 7th, 15th and 23rd convolution layer has biases and no batch norm.
        ranges = [range(0, 6), range(6, 13), range(13, 20)]
        unnormalized = [6, 13, 20]
        for j in range(3):
            for i in ranges[j]:
                current = 52 * 5 + 5 * i + j * 2
                conv_var = variables[current]
                gamma, beta, mean, variance =  \
                    variables[current + 1:current + 5]
                batch_norm_vars = [beta, gamma, mean, variance]

                for var in batch_norm_vars:
                    shape = var.shape.as_list()
                    num_params = np.prod(shape)
                    var_weights = weights[ptr:ptr + num_params].reshape(shape)
                    ptr += num_params
                    assign_ops.append(tf.assign(var, var_weights))

                shape = conv_var.shape.as_list()
                num_params = np.prod(shape)
                var_weights = weights[ptr:ptr + num_params].reshape(
                    (shape[3], shape[2], shape[0], shape[1]))
                var_weights = np.transpose(var_weights, (2, 3, 1, 0))
                ptr += num_params
                assign_ops.append(tf.assign(conv_var, var_weights))

            bias = variables[52 * 5 + unnormalized[j] * 5 + j * 2 + 1]
            shape = bias.shape.as_list()
            num_params = np.prod(shape)
            var_weights = weights[ptr:ptr + num_params].reshape(shape)
            ptr += num_params
            assign_ops.append(tf.assign(bias, var_weights))

            conv_var = variables[52 * 5 + unnormalized[j] * 5 + j * 2]
            shape = conv_var.shape.as_list()
            num_params = np.prod(shape)
            var_weights = weights[ptr:ptr + num_params].reshape(
                (shape[3], shape[2], shape[0], shape[1]))
            var_weights = np.transpose(var_weights, (2, 3, 1, 0))
            ptr += num_params
            assign_ops.append(tf.assign(conv_var, var_weights))

    return assign_ops

<a id="6"></a>
## 6. Running the model
Now we can run the model using some sample images.

### Sample images

In [13]:
video = cv2.VideoCapture('/content/input/Example1.mp4')
frame_rate = video.get(cv2.CAP_PROP_FPS)
frames = []

success,image = video.read()
total_frames = 0
while success:
  frames.append(image)
  success, image = video.read()
  total_frames += 1

print("Total frames: ", total_frames)
print("Frame rate: ", frame_rate)

Total frames:  436
Frame rate:  25.0


### Detections
Testing the model with IoU (Interception over Union ratio used in non-max suppression) threshold and confidence threshold both set to 0.5.

In [14]:
!wget -P weights https://pjreddie.com/media/files/yolov3.weights

--2023-07-16 10:20:00--  https://pjreddie.com/media/files/yolov3.weights
Resolving pjreddie.com (pjreddie.com)... 128.208.4.108
Connecting to pjreddie.com (pjreddie.com)|128.208.4.108|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 248007048 (237M) [application/octet-stream]
Saving to: ‘weights/yolov3.weights’


2023-07-16 10:21:20 (2.99 MB/s) - ‘weights/yolov3.weights’ saved [248007048/248007048]



In [15]:
batch_size = _BATCH_SIZE

class_names = load_class_names('/content/coco.names')
n_classes = len(class_names)
max_output_size = 10
iou_threshold = 0.5
confidence_threshold = _CONF_THRESHOLD

model = Yolo_v3(n_classes=n_classes,
                model_size=_MODEL_SIZE,
                max_output_size=max_output_size,
                iou_threshold=iou_threshold,
                confidence_threshold=confidence_threshold,
                data_format = 'channels_last')

inputs = tf.placeholder(tf.float32, [batch_size, 416, 416, 3])
detections = model(inputs, training=False)

model_vars = tf.global_variables(scope='yolo_v3_model')
assign_ops = load_weights(model_vars, './weights/yolov3.weights')

  return tf.layers.conv2d(
  return tf.layers.batch_normalization(
  inputs = tf.layers.conv2d(inputs, filters=n_anchors * (5 + n_classes),
Instructions for updating:
Use `tf.cast` instead.


Passing the whole video to the Yolo net, _BATCH_SIZE frame per time.

Execution can be interrupted if the whole video is not necessary.

Requests almost 1 min for 25 frames

In [16]:
detection_result = []

for i in range(0, total_frames, _BATCH_SIZE):
  chosen_frames = frames[i:i+_BATCH_SIZE]

  if len(chosen_frames) < 25:
    break

  batch = load_images(chosen_frames, model_size = _MODEL_SIZE)

  with tf.Session() as sess:
      sess.run(assign_ops)
      curr_result = sess.run(detections, feed_dict={inputs: batch})
      for res in curr_result:
        detection_result.insert(len(detection_result), res)

  print("\rAnalyzed ", len(detection_result), "/", total_frames, " frames", end = "")

KeyboardInterrupt: ignored

In [None]:
useful_indices = [0, 1, 2, 3, 5, 7, 9, 15, 16]
boxed_frames = draw_boxes(frames, detection_result, useful_indices, _MODEL_SIZE)

video_name = 'boxed_video.mp4'
video_path = '/content/' + video_name

width, height = boxed_frames[0].size

fourcc = cv2.VideoWriter_fourcc(*'XVID')
video = cv2.VideoWriter(video_name, fourcc, frame_rate, (width, height))

for image in boxed_frames:
    video.write(cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR))

video.release()

from IPython.display import Video

Video(video_path)

<a id="7"></a>
## 7. Detecting boxes direction

In [None]:
import cv2
import numpy as np
from google.colab.patches import cv2_imshow

In [None]:
def getCenter(xy):
  return ((xy[2] + xy[0]) /2, (xy[3] + xy[1]) /2)

def distance(a, b):
  return np.sqrt((b[1] - a[1])**2 + (b[0] - a[0])**2)

def direction(a, b):
  return np.arctan2(b[1] - a[1], b[0] - a[0]) % (2 * np.pi)

In [None]:
# detection_result is a dictionary list
#   a dictionary for each frame
# Its keys are class_names
# Each element has 5 values:
#   The first four are the box coordinates (x0, y0, x1, y1)
#   the fifth is the confidence [0, 1]

CONF = 4
CLASS = 5
TAG = 6

MAX_AVG = 10
MAX_DIST = 3
UPDATE_GAP = 4
CFT_FRAME = 1

QUANTIZER = 5

LENGTH_SCALE = 20 / CFT_FRAME
LENGTH_CAP = 75 // QUANTIZER

THICKNESS = 2

param_text = "Averaged frames: {:d}\nUpdate gap: {:d}\nCompared frame: {:d}\nBox max distance: {:d}\nQuantizer: {:d}".format(MAX_AVG, UPDATE_GAP, CFT_FRAME, MAX_DIST, QUANTIZER)
y0, dy = 12, 15

safezone_size = boxed_frames[0].size[0] // 5

saved_boxes = []

for i, frame_box in enumerate(detection_result): # Frame dictionary
  tmp = []

  for cls in useful_indices:
    boxes = frame_box[cls]

    for box in boxes:
      if box[CONF] > _CONF_THRESHOLD:
        box = np.append(box, cls)
        tmp.append(box)

  saved_boxes.append(tmp)

dict_history = {}
index = 0

# Now each box stores also the class value
for i, saved_in_frame in enumerate(saved_boxes): # Frame by frame loop

  if i == len(saved_boxes) -1:
    break

  for j, box in enumerate(saved_in_frame): # Frame's box loop

    if len(box) < 7:
      box = np.append(box, index)
      dict_history[index] = []
      index += 1

    cft_frame = saved_boxes[i+1] # Frame to be compared

    resize_factor = (boxed_frames[i].size[0] / _MODEL_SIZE[0], boxed_frames[i].size[1] / _MODEL_SIZE[1])

    tmp_box = box[CONF:]
    box = [box[i] * resize_factor[i % 2] // QUANTIZER for i in range(4)]

    box = np.append(box, tmp_box)

    saved_boxes[i][j] = box

    centerBox = getCenter(box)

    minDist = MAX_DIST

    for k, cft_box in enumerate(cft_frame): # Loop on compared frame

      if box[CLASS] != cft_box[CLASS]:
        continue

      cft_box = [cft_box[i] * resize_factor[i % 2] // QUANTIZER for i in range(4)]

      centerCftBox = getCenter(cft_box)

      dist = distance(centerBox, centerCftBox)

      if dist < minDist:
        minDist = dist
        nearest_box = k

    if minDist < MAX_DIST:
      if len(saved_boxes[i+1][nearest_box]) < 7:
        saved_boxes[i+1][nearest_box] = np.append(saved_boxes[i+1][nearest_box], box[TAG])

# Now each box in saved_boxed stores a tag, to track the box between frames

cft_found = False
arrowed_frames = []
draw_dist = [0 for i in range(index)]
draw_dir = [0 for i in range(index)]

car_speed = 0

for i, saved_in_frame in enumerate(saved_boxes): # Frame by frame loop
  curr_frame = frames[i]
  na = np.array(curr_frame)

  if i == len(saved_boxes) - CFT_FRAME -1:
    break

  for j, box in enumerate(saved_in_frame):

    if box[CLASS] != 10:
      continue

    cft_frame = saved_boxes[i+CFT_FRAME] # Frame to be compared
    curr_tag = int(box[TAG])

    for cft_box in cft_frame: # Loop on compared frame

      if len(cft_box) < 7:
          continue

      if cft_box[TAG] == curr_tag:
        cft_found = True
        break

    if cft_found: # I found a near box, go on with direction calculation
      centerBox = getCenter(box)
      centerCftBox = getCenter(cft_box)

      dist = distance(centerBox, centerCftBox) * LENGTH_SCALE
      dir = direction(centerBox, centerCftBox)

      if len(dict_history[curr_tag]) < MAX_AVG:
        dict_history[curr_tag].append((dist, dir))
      else:
        dict_history[curr_tag].pop(0)
        dict_history[curr_tag].append((dist, dir))

      cmn_dist = 0
      cmn_dir = 0

      for val in dict_history[curr_tag]:
        cmn_dist += val[0]
        new_dir = val[1]

        if abs(cmn_dir - new_dir) > np.pi:
          if new_dir > cmn_dir:
            new_dir -= 2*np.pi
          else:
            new_dir += 2*np.pi

        cmn_dir += new_dir

      cmn_dist /= len(dict_history[curr_tag])
      cmn_dir /= len(dict_history[curr_tag])

      car_speed = cmn_dist * np.sin(cmn_dir)
      print(car_speed)

  text = param_text + '\nCar speed est.: {:d}'.format(car_speed)
  for s, line in enumerate(text.split('\n')):
    y = y0 + s*dy
    na = cv2.putText(na, text = line, org = (0, y), color=(255, 255, 255), fontFace = cv2.FONT_HERSHEY_SIMPLEX, fontScale = 0.5)

  for j, box in enumerate(saved_in_frame): # Frame's box loop

    if box[CLASS] == 10:
      continue

    cft_frame = saved_boxes[i+CFT_FRAME] # Frame to be compared
    curr_tag = int(box[TAG])

    for cft_box in cft_frame: # Compared frame's boxes loop

      if len(cft_box) < 7:
          continue

      if cft_box[TAG] == curr_tag:
        cft_found = True
        break

    if cft_found: # I found a near box, go on with direction calculation
      centerBox = getCenter(box)
      centerCftBox = getCenter(cft_box) + (0, car_speed)

      dist = distance(centerBox, centerCftBox) * LENGTH_SCALE
      dir = direction(centerBox, centerCftBox)

      if len(dict_history[curr_tag]) < MAX_AVG:
        dict_history[curr_tag].append((dist, dir))
      else:
        dict_history[curr_tag].pop(0)
        dict_history[curr_tag].append((dist, dir))

      avg_dist = 0
      avg_dir = 0

      for val in dict_history[curr_tag]:
        avg_dist += val[0]
        new_dir = val[1]

        if abs(avg_dir - new_dir) > np.pi:
          if new_dir > avg_dir:
            new_dir -= 2*np.pi
          else:
            new_dir += 2*np.pi

        avg_dir += new_dir

      avg_dist /= len(dict_history[curr_tag])
      avg_dir /= len(dict_history[curr_tag])

      if avg_dist > LENGTH_CAP:
        avg_dist = LENGTH_CAP

      if i % UPDATE_GAP == 0:
        draw_dist[curr_tag] = avg_dist
        draw_dir[curr_tag] = avg_dir

      start = (int(centerBox[0]) * QUANTIZER, int(centerBox[1]) * QUANTIZER)
      end = (int(centerBox[0] + draw_dist[curr_tag] * np.cos(draw_dir[curr_tag])) * QUANTIZER,
             int(centerBox[1] + draw_dist[curr_tag] * np.sin(draw_dir[curr_tag])) * QUANTIZER)

      if avg_dist < 1:
        red = 0
        green = 255
      elif centerBox[0] * QUANTIZER < safezone_size and start[0] > end[0]:
        # I'm in the left side and the arrow is pointing left
        red = 0
        green = 255
      elif centerBox[0] * QUANTIZER > boxed_frames[i].size[0] - safezone_size and start[0] < end[0]:
        # I'm in the right side and the arrow is pointing right
        red = 0
        green = 255
      else:
        red = int(np.abs(np.cos(draw_dir[curr_tag])*255))
        green = int(np.abs(np.sin(draw_dir[curr_tag])*255))

      curr_frame = cv2.arrowedLine(na, start, end, (red, green, 0), THICKNESS)

      xy = box[:4] * QUANTIZER

      for t in np.linspace(0, 1, THICKNESS):
          curr_frame = cv2.rectangle(curr_frame, (int(xy[0]), int(xy[1])), (int(xy[2]), int(xy[3])), color=(red, green, 0), thickness = THICKNESS)

      curr_frame = cv2.rectangle(curr_frame, (safezone_size, 0), (safezone_size, boxed_frames[i].size[1]), color=(255, 255, 255), thickness = THICKNESS)
      curr_frame = cv2.rectangle(curr_frame, (boxed_frames[i].size[0] - safezone_size, 0), (boxed_frames[i].size[0] - safezone_size, boxed_frames[i].size[1]), color=(255, 255, 255), thickness = THICKNESS)

      cv2.putText(curr_frame, text = "Id: " + str(curr_tag), org = (int(xy[0]), int(xy[1])), color=(255, 255, 255), fontFace = 1, fontScale = 1)

      cft_found = False

  img = Image.fromarray(np.array(curr_frame))
  arrowed_frames.append(img)

print("Last frame:")
display(img)

In [None]:
video_name = 'arrowed_video.mp4'
video_path = '/content/' + video_name

width, height = boxed_frames[0].size

fourcc = cv2.VideoWriter_fourcc(*'XVID')
video = cv2.VideoWriter(video_name, fourcc, frame_rate, (width, height))

for image in arrowed_frames:
    video.write(cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR))

video.release()

from IPython.display import Video

Video(video_path)

# **Optical flow**

In [None]:
def optical_flow_rect(img1, img2, boxes):

  img1_gray = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
  img2_gray = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)

  # Whole optical flow
  flow = cv2.calcOpticalFlowFarneback(img1_gray, img2_gray, None,  pyr_scale=0.8, levels=15, winsize=5,
                                      iterations=5, poly_n=5, poly_sigma=0,
                                      flags=10)

  # Filter the optical flow in the box
  for box in boxes:
    x0, y0, x1, y1 = [int(box[j]) for j in range(4)]

    box_flow = flow[y0:y1, x0:x1, :].copy()

    mag, ang = cv2.cartToPolar(box_flow[..., 0], box_flow[..., 1])

    try:
      avg_dist = np.mean(np.mean(mag)) * 50
    except:
      avg_dist = 0

    if avg_dist > LENGTH_CAP:
        avg_dist = LENGTH_CAP

    try:
      avg_dir = 0

      for row in ang:
        for val in row:
          new_dir = val

          if abs(avg_dir - new_dir) > np.pi:
            if new_dir > avg_dir:
              new_dir -= 2*np.pi
            else:
              new_dir += 2*np.pi

          avg_dir += new_dir

      avg_dir /= ang.size
    except:
      avg_dir = 0

    centerX = (x0 + x1) // 2
    centerY = (y0 + y1) // 2

    start = (centerX, centerY)
    end = (int(centerX + avg_dist * np.cos(avg_dir)),
            int(centerY + avg_dist * np.sin(avg_dir)))

    if avg_dist < 1:
      red = 0
      green = 255
    else:
      red = int(np.abs(np.cos(avg_dir)*255))
      green = int(np.abs(np.sin(avg_dir)*255))

    try:
      if ang is not None:
        hsv = np.zeros((y1 - y0, x1 - x0, 3), dtype=np.uint8)

        hsv[..., 0] = ang * 180 / np.pi / 2
        hsv[..., 1] = 200
        hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
        colored_flow = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

        img1[y0:y1, x0:x1] = colored_flow
      else:
        print("\nAng is not valid: ", type(ang))
    except:
      print("")

    img1 = cv2.arrowedLine(img1, start, end, (red, green, 0), 2)

  return img1

Almost 5 frame per second

In [None]:
flowed_frames = []

for i, saved_in_frame in enumerate(saved_boxes): # Frame by frame loop

  if i == len(saved_boxes) - 1:
    break

  curr_frame = frames[i].copy()
  next_frame = frames[i+1].copy()
  resize_factor = (frames[i].shape[1] / _MODEL_SIZE[0], frames[i].shape[0] / _MODEL_SIZE[1])

  boxes = []

  for box in saved_in_frame:
    box = [box[i] * QUANTIZER for i in range(4)]
    boxes.append(box)

  if len(boxes) == 0:
    continue

  curr_frame = optical_flow_rect(curr_frame, next_frame, boxes)

  img = Image.fromarray(np.array(curr_frame))
  flowed_frames.append(img)

  print("\rAnalyzed ", len(flowed_frames), "/", len(frames), " frames", end = "")

print("\nLast frame:")
display(Image.fromarray(curr_frame))

In [None]:
video_name = 'flowed_video.mp4'
video_path = '/content/' + video_name

width, height = boxed_frames[0].size

fourcc = cv2.VideoWriter_fourcc(*'XVID')
video = cv2.VideoWriter(video_name, fourcc, frame_rate, (width, height))

for image in flowed_frames:
    video.write(cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR))

video.release()

from IPython.display import Video

Video(video_path)