# **Pipeline for object detection and tracking in 3D:**

### **Required packages:**

In [2]:
import numpy as np
import cv2
import os
from matplotlib import pyplot as plt
from pathlib import Path
from ultralytics import YOLO
from sort import Sort

### **1. Definition of Objects and Functions**

##### 2D Object detection : YOLOV5

In [57]:

# Initialize the YOLO model
model = YOLO("yolov8l.pt")   # Moved to parameters 

# Initialize SORT tracker
mot_tracker = Sort(max_age=1, min_hits=3, iou_threshold=0.3) # Not important for yolo
def resize_image(input_image_path, width=217, height=225):
    """
    Resizes an input image to the specified dimensions and saves the result.

    Args:
        input_image_path (str): Path to the input image.
        output_image_path (str): Path to save the resized image.
        width (int): Desired width of the resized image. Default is 217.
        height (int): Desired height of the resized image. Default is 225.

    Returns:
        None
    """
    # Load the input image
    image = cv2.imread(input_image_path)

    if image is None:
        raise ValueError("Input image not found or unable to read.")

    # Resize the image to the specified dimensions
    resized_image = cv2.resize(image, (width, height))

    return resized_image

overlap_image = resize_image("Overlap_image.png",width=217, height=225)
# Define input and obj folders
output_folder = "outputs"     # Folder to save obj frames
set_img = 2
if set_img == 1: # third sequence
    data_folder_1 = "data/view1"  # Folder containing input frames
    data_folder_2 = "data/view2"  # Folder containing input frames
if set_img == 2: # second sequence
    data_folder_1 = "data/view3"  # Folder containing input frames
    data_folder_2 = "data/view4"  # Folder containing input frames
else: # third sequence
    data_folder_1 = "data/view5"  # Folder containing input frames
    data_folder_2 = "data/view6"  # Folder containing input frames
os.makedirs(output_folder, exist_ok=True)  # Create the obj folder if it doesn't exist

def calculate_occlusion_area(box, overlay_rect):
    x1 = max(box[0], overlay_rect[0])
    y1 = max(box[1], overlay_rect[1])
    x2 = min(box[2], overlay_rect[2])
    y2 = min(box[3], overlay_rect[3])
    
    intersection_width = max(0, x2 - x1)
    intersection_height = max(0, y2 - y1)
    return intersection_width * intersection_height
def initialize_kalman(x_center,y_center):
    kalman = {
        "x": np.array([x_center,
              0,
              0,
              y_center,
              0,
              0]),  # State vector
        "P": 100000 * np.eye(6),  # Initial uncertainty, a random high number
        "F": np.array([[1, 1, 0.5, 0, 0, 0],
                       [0, 1, 1, 0, 0, 0],
                       [0, 0, 1, 0, 0, 0],
                       [0, 0, 0, 1, 1, 0.5],
                       [0, 0, 0, 0, 1, 1],
                       [0, 0, 0, 0, 0, 1]]),  # Transition matrix
        "u": np.zeros(6),  # External motion
        "H": np.array([[1, 0, 0, 0, 0, 0],  # Observe x position
                       [0, 0, 0, 1, 0, 0]]),  # Observe y position
        "R": np.eye(2),  # Measurement uncertainty
        "I": np.eye(6)  # Identity matrix
    }
    return kalman
def update(kalman, Z):
    x, P, H, R, I = kalman["x"], kalman["P"], kalman["H"], kalman["R"], kalman["I"]
    
    # Measurement residual y
    y = Z - np.dot(H, x)
    
    # Residual covariance S
    S = np.dot(H, np.dot(P, H.T)) + R
    
    # Kalman gain K
    K = np.dot(P, np.dot(H.T, np.linalg.inv(S)))
    print(f"check update: {np.dot(K, y)}")
    
    # Update state estimate x
    x = x + np.dot(K, y)
    
    # Update uncertainty P
    P = np.dot(I - np.dot(K, H), P)
    
    kalman["x"], kalman["P"] = x, P
    return kalman

