In [1]:
import numpy as np

#just fallback values
LENGTH=20
WIDTH=1920
HEIGHT=1080
FRAMERATE=30
color_purple = (255,0,255)
color_white = (255,255,255)

In [2]:
from shapely.geometry import Point
from shapely.geometry.polygon import Polygon

import cv2
import numpy as np

class TrackedVehicle:
    def __init__(self, label, speed,time):
        self.label = label
        self.speed = speed
        self.time = time

class TrackableVehicle:
    def __init__(self, id: int,
                  entered_timestamp: float | None,
                  last_anchor, label, indim) -> None:
        self.id = id
        self.enter_timestamp = entered_timestamp
        self.last_anchor = list()
        self.last_anchor.append((int(last_anchor[0]),int(last_anchor[1])))
        self.labels = dict()
        self.labels[label] = 1
        self.indims = indim


    def label(self):
        return max(self.labels, key=self.labels.get)


    def finish(self, time):
        return TrackedVehicle(self.label(),LENGTH/(time-self.enter_timestamp),time-self.enter_timestamp if self.enter_timestamp is not None else 0)
    
    def update(self, label, anchor, isins):
        if label not in self.labels.keys():
            self.labels[label] = 1
        else:
            self.labels[label] +=1
        self.indim=isins

    def old_anchor(self):
        return self.last_anchor[0]

def avspeed(tracked):
    n = 0
    sp = 0
    for v in tracked:
        if v.time>=0.4: 
            sp+=v.speed
            n+=1
    if n==0: return "NaN"
    return (sp/n)*3.6

def check_in(x, y, poly):
    return Polygon(poly).contains(Point(x,y))

def in_any(x, y, polys):
    for poly in polys:
        if Polygon(poly).contains(Point(x,y)): return True
    return False

In [3]:
from collections import defaultdict
from ultralytics import YOLO
import json
import torch

torch.set_float32_matmul_precision('high')
torch.jit.enable_onednn_fusion(True)
torch.backends.cudnn.benchmark = True

# Load the YOLOv8 model
model = YOLO('yolov8s.pt')
model.to('cuda')
model.fuse()
model.compile()


#markup = r"videos/dataset/markup/jsons/KRA-8-28-2023-08-18-morning.json"
#video_path = r"videos/dataset/Video1/KRA-8-28-2023-08-18-morning.mp4"


