## Kalman filter
#### Load video use object detection and apply Kalman filter

In [1]:
import cv2
import numpy as np
import os
import matplotlib.pyplot as plt
import os
import torch
from torchvision import models, transforms
from PIL import Image
from ultralytics import YOLO
from ultralytics import YOLO
from scipy.optimize import linear_sum_assignment
import torch.nn as nn
#import selectivesearch

In [2]:
class MyPretrainedModel(nn.Module):
    def __init__(self, num_classes=3):
        super(MyPretrainedModel, self).__init__()
        # Load a pretrained ResNet18 model
        self.model = models.resnet18(weights='ResNet18_Weights.DEFAULT')
        # Modify the final fully connected layer to match the number of classes
        self.model.fc = nn.Linear(self.model.fc.in_features, num_classes)

    def forward(self, x):
        return self.model(x)

In [3]:
model = MyPretrainedModel()
model.load_state_dict(torch.load("models/resnet_model.pth"))
# Drop the fully connected layer
model.model.fc = nn.Identity() 
dummy_input = torch.randn(1, 3, 224, 224)  # Batch of 1, with image size 224x224
features = model(dummy_input)
print("Extracted features shape:", features.shape)
model.eval()

Extracted features shape: torch.Size([1, 512])


MyPretrainedModel(
  (model): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, t

## Load data

In [4]:
def load_images_from_folder(folder):
    images = []
    for filename in sorted(os.listdir(folder)):
        img = cv2.imread(os.path.join(folder,filename))
        if img is not None:
            images.append(img)
    return images

seq_01 = load_images_from_folder("../34759_final_project_rect/seq_02/image_02/data")


## Use of YOLO

In [5]:
preprocess = transforms.Compose([
    transforms.Resize((128, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std =[0.229, 0.224, 0.225]),
])


In [6]:
def extract_features(img, bbox):
    x1, y1, x2, y2 = map(int, bbox)
    # Crop the image to the bounding box
    cropped_img = img[y1:y2, x1:x2, :]
    # Convert to PIL Image
    cropped_img = cv2.cvtColor(cropped_img, cv2.COLOR_BGR2RGB)
    pil_img = Image.fromarray(cropped_img)
    # Preprocess the image
    input_tensor = preprocess(pil_img)
    input_batch = input_tensor.unsqueeze(0)  # Create a mini-batch as expected by the model
    # Move the input to the same device as the model
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    input_batch = input_batch.to(device)
    model.to(device)
    # Extract features
    with torch.no_grad():
        features = model(input_batch)
    # Flatten the features
    features = features.cpu().numpy().flatten()
    # Normalize the features
    features = features / np.linalg.norm(features)
    return features


In [7]:
def compute_feature_distance(feature1, feature2):
    # Compute cosine distance (1 - cosine similarity)
    distance = 1 - np.dot(feature1, feature2)
    return distance


## Everything below does not work!

In [None]:
# KalmanBoundingBoxTracker Class (including 'confirmed' attribute)
class KalmanBoundingBoxTracker:
    count = 0
    def __init__(self, bbox):
        x1, y1, x2, y2 = bbox
        self.w = x2 - x1
        self.h = y2 - y1
        center_x = x1 + self.w / 2.
        center_y = y1 + self.h / 2.

        self.x = np.array([
            [center_x],       # Index 0: x_position
            [0],              # Index 1: x_velocity
            [0],              # Index 2: x_acceleration
            [center_y],       # Index 3: y_position
            [0],              # Index 4: y_velocity
            [0],              # Index 5: y_acceleration
            [self.w],         # Index 6: width
            [0],              # Index 7: width_velocity
            [self.h],         # Index 8: height
            [0]               # Index 9: height_velocity
        ])

        # State covariance matrix
        # self.P = np.eye(8) * 1
        pos_unc = 1    # Position uncertainty
        vel_unc = 100    # Velocity uncertainty
        acc_unc = 100     # Acceleration uncertainty
        size_unc = 1   # Width/height uncertainty
        size_vel_unc = 100  # Width/height velocity uncertainty

        self.P = np.diag([
            pos_unc, vel_unc, acc_unc,
            pos_unc, vel_unc, acc_unc,
            size_unc, size_vel_unc,
            size_unc, size_vel_unc
        ])
        # Time step
        dt = 0.1035
        #dt = 0.05

        self.F = np.array([
            [1, dt, 0.5 * dt**2, 0,  0,  0, 0, 0, 0, 0],  # x_position
            [0, 1, dt,           0,  0,  0, 0, 0, 0, 0],  # x_velocity
            [0, 0, 1,            0,  0,  0, 0, 0, 0, 0],  # x_acceleration
            [0, 0, 0,            1, dt, 0.5 * dt**2, 0, 0, 0, 0],  # y_position
            [0, 0, 0,            0, 1, dt,           0, 0, 0, 0],  # y_velocity
            [0, 0, 0,            0, 0, 1,            0, 0, 0, 0],  # y_acceleration
            [0, 0, 0,            0, 0, 0, 1, dt, 0, 0],            # width
            [0, 0, 0,            0, 0, 0, 0, 1, 0, 0],             # width_velocity
            [0, 0, 0,            0, 0, 0, 0, 0, 1, dt],            # height
            [0, 0, 0,            0, 0, 0, 0, 0, 0, 1]              # height_velocity
        ])


        self.H = np.array([
            [1, 0, 0, 0, 0, 0, 0, 0, 0, 0],  # Measure x_position
            [0, 0, 0, 1, 0, 0, 0, 0, 0, 0],  # Measure y_position
            [0, 0, 0, 0, 0, 0, 1, 0, 0, 0],  # Measure width
            [0, 0, 0, 0, 0, 0, 0, 0, 1, 0]   # Measure height
        ])


        Q = np.zeros((10, 10))
        q_acc = (1) ** 2  # Process noise acceleration
        Q_pos = q_acc * np.array([
            [dt**5 / 20, dt**4 / 8, dt**3 / 6],
            [dt**4 / 8,  dt**3 / 3, dt**2 / 2],
            [dt**3 / 6,  dt**2 / 2, dt]
        ])

        q_box_vel = (0.01) ** 2  # Process noise box velocity 
        # Process noise covariance for width and height
        Q_box = q_box_vel * np.array([
            [dt**3 / 3, dt**2 / 2],
            [dt**2 / 2, dt]
        ])

        # Assign Q_pos to x components (indices 0-2)
        Q[0:3, 0:3] = Q_pos
        # Assign Q_size to width components (indices 3-4)
        Q[3:5, 3:5] = Q_box
        # Assign Q_pos to y components (indices 5-7)
        Q[5:8, 5:8] = Q_pos
        # Assign Q_size to height components (indices 8-9)
        Q[8:10, 8:10] = Q_box

        self.Q = Q
        # Measurement noise covariance matrix
        r_pos = 1  # Adjust this value based on measurement accuracy
        r_box = 1
        self.R = np.diag([r_pos, r_pos, r_box, r_box])

        self.time_since_update = 0
        self.id = KalmanBoundingBoxTracker.count
        KalmanBoundingBoxTracker.count += 1

        self.history = []
        self.hits = 0
        self.hit_streak = 0
        self.age = 0
        self.confirmed = False  # Add confirmed flag

    def predict(self):
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q
        self.age += 1
        self.history.append(self.x)
        return self.get_state()

    def update(self, bbox):
        self.time_since_update = 0
        self.hits += 1
        self.hit_streak += 1

        x1, y1, x2, y2 = bbox
        self.w = x2 - x1
        self.h = y2 - y1
        center_x = x1 + self.w / 2.
        center_y = y1 + self.h / 2.

        self.Z = np.array([[center_x], [center_y], [self.w], [self.h]])

        y = self.Z - self.H @ self.x
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ np.linalg.inv(S)
        self.x = self.x + K @ y
        self.P = (np.eye(self.F.shape[0]) - K @ self.H) @ self.P

    def get_state(self):
        x = self.x[0][0]
        y = self.x[3][0]
        w = self.x[6][0]
        h = self.x[8][0]

        x1 = x - w / 2.
        y1 = y - h / 2.
        x2 = x + w / 2.
        y2 = y + h / 2.
        return [x1, y1, x2, y2]

# Function to compute IoU
def iou(bb_test, bb_gt):
    """
    Computes IoU between two bounding boxes.
    """
    xx1 = np.maximum(bb_test[0], bb_gt[0])
    yy1 = np.maximum(bb_test[1], bb_gt[1])
    xx2 = np.minimum(bb_test[2], bb_gt[2])
    yy2 = np.minimum(bb_test[3], bb_gt[3])
    w = np.maximum(0., xx2 - xx1)
    h = np.maximum(0., yy2 - yy1)
    wh = w * h
    o = wh / ((bb_test[2] - bb_test[0]) * (bb_test[3] - bb_test[1]) +
              (bb_gt[2] - bb_gt[0]) * (bb_gt[3] - bb_gt[1]) - wh)
    return o

# Data association function using IoU
def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3):
    """
    Assigns detections to tracked objects using IoU.
    Returns 3 lists of matches, unmatched_detections, and unmatched_trackers.
    """
    if len(trackers) == 0:
        return np.empty((0, 2), dtype=int), np.arange(len(detections)), []
    
    iou_matrix = np.zeros((len(detections), len(trackers)), dtype=np.float32)
    
    for d, det in enumerate(detections):
        for t, trk in enumerate(trackers):
            iou_matrix[d, t] = iou(det, trk.get_state())
    
    matched_indices = linear_assignment(-iou_matrix)
    
    unmatched_detections = []
    for d in range(len(detections)):
        if d not in matched_indices[:, 0]:
            unmatched_detections.append(d)
    
    unmatched_trackers = []
    for t in range(len(trackers)):
        if t not in matched_indices[:, 1]:
            unmatched_trackers.append(t)
    
    # Filter out matches with low IoU
    matches = []
    for m in matched_indices:
        if iou_matrix[m[0], m[1]] < iou_threshold:
            unmatched_detections.append(m[0])
            unmatched_trackers.append(m[1])
        else:
            matches.append(m.reshape(1, 2))
    
    if len(matches) == 0:
        matches = np.empty((0, 2), dtype=int)
    else:
        matches = np.concatenate(matches, axis=0)
    
    return matches, np.array(unmatched_detections), np.array(unmatched_trackers)

# Linear assignment function
def linear_assignment(cost_matrix):
    x, y = linear_sum_assignment(cost_matrix)
    return np.array(list(zip(x, y)))

# Function to load images from a folder
def load_images_from_folder(folder_path):
    images = []
    for filename in sorted(os.listdir(folder_path)):
        img = cv2.imread(os.path.join(folder_path, filename))
        if img is not None:
            images.append(img)
    return images

# Function to get class colors
def getColours(id_num, is_prediction=False, confirmed=True):
    # Use different colors based on confirmation status and prediction
    if not confirmed:
        colour = (0, 0, 255)  # Red for unconfirmed trackers
    elif is_prediction:
        colour = (255, 0, 0)  # Blue for predictions
    else:
        colour = (0, 255, 0)  # Green for updates
    return colour

# Initialize the YOLO model
yolo = YOLO('yolov8s.pt')

# Load the sequence of images
seq_01 = load_images_from_folder("../34759_final_project_rect/seq_02/image_02/data")

# Initialize trackers list and parameters
trackers = []
max_age = 35     # Adjust as needed
min_hits = 3     # Adjust as needed
frame_count = 0

for frame in seq_01:
    frame_count += 1

    # Get detections from YOLO model
    results = yolo.predict(frame)
    detected_bounding_boxes = []

    # Process YOLO detections
    for result in results:
        # Get the class names
        classes_names = result.names

        # Iterate over each box
        for box in result.boxes:
            # Check if confidence is greater than 40 percent
            if box.conf[0] > 0.61:
                # Get coordinates
                x1, y1, x2, y2 = box.xyxy[0]
                # Convert to int
                x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)

                # Get the class
                cls = int(box.cls[0])

                # Get the class name
                class_name = classes_names[cls]

                if class_name in ["person", "bicycle", "car"]:
                    detected_bounding_boxes.append([x1, y1, x2, y2])

    # Predict all trackers
    for trk in trackers:
        trk.predict()

    # Collect predicted bounding boxes for data association
    trks = []
    for trk in trackers:
        pos = trk.get_state()
        trks.append(pos)
    trks = np.array(trks)

    # Associate detections to trackers using IoU
    matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(detected_bounding_boxes, trackers, iou_threshold=0.3)

    # Update matched trackers with assigned detections
    for m in matched:
        trk = trackers[m[1]]
        det = detected_bounding_boxes[m[0]]
        trk.update(det)
        if trk.hit_streak >= min_hits:
            trk.confirmed = True

    # Create and initialize new trackers for unmatched detections
    for i in unmatched_dets:
        trk = KalmanBoundingBoxTracker(detected_bounding_boxes[i])
        trackers.append(trk)

    # Handle unmatched trackers
    for t, trk in enumerate(trackers):
        if t in unmatched_trks:
            trk.time_since_update += 1
            trk.hit_streak = 0
        else:
            trk.time_since_update = 0
            trk.hit_streak += 1

    # Remove dead trackers
    #trackers = [trk for trk in trackers if trk.time_since_update <= max_age]

    # Remove dead trackers based on time since update and hit streak
    new_trackers = []
    for trk in trackers:
        # Remove the tracker if it has been in predicted state longer than confirmed state
        if (trk.time_since_update / 2) > trk.hits:
            # This means the tracker has been in a predicted state more than it has been confirmed
            print(f"Removing tracker ID {trk.id} - Predicted time exceeds confirmed state.")
            continue

        # Remove tracker if it has exceeded max_age (too long without updates)
        if trk.time_since_update <= max_age:
            new_trackers.append(trk)

    # Update the trackers list
    trackers = new_trackers

    # Draw trackers
    for trk in trackers:
        if not trk.confirmed:
            continue
        bbox = trk.get_state()
        x1, y1, x2, y2 = map(int, bbox)
        colour = getColours(trk.id, is_prediction=trk.time_since_update > 0, confirmed=trk.confirmed)
        if trk.confirmed:
            label = f'ID {trk.id}'
        else:
            label = f'Unconfirmed ID {trk.id}'
        cv2.rectangle(frame, (x1, y1), (x2, y2), colour, 2)
        cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.65, colour, 2)

    # Show the frame
    cv2.imshow('frame', frame)

    # Break the loop if 'q' is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release resources and close windows
