In [1]:
!pip install -U ultralytics
!pip install -q supervision

import supervision as sv
print(sv.__version__)

import ultralytics
from ultralytics import YOLO
ultralytics.checks()

import tensorflow as tf
import cv2

import os
import multiprocessing
from IPython.display import display, Image
import IPython.display as ipd

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from glob import glob
from tqdm import tqdm
import subprocess
plt.style.use('ggplot')

Ultralytics YOLOv8.2.81 🚀 Python-3.10.13 torch-2.1.2 CUDA:0 (Tesla P100-PCIE-16GB, 16269MiB)
Setup complete ✅ (4 CPUs, 31.4 GB RAM, 5845.9/8062.4 GB disk)


In [2]:
# Get the number of available CPU cores
num_cores = multiprocessing.cpu_count()

print(num_cores)

# Set the environment variable to use all CPU cores
os.environ["OMP_NUM_THREADS"] = str(num_cores)

4


In [None]:
!conda install -y gdown
!gdown 143m_S8iq35NmJjulMD0hZG-KOKWDrPRb #640model checkpoint (yolon2) best.pt
!gdown 10eIhNwC2udxosfq7j2aRTOtt2CybWxno #dsp_model dsp3.h5
#!gdown --id 1-YXrwIjXc9R_gQImEDoYWsug2kfkP68c #last.pt
!gdown 1dgFF30BqeQFQ08LLt-CunkZ9dYZnUzhM #munich_drive.mp4
#!gdown 1uoCBpxBFkgHBIXR7CUTSc66aBwWY5nBe #test_video.mp4
#!gdown 1nH0OxPJSUxfaxfnUuqUwr9lDayeLj57u #challenge_video.mp4
#!gdown --id 15EbSOR5sjmOkW-ENZhfbE896EZyZj4Rt #yolo1.1.zip
#!unzip yolo1.1.zip
#!rm -r yolo1.1.zip

In [None]:
VIDEO_PATH = f"/kaggle/working/test_video.mp4"
RESULT_VIDEO_PATH = f"/kaggle/working/test_video_result.mp4"

In [4]:
VIDEO_PATH = f"/kaggle/working/munich_drive.mp4"
RESULT_VIDEO_PATH = f"/kaggle/working/munich_drive_result.mp4"

In [5]:
vinfo = sv.VideoInfo.from_video_path(video_path=VIDEO_PATH) 
H = vinfo.height
W = vinfo.width
fps = vinfo.fps

In [None]:
for image_path in glob('/kaggle/working/dsp_results/*.png')[:15]:
    display(Image(filename=image_path, width=600))
    print("\n")

In [None]:
!zip -r yolon2_results.zip /kaggle/working/yolon2_results

In [32]:
!rm -r /kaggle/working/dsp_results

In [6]:
def masked_mae(y_true, y_pred):
    mask = tf.not_equal(y_true, -1)
    mask = tf.cast(mask, dtype=tf.float32)
    mae = tf.abs(y_true - y_pred)
    masked_mae = tf.multiply(mae, mask)
    return tf.reduce_sum(masked_mae) / tf.reduce_sum(mask)

yolo_model = YOLO('/kaggle/working/bestn2.pt')

dsp_model = tf.keras.models.load_model('/kaggle/working/dsp3.h5')

dsp_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss={
        'dist_m': masked_mae,
        'speed_kmph': masked_mae
    },
    metrics={
        'dist_m': masked_mae,
        'speed_kmph': masked_mae
    }
)



In [None]:
dsp_model.summary()

