In [1]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
os.environ['KMP_DUPLICATE_LIB_OK']='TRUE'
import json

import cv2
import imageio
import numpy as np
from tqdm import tqdm
from ultralytics import YOLO
from shapely.geometry import Point
from shapely.geometry.polygon import Polygon

In [2]:
def get_capture_info(cap):
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    return height, width, fps, frame_count

def prepare_zones(json_data, image):
    zones = []
    for item in json_data['areas']:
        points = []
        for x_n, y_n in item:
            x_n = min(x_n, 1.0)
            y_n = min(y_n, 1.0)
            x = int(x_n * image.shape[1])
            y = int(y_n * image.shape[0])
            points.append([x, y])
        zones.append(points)
    return zones

def visialize_points(json_data, img):
    image = img.copy()
    for item in json_data['areas']:
        for x_n, y_n in item:
            x_n = min(x_n, 1.0)
            y_n = min(y_n, 1.0)
            x = int(x_n * image.shape[1])
            y = int(y_n * image.shape[0])
            image = cv2.circle(image, (x, y), radius=10, color=(0, 0, 255), thickness=-1)
            
    return image
    
def shrink_poly(poly, x_shrink, y_shrink):
   
    coords = poly
    xs = [i[0] for i in coords]
    ys = [i[1] for i in coords]

    # simplistic way of calculating a center of the graph, you can choose your own system
    x_center = 0.5 * min(xs) + 0.5 * max(xs)
    y_center = 0.5 * min(ys) + 0.5 * max(ys)

    # shrink figure
    new_xs = [(i - x_center) * (1 - x_shrink) + x_center for i in xs]
    new_ys = [(i - y_center) * (1 - y_shrink) + y_center for i in ys]

    # create list of new coordinates

    new_coords = []
    for x, y in zip(new_xs, new_ys):
        new_coords.append([int(x), int(y)])
        
    return np.array((new_coords), np.int32)

def find_corners(poly):
    x2, y2 = np.max(poly, axis=0)
    x1, y1 = np.min(poly, axis=0)
    w = x2 - x1
    h = y2 - y1
    xc, yc = w // 2 + x1, h // 2 + y1
    return [x1, y1, x2, y2, w, h, xc, yc]

def move_poly(poly, gap_x, gap_y):
    new_poly = []
    for x, y in poly:
        new_x = x - gap_x
        new_y = y - gap_y
        new_poly.append([new_x, new_y])
        
    return np.array((new_poly), np.int32)


def move_and_shrink_poly(poly, bbox):
    poly_info = find_corners(poly)
    
    x1b, y1b, x2b, y2b = bbox
    wb = x2b - x1b 
    hb = (y2b - y1b) //2
    y1b = y1b + hb
    xcb, ycb = wb // 2 + x1b, hb // 2 + y1b
    
    coef_x, coef_y = 1 - (wb / poly_info[4]), 1 - (hb / poly_info[5])
  
    new_poly = shrink_poly(poly, coef_x, coef_y)
    
    new_poly_info = find_corners(new_poly)
    
    gap_x = new_poly_info[6] - xcb
    gap_y = new_poly_info[7] - ycb
       
    new_poly_moved = move_poly(new_poly, gap_x, gap_y)  
    
    poly_right_bottom_x,  poly_right_bottom_y = new_poly_moved[2][0], new_poly_moved[2][1]
    poly_left_bottom_x,  poly_left_bottom_y = new_poly_moved[3][0], new_poly_moved[3][1]
    
    poly_center_y = poly_right_bottom_y + (y2b - poly_right_bottom_y) // 2
    poly_center_x = x1b + wb // 2
    
    return new_poly_moved, (poly_center_x, poly_center_y)

In [3]:
model_name = 'yolov8l.engine'
model  = YOLO(model_name)



In [4]:
classes2color = {'car': (255, 0, 0), 'bus': (0, 255, 0), 'truck': (0, 0, 255)}
label2name = {2: 'car', 5: 'bus', 7: 'truck'}
name2label = {v:k for k, v in label2name.items()}

In [8]:
def load_json(file_path):
    with open(file_path, 'r') as file:
        data = json.load(file)
    return data

json_path = './KRA-14-45-2023-08-31-morning.json'
video_path = './KRA-14-45-2023-08-31-morning-Trim.mp4'

data = load_json(json_path)

In [9]:
zoned_frames = []
tracks_info = {}
objects_in_area = {}

cap = cv2.VideoCapture(video_path) # '../videos/KRA-3-12-2023-08-21-evening_Trim_1.mp4'
height, width, fps, frame_count = get_capture_info(cap)

zones_mask = None

