In [38]:
import cv2
import os
import numpy as np
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort
from collections import defaultdict

# Kalman Filter Class
class KalmanFilter:
    def __init__(self):
        self.F = np.array([[1, 1], [0, 1]], dtype=np.float64)  # State transition matrix
        self.H = np.array([[1, 0]], dtype=np.float64)         # Measurement matrix
        self.Q = np.array([[0.01, 0], [0, 0.01]], dtype=np.float64)  # Process noise covariance
        self.R = np.array([[0.1]], dtype=np.float64)          # Measurement noise covariance
        self.P = np.eye(2, dtype=np.float64)                  # Initial state covariance
        self.state = np.array([[0], [0]], dtype=np.float64)   # Initial state [disparity, disparity_velocity]

    def predict(self):
        self.state = self.F @ self.state
        self.P = self.F @ self.P @ self.F.T + self.Q
        return self.state[0, 0]  # Return predicted disparity

    def update(self, measurement):
        y = measurement - self.H @ self.state
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ np.linalg.inv(S)
        self.state += K @ y
        self.P = (np.eye(2) - K @ self.H) @ self.P

# Initialize the YOLO model
yolo = YOLO('yolov8s.pt')  # Assuming the model is yolov8

# Initialize DeepSort tracker
tracker = DeepSort(
    max_age=35,
    n_init=3,
    max_cosine_distance=0.2,
    nn_budget=None,
)

# Class-to-color mapping
CLASS_COLORS = {
    0: (0, 255, 0),  # Green for humans
    1: (255, 0, 0),  # Blue for class 1
    2: (0, 0, 255),  # Red for class 2
}

# Class-to-name mapping
CLASS_NAMES = {
    0: "Person",
    1: "Bike",
    2: "Car"
}

# Minimum bounding box dimensions (width and height in pixels)
MIN_BBOX_WIDTH = 35
MIN_BBOX_HEIGHT = 35

# Camera parameters
K = np.array([[9.863925e+02, 0.000000e+00, 7.020000e+02],
              [0.000000e+00, 9.821423e+02, 2.588854e+02],
              [0.000000e+00, 0.000000e+00, 1.000000e+00]])  # Intrinsic matrix
B = 0.537  # Baseline (in meters)
f = K[0, 0]  # Focal length (in pixels)

# Function to calculate depth (z) from disparity
def calculate_depth(disparity, baseline, focal_length):
    if disparity == 0:  # Avoid division by zero
        return float('inf')
    return (focal_length * baseline) / disparity

# Template matching function
def find_disparity(template, search_line):
    template_width = template.shape[1]
    sad_values = np.zeros(search_line.shape[1] - template_width + 1)

    # Slide the template over the search line
    for start_pix in range(search_line.shape[1] - template_width + 1):
        end_pix = start_pix + template_width
        search_box = search_line[:, start_pix:end_pix]
        sad = np.sum(np.abs(search_box.astype('float32') - template.astype('float32')))
        sad_values[start_pix] = sad

    # Find the position with the minimum SAD
    position = np.argmin(sad_values)
    return position

# Paths to the folders containing the stereEo image sequence
left_image_folder = 'img_rect/seq_02/image_02/data'
right_image_folder = 'img_rect/seq_02/image_03/data'

# Get sorted lists of image filenames
left_image_files = sorted([f for f in os.listdir(left_image_folder) if f.endswith('.png')])
right_image_files = sorted([f for f in os.listdir(right_image_folder) if f.endswith('.png')])

# Ensure there are images to process
if not left_image_files or not right_image_files:
    print("No images found in the specified folders.")
    exit()

# Dictionary to store Kalman filters for each track
kalman_filters = defaultdict(KalmanFilter)

