In [91]:
import cv2
import torch
import numpy as np
import time
import torch
from ultralytics import YOLO

In [None]:
# # Clona el repositorio de YOLOv5
# !git clone https://github.com/ultralytics/yolov5.git
# %cd yolov5

# # Instala las dependencias
# !pip install -r requirements.txt


## 1. Load YOLO Object Detector

In [92]:
# Cargar el modelo YOLOv5
# model1 = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
# model2 = torch.hub.load('ultralytics/yolov5', 'yolov5n', pretrained=True)
# model3 = torch.load('C:/Users/mirvi/Desktop/mii/UAB/4.1/PSIV2/detect mateicules/repte2_psiv2/object_tracking_yolo/yolov8n.pt')
model3 = YOLO('C:/Users/mirvi/Desktop/mii/UAB/4.1/PSIV2/detect mateicules/repte2_psiv2/object_tracking_yolo/yolov8n.pt')

print(type(model3))

# Definir clases de interés
CLASSES_OF_INTEREST = ['car']

<class 'ultralytics.models.yolo.model.YOLO'>


## 2. Define Tracker Class (per centroides)

In [112]:
import numpy as np
from scipy.spatial.distance import cdist

class Tracker:
    MAX_DISAPPEAR_LIMIT = 14
    def __init__(self,res_p=1):
        self.next_unique_id = 0
        self.trackers = {}
        self.disappear_trackers = {}
        self.tracked_bboxes = {}
        self.MAX_DISTANCE_THRESHOLD = 200*res_p
    
    
    def init_object(self,centroid,boxes):
        global next_unique_id
        self.trackers[self.next_unique_id] = centroid
        self.tracked_bboxes[self.next_unique_id] = boxes
        self.disappear_trackers[self.next_unique_id] = 0
        self.next_unique_id+=1

    def del_object(self,track_id):
        del self.trackers[track_id]
        del self.tracked_bboxes[track_id]
        del self.disappear_trackers[track_id]

    def update_object(self,bboxes):
        
        if(len(bboxes)==0):
            
            for oid in list(self.disappear_trackers.keys()):
                self.disappear_trackers[oid]+=1
                
                if self.disappear_trackers[oid] > Tracker.MAX_DISAPPEAR_LIMIT:
                    self.del_object(oid)
                
            return self.tracked_bboxes
        
        else:   
            input_centroids = np.zeros((len(bboxes),2)) 
            for i in range(len(bboxes)):
                x,y,w,h = bboxes[i][0],bboxes[i][1],bboxes[i][2],bboxes[i][3]
                cx,cy = x + w/2 , y + h/2
                input_centroids[i] = (cx,cy)

            
            if(len(self.trackers)==0):
                for i in range(len(input_centroids)):
                    self.init_object(input_centroids[i],bboxes[i])
            
            else:
                
                tracker_centroids = list(self.trackers.values())

                distance_matrix = cdist(np.array(tracker_centroids) , input_centroids)

                rows = distance_matrix.min(axis=1).argsort()
                cols = distance_matrix.argmin(axis=1)[rows]

                usedRows = set()
                usedCols = set()
                
                tracker_ids = list(self.trackers.keys()) 
                for row,col in zip(rows,cols):
                    if row in usedRows or col in usedCols:
                        continue

                    # # Check if distance is within the threshold of 200 pixels
                    # if distance_matrix[row, col] > self.MAX_DISTANCE_THRESHOLD:
                    #     continue  # Skip association if distance is too large

                    track_id = tracker_ids[row]
                    
                    self.trackers[track_id] = input_centroids[col]
                    self.tracked_bboxes[track_id] = bboxes[col]

                    self.disappear_trackers[track_id] = 0
                    usedRows.add(row)                                
                    usedCols.add(col)

                unusedRows = set(range(0,distance_matrix.shape[0])).difference(usedRows)
                unusedCols = set(range(0,distance_matrix.shape[1])).difference(usedCols)

                if(distance_matrix.shape[0]>=distance_matrix.shape[1]):
                    
                    for r in unusedRows: 
                        track_id = tracker_ids[r]
                        self.disappear_trackers[track_id]+=1
                        if(self.disappear_trackers[track_id] > Tracker.MAX_DISAPPEAR_LIMIT):
                            self.del_object(track_id)
                else:
                    for c in unusedCols:                    
                        self.init_object(input_centroids[c],bboxes[c])

        return self.tracked_bboxes