cv2.destroyAllWindows()



0: 224x640 7 persons, 1 bicycle, 4 cars, 1 motorcycle, 1 bench, 1 bird, 44.9ms
Speed: 1.9ms preprocess, 44.9ms inference, 158.8ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 7 persons, 1 bicycle, 4 cars, 1 motorcycle, 1 bird, 8.7ms
Speed: 1.8ms preprocess, 8.7ms inference, 3.9ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 5 persons, 2 bicycles, 4 cars, 1 motorcycle, 1 bird, 9.8ms
Speed: 3.6ms preprocess, 9.8ms inference, 1.0ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 5 persons, 1 bicycle, 4 cars, 1 motorcycle, 1 bird, 9.0ms
Speed: 2.3ms preprocess, 9.0ms inference, 3.7ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 5 persons, 1 bicycle, 4 cars, 1 motorcycle, 1 bird, 8.9ms
Speed: 2.2ms preprocess, 8.9ms inference, 1.6ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 7 persons, 2 bicycles, 4 cars, 1 bird, 9.0ms
Speed: 3.5ms preprocess, 9.0ms inference, 3.9ms postprocess per image at shape (1, 3, 224, 640)

0:

In [82]:
# KalmanBoundingBoxTracker Class (including 'confirmed' attribute)
class KalmanBoundingBoxTracker:
    count = 0
    def __init__(self, bbox, feature):
        x1, y1, x2, y2 = bbox
        self.w = x2 - x1
        self.h = y2 - y1
        center_x = x1 + self.w / 2.
        center_y = y1 + self.h / 2.
        self.feature = feature

        self.x = np.array([
            [center_x],       # Index 0: x_position
            [0],              # Index 1: x_velocity
            [0],              # Index 2: x_acceleration
            [center_y],       # Index 3: y_position
            [0],              # Index 4: y_velocity
            [0],              # Index 5: y_acceleration
            [self.w],         # Index 6: width
            [0],              # Index 7: width_velocity
            [self.h],         # Index 8: height
            [0]               # Index 9: height_velocity
        ])

        # State covariance matrix
        # self.P = np.eye(8) * 1
        pos_unc = 1    # Position uncertainty
        vel_unc = 100    # Velocity uncertainty
        acc_unc = 100     # Acceleration uncertainty
        size_unc = 1   # Width/height uncertainty
        size_vel_unc = 100  # Width/height velocity uncertainty

        self.P = np.diag([
            pos_unc, vel_unc, acc_unc,
            pos_unc, vel_unc, acc_unc,
            size_unc, size_vel_unc,
            size_unc, size_vel_unc
        ])
        # Time step
        dt = 0.1035

        self.F = np.array([
            [1, dt, 0.5 * dt**2, 0,  0,  0, 0, 0, 0, 0],  # x_position
            [0, 1, dt,           0,  0,  0, 0, 0, 0, 0],  # x_velocity
            [0, 0, 1,            0,  0,  0, 0, 0, 0, 0],  # x_acceleration
            [0, 0, 0,            1, dt, 0.5 * dt**2, 0, 0, 0, 0],  # y_position
            [0, 0, 0,            0, 1, dt,           0, 0, 0, 0],  # y_velocity
            [0, 0, 0,            0, 0, 1,            0, 0, 0, 0],  # y_acceleration
            [0, 0, 0,            0, 0, 0, 1, dt, 0, 0],            # width
            [0, 0, 0,            0, 0, 0, 0, 1, 0, 0],             # width_velocity
            [0, 0, 0,            0, 0, 0, 0, 0, 1, dt],            # height
            [0, 0, 0,            0, 0, 0, 0, 0, 0, 1]              # height_velocity
        ])


        self.H = np.array([
            [1, 0, 0, 0, 0, 0, 0, 0, 0, 0],  # Measure x_position
            [0, 0, 0, 1, 0, 0, 0, 0, 0, 0],  # Measure y_position
            [0, 0, 0, 0, 0, 0, 1, 0, 0, 0],  # Measure width
            [0, 0, 0, 0, 0, 0, 0, 0, 1, 0]   # Measure height
        ])


        Q = np.zeros((10, 10))
        q_acc = (1) ** 2  # Process noise acceleration
        Q_pos = q_acc * np.array([
            [dt**5 / 20, dt**4 / 8, dt**3 / 6],
            [dt**4 / 8,  dt**3 / 3, dt**2 / 2],
            [dt**3 / 6,  dt**2 / 2, dt]
        ])

        q_box_vel = (10) ** 2  # Process noise box velocity 
        # Process noise covariance for width and height
        Q_box = q_box_vel * np.array([
            [dt**3 / 3, dt**2 / 2],
            [dt**2 / 2, dt]
        ])

        # Assign Q_pos to x components (indices 0-2)
        Q[0:3, 0:3] = Q_pos
        # Assign Q_size to width components (indices 3-4)
        Q[3:5, 3:5] = Q_box
        # Assign Q_pos to y components (indices 5-7)
        Q[5:8, 5:8] = Q_pos
        # Assign Q_size to height components (indices 8-9)
        Q[8:10, 8:10] = Q_box

        self.Q = Q
        # Measurement noise covariance matrix
        r_pos = 1  # Adjust this value based on measurement accuracy
        r_box = 1
        self.R = np.diag([r_pos, r_pos, r_box, r_box])

        self.time_since_update = 0
        self.id = KalmanBoundingBoxTracker.count
        KalmanBoundingBoxTracker.count += 1

        self.history = []
        self.hits = 0
        self.hit_streak = 0
        self.age = 0
        self.confirmed = False  # Add confirmed flag

    def predict(self):
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q
        self.age += 1
        self.history.append(self.x)
        return self.get_state()

    def update(self, bbox, feature):
        self.feature = feature
        self.time_since_update = 0
        self.hits += 1
        self.hit_streak += 1

        x1, y1, x2, y2 = bbox
        self.w = x2 - x1
        self.h = y2 - y1
        center_x = x1 + self.w / 2.
        center_y = y1 + self.h / 2.

        self.Z = np.array([[center_x], [center_y], [self.w], [self.h]])

        y = self.Z - self.H @ self.x
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ np.linalg.inv(S)
        self.x = self.x + K @ y
        self.P = (np.eye(self.F.shape[0]) - K @ self.H) @ self.P

    def get_state(self):
        x = self.x[0][0]
        y = self.x[3][0]
        w = self.x[6][0]
        h = self.x[8][0]

        x1 = x - w / 2.
        y1 = y - h / 2.
        x2 = x + w / 2.
        y2 = y + h / 2.
        return [x1, y1, x2, y2]