for frame_num in tqdm(range(frame_count)):
    tracks_info[frame_num] = {}
    
    success, img = cap.read()
    if img is None:
        break
    results = model.track(img, persist=True, classes=[2, 5, 7], verbose=False, conf=0.5, tracker='bytetrack.yaml')
    
    detections = []
    polygons = []
    zones = prepare_zones(data, img)
    for zone in zones:
        polygons.append(Polygon(zone))
        
    zoned_img = img.copy()
    if zones_mask is None:
        zones_mask = np.zeros_like(img, dtype=np.uint8)
        zones_mask = cv2.fillPoly(zones_mask, pts=np.array(zones), color=(255, 255, 255)).astype(bool)
        
    zoned_img[zones_mask] = np.clip(zoned_img[zones_mask].astype(np.uint16) * 2, 0, 255).astype(np.uint8)
    zoned_img = visialize_points(data, zoned_img)
    zoned_frames.append(zoned_img)
    
    for track in results[0].boxes:
        if not track.is_track:
            continue
        track_id = track.id[0].cpu().item()
        x1, y1, x2, y2 = [int(x) for x in track.xyxy[0].cpu().numpy()]

        speed_vector = None
        point_x = x1 + (x2 - x1) // 2
        point_y = y1 + (y2 - y1) // 2

        _, (point_x, point_y) = move_and_shrink_poly(zones[0], [x1, y1, x2, y2])
        
        point = Point(point_x, point_y)

        in_area = any([point.within(x) for x in polygons])
        tracks_info[frame_num][track_id] = {
            'class': track.cls[0].cpu().item(),
            'box': [x1, y1, x2, y2],
            'speed_vector': speed_vector,
            'in_area': in_area,
            'track_point': (point_x, point_y),
        }

        if in_area:
            if track_id not in objects_in_area:
                objects_in_area[track_id] = {
                    'class': track.cls[0].cpu().item(),
                    'frames_in_area': 1,
                    'crossed': False
                }
            else:
                objects_in_area[track_id]['frames_in_area'] += 1
                
        if ((frame_num - 1) in tracks_info) and (track_id in objects_in_area) and (not objects_in_area[track_id]['crossed']):
            if (not in_area) and (track_id in tracks_info[frame_num - 1]) and (tracks_info[frame_num - 1][track_id]['in_area']):
                objects_in_area[track_id]['crossed'] = True
                
cap.release()

100%|██████████| 568/568 [00:35<00:00, 15.84it/s]


In [16]:
frames = []

object_speed = {'car': [], 'bus': [], 'truck': []}
object_count = {'car': 0, 'bus': 0, 'truck': 0}

for track_id, object_info in objects_in_area.items():
    if not object_info['crossed']:
        continue
    cls = object_info['class']
    class_name = label2name[cls]
    
    if object_info['frames_in_area'] <= 10:
        continue
        
    object_count[class_name] += 1
    speed = (20  / (object_info['frames_in_area'] / fps)) * 3.6
    object_speed[class_name].append(speed)


mean_speeds = {
    'car': np.mean(object_speed['car']) if len(object_speed['car']) else 0, 
    'bus': np.mean(object_speed['bus']) if len(object_speed['bus']) else 0, 
    'truck': np.mean(object_speed['truck']) if len(object_speed['truck']) else 0,
}

for frame_num in tqdm(range(len(zoned_frames))):
    zoned_img = zoned_frames[frame_num].copy()
    for track_id, track_info in tracks_info[frame_num].items():
        cls = track_info['class']
        x1, y1, x2, y2 = track_info['box']
        in_area = track_info['in_area']
        current_class = label2name[cls]
        color = classes2color.get(current_class, (255, 120, 50))
        cv2.rectangle(zoned_img, (x1, y1), (x2, y2), color, 3)
        cv2.putText(zoned_img, f'Id: {track_id} | IN AREA: {in_area}', (x1, y1 - 15), cv2.FONT_HERSHEY_PLAIN, 3, color, 3)
        
        y_text = 100
        y_delta = 40
        for class_name in ['car', 'bus', 'truck']:
            general_info = f'{class_name.upper()} COUNT: {object_count[class_name]}'
            cv2.putText(zoned_img, general_info, (100, y_text), cv2.FONT_HERSHEY_PLAIN, 2, (0, 255, 0), 2)
            y_text += y_delta
            
            general_info = f'{class_name.upper()} SPEED: {mean_speeds[class_name]:3.2f}'
            cv2.putText(zoned_img, general_info, (100, y_text), cv2.FONT_HERSHEY_PLAIN, 2, (0, 255, 0), 2)
            y_text += y_delta
            
        point_x, point_y = track_info['track_point']
        zoned_img = cv2.circle(zoned_img, (point_x, point_y), radius=10, color=(255, 0, 255), thickness=-1)

    zoned_img = cv2.resize(zoned_img, (640, 480))
    frames.append(cv2.cvtColor(zoned_img, cv2.COLOR_BGR2RGB))
    
    
imageio.mimwrite("../videos/KRA-14-45-2023-08-31-morning-Trim.mp4", frames, fps=fps)

100%|██████████| 568/568 [00:04<00:00, 138.67it/s]


In [1]:
from denku import show_video_in_jupyter
show_video_in_jupyter("../videos/KRA-14-45-2023-08-31-morning-Trim.mp4")