## 3. Load Video

In [158]:
# resize percent
resize_percent = 0.7

cap = cv2.VideoCapture("/Users/mirvi/Desktop/mii/UAB/4.1/PSIV2/detect mateicules/repte2_psiv2/data_r2/short_uab_flow.mp4")

## 4. Object Detection and Tracking over video

In [159]:
# PROCESS VIDEO IN GRAYSCALE
gray_video_og = []

i=0
while True:
    ret, fr = cap.read()
    if not ret:
        break
    # resize frame
    fr = cv2.resize(fr, (0, 0), fx=resize_percent, fy=resize_percent)

    # # to gray
    # fr2 = cv2.cvtColor(fr1, cv2.COLOR_BGR2GRAY)

    # # apply clahe contrast
    # clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
    # fr = clahe.apply(fr2)

    # agafar nomes frames que volem
    if i>0:
        gray_video_og.append(fr)
    # if i == 3200:
    #     break
    i+=1

print(i)

7288


In [160]:
import time
import cv2

time_start_all = time.time()

# Define selected model
model = model3

# Init tracker
tracker2 = Tracker(res_p=resize_percent)

# Define ROI dimensions
roix1 = int(150 * resize_percent)
roix2 = int(480 * resize_percent)
roiy1 = int(340 * resize_percent)
roiy2 = int(800 * resize_percent)

