## YOLOv9 Detection + Yaw Estimation + Kalman Tracking

In [1]:
video_name = 'video11'

## YOLOv9 + Yaw Estimation

In [2]:
from ultralytics import YOLO
from deepbox.deepbox import Deepbox

import cv2
import json
import torch
import numpy as np

# Load YOLOv9
device_name = 'cpu'
if torch.cuda.is_available():
    torch.cuda.set_device(0)
    device_name = '0'
    yolov9 = YOLO("yolov9c.pt").to('cuda')
    print('Detecting using GPU...')
else:
    device_name = 'cpu'
    yolov9 = YOLO("yolov9c.pt").to('cpu')
    print('Detecting using CPU...')

# Load Deepbox
deepbox = Deepbox()

# Read video file
input_file_name = f'video/{video_name}.mp4'
cap = cv2.VideoCapture(input_file_name)

# get height, width and frame count of the video
width, height = (
        int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
        int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    )
no_of_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
proc_frames = 0

# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v')
out = cv2.VideoWriter()
output_file_name = f"output/{video_name}_yolov9.mp4"
out.open(output_file_name, fourcc, fps, (width, height), True)

frames = []
frames_detections = []
try:
    for f in range(no_of_frames):
        ret, frame = cap.read()
        if not ret:
            break
        
        # yolov9 detection
        result = yolov9.predict(frame)[0]
        frame_detections = []
        for i, box in enumerate(result.boxes):
            # accepted_class_names = ['car', 'bus', 'truck']
            accepted_class_names = ['car']
            class_index = int(result.boxes.cls[i])
            class_name = result.names[class_index]
            conf = float(result.boxes.conf[i])
            xywh = result.boxes.xywh[i]

            x1 = int(xywh[0] - xywh[2]/2)
            y1 = int(xywh[1] - xywh[3]/2)
            x2 = int(xywh[0] + xywh[2]/2)
            y2 = int(xywh[1] + xywh[3]/2)
        
            det = xywh.tolist() +  [class_name, conf]
            if class_name not in accepted_class_names:
                continue

            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3)
            cv2.putText(frame, f'{det[4]} ({det[5]:.2f})', (int(det[0]), int(det[1])), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            
            frame_detections.append(det)
        
        # yaw estimation
        detc_2ds = []
        for det in frame_detections:
            # class_name = result.names[det[4]]
            class_name = det[4]
            xywh = det
            x1 = int(xywh[0] - xywh[2]/2)
            y1 = int(xywh[1] - xywh[3]/2)
            x2 = int(xywh[0] + xywh[2]/2)
            y2 = int(xywh[1] + xywh[3]/2)
            detc_2ds.append([class_name, [x1, y1, x2, y2]])
        yaws = deepbox.predict(frame, detc_2ds)
        yaw_degrees = [np.rad2deg(yaw) for yaw in yaws]

        # convert to frame detections [x, y, w, h, yaw, class_index, conf]
        frame_detections = [[det[0], det[1], det[2], det[3], yaw_degrees[i], det[4], det[5]] for i, det in enumerate(frame_detections)]

        out.write(frame)
        frames_detections.append(frame_detections)

    # Add to frames detections
    with open(f'output/{video_name}_yolov9.json', 'w', encoding='utf-8') as f:
        json.dump({ "final_frames_detections": frames_detections }, f, ensure_ascii=False, indent=4)

except Exception:
    import traceback
    print(traceback.format_exc())
finally:
    # Release resources
    cap.release()
    out.release()


Detecting using GPU...


Instructions for updating:
dim is deprecated, use axis instead

0: 384x640 7 cars, 491.0ms
Speed: 31.7ms preprocess, 491.0ms inference, 4455.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 472.0ms
Speed: 6.0ms preprocess, 472.0ms inference, 5.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 370.7ms
Speed: 3.5ms preprocess, 370.7ms inference, 3.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 1 truck, 370.1ms
Speed: 3.0ms preprocess, 370.1ms inference, 4.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 1 train, 1 truck, 371.3ms
Speed: 3.0ms preprocess, 371.3ms inference, 3.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 371.7ms
Speed: 3.0ms preprocess, 371.7ms inference, 2.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 6 cars, 1 train, 1 clock, 370.9ms
Speed: 3.5ms preprocess, 370.9ms inference, 3.0ms postprocess per image at sha

In [3]:
%%capture
!ffmpeg -i video/video11.mp4 -f mp3 -ab 192000 -vn output/video11_audio.mp3 -y
!ffmpeg -i output/video11_yolov9.mp4 -i output/video11_audio.mp3 -c:v libx264 -c:a copy -map 0:v:0 -map 1:a:0 output/video11_yolov9_audio.mp4 -y

## Kalman Tracking

In [2]:
from sort.vehicle_tracking import SortVehicle
import json
import cv2
import numpy as np

input_file_name = f"video/{video_name}.mp4"
output_file_name = f"output/{video_name}_tracked.mp4"
output_state_file_name = f"output/{video_name}_states.json"

sort = SortVehicle(max_age=5, min_hits=5, iou_threshold=0.3, zc=750, offset_x=-100)

cap = cv2.VideoCapture(input_file_name)
with open(f'output/{video_name}_yolov9.json') as f:
    detections = json.load(f)['final_frames_detections']

fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v')
fps = int(cap.get(cv2.CAP_PROP_FPS))
width, height = (
            int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
            int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
)
out = cv2.VideoWriter()
out.open(output_file_name, fourcc, fps, (width, height), True)

proc_frames = 0
state_rotations = []
color_mapping = {}
try:
    while proc_frames < len(detections):
        ret, frame = cap.read()
        if not ret:
            break

        im = frame
        # Loop through list (if empty this will be skipped) and overlay green bboxes
        frame_detections = detections[proc_frames]
        frame_detections_converted = []
        for Z in frame_detections:
            frame_detections_converted.append([Z[0]-Z[2]/2, Z[1]-Z[3]/2, Z[0]+Z[2]/2, Z[1]+Z[3]/2, Z[4]])
        frame_detections_converted = np.array(frame_detections_converted)

        # tracking
        if len(frame_detections_converted) > 0:
            Zs, Xs = sort.update(frame_detections_converted)
        else:
            Zs, Xs = sort.update(np.empty((0, 5)))

        temp = []
        for (Z, X) in zip(Zs, Xs):
            vehicle_id = int(Z[-1])
            if vehicle_id not in color_mapping:
                bounding_rect = im[int(Z[1]):int(Z[3]), int(Z[0]):int(Z[2])]
                try:
                    bgr_color = np.average(np.average(bounding_rect, axis=0), axis=0)
                except Exception as error:
                    continue

                hsv_color = cv2.cvtColor(np.array([[bgr_color]]).astype(np.uint8), cv2.COLOR_BGR2HSV)
                hsv_color[:, :, 1] = 200
                bgr_color = cv2.cvtColor(hsv_color, cv2.COLOR_HSV2BGR)[0, 0]

                rgb_color = [int(bgr_color[2]), int(bgr_color[1]), int(bgr_color[0])]
                color_mapping[vehicle_id] = rgb_color

            temp.append(X.tolist() + [Z[0], Z[1], Z[2], Z[3], int(Z[-1]), color_mapping[vehicle_id]])
            cv2.putText(im, f'{int(Z[6])} ({int(X[0])},0,{int(X[2])},{int(X[3])})', (int((Z[0] + Z[2])//2), int((Z[1] + Z[3])//2)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
            cv2.rectangle(im, (int(Z[0]), int(Z[1])), (int(Z[2]), int(Z[3])), (0, 255, 0), 3)
        state_rotations.append(temp)

        # write the frame
        out.write(im)

        proc_frames += 1
except Exception as error:
    # Release resources
    import traceback
    print(traceback.format_exc())
finally:
    out.release()
    cap.release()

with open(output_state_file_name, 'w', encoding='utf-8') as f:
    json.dump({ "states": state_rotations }, f, ensure_ascii=False, indent=4)

  w = np.sqrt(x[4] * x[5])


In [3]:
%%capture
!ffmpeg -i output/video11_tracked.mp4 -i output/video11_audio.mp3 -c:v libx264 -c:a copy -map 0:v:0 -map 1:a:0 output/video11_tracked_audio.mp4 -y