def predict(kalman):
    x, P, F, u = kalman["x"], kalman["P"], kalman["F"], kalman["u"]
    
    Q = np.eye(6) * 0.1  # small noise

    # Predict state x
    x = np.dot(F, x) + u
    print(f"New x after prediction = {x}")
    
    # Predict uncertainty P
    P = np.dot(F, np.dot(P, F.T)) + Q
    
    kalman["x"], kalman["P"] = x, P
    return kalman

def update_box(output):
        obj = output
        #print(obj)
        kalman = initialize_kalman()
        x_center = obj[5]
        y_center = obj[6]
        Z = np.array([x_center, y_center])
        kalman = update(kalman, Z)
        kalman = predict(kalman)
        new_x = kalman["x"]
        new_center = (round(new_x[0]), round(new_x[3]))
        x_center = new_center[0]
        y_center = new_center[1]
        width = x2 - x1            
        height = y2 - y1
        new_x1 = x_center-width/2
        new_x2 = x_center+width/2
        new_y1 = y_center-height/2
        new_y2 = y_center+height/2
        obj = np.array(obj)
        obj[1:7] = [new_x1,new_y1,new_x2,new_y2,x_center,y_center]
        output = obj
        print(output)
        return output


# Initialize a list to store occluded predictions
# Initialize a list to store occluded predictions
tracked_predictions = {}
outputs = []