for left_image_file, right_image_file in zip(left_image_files, right_image_files):
    # Load the current pair of stereo frames
    left_frame_path = os.path.join(left_image_folder, left_image_file)
    right_frame_path = os.path.join(right_image_folder, right_image_file)
    
    left_frame = cv2.imread(left_frame_path)
    right_frame = cv2.imread(right_frame_path)

    if left_frame is None or right_frame is None:
        print(f"Failed to load image pair: {left_image_file}, {right_image_file}")
        continue

    # Detect objects in the current frame
    results = yolo(left_frame, stream=True, classes=[0, 1, 2]) # 0: human, 1: bike, 2: car
    detections = []  # To store detections in DeepSort format
    class_map = {}   # To map track IDs to class IDs

    # Extract bounding boxes, confidence, and class ID
    for result in results:
        for box in result.boxes:
            x1, y1, x2, y2 = map(float, box.xyxy[0].cpu().numpy())
            conf = float(box.conf[0].cpu().numpy())
            cls = int(box.cls[0].cpu().numpy())

            # Calculate bounding box width and height
            width = x2 - x1
            height = y2 - y1

            # Filter out small detections
            if width < MIN_BBOX_WIDTH or height < MIN_BBOX_HEIGHT:
                continue  # Ignore detections with small bounding boxes

            # Convert to [left, top, width, height]
            left, top = x1, y1
            detections.append(([left, top, width, height], conf, cls))  # Include class ID

    # Update DeepSort tracker
    tracks = tracker.update_tracks(detections, frame=left_frame)

    # Persistent class map for track IDs
    class_map = {}  # {track_id: (class_id, confidence)}

    # Define reasonable constraints for disparity and depth
    min_disparity = 1e-6
    max_disparity = 100  # Adjust based on expected scene
    min_depth = 0.6  # Minimum depth in meters
    max_depth = 30   # Maximum depth in meters

    # Smoothing factor for depth predictions
    alpha = 0.9
    previous_depths = {}  # To store previous depths for smoothing

    for track in tracks:
        if not track.is_confirmed():
            continue

        track_id = track.track_id
        ltrb = track.to_ltrb()
        x1, y1, x2, y2 = map(int, ltrb)
        center_x = (x1 + x2) // 2
        center_y = (y1 + y2) // 2

        if track.time_since_update <= 1:
            # Visible (updated) track
            matched_detection = None
            for det in detections:
                bbox, conf, cls = det
                det_x1, det_y1, det_w, det_h = bbox
                det_x2, det_y2 = det_x1 + det_w, det_y1 + det_h

                # Check if detection overlaps with the tracked bounding box
                if det_x1 < x2 and det_x2 > x1 and det_y1 < y2 and det_y2 > y1:
                    matched_detection = (cls, conf)
                    break

            # Update class_map with the most confident detection
            if matched_detection:
                detected_cls, detected_conf = matched_detection
                if track_id not in class_map or detected_conf > class_map[track_id][1]:
                    class_map[track_id] = (detected_cls, detected_conf)

            # Retrieve class ID from class_map
            cls, _ = class_map.get(track_id, (None, 0))

            if cls is not None:
                class_name = CLASS_NAMES.get(cls, "Unknown")
                color = CLASS_COLORS.get(cls, (255, 255, 255))
            else:
                class_name = "Unknown"
                color = (255, 255, 255)

            # Calculate disparity
            template_size = 15
            x_start = max(0, center_x - template_size // 2)
            x_end = min(left_frame.shape[1], center_x + template_size // 2)
            y_start = max(0, center_y - template_size // 2)
            y_end = min(left_frame.shape[0], center_y + template_size // 2)

            template = left_frame[y_start:y_end, x_start:x_end]
            search_line = right_frame[y_start:y_end, :]
            disparity_position = find_disparity(template, search_line)
            disparity = center_x - disparity_position

            # Clamp disparity
            disparity = max(min(disparity, max_disparity), min_disparity)

            # Update Kalman filter
            kalman_filters[track_id].update(np.array([[disparity]], dtype=np.float64))

            # Calculate depth and handle invalid values
            if disparity < min_disparity or disparity > max_disparity:
                depth = previous_depths.get(track_id, 0)  # Use last valid depth
            else:
                depth = calculate_depth(disparity, B, f)

            # Smooth depth prediction
            if track_id in previous_depths:
                depth = alpha * depth + (1 - alpha) * previous_depths[track_id]
            previous_depths[track_id] = depth

            # Clamp depth and fallback to last valid value
            if depth < min_depth or depth > max_depth:
                depth = previous_depths.get(track_id, 0)

            # Draw bounding box for visible tracks
            cv2.rectangle(left_frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(left_frame, f"ID: {track_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
            if depth<30:
                cv2.putText(left_frame, f"{depth:.2f}m", (x1, y2 + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
            cv2.putText(left_frame, f"{class_name}", (x1, y2 + 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)

        else:
            # Predicted (occluded) track
            disparity = kalman_filters[track_id].predict()

            # Clamp disparity
            disparity = max(min(disparity, max_disparity), min_disparity)

            # Calculate depth and handle invalid values
            if disparity < min_disparity or disparity > max_disparity:
                depth = previous_depths.get(track_id, 0)
            else:
                depth = calculate_depth(disparity, B, f)

            # Smooth depth prediction
            if track_id in previous_depths:
                depth = alpha * depth + (1 - alpha) * previous_depths[track_id]
            previous_depths[track_id] = depth

            # Clamp depth and fallback to last valid value
            if depth < min_depth or depth > max_depth:
                depth = previous_depths.get(track_id, 0)

            # Retrieve class ID from class_map
            cls, _ = class_map.get(track_id, (None, 0))

            if cls is not None:
                class_name = CLASS_NAMES.get(cls, " ")
            else:
                class_name = " "

            # Draw bounding box for occluded tracks
            color = (200, 200, 200)  # Gray for occluded tracks
            cv2.rectangle(left_frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(left_frame, f"ID: {track_id} (predicted)", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
            if depth<30:
                cv2.putText(left_frame, f"{depth:.2f}m (predicted)", (x1, y2 + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
            #cv2.putText(left_frame, f"{class_name} (predicted)", (x1, y2 + 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)






    # Show the frame in a live window
    cv2.imshow("Live Tracking with Depth", left_frame)

    # Break the loop if 'q' is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cv2.destroyAllWindows()



0: 224x640 7 persons, 1 bicycle, 4 cars, 69.4ms
Speed: 5.4ms preprocess, 69.4ms inference, 1.0ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 7 persons, 1 bicycle, 4 cars, 74.1ms
Speed: 1.0ms preprocess, 74.1ms inference, 8.4ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 5 persons, 2 bicycles, 4 cars, 70.7ms
Speed: 1.0ms preprocess, 70.7ms inference, 0.0ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 5 persons, 1 bicycle, 4 cars, 77.5ms
Speed: 1.8ms preprocess, 77.5ms inference, 0.0ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 5 persons, 1 bicycle, 4 cars, 100.4ms
Speed: 0.0ms preprocess, 100.4ms inference, 0.0ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 7 persons, 2 bicycles, 4 cars, 61.3ms
Speed: 0.0ms preprocess, 61.3ms inference, 0.0ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 5 persons, 1 bicycle, 4 cars, 64.6ms
Speed: 1.1ms preprocess, 64.6ms inference, 0.0ms postprocess per image 