# Function to compute IoU
def iou(bb_test, bb_gt):
    """
    Computes IoU between two bounding boxes.
    """
    xx1 = np.maximum(bb_test[0], bb_gt[0])
    yy1 = np.maximum(bb_test[1], bb_gt[1])
    xx2 = np.minimum(bb_test[2], bb_gt[2])
    yy2 = np.minimum(bb_test[3], bb_gt[3])
    w = np.maximum(0., xx2 - xx1)
    h = np.maximum(0., yy2 - yy1)
    wh = w * h
    o = wh / ((bb_test[2] - bb_test[0]) * (bb_test[3] - bb_test[1]) +
              (bb_gt[2] - bb_gt[0]) * (bb_gt[3] - bb_gt[1]) - wh)
    return o

# Data association function using IoU
def associate_detections_to_trackers(detections, trackers, iou_threshold=0.01, lambda_iou=0.5, lambda_app=0.5, app_threshold=0.0007):
    """
    Assigns detections to tracked objects using IoU.
    Returns 3 lists of matches, unmatched_detections, and unmatched_trackers.
    """
    if len(trackers) == 0:
        return np.empty((0, 2), dtype=int), np.arange(len(detections)), []
    
    iou_matrix = np.zeros((len(detections), len(trackers)), dtype=np.float32)
    appearance_matrix = np.zeros((len(detections), len(trackers)), dtype=np.float32)
    
    for d, det in enumerate(detections):
        for t, trk in enumerate(trackers):
            iou_cost = 1 - iou(det['bbox'], trk.get_state())
            iou_matrix[d, t] = iou_cost

            appearance_cost = compute_feature_distance(det['feature'], trk.feature)
            appearance_matrix[d, t] = appearance_cost

    # Normalize cost matrices to [0, 1]
    iou_matrix /= np.max(iou_matrix) if np.max(iou_matrix) > 0 else 1
    appearance_matrix /= np.max(appearance_matrix) if np.max(appearance_matrix) > 0 else 1

    # Compute total cost matrix
    total_cost = lambda_iou * iou_matrix + lambda_app * appearance_matrix

    matched_indices = linear_assignment(total_cost)
    
    unmatched_detections = []
    for d in range(len(detections)):
        if d not in matched_indices[:, 0]:
            unmatched_detections.append(d)
    
    unmatched_trackers = []
    for t in range(len(trackers)):
        if t not in matched_indices[:, 1]:
            unmatched_trackers.append(t)
    
    # Filter out matches with low IoU
    matches = []
    print(matched_indices)
    for m in matched_indices:
        detection_idx, tracker_idx = m[0], m[1]
        if iou_matrix[m[0], m[1]] < iou_threshold:
            unmatched_detections.append(m[0])
            unmatched_trackers.append(m[1])
        else:
            matches.append(m.reshape(1, 2))

        if appearance_matrix[detection_idx, tracker_idx] > app_threshold:
            print("gotget")

    
    if len(matches) == 0:
        matches = np.empty((0, 2), dtype=int)
    else:
        matches = np.concatenate(matches, axis=0)
    
    return matches, np.array(unmatched_detections), np.array(unmatched_trackers)

