In [None]:
import numpy as np

class KalmanFilter2D:
  def __init__(self, init_state, init_uncertainty, process_noise, measurement_noise, dt):

    # x: initial position (x, y) and velocities (x_dot, y_dot)
    # [x, y, x_dot, y_dot, w, h, w_dot, h_dot]
    self.x = np.array(init_state, dtype=float).reshape(8, 1)

    # P: state covariance matrix - uncertainty in state estimate
    self.P = np.array(init_uncertainty)

    # Q: process (covariance/) noise matrix - how much system model can be trusted
    self.Q = np.array(process_noise)

    # R: measurement noise matrix - how much measurement can be trusted
    self.R = np.array(measurement_noise)

    # dt: time step between detections
    self.dt = dt

    # A: State transition matrix - describes system dynamics
    self.A = np.array([
        [1, 0, dt, 0, 0, 0 ,0 ,0],
        [0, 1, 0, dt, 0, 0 ,0 ,0],
        [0, 0, 1, 0, 0, 0, 0, 0],
        [0, 0, 0, 1, 0, 0 ,0 ,0],
        [0, 0, 0, 0, 1, 0, dt, 0],
        [0, 0 ,0 ,0 ,0, 1 ,0, dt],
        [0, 0, 0, 0, 0, 0, 1, 0],
        [0, 0, 0, 0, 0, 0, 0, 1]
    ])

    # Measurement Matrix
    self.H = np.array([
        [1., 0., 0., 0., 0., 0., 0., 0.],
        [0., 1., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 1., 0., 0., 0.],
        [0., 0., 0., 0., 0., 1., 0., 0.]])

   # Identity matrix
    self.I = np.eye(8)


  def predict(self):
    # predicting based on system dynamics (A) -> x_pred = A * x
    self.x = np.dot(self.A, self.x)

    # predicting state covariance -> how much can we trust state estimate. NOTE: A instead of measurement function (H) because state variables (x, y) are directly measured
    self.P = np.dot(np.dot(self.A, self.P), self.A.T) + self.Q

  def update(self, measurement):
    # Innovation or measurement residual
    y = measurement.reshape(-1, 1) - self.H.dot(self.x)  # Reshape measurement to column vector
    # S:  combining prediction uncertainty with measurement noise
    S = self.H.dot(self.P).dot(self.H.T) + self.R
    # K: kalman gain -> compute
    K = self.P.dot(self.H.T).dot(np.linalg.inv(S))
    # update state estimate with measurement
    self.x = self.x + K.dot(y)
    # update state covariance (P)
    self.P = (self.I - K.dot(self.H)).dot(self.P)

  def get_state(self):
    return self.x.flatten()

import torchvision.models as models

# Make sure you are running this in a Python environment where PyTorch and torchvision are properly installed.
# You can choose a different model like 'maskrcnn_resnet50_fpn' for instance:

model = models.detection.maskrcnn_resnet50_fpn(pretrained=True)
model.eval()  # Set the model to inference mode


def iou(boxA, boxB):
    # Convert (x, y, w, h) to (x_min, y_min, x_max, y_max)
    boxA_x_max = boxA[0] + boxA[2]
    boxA_y_max = boxA[1] + boxA[3]
    boxB_x_max = boxB[0] + boxB[2]
    boxB_y_max = boxB[1] + boxB[3]

    # Determine the coordinates of the intersection rectangle
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA_x_max, boxB_x_max)
    yB = min(boxA_y_max, boxB_y_max)

    # Compute the area of intersection
    interArea = max(0, xB - xA) * max(0, yB - yA)

    # Compute the area of both the prediction and ground-truth rectangles
    boxAArea = (boxA[2]) * (boxA[3])  # Use w and h directly for area calculation
    boxBArea = (boxB[2]) * (boxB[3])  # Use w and h directly for area calculation

    # Compute the area of union
    unionArea = boxAArea + boxBArea - interArea

    # Compute the IoU by dividing the intersection area by the union area
    iou = interArea / float(unionArea)

    # Return the IoU value
    return iou


Downloading: "https://download.pytorch.org/models/maskrcnn_resnet50_fpn_coco-bf2d0c1e.pth" to /root/.cache/torch/hub/checkpoints/maskrcnn_resnet50_fpn_coco-bf2d0c1e.pth
100%|██████████| 170M/170M [00:01<00:00, 91.9MB/s]


In [None]:
import cv2
import numpy as np
import torchvision.transforms as transforms
from PIL import Image
import torch
from google.colab.patches import cv2_imshow


cap = cv2.VideoCapture(r'/content/pexels_videos_2103099 (2160p).mp4')
#cap = cv2.VideoCapture(r'/content/traffic_clipped2.mp4')
if not cap.isOpened():
    cap = cv2.VideoCapture(0)
if not cap.isOpened():
    raise IOError('Cant open the video')

font_scale = 3
font = cv2.FONT_HERSHEY_PLAIN

# Load the pre-trained Mask R-CNN model
model = models.detection.maskrcnn_resnet50_fpn(pretrained=True)
model.eval()

# Define the transformation
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Function to transform frames
def transform_image(frame):
    image = Image.fromarray(frame)
    image = transform(image).unsqueeze(0)  # Add a batch dimension
    return image

color_map = {}
unique_id = 0
tracked_objects = []

