In [1]:
import cv2
import torch
import time
import colorsys
import numpy as np
from deep_sort_realtime.deepsort_tracker import DeepSort
from ultralytics import YOLO
import supervision as sv
from tqdm import tqdm
from supervision.assets import VideoAssets, download_assets
from collections import defaultdict, deque

In [2]:
print(torch.__config__.parallel_info())
import os
os.environ['OMP_NUM_THREADS'] = "12"
os.environ['OMP_DISPLAY_ENV'] = "TRUE"
os.environ['OMP_PROC_BIND'] = "TRUE"
os.environ['OMP_SCHEDULE'] = "STATIC"
os.environ['GOMP_CPU_AFFINITY'] = "0-12"

ATen/Parallel:
	at::get_num_threads() : 12
	at::get_num_interop_threads() : 16
OpenMP 201511 (a.k.a. OpenMP 4.5)
	omp_get_max_threads() : 12
Intel(R) oneAPI Math Kernel Library Version 2022.2-Product Build 20220804 for Intel(R) 64 architecture applications
	mkl_get_max_threads() : 12
Intel(R) MKL-DNN v3.3.6 (Git Hash 86e6af5974177e513fd3fee58425e1063e7f1361)
std::thread::hardware_concurrency() : 16
Environment variables:
	OMP_NUM_THREADS : 1
	MKL_NUM_THREADS : [not set]
ATen parallel backend: OpenMP



In [55]:
SOURCE_VIDEO_PATH = "video/bacheha.mp4"
OUTPUT_VIDEO_PATH = "test/test_6.mp4"
BLUR_ID = None
CONF = 0.6
CLASS_ID = None

In [56]:
FRAME_WIDTH=30
FRAME_HEIGHT=100
# SOURCE_POLYGONE = np.array([[18, 550], [1852, 608],[1335, 370], [534, 343]], dtype=np.float32)
SOURCE_POLYGONE = np.array([
    [50, 400],
    [900, 400],
    [900, 1400],
    [-900, 1400]
], dtype=np.float32)
BIRD_EYE_VIEW = np.array([[0, 0], [FRAME_WIDTH, 0], [FRAME_WIDTH, FRAME_HEIGHT],[0, FRAME_HEIGHT]], dtype=np.float32)
M = cv2.getPerspectiveTransform(SOURCE_POLYGONE, BIRD_EYE_VIEW)

In [57]:
def read_frames(frame_skip_count = 1):
    i = 0
    while True:
        ret, frame = cap.read()
        if i % frame_skip_count == 0 :
            if not ret:
                break
            yield frame
        else:
            continue

def calculate_speed(distance, fps):
    return (distance *fps)

def calculate_distance(p1, p2):
    return np.sqrt((p2[0] - p1[0])**2 + (p2[1] - p1[1])**2)

In [58]:
def draw_corner_rect(img, bbox, line_length=30, line_thickness=5, rect_thickness=1,
                     rect_color=(255, 0, 255), line_color=(0, 255, 0)):
    x, y, w, h = bbox
    x1, y1 = x + w, y + h

    if rect_thickness != 0:
        cv2.rectangle(img, bbox, rect_color, rect_thickness)

    # Top Left  x, y
    cv2.line(img, (x, y), (x + line_length, y), line_color, line_thickness)
    cv2.line(img, (x, y), (x, y + line_length), line_color, line_thickness)

    # Top Right  x1, y
    cv2.line(img, (x1, y), (x1 - line_length, y), line_color, line_thickness)
    cv2.line(img, (x1, y), (x1, y + line_length), line_color, line_thickness)

    # Bottom Left  x, y1
    cv2.line(img, (x, y1), (x + line_length, y1), line_color, line_thickness)
    cv2.line(img, (x, y1), (x, y1 - line_length), line_color, line_thickness)

    # Bottom Right  x1, y1
    cv2.line(img, (x1, y1), (x1 - line_length, y1), line_color, line_thickness)
    cv2.line(img, (x1, y1), (x1, y1 - line_length), line_color, line_thickness)

    return img

In [59]:
# Initialize the video capture
cap = cv2.VideoCapture(SOURCE_VIDEO_PATH)
frame_generator = read_frames()
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

#Create mask to filter detections
pts = SOURCE_POLYGONE.astype(np.int32) 
pts = pts.reshape((-1, 1, 2))
polygon_mask = np.zeros((frame_height, frame_width), dtype=np.uint8)
cv2.fillPoly(polygon_mask, [pts], 255)

# Initialize the video writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(OUTPUT_VIDEO_PATH, fourcc, fps, (frame_width, frame_height))

# Initialize the DeepSort tracker
tracker = DeepSort(max_age=250)
# Load YOLO model
device = torch.device('cuda')
model = YOLO("yolov10s.pt")

classes_path = "configs/coco.names"
with open(classes_path, "r") as f:
    class_names = f.read().strip().split("\n")
np.random.seed(42)
colors = np.random.randint(0, 255, size=(len(class_names), 3))

In [60]:
torch.cuda.is_available()

True

