# **Pipeline for object detection and tracking in 3D:**

### **Required packages:**

In [19]:
import numpy as np
import cv2
import os
from matplotlib import pyplot as plt
from pathlib import Path
from ultralytics import YOLO
from sort import Sort

### **1. Definition of Objects and Functions**

##### 2D Object detection : YOLOV5

In [15]:


# Initialize the YOLO model
model = YOLO("yolov8l.pt")   # Moved to parameters 

# Initialize SORT tracker
mot_tracker = Sort(max_age=1, min_hits=3, iou_threshold=0.3) # Not important for yolo

# Define input and output folders
data_folder_1 = "data/view1"  # Folder containing input frames
data_folder_2 = "data/view2"  # Folder containing input frames
output_folder = "outputs"     # Folder to save output frames
os.makedirs(output_folder, exist_ok=True)  # Create the output folder if it doesn't exist

# Process images in the input folder
for frame_path in sorted(Path(data_folder_1).glob("*.png")):  # Adjust pattern for your image format
    print(f"Processing: {frame_path.name}")
    
    # Load the image
    img = cv2.imread(str(frame_path))
    if img is None:
        print(f"Error: Could not load image {frame_path.name}")
        continue
    
    # Perform object detection
    results = model(img,conf=0.5, classes=[0,1,2,7])
    detections = results[0].boxes  # Access detection results
    
    print(f"Detections for {frame_path.name}:")

    dets = []
    # Process each detection
    for box in detections:
        # Extract bounding box coordinates, confidence, and class
        x1, y1, x2, y2 = box.xyxy[0].tolist()  # Bounding box: [x1, y1, x2, y2]
        confidence = float(box.conf[0])        # Confidence score
        cls = int(box.cls[0])                  # Class index
        label = model.names[cls]               # Class name
        
        # Skip detections with low confidence
        if confidence < 0.5:
            print(f"Skipped detection with low confidence: {confidence:.2f}")
            continue

        # Filter out unwanted classes
        if label not in ["person", "car", "truck"]:
            print(f"Skipped detection for unwanted class: {label}")
            continue

        # Print detection information
        #print(f"Class: {label}, Confidence: {confidence:.2f}, "
        #      f"BBox: ({x1:.2f}, {y1:.2f}, {x2:.2f}, {y2:.2f})")
        
                # Add detection to SORT input
        dets.append([x1, y1, x2, y2, confidence])
    

    dets = np.array(dets)  # Convert list to numpy array for SORT

    # Update SORT tracker with current frame detections
    trackers = mot_tracker.update(dets)

    # Annotate the frame with tracking results
    for d in trackers:
        x1, y1, x2, y2, track_id = map(int, d)  # Tracker output [x1, y1, x2, y2, ID]
        cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)  # Draw bounding box
        cv2.putText(img, f"ID: {track_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)  # Add track ID

    # Save the annotated frame
    output_path = os.path.join(output_folder, frame_path.name)
    cv2.imwrite(output_path, img)
    print(f"Saved: {output_path}\n")



    # Annotate the frame with detections
    #annotated_frame = results[0].plot()
    
    # Save the processed frame
    #output_path = os.path.join(output_folder, frame_path.name)
    #cv2.imwrite(output_path, annotated_frame)
    #print(f"Saved: {output_path}\n")



#results = model("./data/view1/000000.png", show=True)
#cv2.waitKey(0)


Processing: 000000.png

0: 224x640 4 persons, 1 bicycle, 1 truck, 666.4ms
Speed: 2.9ms preprocess, 666.4ms inference, 0.7ms postprocess per image at shape (1, 3, 224, 640)
Detections for 000000.png:
Skipped detection for unwanted class: bicycle
Saved: outputs/000000.png

Processing: 000001.png

0: 224x640 4 persons, 1 bicycle, 1 truck, 595.0ms
Speed: 1.4ms preprocess, 595.0ms inference, 0.6ms postprocess per image at shape (1, 3, 224, 640)
Detections for 000001.png:
Skipped detection for unwanted class: bicycle
Saved: outputs/000001.png

Processing: 000002.png

0: 224x640 4 persons, 2 bicycles, 1 truck, 606.2ms
Speed: 1.6ms preprocess, 606.2ms inference, 0.7ms postprocess per image at shape (1, 3, 224, 640)
Detections for 000002.png:
Skipped detection for unwanted class: bicycle
Skipped detection for unwanted class: bicycle
Saved: outputs/000002.png

Processing: 000003.png

0: 224x640 2 persons, 1 bicycle, 1 truck, 593.7ms
Speed: 1.4ms preprocess, 593.7ms inference, 0.8ms postprocess p

usage: ipykernel_launcher.py [-h] [--display] [--seq_path SEQ_PATH]
                             [--phase PHASE] [--max_age MAX_AGE]
                             [--min_hits MIN_HITS]
                             [--iou_threshold IOU_THRESHOLD]
ipykernel_launcher.py: error: unrecognized arguments: --f=/Users/dani/Library/Jupyter/runtime/kernel-v3b918768b0b646ac0cdfc49f9fe046de38f90c00f.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [7]:
# DEFINE PARAMETERS:
model = YOLO("yolov8l.pt")
source_1="data/view1"
source_2="data/view2"
device="cpu"
save_path = "outputs"

image_size=(512, 512)
conf_thres=0.5
max_det=20
line_thickness=2
iou_thres=0.45

save_txt=True
save_csv=False
nosave=False
hide_labels=False 
hide_conf=True

# DEFINE PARAMETERS FOR SORT:


In [16]:
class ObjectDetector:
    def __init__(self, device, weights, source_1, source_2, image_size, save_path, sort_max_age=10, sort_min_hits=3, sort_iou_thresh=0.3):
        self.device = select_device(device)
        #self.model = DetectMultiBackend(weights, self.device)
        #self.model.names = dict(list(self.model.names.items())[:2] + list(self.model.names.items())[4:])
        #self.stride, self.names, self.pt = self.model.stride, self.model.names, self.model.pt
        self.imgsz = image_size
        #self.model.warmup(imgsz=(1, 3, *self.imgsz))
        self.dt = Profile(device=self.device)
        self.source_1 = Path(source_1)
        self.source_2 = Path(source_2)
        self.files_1 = [f for f in self.source_1.glob('*') if f.suffix.lower() in ['.jpg', '.jpeg', '.png']]
        self.files_2 = [f for f in self.source_2.glob('*') if f.suffix.lower() in ['.jpg', '.jpeg', '.png']]
        
        self.save_path = save_path
        if not os.path.exists(self.save_path):
            os.makedirs(self.save_path)
        
        # Initialize SORT Tracker
        self.sort_tracker = Sort(max_age=sort_max_age, min_hits=sort_min_hits, iou_threshold=sort_iou_thresh)
    
    def __iter__(self):
        for file_1, file_2 in zip_longest(self.files_1, self.files_2, fillvalue=None):
            img_1 = cv2.imread(str(file_1))
            img_2 = cv2.imread(str(file_2))
            yield (img_1, img_2)
    
    def draw_boxes(self, img, bbox, identities=None, categories=None, names=None, offset=(0, 0)):
        for i, box in enumerate(bbox):
            x1, y1, x2, y2 = [int(i) for i in box]
            x1 += offset[0]
            x2 += offset[0]
            y1 += offset[1]
            y2 += offset[1]
            id = int(identities[i]) if identities is not None else 0
            data = (int((box[0] + box[2]) / 2), int((box[1] + box[3]) / 2))
            label = f"{id} {names[int(categories[i])]}" if categories is not None else str(id)

            color = self.compute_color_for_labels(id)
            (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
            cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
            cv2.rectangle(img, (x1, y1 - 20), (x1 + w, y1), (255, 191, 0), -1)
            cv2.putText(img, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, [255, 255, 255], 1)
            cv2.circle(img, data, 3, color, -1)

        return img
    
    @smart_inference_mode()
    def detect_object_2D(self, im, frame_num, save_images):
        with self.dt:
            image_original = im
            im = letterbox(im, self.imgsz, stride=self.stride, auto=True)[0]
            im = im.transpose((2, 0, 1))[::-1]
            im = np.ascontiguousarray(im)
            im = torch.from_numpy(im).to(self.device)
            im = im.float().unsqueeze(0)
            im /= 255
            
            pred = self.model(im)
            pred = non_max_suppression(pred)
            
            for det in pred:
                gn = torch.tensor(image_original.shape)[[1, 0, 1, 0]] #Normalization
                det[:, :4] = scale_boxes(im.shape[2:], det[:, :4], image_original.shape).round() # Rescale to original size
                
                dets_to_sort = np.empty((0, 6))
                for *xyxy, conf, cls in reversed(det):
                    if cls not in [0, 1, 4]:
                        continue
                    
                    c = int(cls)
                    coords = ((xyxy2xywh(torch.tensor(xyxy).view(1, 4)) / gn).view(-1).tolist())  # Normalized xywh
                    #{conf:.2f}", *coords
                    # Agregar detección a dets_to_sort para clases 0 y 1
                    x1, y1, x2, y2 = [int(coord) for coord in xyxy]
                    dets_to_sort = np.vstack((dets_to_sort, np.array([x1, y1, x2, y2, conf, c])))
                
                # Update SORT tracker
                tracked_dets = self.sort_tracker.update(dets_to_sort)

                # Draw tracked bounding boxes
                if len(tracked_dets) > 0:
                    bbox_xyxy = tracked_dets[:, :4]
                    identities = tracked_dets[:, 4]  # Object IDs
                    categories = [0] * len(identities)  # Placeholder for categories if needed
                    self.draw_boxes(image_original, bbox_xyxy, identities, categories, self.names)
            
            # Save or display the resulting frame
            if save_images:
                save_path = os.path.join(self.save_path, f"{frame_num}.png")
                cv2.imwrite(save_path, image_original)

    def compute_color_for_labels(self, label, palette=(2 ** 11 - 1, 2 ** 15 - 1, 2 ** 20 - 1)):
        color = [int((p * (label ** 2 - label + 1)) % 255) for p in palette]
        return tuple(color)

In [15]:
detector = ObjectDetector(device, weights, source_1, source_2, image_size, save_path)

fatal: cannot change to '/Users/dani/Desktop/MS_AutonomousSystems/Perception_for_Autonomous_systems/Final': No such file or directory
YOLOv5 🚀 2024-11-21 Python-3.10.14 torch-2.2.2 CPU



##### Z ESTIMATION:

In [18]:
def calculate_distance_to_object(left_center, right_center, im_left, im_right, focal_length = 707.0493, baseline = 0.06):

    # Calculate disparity (horizontal pixel difference between the left and right image)
    disparity = abs(left_center[0] - right_center[0])
    
    if disparity == 0:
        return float('inf')
    
    Z = (focal_length * baseline) / disparity
    
    return Z

In [24]:
for i, images in enumerate(detector):
    detector.detect_object_2D(images[0], frame_num=i, save_images=True)
    #detector.detect_object_2D(images[1], save_path=output, frame_num=i)

KeyboardInterrupt: 

<h2> Video from frames -><h2>

In [22]:
import cv2
import os
from pathlib import Path

# Define paths
data_folder = "./outputs"  # Folder containing the images
output_video = "output_video.mp4"  # Output video file

# Video settings
frame_rate = 30  # Frames per second
frame_size = None

# Collect all image paths, sorted by name
image_paths = sorted(Path(data_folder).glob("*.png"))  # Adjust the pattern to match your image format

# Check if there are images to process
if not image_paths:
    raise ValueError(f"No images found in {data_folder}")

# Read the first image to get the frame size
first_image = cv2.imread(str(image_paths[0]))
if first_image is None:
    raise ValueError("Could not read the first image. Check the image path and format.")
frame_size = (first_image.shape[1], first_image.shape[0])  # (width, height)

# Define the video writer with the correct codec
fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for mp4
out = cv2.VideoWriter(output_video, fourcc, frame_rate, frame_size)

# Write each image to the video
for image_path in image_paths:
    frame = cv2.imread(str(image_path))
    if frame is None:
        print(f"Warning: Could not read image {image_path}, skipping.")
        continue
    resized_frame = cv2.resize(frame, frame_size)  # Ensure consistent size
    out.write(resized_frame)

# Release the video writer
out.release()
print(f"Video saved as {output_video}")

Video saved as output_video.mp4


: 