#Faster RCNN final

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%cd /content/drive/MyDrive/FasterRcnn/

/content/drive/MyDrive/FasterRcnn


In [None]:
!pip3 install deep-sort-realtime

Collecting deep-sort-realtime
  Downloading deep_sort_realtime-1.3.2-py3-none-any.whl (8.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.4/8.4 MB[0m [31m18.6 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: deep-sort-realtime
Successfully installed deep-sort-realtime-1.3.2


#DeepSort

In [None]:
import cv2
import numpy as np
import os
import torch
import time
from deep_sort_realtime.deepsort_tracker import DeepSort
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.transforms import functional as F
from google.colab.patches import cv2_imshow

# Load the Faster R-CNN model
model = fasterrcnn_resnet50_fpn(pretrained=True)
model.eval()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Initialize DeepSort tracker
tracker = DeepSort(max_age=50, max_iou_distance=0.9, n_init=10)

cap = cv2.VideoCapture('/content/drive/MyDrive/FasterRcnn/TrafficVideo3.mp4')

frames = []
unique_track_ids = set()

start_time = time.perf_counter()
counter = 0
fps = 0

output_folder = 'frames_FasterRCNN_Deepsort'
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

while(cap.isOpened()):  # Check if video file is open
    ret, frame = cap.read()

    if ret:
        og_frame = frame.copy()
        frame_tensor = F.to_tensor(og_frame).unsqueeze(0).to(device)

        with torch.no_grad():
            predictions = model(frame_tensor)

        boxes = predictions[0]['boxes'].cpu().numpy()
        labels = predictions[0]['labels'].cpu().numpy()
        scores = predictions[0]['scores'].cpu().numpy()

        # Filter out detections with low confidence and class labels other than car
        threshold = 0.8
        car_boxes = boxes[(scores > threshold) & (labels == 3)]  # 3 corresponds to the 'car' class label

        # Convert boxes to xywh format
        xywh_boxes = []

        for i, box in enumerate(car_boxes):
        # Append ([x, y, w, h], score, label_string).
          xywh_boxes.append(
              (
                [[box[0], box[1], box[2] - box[0], box[3] - box[1]],
                scores[i],
                str(labels[i])]
              )
          )

        # Update tracker
        tracks = tracker.update_tracks(xywh_boxes, frame=og_frame) # bbs expected to be a list of detections, each in tuples of ( [left,top,w,h], confidence, detection_class )
        for track in tracks:
          if not track.is_confirmed():
            continue
          track_id = track.track_id
          bbox = track.to_ltrb()
          # Draw bounding box and track ID
          cv2.rectangle(og_frame, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (0, 255, 0), 2)

          # Add the track_id to the set of unique track IDs
          unique_track_ids.add(track_id)

        # Update car count
        car_count = len(unique_track_ids)

        # Update FPS and place on frame
        current_time = time.perf_counter()
        elapsed = (current_time - start_time)
        counter += 1
        if elapsed > 1:
            fps = counter / elapsed
            counter = 0
            start_time = current_time

        # Draw car count on frame
        cv2.putText(og_frame, f"Car Count: {car_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        # cv2_imshow(og_frame)

        # Append the frame to the list
        frames.append(og_frame)

        # Save frame as an image
        frame_name = os.path.join(output_folder, f'frame_{len(frames)}.jpg')
        cv2.imwrite(frame_name, og_frame)

        # # Write the frame to the output video file
        # output.write(og_frame)

        if cv2.waitKey(1) & 0xFF == ord('s'):
            break
    else:
        break

cap.release()
cv2.destroyAllWindows()

Downloading: "https://download.pytorch.org/models/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth" to /root/.cache/torch/hub/checkpoints/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth
100%|██████████| 160M/160M [00:02<00:00, 76.2MB/s]


**Merge all the frames into a video**

In [None]:
import cv2
import os

# Path to the folder containing the frames
frame_folder = 'frames_FasterRCNN_Deepsort'

# Get all file names in the folder
frames = os.listdir(frame_folder)

# Sort frames based on their filenames
frames.sort(key=lambda x: int(x.split('_')[1].split('.')[0]))

# Define the output video file path
output_video_path = 'output_video_from_frames_deepsort.mp4'

# Get the first frame to extract its dimensions
first_frame = cv2.imread(os.path.join(frame_folder, frames[0]))
height, width, _ = first_frame.shape

# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, 25, (width, height))

# Write frames to the video
for frame_name in frames:
    frame_path = os.path.join(frame_folder, frame_name)
    frame = cv2.imread(frame_path)
    out.write(frame)

# Release the VideoWriter and destroy any remaining windows
out.release()
cv2.destroyAllWindows()

#Sort

In [None]:
!pip install filterpy

Collecting filterpy
  Downloading filterpy-1.4.5.zip (177 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m178.0/178.0 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: filterpy
  Building wheel for filterpy (setup.py) ... [?25l[?25hdone
  Created wheel for filterpy: filename=filterpy-1.4.5-py3-none-any.whl size=110458 sha256=2543b500ac432e3aec26a83cdf9fd5cff64bfa555716141437fc06de69f8163f
  Stored in directory: /root/.cache/pip/wheels/0f/0c/ea/218f266af4ad626897562199fbbcba521b8497303200186102
Successfully built filterpy
Installing collected packages: filterpy
Successfully installed filterpy-1.4.5


In [None]:
# Refered from https://pypi.org/project/sort-track/#description package, modified
# some part of the code to remove error.

from __future__ import print_function

import numpy as np
from filterpy.kalman import KalmanFilter
from scipy.optimize import linear_sum_assignment

np.random.seed(0)


def linear_assignment(cost_matrix: np.ndarray):
    """
    Solves the assignment problem using Hungarian algorithm.
    Args:
      cost_matrix (np.ndarray): A 2D matrix of shape [M, N].
    Returns:
      np.ndarray: A 2D matrix of shape [max(M, N), 2] containing pairs of matched indices.
    """
    x, y = linear_sum_assignment(cost_matrix)
    return np.array(list(zip(x, y)))


def iou_batch(bb_test: np.ndarray, bb_gt: np.ndarray):
    """
    Computes IOU between two bboxes in the form [x1,y1,x2,y2]
    Args:
      bb_test (np.ndarray): A 2D matrix of shape [N, 4].
      bb_gt (np.ndarray): A 2D matrix of shape [M, 4].
    Returns:
      np.ndarray: A 2D matrix of shape [N, M] in which each element is the IOU value.
    """
    bb_gt = np.expand_dims(bb_gt, 0)
    bb_test = np.expand_dims(bb_test, 1)

    xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0])
    yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1])
    xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2])
    yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3])
    w = np.maximum(0.0, xx2 - xx1)
    h = np.maximum(0.0, yy2 - yy1)
    wh = w * h
    o = wh / (
        (bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1])
        + (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1])
        - wh
    )
    return o


