In [1]:
from ultralytics import YOLO
import os
import cv2
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans
from collections import defaultdict
from moviepy.editor import VideoFileClip
from matplotlib import pyplot as plt
from scipy.interpolate import UnivariateSpline

KeyboardInterrupt: 

In [None]:
ROOT_DIR = '/content/drive/MyDrive/Data Set/try'

In [None]:
def calculate_speed(track_history, fps):
    speeds = {}
    for track_id, history in track_history.items():
        print((track_id, history))
        if len(history) >= 2:
            x1, y1 = history[-2]
            x2, y2 = history[-1]
            distance = np.sqrt((x2 - x1)**2 + (y2 - y1)**2)
            speed = distance * fps
            speeds[track_id] = speed
    return speeds


def calculate_acceleration(track_history, fps):
    acceleration = {}

    for track_id, track in track_history.items():
        if len(track) > 1:
            speeds = []
            for i in range(1, len(track)):
                x1, y1 = track[i - 1]
                x2, y2 = track[i]
                distance = np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
                speed = distance * fps
                speeds.append(speed)

            if len(speeds) > 1:
                acceleration_values = []
                for i in range(1, len(speeds)):
                    acc = (speeds[i] - speeds[i - 1]) * fps
                    acceleration_values.append(acc)
                acceleration[track_id] = acceleration_values[-1]
            else:
                acceleration[track_id] = 0
        else:
            acceleration[track_id] = 0
    return acceleration


def calculate_direction(track_history):
    directions = {}
    for track_id, history in track_history.items():
        if len(history) >= 2:
            x1, y1 = history[-2]
            x2, y2 = history[-1]
            dx, dy = x2 - x1, y2 - y1
            angle = np.arctan2(dy, dx) * 180 / np.pi  
            directions[track_id] = angle
    return directions


def calculate_distance_traveled(track_history):
    distances = {}
    for track_id, history in track_history.items():
        total_distance = 0
        for i in range(1, len(history)):
            x1, y1 = history[i - 1]
            x2, y2 = history[i]
            total_distance += np.sqrt((x2 - x1)**2 + (y2 - y1)**2)
        distances[track_id] = total_distance
    return distances 


def get_dominant_color(image, k=1):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    pixels = image.reshape(-1, 3)
    kmeans = KMeans(n_clusters=k)
    kmeans.fit(pixels)
    dominant_color = kmeans.cluster_centers_[0]
    return tuple(int(c) for c in dominant_color)


def smooth_points(point_series, s=1.0):

  smoothed_points = []
  for point_series in point_series:
    x, y = zip(*point_series)
    x = np.array(x)
    y = np.array(y)

    if len(x) <= 3:  
      smoothed_points.append(point_series)  # No smoothing if not enough points
      continue

    spl = UnivariateSpline(x, y, s=s)
    smoothed_points.append(list(zip(x, spl(x))))

  return smoothed_points

In [None]:
VIDEO_PATH = "../datasets/test_videos/clear_street.mp4"
OUTPUT_VIDEO_PATH = "../datasets/test_videos/result_clear_street.mp4"
MODEL_PATH = "../datasets/detect/pretrained/train_100/best.pt"
SAVE_PHOTOS_PATH = "../datasets/objects_images"

properties_dict = {
    "frame": [],
    "track_id": [],
    "x": [],
    "y": [],
    "w": [],
    "h": [],
    "speed": [],
    "acceleration": [],
    "direction": [],
    "distance": [],
    "color": [], 
    "conf": [],
    "timestamp": [],
    "obj_img": []
}

track_history = defaultdict(lambda: [])

font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1
font_thickness = 2
text_color = (0, 0, 255)
box_color = (0, 0, 255)

model = YOLO(MODEL_PATH)

cap = cv2.VideoCapture(VIDEO_PATH)
fps = cap.get(cv2.CAP_PROP_FPS)

frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter(OUTPUT_VIDEO_PATH, cv2.VideoWriter_fourcc(*'XVID'), fps, (frame_width, frame_height))

frame_number = 0