In [7]:
# Function to process YOLO results
def process_yolo_results(result):
    # Process image
    orig_img = result.orig_img
    image = cv2.cvtColor(orig_img, cv2.COLOR_BGR2GRAY)
    image = cv2.resize(image, (224, 224))
    image = np.expand_dims(image, axis=-1).astype('float32') / 255.0
    #images.append(image)

    # Process bounding boxes
    xywh = result.boxes.xywh.cpu().numpy() # gives x_center, y_center, width, height.

    # Calculate the ratios
    width_ratio = 224 / W
    height_ratio = 224 / H
    
    # Initialize the list to hold the normalized bounding boxes
    xywhn_list = []

    for box in xywh:
        x_center, y_center, width, height = box

        # Convert to normalized coordinates based on the resized image
        xn = x_center * width_ratio
        yn = y_center * height_ratio
        wn = width * width_ratio
        hn = height * height_ratio

        # Normalize to the [0, 1] range with respect to the 224x224 resized image
        normalized_box = [xn / 224, yn / 224, wn / 224, hn / 224]
        xywhn_list.append(normalized_box)
    
    xywhn_array = np.array(xywhn_list)

    # Ensure there are exactly 20 bounding boxes per image, padded with zeros if necessary
    if xywhn_array.shape[0] < 20:
        padded_xywhn = np.zeros((20, 4))
        padded_xywhn[:xywhn_array.shape[0], :] = xywhn_array
    else:
        padded_xywhn = xywhn_array[:20, :]


    # Process class IDs
    class_id = result.boxes.cls.cpu().numpy().astype(int)
    class_one_hot = np.zeros((20, len(result.names)))
    for i, cls in enumerate(class_id):
        class_one_hot[i, cls] = 1

    images = np.array([image],dtype='float32')
    bboxes = np.array([padded_xywhn], dtype='float32')
    classes = np.array([class_one_hot], dtype='float32')

    return images, bboxes, classes

In [None]:
from supervision.geometry.core import Position

box_annotator = sv.BoxAnnotator()
label_annotator = sv.LabelAnnotator()
frame_generator = sv.get_video_frames_generator(source_path=VIDEO_PATH,  stride=5)

In [None]:
frame = next(iter(frame_generator))
result = yolo_model(frame, conf=0.68, verbose=False)[0]
image, bboxes, classes = process_yolo_results(result)
print(bboxes)
l= result.boxes.xyxy.cpu().numpy().shape[0]
print(l)
detections = sv.Detections.from_ultralytics(result)

pred = dsp_model.predict({'image_path': image, 'bboxes': bboxes, 'classes': classes})
# Assuming pred is a list or array, not a dictionary
dist_m = pred[0][:,:l]
print(dist_m)
speed_kmph = pred[1][:,:l]
#print(speed_kmph)

# Display predictions
print(dist_m.shape)
#print(speed_kmph.shape)

labels1 = [
    f"{yolo_model.model.names[class_id]} {confidence:.2f}"
    for class_id, confidence
    in zip(detections.class_id, detections.confidence)
]

# Labels for predicted distances and speeds
labels2 = [
    f"{abs(dist_m[0, i])/10000:.2f}m {abs(speed_kmph[0, i])/10000:.2f}km/h"
    for i in range(l)
]

#print(detections.confidence.shape)
#labels2 = f'{dist_m}m{speed_kmph}kmph'
annotated_image = box_annotator.annotate(frame.copy(), detections=detections)
annotated_image = sv.LabelAnnotator(text_position=Position.BOTTOM_LEFT).annotate(annotated_image.copy(), detections, labels=labels1)
annotated_image = sv.LabelAnnotator(text_position=Position.TOP_LEFT).annotate(annotated_image.copy(),detections, labels=labels2)

#print(annotated_image)
sv.plot_image(image=annotated_image, size=(12, 10))

In [19]:
from supervision.geometry.core import Position

video_info = sv.VideoInfo.from_video_path(video_path=VIDEO_PATH)

box_annotator = sv.BoxAnnotator()
previous_distances = None
#label_annotator = sv.LabelAnnotator()