def convert_bbox_to_z(bbox: np.ndarray):
    """
    Takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form
      [x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is
      the aspect ratio
    """
    w = bbox[2] - bbox[0]
    h = bbox[3] - bbox[1]
    x = bbox[0] + w / 2.0
    y = bbox[1] + h / 2.0
    s = w * h  # scale is just area
    r = w / float(h)
    return np.array([x, y, s, r]).reshape((4, 1))


def convert_x_to_bbox(x, score=None):
    """
    Takes a bounding box in the centre form [x,y,s,r] and returns it in the form
      [x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right
    """
    w = np.sqrt(x[2] * x[3])
    h = x[2] / w
    if score == None:
        return np.array([x[0] - w / 2.0, x[1] - h / 2.0, x[0] + w / 2.0, x[1] + h / 2.0]).reshape((1, 4))
    else:
        return np.array([x[0] - w / 2.0, x[1] - h / 2.0, x[0] + w / 2.0, x[1] + h / 2.0, score]).reshape((1, 5))


class KalmanBoxTracker(object):
    """
    This class represents the internal state of individual tracked objects observed as bbox.
    """

    count = 0

    def __init__(self, bbox, conf, cls):
        """
        Initialises a tracker using initial bounding box.
        """
        # define constant velocity model
        self.kf = KalmanFilter(dim_x=7, dim_z=4)
        self.kf.F = np.array(
            [
                [1, 0, 0, 0, 1, 0, 0],
                [0, 1, 0, 0, 0, 1, 0],
                [0, 0, 1, 0, 0, 0, 1],
                [0, 0, 0, 1, 0, 0, 0],
                [0, 0, 0, 0, 1, 0, 0],
                [0, 0, 0, 0, 0, 1, 0],
                [0, 0, 0, 0, 0, 0, 1],
            ]
        )
        self.kf.H = np.array(
            [[1, 0, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0, 0], [0, 0, 1, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0, 0]]
        )

        self.kf.R[2:, 2:] *= 10.0
        self.kf.P[4:, 4:] *= 1000.0  # give high uncertainty to the unobservable initial velocities
        self.kf.P *= 10.0
        self.kf.Q[-1, -1] *= 0.01
        self.kf.Q[4:, 4:] *= 0.01

        self.kf.x[:4] = convert_bbox_to_z(bbox)
        self.time_since_update = 0
        self.id = KalmanBoxTracker.count
        KalmanBoxTracker.count += 1
        self.history = []
        self.hits = 0
        self.hit_streak = 0
        self.age = 0
        self.conf = conf
        self.cls = cls

    def update(self, bbox):
        """
        Updates the state vector with observed bbox.
        """
        self.time_since_update = 0
        self.history = []
        self.hits += 1
        self.hit_streak += 1
        self.kf.update(convert_bbox_to_z(bbox))

    def predict(self):
        """
        Advances the state vector and returns the predicted bounding box estimate.
        """
        if (self.kf.x[6] + self.kf.x[2]) <= 0:
            self.kf.x[6] *= 0.0
        self.kf.predict()
        self.age += 1
        if self.time_since_update > 0:
            self.hit_streak = 0
        self.time_since_update += 1
        self.history.append(convert_x_to_bbox(self.kf.x))
        return self.history[-1]

    def get_state(self):
        """
        Returns the current bounding box estimate.
        """
        return convert_x_to_bbox(self.kf.x)