# Main loop for processing frames
for frame_path in sorted(Path(data_folder_1).glob("*.png")):
    img = cv2.imread(str(frame_path))
    ids = []
    if img is None:
        print(f"Error: Could not load image {frame_path.name}")
        continue

    # Perform object detection
    results = model(img, conf=0.5, classes=[0, 1, 2, 7])
    detections = results[0].boxes

    dets = []
    labels = []

    # Process each detection
    for box in detections:
        x1, y1, x2, y2 = box.xyxy[0].tolist()
        confidence = float(box.conf[0])
        cls = int(box.cls[0])
        label = model.names[cls]
        labels.append(label)

        if confidence < 0.5 or label not in ["person", "car", "truck"]:
            continue

        dets.append([x1, y1, x2, y2, confidence])

    dets = np.array(dets)

    # Update SORT tracker
    trackers = mot_tracker.update(dets)

    # Convert both the frame and template to grayscale for template matching
    gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    gray_template = cv2.cvtColor(overlap_image, cv2.COLOR_BGR2GRAY)

    result = cv2.matchTemplate(img, overlap_image, cv2.TM_CCOEFF_NORMED)
    min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
    threshold = 0.4
    overlay_rect = None
    if max_val >= threshold:
        top_left = (max_loc[0], max_loc[1])
        h, w = gray_template.shape[:2]
        bottom_right = (top_left[0] + w + 20, top_left[1] + h)
        overlay_rect = (top_left[0], top_left[1], bottom_right[0], bottom_right[1])
        cv2.rectangle(img, top_left, bottom_right, (0, 255, 0), 2)

    # Process each tracked object
    for i, d in enumerate(trackers): 

        ################ WE ARE DOING THIS FOR EACH TRACKED OBJECT

        x1, y1, x2, y2, track_id = map(int, d)
        ids.append(track_id)
        x_center = (x1 + x2) / 2
        y_center = (y1 + y2) / 2
        label = labels[i]

        cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)  # Draw bounding box
        cv2.putText(img, f": {label}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) 

        # Calculate occlusion area
        box_area = (x2 - x1) * (y2 - y1)
        occlusion_area = calculate_occlusion_area((x1, y1, x2, y2), overlay_rect)
        occlusion_rate = (occlusion_area / box_area) * 100 
        if occlusion_rate == 100:
            outputs.append((frame_path.name,label,track_id, x1, y1, x2, y2, x_center, y_center, 3)) # not visible at all
        if occlusion_rate<100 and occlusion_rate >= 50:
            outputs.append((frame_path.name,label,track_id, x1, y1, x2, y2, x_center, y_center, 2)) # partially visible
        if occlusion_rate<50 and occlusion_rate > 0:
            outputs.append((frame_path.name,label,track_id, x1, y1, x2, y2, x_center, y_center, 1)) # mostly visible
        else:
            outputs.append((frame_path.name,label,track_id, x1, y1, x2, y2, x_center, y_center, 0)) # totally visible

        if occlusion_rate > 0:
            print(f"Box {track_id} is occluded with rate: {occlusion_rate}. Starting prediction.")

            # Check if this box is already being tracked for prediction
            if track_id not in tracked_predictions:
                kalman = initialize_kalman(x_center, y_center)
                tracked_predictions[track_id] = {"kalman": kalman, "width": x2 - x1, "height": y2 - y1, "occlusion_rate":occlusion_rate}
            if track_id in ids:
                Z = np.array([x_center, y_center])
                tracked_predictions[track_id]['kalman'] = update(tracked_predictions[track_id]['kalman'], Z) 
                # Update predictions for tracked boxes

###### For each frame we scroll the one that are occluded

    for track_id in tracked_predictions:
        prediction = tracked_predictions[track_id]
        kalman = prediction['kalman']
        print(f"Frame: {frame_path.name}")
        print(f"Prediction for the ID: {track_id}")
        old_center = [kalman['x'][0], kalman['x'][3]]
        print(f"old center: {old_center}")
        # Predict the next state

        kalman = predict(kalman)
        x, y = kalman["x"][0], kalman["x"][3]
        prediction["kalman"] = kalman
        tracked_predictions[track_id]["kalman"] = kalman

        # Update bounding box using the predicted position
        width = prediction["width"]
        height = prediction["height"]
        new_x1 = int(x - width / 2)
        new_x2 = int(x + width / 2)
        new_y1 = int(y - height / 2)
        new_y2 = int(y + height / 2)
        print(prediction['occlusion_rate'])
        print(f"Predicted Center: ({x}, {y})")

        # Draw the predicted bounding box
        cv2.rectangle(img, (new_x1, new_y1), (new_x2, new_y2), (255, 0, 0), 2)
        cv2.putText(img, f"Pred: {track_id}", (new_x1, new_y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

        box_area = width * height
        # Recalculate occlusion area
        occlusion_area = calculate_occlusion_area((new_x1, new_y1, new_x2, new_y2), overlay_rect)
        occlusion_rate = (occlusion_area / box_area) * 100 
        print(f"New Occlusion_rate for {track_id} in {frame_path.name} is {occlusion_rate}")
        tracked_predictions[track_id]['occlusion_rate'] = occlusion_rate

    for track_id, prediction in tracked_predictions.copy().items():
        # Stop tracking if occlusion area becomes 0
        if prediction['occlusion_rate'] == 0:
            print(f"Box {track_id} is no longer occluded. Stopping prediction.")
            del tracked_predictions[track_id]

    # Save annotated frame
    output_path = os.path.join(output_folder, frame_path.name)
    cv2.imwrite(output_path, img)



0: 224x640 6 persons, 1 bicycle, 2 cars, 198.1ms
Speed: 1.4ms preprocess, 198.1ms inference, 1.3ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 6 persons, 2 bicycles, 2 cars, 177.1ms
Speed: 0.8ms preprocess, 177.1ms inference, 1.0ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 4 persons, 2 bicycles, 3 cars, 274.5ms
Speed: 1.0ms preprocess, 274.5ms inference, 1.0ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 5 persons, 2 bicycles, 2 cars, 264.6ms
Speed: 0.7ms preprocess, 264.6ms inference, 0.5ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 5 persons, 2 bicycles, 3 cars, 260.2ms
Speed: 1.1ms preprocess, 260.2ms inference, 0.5ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 5 persons, 2 bicycles, 2 cars, 223.3ms
Speed: 0.8ms preprocess, 223.3ms inference, 0.6ms postprocess per image at shape (1, 3, 224, 640)

0: 224x640 4 persons, 2 bicycles, 2 cars, 217.6ms
Speed: 0.9ms preprocess, 217.6ms inference, 0.6ms postpr

In [63]:
x = [         59,          59,      29.5,       194.5,       194.5,       97.25]
F = np.array([[1, 1, 0.5, 0, 0, 0],
                       [0, 1, 1, 0, 0, 0],
                       [0, 0, 1, 0, 0, 0],
                       [0, 0, 0, 1, 1, 0.5],
                       [0, 0, 0, 0, 1, 1],
                       [0, 0, 0, 0, 0, 1]])
print(x)

print(np.dot(F,x))
print(np.dot(x,F))

[59, 59, 29.5, 194.5, 194.5, 97.25]
[     132.75        88.5        29.5      437.62      291.75       97.25]
[         59         118         118       194.5         389         389]


In [79]:
print(outputs[i])
#outputs[i] = update_box(outputs[i])

[('p',), ('1', 0.0, 0.0), ('2', 7.0, 5.0), ('3', 0.0, 0.0), ('8', 2.0, 5.0), ('2', 0.0), ('5', 5.0), ('0',)]


In [None]:
# DEFINE PARAMETERS:
model = YOLO("yolov8l.pt")
source_1="data/view1"
source_2="data/view2"
device="cpu"
save_path = "outputs"

image_size=(512, 512)
conf_thres=0.5
max_det=20
line_thickness=2
iou_thres=0.45

save_txt=True
save_csv=False
nosave=False
hide_labels=False 
hide_conf=True

# DEFINE PARAMETERS FOR SORT:


In [None]:
class ObjectDetector:
    def __init__(self, device, weights, source_1, source_2, image_size, save_path, sort_max_age=10, sort_min_hits=3, sort_iou_thresh=0.3):
        self.device = select_device(device)
        #self.model = DetectMultiBackend(weights, self.device)
        #self.model.names = dict(list(self.model.names.items())[:2] + list(self.model.names.items())[4:])
        #self.stride, self.names, self.pt = self.model.stride, self.model.names, self.model.pt
        self.imgsz = image_size
        #self.model.warmup(imgsz=(1, 3, *self.imgsz))
        self.dt = Profile(device=self.device)
        self.source_1 = Path(source_1)
        self.source_2 = Path(source_2)
        self.files_1 = [f for f in self.source_1.glob('*') if f.suffix.lower() in ['.jpg', '.jpeg', '.png']]
        self.files_2 = [f for f in self.source_2.glob('*') if f.suffix.lower() in ['.jpg', '.jpeg', '.png']]
        
        self.save_path = save_path
        if not os.path.exists(self.save_path):
            os.makedirs(self.save_path)
        
        # Initialize SORT Tracker
        self.sort_tracker = Sort(max_age=sort_max_age, min_hits=sort_min_hits, iou_threshold=sort_iou_thresh)
    
    def __iter__(self):
        for file_1, file_2 in zip_longest(self.files_1, self.files_2, fillvalue=None):
            img_1 = cv2.imread(str(file_1))
            img_2 = cv2.imread(str(file_2))
            yield (img_1, img_2)
    
    def draw_boxes(self, img, bbox, identities=None, categories=None, names=None, offset=(0, 0)):
        for i, box in enumerate(bbox):
            x1, y1, x2, y2 = [int(i) for i in box]
            x1 += offset[0]
            x2 += offset[0]
            y1 += offset[1]
            y2 += offset[1]
            id = int(identities[i]) if identities is not None else 0
            data = (int((box[0] + box[2]) / 2), int((box[1] + box[3]) / 2))
            label = f"{id} {names[int(categories[i])]}" if categories is not None else str(id)

            color = self.compute_color_for_labels(id)
            (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
            cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
            cv2.rectangle(img, (x1, y1 - 20), (x1 + w, y1), (255, 191, 0), -1)
            cv2.putText(img, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, [255, 255, 255], 1)
            cv2.circle(img, data, 3, color, -1)

        return img
    
    @smart_inference_mode()
    def detect_object_2D(self, im, frame_num, save_images):
        with self.dt:
            image_original = im
            im = letterbox(im, self.imgsz, stride=self.stride, auto=True)[0]
            im = im.transpose((2, 0, 1))[::-1]
            im = np.ascontiguousarray(im)
            im = torch.from_numpy(im).to(self.device)
            im = im.float().unsqueeze(0)
            im /= 255
            
            pred = self.model(im)
            pred = non_max_suppression(pred)
            
            for det in pred:
                gn = torch.tensor(image_original.shape)[[1, 0, 1, 0]] #Normalization
                det[:, :4] = scale_boxes(im.shape[2:], det[:, :4], image_original.shape).round() # Rescale to original size
                
                dets_to_sort = np.empty((0, 6))
                for *xyxy, conf, cls in reversed(det):
                    if cls not in [0, 1, 4]:
                        continue
                    
                    c = int(cls)
                    coords = ((xyxy2xywh(torch.tensor(xyxy).view(1, 4)) / gn).view(-1).tolist())  # Normalized xywh
                    #{conf:.2f}", *coords
                    # Agregar detección a dets_to_sort para clases 0 y 1
                    x1, y1, x2, y2 = [int(coord) for coord in xyxy]
                    dets_to_sort = np.vstack((dets_to_sort, np.array([x1, y1, x2, y2, conf, c])))
                
                # Update SORT tracker
                tracked_dets = self.sort_tracker.update(dets_to_sort)

                # Draw tracked bounding boxes
                if len(tracked_dets) > 0:
                    bbox_xyxy = tracked_dets[:, :4]
                    identities = tracked_dets[:, 4]  # Object IDs
                    categories = [0] * len(identities)  # Placeholder for categories if needed
                    self.draw_boxes(image_original, bbox_xyxy, identities, categories, self.names)
            
            # Save or display the resulting frame
            if save_images:
                save_path = os.path.join(self.save_path, f"{frame_num}.png")
                cv2.imwrite(save_path, image_original)

    def compute_color_for_labels(self, label, palette=(2 ** 11 - 1, 2 ** 15 - 1, 2 ** 20 - 1)):
        color = [int((p * (label ** 2 - label + 1)) % 255) for p in palette]
        return tuple(color)

In [None]:
detector = ObjectDetector(device, weights, source_1, source_2, image_size, save_path)

fatal: cannot change to '/Users/dani/Desktop/MS_AutonomousSystems/Perception_for_Autonomous_systems/Final': No such file or directory
YOLOv5 🚀 2024-11-21 Python-3.10.14 torch-2.2.2 CPU



##### Z ESTIMATION:

In [None]:
def calculate_distance_to_object(left_center, right_center, im_left, im_right, focal_length = 707.0493, baseline = 0.06):

    # Calculate disparity (horizontal pixel difference between the left and right image)
    disparity = abs(left_center[0] - right_center[0])
    
    if disparity == 0:
        return float('inf')
    
    Z = (focal_length * baseline) / disparity
    
    return Z

In [None]:
for i, images in enumerate(detector):
    detector.detect_object_2D(images[0], frame_num=i, save_images=True)
    #detector.detect_object_2D(images[1], save_path=output, frame_num=i)

KeyboardInterrupt: 

<h2> Video from frames -><h2>

In [None]:
import cv2
import os
from pathlib import Path

# Define paths
data_folder = "./outputs"  # Folder containing the images
output_video = "output_video.mp4"  # Output video file

# Video settings
frame_rate = 30  # Frames per second
frame_size = None

# Collect all image paths, sorted by name
image_paths = sorted(Path(data_folder).glob("*.png"))  # Adjust the pattern to match your image format

# Check if there are images to process
if not image_paths:
    raise ValueError(f"No images found in {data_folder}")

# Read the first image to get the frame size
first_image = cv2.imread(str(image_paths[0]))
if first_image is None:
    raise ValueError("Could not read the first image. Check the image path and format.")
frame_size = (first_image.shape[1], first_image.shape[0])  # (width, height)

# Define the video writer with the correct codec
fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for mp4
out = cv2.VideoWriter(output_video, fourcc, frame_rate, frame_size)

# Write each image to the video
for image_path in image_paths:
    frame = cv2.imread(str(image_path))
    if frame is None:
        print(f"Warning: Could not read image {image_path}, skipping.")
        continue
    resized_frame = cv2.resize(frame, frame_size)  # Ensure consistent size
    out.write(resized_frame)

# Release the video writer
out.release()
print(f"Video saved as {output_video}")

Video saved as output_video.mp4


: 