# Define reference line in the middle of the ROI
line_y_position = ((roiy2 - roiy1) // 2) - int(40 * resize_percent)

# Counters for cars moving up and down
count_up = 0
count_down = 0

# Dictionary to store previous positions of each car
previous_positions = {}

# Dictionary to track crossed status
crossed_up = {id: False for id in range(200)}
crossed_down = {id: False for id in range(200)}
parked = {id: False for id in range(200)}

# Distance threshold for movement in pixels
movement_threshold = 50*resize_percent

# Initialize last positions dictionary
last_positions = {}

frame_id = 0

# init box_id var
box_id = 0

# right roi margin
right_margin = 35*resize_percent


while frame_id < len(gray_video_og) - 1:
    # Start time measurement
    time_start = time.time()

    # Process the frame
    fr = gray_video_og[frame_id]
    frame = fr.copy()
    height, width, _ = frame.shape

    # Extract ROI
    roi = frame[roiy1:roiy2, roix1:roix2]

    # 1. Object Detection
    ts_od = time.time()
    results = model(roi)

    # Filter detections based on size and movement
    detections = []
    for idx,det in enumerate(results[0].boxes):
        x1, y1, x2, y2, conf, cls = det.data[0]
        if cls == 2:
            w = int(x2 - x1)
            h = int(y2 - y1)
            x = int(x1)
            y = int(y1)

            # Only keep detections with a certain size
            if w > 55 * resize_percent and h > 55 * resize_percent:
                detections.append([x, y, w, h])

    te_od = time.time()

    # Draw text and ROI with reference line
    cv2.putText(frame, f'ObjDet time: {(te_od - ts_od):.2f}', 
                (int(10 * resize_percent), int(30 * resize_percent)), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
    print('count up:', count_up)
    cv2.putText(frame, f"UP: {count_up}", (int(10 * resize_percent), int(700 * resize_percent)), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
    cv2.putText(frame, f"DOWN: {count_down}", (int(10 * resize_percent), int(750 * resize_percent)), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
    cv2.rectangle(frame, (roix1, roiy1), (roix2, roiy2), (0, 255, 125), 2)

    # Draw line with increments
    line_inc = [-100, 0, 100]
    for inc in line_inc:
        cv2.line(roi, (0, line_y_position + inc), (width, line_y_position + inc), (0, 255, 255), 2)

    # 2. Object Tracking
    boxes_ids = tracker2.update_object(detections)

    # Draw bounding boxes and text for tracked objects
    for box_id, box in boxes_ids.items():
        x, y, w, h = box
        id = box_id

        # check if car is parked
        if x + w + roix1 > roix2 - right_margin:
            crossed_up[id] = True
            crossed_down[id] = True
            parked[id] = True

        # Draw ID and bounding box
        if parked[id]:
            cv2.putText(roi, f"PARKED {str(id)}", (x, y - 10), cv2.FONT_HERSHEY_PLAIN, 0.7, (255, 127, 0), 2)
            cv2.rectangle(roi, (x, y), (x + w, y + h), (200, 0, 255), 2)
        else:
            cv2.putText(roi, str(id), (x, y - 10), cv2.FONT_HERSHEY_PLAIN, 0.7, (255, 127, 0), 2)
            cv2.rectangle(roi, (x, y), (x + w, y + h), (0, 255, 0), 2)

        # Check if the car crossed the line
        current_y = y + h // 2
        if id not in previous_positions:
            previous_positions[id] = current_y
            continue

        previous_y = previous_positions[id]

        # Check if object has not yet crossed any line
        if not crossed_up[id] and not crossed_down[id]:
            for inc in line_inc:
                # Check crossing of reference line
                if previous_y < line_y_position + int(inc * resize_percent) and current_y > line_y_position + int(inc * resize_percent) and not crossed_down[id]:
                    count_down += 1
                    # cv2.putText(frame, f"DOWN: {count_down}", (int(10 * resize_percent), int(750 * resize_percent)), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 255), 2)
                    crossed_down[id] = True
                    break
                elif previous_y > line_y_position and current_y < line_y_position and not crossed_up[id]:
                    count_up += 1
                    # cv2.putText(frame, f"UP: {count_up}", (int(10 * resize_percent), int(750 * resize_percent)), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
                    crossed_up[id] = True
                    break

        # Update the car's previous position
        previous_positions[id] = current_y

    # Calculate FPS
    time_end = time.time()
    time_per_frame = time_end - time_start
    fps = 1 / time_per_frame if time_per_frame > 0 else 0
    cv2.putText(frame, f'Processing time: {fps:.2f} FPS', (10, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)

    # Show frames
    cv2.imshow("roi", roi)
    cv2.imshow("Frame", frame)

    # Advance frame
    frame_id += 7

    # Exit with ESC key
    if cv2.waitKey(30) == 27:
        break

time_end_all = time.time()

cap.release()
cv2.destroyAllWindows()

print('count up:', count_up)
print('count down:', count_down)




0: 640x480 2 cars, 1 remote, 604.4ms
Speed: 107.8ms preprocess, 604.4ms inference, 59.8ms postprocess per image at shape (1, 3, 640, 480)
count up: 0

0: 640x480 3 cars, 1 remote, 300.2ms
Speed: 14.0ms preprocess, 300.2ms inference, 3.0ms postprocess per image at shape (1, 3, 640, 480)
count up: 0

0: 640x480 3 cars, 1 remote, 250.3ms
Speed: 12.0ms preprocess, 250.3ms inference, 3.0ms postprocess per image at shape (1, 3, 640, 480)
count up: 0

0: 640x480 3 cars, 1 remote, 236.4ms
Speed: 9.0ms preprocess, 236.4ms inference, 2.0ms postprocess per image at shape (1, 3, 640, 480)
count up: 0

0: 640x480 3 cars, 1 remote, 259.3ms
Speed: 7.0ms preprocess, 259.3ms inference, 5.0ms postprocess per image at shape (1, 3, 640, 480)
count up: 0

0: 640x480 5 cars, 1 remote, 241.4ms
Speed: 6.0ms preprocess, 241.4ms inference, 3.0ms postprocess per image at shape (1, 3, 640, 480)
count up: 0

0: 640x480 4 cars, 1 remote, 210.4ms
Speed: 5.0ms preprocess, 210.4ms inference, 2.0ms postprocess per ima

## CALCUL TEMPS FINAL