def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3):
    """
    Assigns detections to tracked object (both represented as bounding boxes)

    Returns 3 lists of matches, unmatched_detections and unmatched_trackers
    """
    if len(trackers) == 0:
        return np.empty((0, 2), dtype=int), np.arange(len(detections)), np.empty((0, 5), dtype=int)

    iou_matrix = iou_batch(detections, trackers)

    if min(iou_matrix.shape) > 0:
        a = (iou_matrix > iou_threshold).astype(np.int32)
        if a.sum(1).max() == 1 and a.sum(0).max() == 1:
            matched_indices = np.stack(np.where(a), axis=1)
        else:
            matched_indices = linear_assignment(-iou_matrix)
    else:
        matched_indices = np.empty(shape=(0, 2))

    unmatched_detections = []
    for d, det in enumerate(detections):
        if d not in matched_indices[:, 0]:
            unmatched_detections.append(d)
    unmatched_trackers = []
    for t, trk in enumerate(trackers):
        if t not in matched_indices[:, 1]:
            unmatched_trackers.append(t)

    # filter out matched with low IOU
    matches = []
    for m in matched_indices:
        if iou_matrix[m[0], m[1]] < iou_threshold:
            unmatched_detections.append(m[0])
            unmatched_trackers.append(m[1])
        else:
            matches.append(m.reshape(1, 2))
    if len(matches) == 0:
        matches = np.empty((0, 2), dtype=int)
    else:
        matches = np.concatenate(matches, axis=0)

    return matches, np.array(unmatched_detections), np.array(unmatched_trackers)


class SortTracker(object):
    def __init__(self, max_age=50, min_hits=10, iou_threshold=0.9):
        """
        Sets key parameters for SORT
        """
        self.max_age = max_age
        self.min_hits = min_hits
        self.iou_threshold = iou_threshold
        self.trackers = []
        self.frame_count = 0

    def update(self, dets, _):
        """
        Params:
          dets - a numpy array of detections in the format [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...]
        Requires: this method must be called once for each frame even with empty detections (use np.empty((0, 5)) for frames without detections).
        Returns the a similar array, where the last column is the object ID.

        NOTE: The number of objects returned may differ from the number of detections provided.
        """
        self.frame_count += 1
        # get predicted locations from existing trackers.
        trks = np.zeros((len(self.trackers), 5))
        to_del = []
        ret = []
        for t, trk in enumerate(trks):
            pos = self.trackers[t].predict()[0]
            trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
            if np.any(np.isnan(pos)):
                to_del.append(t)
        trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
        for t in reversed(to_del):
            self.trackers.pop(t)
        matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets, trks, self.iou_threshold)

        # update matched trackers with assigned detections
        for m in matched:
            self.trackers[m[1]].update(dets[m[0], :])

        # create and initialise new trackers for unmatched detections
        for i in unmatched_dets:
            trk = KalmanBoxTracker(dets[i, :][:4], dets[i, :][4], int(dets[i, :][4]))
            self.trackers.append(trk)
        i = len(self.trackers)
        for trk in reversed(self.trackers):
            d = trk.get_state()[0]
            x1, y1, x2, y2 = d[0], d[1], d[2], d[3]
            if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
                # breakpoint()
                ret.append(np.concatenate((d, [trk.id + 1], [trk.cls], [trk.conf])).reshape(1, -1))
            i -= 1
            # remove dead tracklet
            if trk.time_since_update > 1:
                self.trackers.pop(i)
        if len(ret) > 0:
            return np.concatenate(ret)
        return np.empty((0, 7))