def process_video(video_path, markup):
    with open(markup, 'r') as file:
        data = json.load(file)

    zones = data['zones']

    # Open the video file
    cap = cv2.VideoCapture(video_path)

    WIDTH=cap.get(cv2.CAP_PROP_FRAME_WIDTH)
    HEIGHT=cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
    FRAMERATE=cap.get(cv2.CAP_PROP_FPS)

    dims = []
    for i, arr in enumerate(zones):
        dims.append([])
        for j, arr2 in enumerate(arr):
            dims[i].append((int(WIDTH*arr2[0]),int(HEIGHT*arr2[1])))

    areas = data['areas']

    bdims = []
    for i, arr in enumerate(areas):
        bdims.append([])
        for j, arr2 in enumerate(arr):
            bdims[i].append((int(WIDTH*arr2[0]),int(HEIGHT*arr2[1])))

    # Store the track history
    track_history = defaultdict(lambda: [])

    needed_classes=[2,3,5,7]

    tracking=dict()
    tracked=dict()

    count=0
    # Loop through the video frames
    while cap.isOpened():
        # Read a frame from the video
        success, frame = cap.read()
        if success:
            # Run YOLOv8 tracking on the frame, persisting tracks between frames
            results = model.track(frame, persist=True, tracker='bytetrack.yaml', classes=needed_classes, verbose=False)

            # Get the boxes and track IDs
            tracks = results[0].boxes.cpu()
            boxes = results[0].boxes.xywh.cpu()
            track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id != None else []

            # Visualize the results on the frame
            annotated_frame = results[0].plot()

            # Plot the tracks
            for box, track_id in zip(boxes, track_ids):
                x, y, w, h = box
                track = track_history[track_id]
                track.append((float(x), float(y+h/2)))  # x, y center point
                if len(track) > 90:  # retain 90 tracks for 90 frames
                    track.pop(0)

                # Draw the tracking lines
                points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
            
            
            for box, id, track in zip(boxes, track_ids, tracks):
                x,y,w,h = box
                isin = in_any(float(x), float(y+h/2), bdims)
                isins = in_any(float(x), float(y+h/2), dims)
                if isin: 
                    cv2.circle(annotated_frame, tuple((int(x), int(y+h/2))), 10, (255,0,0), 10)
                if ((id not in tracking.keys()) and isin and isins) or (id in tracking.keys() and (not isin)):


                    if id not in tracking.keys(): tracking[id] = TrackableVehicle(id,None,(int(x), int(y+h/2)),track.cls,isins)
                    
                    if isin and isins:
                        tracking[id].enter_timestamp = (1/FRAMERATE)*count

                    elif (not isin) and tracking[id].indims != isins:
                        if (1/FRAMERATE)*count!=tracking[id].enter_timestamp:
                            tracked[tracking[id].label()] = tracking[id].finish((1/FRAMERATE)*count)
                            tracking.pop(id)
                    
                if id in tracking.keys(): tracking[id].update(track.cls, (int(x), int(y+h/2)), isins)

        else:
            # Break the loop if the end of the video is reached
            break

        count+=1
        
    # Release the video capture object and close the display window
    cap.release()

    speeds=dict()
    speeds[2]=list()
    speeds[3]=list()
    speeds[5]=list()
    speeds[7]=list()
    for track in tracked.values():
        if track.time>=0.35: speeds[int(track.label)].append(track.speed)
    for k, l in speeds.items():
        print(f"label {k}: ", (sum(l)/len(l))*3.6 if len(l)!=0 else 0)
    for k, l in speeds.items():
        print(f"{k}: ", len(l))

    return [(sum(l)/len(l))*3.6 if len(l)!=0 else 0 for k,l in speeds.items()], [len(l) for k, l in speeds.items()]


YOLOv8s summary (fused): 168 layers, 11156544 parameters, 0 gradients, 28.6 GFLOPs


In [4]:
import pathlib

def find_mp4(directory):
    """
    Recursively search for .mp4 files in a directory and its subdirectories.
    
    Returns a list of file paths.
    """
    mp4_files = []
    for path in pathlib.Path(directory).rglob('*.mp4'):
        mp4_files.append(path)
    return mp4_files

In [5]:
import pandas as pd

csv = pd.read_csv("submission.csv")
df = csv.head(0)
df

Unnamed: 0,file_name;quantity_car;average_speed_car;quantity_van;average_speed_van;quantity_bus;average_speed_bus


In [6]:
data_path=r"videos/dataset/Test_Newest_2"
markup_path=r"videos/dataset/Test_markup"
from tqdm import tqdm

data_mp4=find_mp4(data_path)
ids=[]
result=[]
for file in tqdm(data_mp4):
    json_path = markup_path+"/"+file.stem+".json"
    result.append(process_video(str(file), json_path))
    ids.append(file.stem)

  1%|          | 1/100 [14:19<23:37:41, 859.21s/it]

label 2:  32.76295519059939
label 3:  0
label 5:  22.989130434782883
label 7:  25.11816617384253
2:  590
3:  0
5:  2
7:  69


  2%|▏         | 2/100 [27:29<22:17:22, 818.80s/it]

label 2:  51.62888646577882
label 3:  0
label 5:  45.41231884057819
label 7:  47.243881725537676
2:  422
3:  0
5:  4
7:  33


  3%|▎         | 3/100 [43:31<23:49:36, 884.30s/it]

label 2:  23.65558424470924
label 3:  0
label 5:  22.313200981774404
label 7:  24.92530389672982
2:  662
3:  0
5:  12
7:  32


  4%|▍         | 4/100 [58:27<23:41:44, 888.59s/it]

label 2:  31.00087153046881
label 3:  0
label 5:  28.6605591699457
label 7:  43.27855576281764
2:  301
3:  0
5:  10
7:  15


  5%|▌         | 5/100 [1:10:53<22:05:41, 837.28s/it]

label 2:  58.61391825396593
label 3:  0
label 5:  48.490767867117405
label 7:  56.47763829032704
2:  590
3:  0
5:  10
7:  89