MAX_AGE = 10  # Maximum age before a track is removed
HIT_THRESHOLD = 3  # Number of hits before a track is confirmed

frame_count = 0  # Initialize a frame counter

while True:
    ret, frame = cap.read()
    frame_count += 1  # Increment frame counter
    dt = 1.0 / cap.get(cv2.CAP_PROP_FPS)


    # Predict the next state for each tracked object
    for obj in tracked_objects:
        obj['kalman_filter'].predict()

    # Process every 3 frames, since real life detection won't happen at as high a framerate as the video
    if frame_count % 3 == 0:

        # Preprocess the frame for the object detection model
        transformed_frame = transform_image(frame)

        # Perform inference
        with torch.no_grad():
            prediction = model(transformed_frame)[0]

        # Extract bounding boxes and scores from the prediction
        boxes = prediction['boxes'].cpu().data.numpy()
        boxes = boxes.round().astype(int)
        scores = prediction['scores'].cpu().data.numpy()

        # Filter out low-confidence detections
        confidence_threshold = 0.3
        boxes = boxes[scores > confidence_threshold].astype('int')
        scores = scores[scores > confidence_threshold]

        updated_tracked_objects = []
        for box in boxes:

            # Find the best matching tracked object
            best_match = None
            highest_iou = 0.0
            #print(type(box))

            x_min, y_min, x_max, y_max = box
            x = x_min  # Top-left x-coordinate
            y = y_min  # Top-left y-coordinate
            w = x_max - x_min  # Width calculation
            h = y_max - y_min  # Height calculation
            new_box = np.array((x, y, w, h))


            try:
                tracked_objects
            except NameError:
                tracked_objects = []  # Initialize it if it doesn't exist

            if tracked_objects != []:
              #print("tracked_objects defined")
              for tobj in tracked_objects:
                  iou_score = iou(tobj['predicted_box'], new_box)
                  if iou_score > highest_iou:
                      best_match = tobj
                      highest_iou = iou_score

            if highest_iou > 0.3 and best_match is not None:  # If a match with reasonable IoU is found, update it
                best_match['kalman_filter'].update(new_box)  # Only x, y, w, h are used for update

                print(best_match['predicted_box'])
                print([best_match['kalman_filter'].get_state()[i] for i in [0, 1, 4 ,5]])
                best_match['predicted_box'] = np.array([best_match['kalman_filter'].get_state()[i] for i in [0, 1, 4 ,5]])
                best_match['age'] = 0  # Reset the age as it's detected in this frame
                best_match['hits'] += 1  # Increment the hits
                updated_tracked_objects.append(best_match)
            else:  # If no match is found, initialize a new Kalman Filter
                position_measurement_noise = 100  # lower value than before for x, y
                size_measurement_noise = 500  # slightly higher value than position for width and height

                # For process noise, we anticipate potentially rapid changes in velocity and acceleration
                velocity_process_noise = 500  # higher value for dx, dy to allow for quick adjustment
                size_change_process_noise = 100  # reasonable value for dw, dh to adapt to size change quickly

                kf = KalmanFilter2D(init_state=[x, y, 0, 0, w, h, 0, 0],
                                    #init_uncertainty = np.diag([1, 1, 1, 1, 10, 10, 1, 1]),
                                    init_uncertainty = np.diag([100, 100, 100, 100, 10000, 10000, 100, 100]),
                                    process_noise = np.diag([1, 1, velocity_process_noise, velocity_process_noise, 0.5, 0.5, size_change_process_noise, size_change_process_noise]) ** 2,
                                    measurement_noise = np.diag([position_measurement_noise] * 2 + [size_measurement_noise] * 2),
                                    dt=dt)
                obj_id = unique_id
                unique_id += 1
                color_map[obj_id] = (int(np.random.rand() * 255), int(np.random.rand() * 255), int(np.random.rand() * 255))
                updated_tracked_objects.append({'id': obj_id,
                                                'kalman_filter': kf,
                                                'predicted_box': new_box,
                                                'age': 0,
                                                'hits': 1})


        # Update age for tracks that were not matched in the current frame
        for tobj in tracked_objects:
            if tobj not in updated_tracked_objects:
                tobj['age'] += 1
                if tobj['age'] <= MAX_AGE:
                    updated_tracked_objects.append(tobj)  # Keep track if it's not too old


        # Draw bounding boxes on the frame
        for obj_index, obj in enumerate(updated_tracked_objects):
            obj_id = obj['id']
            #print(obj_id)
            #print(color_map)
            color = color_map[obj_id]

            pred_box = obj['predicted_box'].astype(int)
            x, y, w, h = pred_box[:4]  # Extract x, y, width, and height
            print(f'{obj_id} ----- COORDS: {x, y, w, h}')
            cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)

            label = f'ID: {obj_id}'
            label_size = cv2.getTextSize(label, font, font_scale, 2)[0]
            text_x = x
            text_y = y - label_size[1] if y - label_size[1] > 10 else y + label_size[1]  # Ensure text is within frame

            # Put text on the frame
            cv2.putText(frame, label, (text_x, text_y), font, font_scale, color, 2)

        # Replace old list of tracked objects with the updated one
        tracked_objects = updated_tracked_objects

        # Display the frame
        cv2_imshow(frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()