In [61]:
frame_count = 0
start_time = time.time()
prev_positions={}
speed_accumulator={}

while True:
    try:
        frame = next(frame_generator)
    except StopIteration:
        break
    # Run model on each frame
    results = model(frame)
    detect = []
    for pred in results:
        for box in pred.boxes:    
            x1, y1, x2, y2 = map(int, box.xyxy[0] )
            confidence = box.conf[0]     
            label = box.cls[0]  
            
            # Filter out weak detections by confidence threshold and class_id
            if CLASS_ID is None:
                if confidence < CONF:
                    continue
            else:
                if class_id != CLASS_ID or confidence < CONF:
                    continue            
                
            if polygon_mask[(y1 + y2) // 2, (x1 + x2) // 2] == 255:
                detect.append([[x1, y1, x2 - x1, y2 - y1], confidence, int(label)])   
                
    tracks = tracker.update_tracks(detect, frame=frame)
    for track in tracks:
        if not track.is_confirmed():
            continue
        track_id = track.track_id    
        ltrb = track.to_ltrb()
        class_id = track.get_det_class()
        x1, y1, x2, y2 = map(int, ltrb)
        if polygon_mask[(y1+y2)//2,(x1+x2)//2] == 0:
            tracks.remove(track)
        color = colors[class_id]
        B, G, R = map(int, color)
        text = f"{track_id} - {class_names[class_id]}"
        center_pt = np.array([[(x1+x2)//2, (y1+y2)//2]], dtype=np.float32)
        transformed_pt = cv2.perspectiveTransform(center_pt[None, :, :], M)

        #Process distance and speed calculations by using previous predictions
        if track_id in prev_positions:
            prev_position = prev_positions[track_id]
            distance = calculate_distance(prev_position, transformed_pt[0][0])
            speed = calculate_speed(distance, fps)
            if track_id in speed_accumulator:
                speed_accumulator[track_id].append(speed)
                if len(speed_accumulator[track_id]) > 100:
                    speed_accumulator[track_id].pop(0)
            else:
                speed_accumulator[track_id] = []
                speed_accumulator[track_id].append(speed)
        prev_positions[track_id] = transformed_pt[0][0]

        # Draw bounding box and text
        frame = draw_corner_rect(frame, (x1, y1, x2 - x1, y2 - y1), line_length=15, line_thickness=3, rect_thickness=1, rect_color=(B, G, R), line_color=(R, G, B))
        #cv2.rectangle(frame, (x1, y1), (x2, y2), (B, G, R), 2)
        cv2.rectangle(frame, (x1 - 1, y1 - 20), (x1 + len(text) * 10, y1), (B, G, R), -1)
        cv2.putText(frame, text, (x1 + 5, y1 - 7), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
        cv2.polylines(frame, [pts], isClosed=True, color=(255, 0, 0), thickness=2)
        if track_id in speed_accumulator :
            avg_speed = sum(speed_accumulator[track_id]) / len(speed_accumulator[track_id])
            cv2.rectangle(frame, (x1 - 1, y1-40 ), (x1 + len(f"Speed: {avg_speed:.0f} km/h") * 10, y1-20), (0, 0, 255), -1)
            cv2.putText(frame, f"Speed: {avg_speed:.0f} km/h", (x1, y1 - 25), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
        # Apply Gaussian Blur
        if BLUR_ID is not None and class_id == BLUR_ID:
            if 0 <= x1 < x2 <= frame.shape[1] and 0 <= y1 < y2 <= frame.shape[0]:
                frame[y1:y2, x1:x2] = cv2.GaussianBlur(frame[y1:y2, x1:x2], (99, 99), 3)
    writer.write(frame)
    frame_count += 1
    if frame_count % 10 == 0:
        elapsed_time = time.time() - start_time
        fps_calc = frame_count / elapsed_time
        print(f"FPS: {fps_calc:.2f}")

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
# Release video capture and writer
cap.release()
writer.release()


0: 640x384 7 cars, 1 truck, 18.6ms
Speed: 1.5ms preprocess, 18.6ms inference, 0.3ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 6 cars, 1 truck, 18.3ms
Speed: 2.0ms preprocess, 18.3ms inference, 0.4ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 6 cars, 1 truck, 15.7ms
Speed: 1.6ms preprocess, 15.7ms inference, 0.5ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 5 cars, 1 truck, 15.7ms
Speed: 1.7ms preprocess, 15.7ms inference, 0.5ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 5 cars, 1 truck, 15.7ms
Speed: 2.3ms preprocess, 15.7ms inference, 0.4ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 5 cars, 1 truck, 14.1ms
Speed: 2.1ms preprocess, 14.1ms inference, 0.9ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 5 cars, 1 truck, 14.1ms
Speed: 2.2ms preprocess, 14.1ms inference, 0.5ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 5 cars, 1 truck, 14.0ms
Speed: 1.8ms preprocess, 14.0ms i

In [None]:
frame_count = 0
start_time = time.time()
prev_positions={}
speed_accumulator={}

while True:
    try:
        frame = next(frame_generator)
    except StopIteration:
        break
    # Run model on each frame
    results = model(frame)
    detections = sv.Detections.from_ultralytics(result)

    # filter out detections by class and confidence
    detections = detections[detections.confidence > CONFIDENCE_THRESHOLD]
    detections = detections[detections.class_id != 0]

    # filter out detections outside the zone
    detections = detections[polygon_zone.trigger(detections)]

    # refine detections using non-max suppression
    detections = detections.with_nms(IOU_THRESHOLD)

    # pass detection through the tracker
    detections = byte_track.update_with_detections(detections=detections)

    points = detections.get_anchors_coordinates(
        anchor=sv.Position.BOTTOM_CENTER
    )

    # calculate the detections position inside the target RoI
    points = view_transformer.transform_points(points=points).astype(int)

    # store detections position
    for tracker_id, [_, y] in zip(detections.tracker_id, points):
        coordinates[tracker_id].append(y)
    
    
    annotated_frame = frame.copy()
    annotated_frame = trace_annotator.annotate(
        scene=annotated_frame, detections=detections
    )
    annotated_frame = bounding_box_annotator.annotate(
        scene=annotated_frame, detections=detections
    )
    annotated_frame = label_annotator.annotate(
        scene=annotated_frame, detections=detections, labels=labels
    )
    # add frame to target video
    sink.write_frame(annotated_frame)
    
    
    detect = []
    for pred in results:
        for box in pred.boxes:    
            x1, y1, x2, y2 = map(int, box.xyxy[0] )
            confidence = box.conf[0]     
            label = box.cls[0]  
            
            # Filter out weak detections by confidence threshold and class_id
            if CLASS_ID is None:
                if confidence < CONF:
                    continue
            else:
                if class_id != CLASS_ID or confidence < CONF:
                    continue            
                
            if polygon_mask[(y1 + y2) // 2, (x1 + x2) // 2] == 255:
                detect.append([[x1, y1, x2 - x1, y2 - y1], confidence, int(label)])   
                
    tracks = tracker.update_tracks(detect, frame=frame)
    for track in tracks:
        if not track.is_confirmed():
            continue
        track_id = track.track_id    
        ltrb = track.to_ltrb()
        class_id = track.get_det_class()
        x1, y1, x2, y2 = map(int, ltrb)
        if polygon_mask[(y1+y2)//2,(x1+x2)//2] == 0:
            tracks.remove(track)
        color = colors[class_id]
        B, G, R = map(int, color)
        text = f"{track_id} - {class_names[class_id]}"
        center_pt = np.array([[(x1+x2)//2, (y1+y2)//2]], dtype=np.float32)
        transformed_pt = cv2.perspectiveTransform(center_pt[None, :, :], M)

        #Process distance and speed calculations by using previous predictions
        if track_id in prev_positions:
            prev_position = prev_positions[track_id]
            distance = calculate_distance(prev_position, transformed_pt[0][0])
            speed = calculate_speed(distance, fps)
            if track_id in speed_accumulator:
                speed_accumulator[track_id].append(speed)
                if len(speed_accumulator[track_id]) > 100:
                    speed_accumulator[track_id].pop(0)
            else:
                speed_accumulator[track_id] = []
                speed_accumulator[track_id].append(speed)
        prev_positions[track_id] = transformed_pt[0][0]

        # Draw bounding box and text
        frame = draw_corner_rect(frame, (x1, y1, x2 - x1, y2 - y1), line_length=15, line_thickness=3, rect_thickness=1, rect_color=(B, G, R), line_color=(R, G, B))
        #cv2.rectangle(frame, (x1, y1), (x2, y2), (B, G, R), 2)
        cv2.rectangle(frame, (x1 - 1, y1 - 20), (x1 + len(text) * 10, y1), (B, G, R), -1)
        cv2.putText(frame, text, (x1 + 5, y1 - 7), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
        cv2.polylines(frame, [pts], isClosed=True, color=(255, 0, 0), thickness=2)
        if track_id in speed_accumulator :
            avg_speed = sum(speed_accumulator[track_id]) / len(speed_accumulator[track_id])
            cv2.rectangle(frame, (x1 - 1, y1-40 ), (x1 + len(f"Speed: {avg_speed:.0f} km/h") * 10, y1-20), (0, 0, 255), -1)
            cv2.putText(frame, f"Speed: {avg_speed:.0f} km/h", (x1, y1 - 25), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
        # Apply Gaussian Blur
        if BLUR_ID is not None and class_id == BLUR_ID:
            if 0 <= x1 < x2 <= frame.shape[1] and 0 <= y1 < y2 <= frame.shape[0]:
                frame[y1:y2, x1:x2] = cv2.GaussianBlur(frame[y1:y2, x1:x2], (99, 99), 3)
    writer.write(frame)
    frame_count += 1
    if frame_count % 10 == 0:
        elapsed_time = time.time() - start_time
        fps_calc = frame_count / elapsed_time
        print(f"FPS: {fps_calc:.2f}")

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
# Release video capture and writer
cap.release()
writer.release()