# Linear assignment function
def linear_assignment(cost_matrix):
    x, y = linear_sum_assignment(cost_matrix)
    return np.array(list(zip(x, y)))

# Function to load images from a folder
def load_images_from_folder(folder_path):
    images = []
    for filename in sorted(os.listdir(folder_path)):
        img = cv2.imread(os.path.join(folder_path, filename))
        if img is not None:
            images.append(img)
    return images

# Function to get class colors
def getColours(id_num, is_prediction=False, confirmed=True):
    # Use different colors based on confirmation status and prediction
    if not confirmed:
        colour = (0, 0, 255)  # Red for unconfirmed trackers
    elif is_prediction:
        colour = (255, 0, 0)  # Blue for predictions
    else:
        colour = (0, 255, 0)  # Green for updates
    return colour

# Initialize the YOLO model
yolo = YOLO('yolov8s.pt')

# Load the sequence of images
seq_01 = load_images_from_folder("../34759_final_project_rect/seq_02/image_02/data")

# Initialize trackers list and parameters
trackers = []
max_age = 40     # Adjust as needed
min_hits = 2     # Adjust as needed
frame_count = 0

for frame in seq_01:
    frame_count += 1

    # Get detections from YOLO model
    results = yolo.predict(frame)
    detections = []

    # Process YOLO detections
    for result in results:
        # Get the class names
        classes_names = result.names

        # Iterate over each box
        for box in result.boxes:
            # Check if confidence is greater than 40 percent
            if box.conf[0] > 0.6:
                # Get coordinates
                x1, y1, x2, y2 = box.xyxy[0]
                # Convert to int
                x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)

                # Get the class
                cls = int(box.cls[0])

                # Get the class name
                class_name = classes_names[cls]

                if class_name in ["person", "bicycle"]:
                    bbox = [x1, y1, x2, y2]
                    feature = extract_features(frame, bbox)
                    detections.append({'bbox': bbox, 'feature': feature})
            
    # Predict all trackers
    for trk in trackers:
        trk.predict()

    # Collect predicted bounding boxes for data association
    trks = []
    for trk in trackers:
        pos = trk.get_state()
        trks.append(pos)
    trks = np.array(trks)

    # Associate detections to trackers using IoU
    matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(
        detections, trackers, iou_threshold=0.2, lambda_iou=0.5, lambda_app=0, app_threshold=0.1)

    # Update matched trackers with assigned detections
    for m in matched:
        trk = trackers[m[1]]
        det = detections[m[0]]
        trk.update(det['bbox'], det['feature'])
        if trk.hit_streak >= min_hits:
            trk.confirmed = True

    # Create and initialize new trackers for unmatched detections
    for i in unmatched_dets:
        det = detections[i]
        trk = KalmanBoundingBoxTracker(det['bbox'], det['feature'])
        trackers.append(trk)

    # Handle unmatched trackers
    for t in unmatched_trks:
        trk = trackers[t]
        trk.time_since_update += 1
        trk.hit_streak = 0

    # Remove dead trackers
    # trackers = [trk for trk in trackers if trk.time_since_update <= max_age]
    # Remove dead trackers based on time since update and hit streak
    new_trackers = []
    for trk in trackers:
        # Remove the tracker if it has been in predicted state longer than confirmed state
        if ((trk.time_since_update / 2) > trk.hits) and (3 < trk.time_since_update):
            print(trk.hits)
            # This means the tracker has been in a predicted state more than it has been confirmed
            print(f"Removing tracker ID {trk.id} - Predicted time exceeds confirmed state.")
            continue

        # Remove tracker if it has exceeded max_age (too long without updates)
        if (trk.time_since_update/2) <= max_age:
            new_trackers.append(trk)

    # Update the trackers list
    trackers = new_trackers
    # Draw trackers
    for trk in trackers:
        # if not trk.confirmed:
        #     continue
        bbox = trk.get_state()
        x1, y1, x2, y2 = map(int, bbox)
        colour = getColours(trk.id, is_prediction=trk.time_since_update > 0, confirmed=trk.confirmed)
        if trk.confirmed:
            label = f'ID {trk.id}'
        else:
            label = f'Unconfirmed ID {trk.id}'
        cv2.rectangle(frame, (x1, y1), (x2, y2), colour, 2)
        cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.65, colour, 2)

    # Show the frame
    cv2.imshow('frame', frame)

    # Break the loop if 'q' is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release resources and close windows