with sv.VideoSink(target_path=RESULT_VIDEO_PATH, video_info=video_info) as sink:
    for frame_idx, frame in enumerate(sv.get_video_frames_generator(source_path=VIDEO_PATH)):
        result = yolo_model(frame,conf=0.67, verbose=False)[0]
        detections = sv.Detections.from_ultralytics(result)
        
        image, bboxes, classes = process_yolo_results(result)
        l= result.boxes.xyxy.cpu().numpy().shape[0]
        #print(l)
        detections = sv.Detections.from_ultralytics(result)

        pred = dsp_model.predict({'image_path': image, 'bboxes': bboxes, 'classes': classes},verbose=0)
        # Assuming pred is a list or array, not a dictionary
        dist_m = abs(pred[0][:,:l])
        speed_kmph = None
        if previous_distances is not None and previous_distances.shape == dist_m.shape:
            distance_change = dist_m/100000 - previous_distances
            speed_kmph = (distance_change*vinfo.fps)* 3.6 # Convert m/s to km/h
 
        previous_distances = dist_m/100000


        labels1 = [
            f"{yolo_model.model.names[class_id]} {confidence:.2f}"
            for class_id, confidence
            in zip(detections.class_id, detections.confidence)
        ]
        
        labels2 = [
            f"{dist_m[0, i]/100000:.2f}m {speed_kmph[0, i]:.2f}km/h" if speed_kmph is not None else ""
            for i in range(l)
        ]
        
        '''
        labels2 = [
            f"{abs(dist_m[0, i])/100000:.2f}m {speed_kmph[0, i]:.2f}km/h"
            for i in range(l)
        ]
        '''

        
        annotated_image = box_annotator.annotate(frame.copy(), detections=detections)
        annotated_image = sv.LabelAnnotator(text_position=Position.BOTTOM_LEFT).annotate(annotated_image, detections, labels=labels1)
        annotated_image = sv.LabelAnnotator(text_position=Position.TOP_LEFT).annotate(annotated_image,detections, labels=labels2)
        #annotated_image = sv.LabelAnnotator(text_position=Position.TOP_RIGHT).annotate(annotated_image.copy(),detections, labels=labels3)
        if frame_idx == 70:
            break
        sink.write_frame(frame=annotated_image)

In [20]:
tmp_output_path = RESULT_VIDEO_PATH
output_path = "out_test_compressed4.mp4"
subprocess.run(
    [
        "ffmpeg",
        "-i",
        tmp_output_path,
        "-crf",
        "18",
        "-preset",
        "veryfast",
        "-vcodec",
        "libx264",
        output_path,
        '-loglevel',
        'quiet'
    ]
)

CompletedProcess(args=['ffmpeg', '-i', '/kaggle/working/munich_drive_result.mp4', '-crf', '18', '-preset', 'veryfast', '-vcodec', 'libx264', 'out_test_compressed4.mp4', '-loglevel', 'quiet'], returncode=0)

In [21]:
ipd.Video('out_test_compressed4.mp4', width=1000)

In [None]:
#yolo model reults
from supervision.geometry.core import Position

box_annotator = sv.BoxAnnotator()
label_annotator = sv.LabelAnnotator()

with sv.ImageSink(target_dir_path='/kaggle/working/yolon2_results') as sink:
     for frame in sv.get_video_frames_generator(source_path=VIDEO_PATH, stride=25):
            result = yolo_model(frame, conf=0.68, verbose=False)[0]
            l= result.boxes.xyxy.cpu().numpy().shape[0]
            detections = sv.Detections.from_ultralytics(result)

            labels1 = [
                f"{yolo_model.model.names[class_id]} {confidence:.2f}"
                for class_id, confidence
                in zip(detections.class_id, detections.confidence)
            ]
            
            annotated_image = box_annotator.annotate(frame.copy(), detections=detections)
            annotated_image = sv.LabelAnnotator(text_position=Position.TOP_LEFT).annotate(annotated_image, detections, labels=labels1)

            sink.save_image(image=annotated_image)