while cap.isOpened():
    success, frame = cap.read()

    if success:
        frame_number += 1

        x_set = 0 
        y_set = 200
        w_set = 1100
        h_set = 1080

        roi = frame[y_set:h_set, x_set:w_set]

        cv2.rectangle(frame, (x_set, y_set), (w_set, h_set), (95, 150, 124), thickness=2)

        # apply track on ROI
        results = model.track(roi, persist=True)

        # process the tracking result
        if (len(results) > 0) & (results[0].boxes.id != None):
            # initialize the object parameters place
            scores = results[0].boxes.conf.cpu().numpy()
            track_ids = results[0].boxes.id.int().cpu().tolist()
            boxes = results[0].boxes.xywh.cpu()
            detections = np.hstack((boxes, scores[:, np.newaxis]))

            # get the object properties calculation parameters
            speeds = calculate_speed(track_history, fps)
            accelerations = calculate_acceleration(track_history, fps)
            directions = calculate_direction(track_history)
            distances = calculate_distance_traveled(track_history)

            # move over the objects to apply the operations on them
            for track_id, box in zip(track_ids, boxes):
                # get the object location and show it on the output video
                x, y, w, h = box

                x1, y1 = int(x), int(y)  
                x2, y2 = x1 + w, y1 + h  

                adjusted_x1 = x1 - int(w / 2)
                adjusted_x2 = adjusted_x1 + w
                adjusted_y1 = y1 - int(h / 2)
                adjusted_y2 = adjusted_y1 + h

                detected_object_image = roi[int(adjusted_y1):int(adjusted_y2), int(adjusted_x1):int(adjusted_x2)]

                # gather statistics that belong to each id
                speed = speeds.get(track_id, 0)
                acceleration = accelerations.get(track_id, 0)
                direction = directions.get(track_id, 0)
                distance = distances.get(track_id, 0)
                bbox_area = roi[int(adjusted_y1):int(adjusted_y2), int(adjusted_x1):int(adjusted_x2)] 
                color = get_dominant_color(bbox_area)
                detection_idx = np.where((np.abs(detections[:, :4] - [x, y, w, h]) < 1).all(axis=1))[0]
                confidence = detections[detection_idx, 4][0] if detection_idx.size > 0 else 0
                timestamp = frame_number / fps

                # save detected object image
                obj_img_save_path = os.path.join(SAVE_PHOTOS_PATH, f"frame_{frame_number}_id_{track_id}.jpg")
                
                if detected_object_image.size > 0:
                    cv2.imwrite(obj_img_save_path, detected_object_image)

                #draw the object detected
                cv2.rectangle(roi, (int(adjusted_x1), int(adjusted_y1)), (int(adjusted_x2), int(adjusted_y2)), box_color, thickness=2)

                # put the text for each object (put the id)
                text = f"ID: {track_id}"

                (text_width, text_height) = cv2.getTextSize(text, font, font_scale, font_thickness)[0]
                adjusted_text_x = adjusted_x1 + int((w - text_width) / 2)  
                adjusted_text_y = adjusted_y1 - 10  

                cv2.putText(roi, text, (adjusted_text_x, adjusted_text_y), font, font_scale, text_color, font_thickness)
                
                track = track_history[track_id]
                track.append((float(x), float(y))) # here also update track_history because i apply [call by value] variable

                points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))

                if len(track) > 2:
                    smoothed_points = np.array(smooth_points(points)).astype(np.int32).reshape((-1, 1, 2))
                    cv2.polylines(roi, [smoothed_points], isClosed=False, color=(0,255,0), thickness=2)
                
                # draw speed vector for object detected
                speed_v = (int(x + speed * np.cos(direction * np.pi / 180)), int(y + speed * np.sin(direction * np.pi / 180)))
                cv2.arrowedLine(roi, (x1, y1), speed_v, color=(0, 0, 0), thickness= 2)
                
                # draw normal tracking lines
                cv2.polylines(roi, [points], isClosed=False, color=(0, 0, 255), thickness=2)

                # save the statistics
                properties_dict["frame"].append(frame_number)
                properties_dict["track_id"].append(track_id)
                properties_dict["x"].append(x)
                properties_dict["y"].append(y)
                properties_dict["w"].append(w)
                properties_dict["h"].append(h)
                properties_dict["speed"].append(speed)
                properties_dict["acceleration"].append(acceleration)
                properties_dict["direction"].append(direction)
                properties_dict["distance"].append(distance)
                properties_dict["color"].append(color)
                properties_dict["conf"].append(confidence)
                properties_dict["timestamp"].append(timestamp)
                properties_dict["obj_img"].append(obj_img_save_path)
            
        out.write(frame)

        cv2.imshow("YOLOv8 Tracking", frame)

        if cv2.waitKey(1) & 0xFF == ord("q"):
            break
    else:
        break

cap.release()
out.release()
cv2.destroyAllWindows()

properties_df = pd.DataFrame(properties_dict)
properties_df.to_csv('../datasets/object_properties/clear_street_properties.csv', index=False)


0: 512x640 2 cars, 460.7ms
Speed: 25.2ms preprocess, 460.7ms inference, 17.8ms postprocess per image at shape (1, 3, 512, 640)

0: 512x640 2 cars, 614.6ms
Speed: 4.4ms preprocess, 614.6ms inference, 3.5ms postprocess per image at shape (1, 3, 512, 640)
(1, [(635.961181640625, 17.913551330566406)])
(2, [(640.2093505859375, 96.55947875976562)])

0: 512x640 2 cars, 439.7ms
Speed: 1.0ms preprocess, 439.7ms inference, 0.0ms postprocess per image at shape (1, 3, 512, 640)
(1, [(635.961181640625, 17.913551330566406), (635.441162109375, 20.03464698791504)])
(2, [(640.2093505859375, 96.55947875976562), (639.6746826171875, 100.44627380371094)])

0: 512x640 2 cars, 373.5ms
Speed: 8.2ms preprocess, 373.5ms inference, 0.0ms postprocess per image at shape (1, 3, 512, 640)
(1, [(635.961181640625, 17.913551330566406), (635.441162109375, 20.03464698791504), (635.3912353515625, 21.99607276916504)])
(2, [(640.2093505859375, 96.55947875976562), (639.6746826171875, 100.44627380371094), (638.5530395507812,