cv2.destroyAllWindows()



0: 224x640 7 persons, 1 bicycle, 4 cars, 1 motorcycle, 1 bench, 1 bird, 9.7ms
Speed: 0.9ms preprocess, 9.7ms inference, 1.2ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 7 persons, 1 bicycle, 4 cars, 1 motorcycle, 1 bird, 8.7ms
Speed: 1.1ms preprocess, 8.7ms inference, 1.3ms postprocess per image at shape (1, 3, 224, 640)
[[0 0]
 [1 2]
 [2 3]
 [3 1]
 [4 4]
 [5 5]]

0: 224x640 5 persons, 2 bicycles, 4 cars, 1 motorcycle, 1 bird, 8.7ms
Speed: 1.5ms preprocess, 8.7ms inference, 1.9ms postprocess per image at shape (1, 3, 224, 640)
[[0 0]
 [1 2]
 [2 6]
 [3 3]
 [4 8]
 [5 7]]

0: 224x640 5 persons, 1 bicycle, 4 cars, 1 motorcycle, 1 bird, 9.0ms
Speed: 3.7ms preprocess, 9.0ms inference, 1.3ms postprocess per image at shape (1, 3, 224, 640)
[[0 0]
 [1 2]
 [2 9]
 [3 3]]

0: 224x640 5 persons, 1 bicycle, 4 cars, 1 motorcycle, 1 bird, 9.0ms
Speed: 3.3ms preprocess, 9.0ms inference, 1.1ms postprocess per image at shape (1, 3, 224, 640)
[[0 0]
 [1 2]
 [2 3]
 [3 1]]
0
Removing track

In [17]:
import cv2
import numpy as np
from scipy.optimize import linear_sum_assignment


## Depth mapping

In [37]:

seq_02_left = load_images_from_folder("../34759_final_project_rect/seq_02/image_03/data")
seq_02_right = load_images_from_folder("../34759_final_project_rect/seq_02/image_02/data")
min_disp = 2
num_disp = 32
block_size = 15

stereo = cv2.StereoBM_create(numDisparities = num_disp, blockSize = block_size)
#stereo.setMinDisparity(min_disp)
stereo.setDisp12MaxDiff(200)
stereo.setUniquenessRatio(1)
stereo.setSpeckleRange(3)
stereo.setSpeckleWindowSize(3)

for img_left,img_right in zip(seq_02_left[:5],seq_02_right[:5]):
    gray_left = cv2.cvtColor(img_left,cv2.COLOR_RGB2GRAY)
    gray_right = cv2.cvtColor(img_right,cv2.COLOR_RGB2GRAY)
    disp = stereo.compute(gray_left, gray_right).astype(np.float32) / 16.0

    plt.figure(figsize=(18,18))
    plt.imshow(disp)
    plt.show()

NameError: name 'seq_02_left' is not defined