**Faster RCNN with Sort**

In [None]:
import cv2
import numpy as np
import os
import torch
import time
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.transforms import functional as F
from google.colab.patches import cv2_imshow

# Load the Faster R-CNN model
model = fasterrcnn_resnet50_fpn(pretrained=True)
model.eval()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
args={}

# Initialize SORT tracker
tracker = SortTracker(args)

cap = cv2.VideoCapture('/content/drive/MyDrive/YOLOv2/TrafficVideo3.mp4')

frames = []
unique_track_ids = set()

start_time = time.perf_counter()
counter = 0
fps = 0

output_folder = 'frames_FasterRCNN_Sort'
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

while(cap.isOpened()):  # Check if video file is open
    ret, frame = cap.read()

    if ret:
        og_frame = frame.copy()
        frame_tensor = F.to_tensor(og_frame).unsqueeze(0).to(device)

        with torch.no_grad():
            predictions = model(frame_tensor)

        boxes = predictions[0]['boxes'].cpu().numpy()
        labels = predictions[0]['labels'].cpu().numpy()
        scores = predictions[0]['scores'].cpu().numpy()

        # Filter out detections with low confidence and class labels other than car
        threshold = 0.9
        car_boxes = boxes[(scores > threshold) & (labels == 3)]  # 3 corresponds to the 'car' class label

        # Convert boxes to xywh format
        xywh_boxes = []

        for i, box in enumerate(car_boxes):
        # Append ([x, y, w, h], score, label_string).
          x1, y1, x2, y2 = box
          xywh_boxes.append([x1, y1, x2, y2,scores[i]])
        xywh_boxes = np.array(xywh_boxes)

        # Update tracker
        tracks = tracker.update(xywh_boxes,_)

        for track in tracks:
            track_id = int(track[4])  # Track ID is in the 5th element of the track data
            bbox = track[:4]  # Bounding box coordinates are the first four elements of the track data

            # Draw bounding box and track ID
            bbox = [int(coord) for coord in bbox]
            cv2.rectangle(og_frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 255, 0), 2)

            # Add the track_id to the set of unique track IDs
            unique_track_ids.add(track_id)

        # Update car count
        car_count = len(unique_track_ids)

        # Update FPS and place on frame
        current_time = time.perf_counter()
        elapsed = (current_time - start_time)
        counter += 1
        if elapsed > 1:
            fps = counter / elapsed
            counter = 0
            start_time = current_time

        # Draw car count on frame
        cv2.putText(og_frame, f"Car Count: {car_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        # cv2_imshow(og_frame)

        # Append the frame to the list
        frames.append(og_frame)

        # Save frame as an image
        frame_name = os.path.join(output_folder, f'frame_{len(frames)}.jpg')
        cv2.imwrite(frame_name, og_frame)

        if cv2.waitKey(1) & 0xFF == ord('s'):
            break
    else:
        break

cap.release()
cv2.destroyAllWindows()

KeyboardInterrupt: 

**Merge all the frames to form a video.**

In [None]:
import cv2
import os

# Path to the folder containing the frames
frame_folder = 'frames_FasterRCNN_Sort'

# Get all file names in the folder
frames = os.listdir(frame_folder)

# Sort frames based on their filenames
frames.sort(key=lambda x: int(x.split('_')[1].split('.')[0]))

# Define the output video file path
output_video_path = 'output_video_from_frames_sort.mp4'

# Get the first frame to extract its dimensions
first_frame = cv2.imread(os.path.join(frame_folder, frames[0]))
height, width, _ = first_frame.shape

# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, 25, (width, height))

# Write frames to the video
for frame_name in frames:
    frame_path = os.path.join(frame_folder, frame_name)
    frame = cv2.imread(frame_path)
    out.write(frame)

# Release the VideoWriter and destroy any remaining windows
out.release()
cv2.destroyAllWindows()