# YOLOv8 Installation

In [None]:
import os
import pandas as pd

os.makedirs('models', exist_ok=True)


folder_url = 'https://drive.google.com/drive/folders/1Dc0W67KQAJ34QCSI_PaZYqPFu1V1651_?usp=sharing'

!pip install --upgrade gdown

import gdown

folder_id = '1Dc0W67KQAJ34QCSI_PaZYqPFu1V1651_'


gdown.download_folder(f"https://drive.google.com/drive/folders/{folder_id}", quiet=False, use_cookies=False)

print("File nella cartella scaricata:")
print(os.listdir('models'))

In [None]:
import os
import pandas as pd

os.makedirs('data', exist_ok=True)


folder_url = 'https://drive.google.com/drive/folders/1Vz2TCsDS_bbENCkWykH5Mz1T0OxpSrks?usp=sharing'

import gdown

folder_id = '1Vz2TCsDS_bbENCkWykH5Mz1T0OxpSrks'


gdown.download_folder(f"https://drive.google.com/drive/folders/{folder_id}", quiet=False, use_cookies=False)

print("File nella cartella scaricata:")
print(os.listdir('data'))

In [None]:
!pip install opencv-python
!pip install inference==0.9.17
!pip install supervision>=0.20.0
!pip install ultralytics

# YOLOv8 Architecture


In [None]:
# Download the YOLOv8 Architecture File
!wget https://raw.githubusercontent.com/ultralytics/ultralytics/main/ultralytics/cfg/models/v8/yolov8.yaml

## Modified YOLOv8 Architecture for Small Objects

In [None]:
# Copy YOLOv8l Small Architecture
!cp yolov8.yaml yolov8l-small.yaml

#  YOLOv8 Training

In [None]:
# Training Original Model
from ultralytics import YOLO

model = YOLO("path_model")
results = model.train(
    data="path_data.yaml", epochs=250, imgsz=640, device=0, batch=16, workers=2, resume = False, lr0=0.01, lrf=0.001, momentum=0.95,
    weight_decay=0.0001, warmup_epochs=10, warmup_momentum=0.5, warmup_bias_lr=0.1, optimizer='SGD', patience=30, plots=True,
    name='path_save_dir', hsv_h=0.015, hsv_s=0.7, hsv_v=0.4, degrees=0.0, translate=0.1, scale=0.5, shear=0.0, perspective=0.0, flipud=0.>    fliplr=0.1, mosaic=1, mixup=0.2, copy_paste=0.0, cache=False, save=True, save_period=-1, project=None, exist_ok=False, pretrained=True,
    verbose=True, seed=0, deterministic=True, single_cls=False, rect=False, cos_lr=False, close_mosaic=10, amp=True, fraction=1.0, profile=False,
    freeze=None, multi_scale=False, overlap_mask=True, mask_ratio=4, dropout=0.0, val=True, split='val', save_json=False, save_hybrid=False,
    conf=None, iou=0.7, max_det=300, half=False, dnn=False, source=None, vid_stride=1, stream_buffer=False, visualize=False, augment=False,
    agnostic_nms=False, classes=None, retina_masks=False, embed=None, show=False, save_frames=False, save_txt=False, save_conf=False,
    save_crop=False, show_labels=True, show_conf=True, show_boxes=True, line_width=None, format='torchscript', keras=False, optimize=False,
    int8=False, dynamic=False, simplify=False, opset=None, workspace=4, nms=False, box=7.5, cls=0.5, dfl=1.5, pose=12.0, kobj=1.0,
    label_smoothing=0.0, nbs=64, auto_augment='randaugment', erasing=0.4, crop_fraction=1.0, cfg=None, tracker='botsort.yaml',
    save_dir='path_save_dir'
)
print(results)

# YOLOv8 Validation

In [None]:
from ultralytics import YOLO

# Load a model
model = YOLO("path_your_model")

# Customize validation settings
validation_results = model.val(data="path_your_data.yaml", imgsz=640, batch=16, conf=0.3, iou=0.5, device="0", split = "test")

#TensorRT


In [None]:
!pip install tensorrt
!pip install tensorrt_lean
!pip install tensorrt_dispatch
!pip install onnx onnxsim onnxruntime-gpu

In [None]:
import tensorrt
print(tensorrt.__version__)
assert tensorrt.Builder(tensorrt.Logger())

In [None]:
!yolo export model="path_your_model"  format=engine half=False device=0 workspace=12

## Speed


In [None]:
# Download modules
!gdown https://drive.google.com/uc?id=1RskX1wXVF0xSMAPgpkU-EsaUv8tD7lvS

In [None]:
# Unzip the modules
!unzip modules.zip

In [None]:
import cv2
import random
import time
import argparse
import os
from ultralytics import YOLO

def get_name(file_path):
  name_idx = 0
  file_pos = (file_path).rfind('\\')

  if(file_pos == -1):
      file_pos = (file_path).rfind('/')

      if(file_pos == -1):
          file_pos = 0

  name_idx = file_pos + 1

  name = file_path[name_idx:]

  return name

def get_save_path(file_name, folder_name):
  path = "result"
  save_path = os.path.join(path, folder_name)

  exists = os.path.exists(save_path)

  if(not exists):
      os.makedirs(save_path)

  save_path = os.path.join(save_path, file_name)

  return save_path

def draw_box(img, result, class_list, colors, label_size) :
  # Get information from result
  xyxy = result.boxes.xyxy.numpy()
  confidence = result.boxes.conf.numpy()
  class_id = result.boxes.cls.numpy().astype(int)
  # Pack together for easy use
  sum_output = list(zip(class_id, confidence, xyxy))
  # Copy image, in case that we need original image for something
  out_image = img.copy()

  for run_output in sum_output :
    # Unpack
    label, con, box = run_output
    # Choose color
    box_color = colors[int(label)]
    text_color = (255,255,255)
    # Get Class Name
    label = class_list[int(label)]
    # Draw object box
    first_half_box = (int(box[0]),int(box[1]))
    second_half_box = (int(box[2]),int(box[3]))
    cv2.rectangle(out_image, first_half_box, second_half_box, box_color, 2)
    # Create text
    text_print = '{label} {con:.2f}'.format(label = label, con = con)
    # Locate text position
    text_location = (int(box[0]), int(box[1] - 10 ))
    # Get size and baseline
    labelSize, baseLine = cv2.getTextSize(text_print, cv2.FONT_HERSHEY_SIMPLEX, label_size, 1)

    # Draw text's background
    cv2.rectangle(out_image
                    , (int(box[0]), int(box[1] - labelSize[1] - 10 ))
                    , (int(box[0])+labelSize[0], int(box[1] + baseLine-10))
                    , box_color , cv2.FILLED)
    # Put text
    cv2.putText(out_image, text_print ,text_location
                , cv2.FONT_HERSHEY_SIMPLEX , label_size
                , text_color, 2, cv2.LINE_AA)

  return out_image

def draw_fps(avg_fps, combined_img):
  avg_fps_str = float("{:.2f}".format(avg_fps))

  cv2.rectangle(combined_img, (10,2), (660,110), (255,255,255), -1)
  cv2.putText(combined_img, "FPS: "+str(avg_fps_str), (20, 90), cv2.FONT_HERSHEY_SIMPLEX, 3.5, (0,255,0), thickness=6)

  return combined_img


def detection(source, model, folder_name, half=False, label_size=1):
  # Initialize video
  cap = cv2.VideoCapture(source)

  # Initialize YOLOv8 model
  model_path = model
  yolov8_detector = YOLO(model_path)

  # Class Name and Colors
  label_map = yolov8_detector.names
  COLORS = [[random.randint(0, 255) for _ in range(3)] for _ in label_map]

  # FPS Detection
  frame_count = 0
  total_fps = 0
  avg_fps = 0

  # FPS Video
  total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
  frame_width = int(cap.get(3))
  frame_height = int(cap.get(4))

  video_frames = []

  while cap.isOpened():
    # Press key q to stop
    if cv2.waitKey(1) == ord('q'):
        break

    try:
        # Read frame from the video
        ret, frame = cap.read()
        if not ret:
            break
    except Exception as e:
        print(e)
        continue

    # # Start Time
    start = time.time()
    # Update object localizer
    results = yolov8_detector.predict(frame, half=half, conf=0.5, verbose=False,device="0")
    result = results[0].cpu()

    # Draw Detection Results
    combined_img = draw_box(frame, result, label_map, COLORS, label_size)

    end = time.time()
    # # End Time

    # Draw FPS
    frame_count += 1
    fps = 1 / (end - start)
    total_fps = total_fps + fps
    avg_fps = total_fps / frame_count

    combined_img = draw_fps(avg_fps, combined_img)

    # Append frame to array
    video_frames.append(combined_img)

    #
    print("(%2d / %2d) Frames Processed" % (frame_count, total_frames))

  print("\nCreate a Video:")

  # Get a file name
  file_name = get_name(source)
  # Get Save Path
  save_path = get_save_path(file_name, folder_name)
  # Create VideoWriter object.
  out = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'XVID'), int(avg_fps), (frame_width, frame_height))

  for frame in video_frames:
    out.write(frame)

  out.release()

  print("Video is saved in: "+save_path)




In [None]:
detection("path_your_video.mp4", "path_your_model", "path_output_directory", half=True)

In [None]:
from google.colab import files

files.download( "path_output_directory")

## Inference on Image


In [None]:
# Inference Using YOLOv8 Model
!yolo detect predict model= "path_your_model" source= "path_your_image" device=0

# Supervision


In [None]:
import json
from typing import Generator, List

import cv2
import numpy as np


def load_zones_config(file_path: str) -> List[np.ndarray]:
    """
    Load polygon zone configurations from a JSON file.

    This function reads a JSON file which contains polygon coordinates, and
    converts them into a list of NumPy arrays. Each polygon is represented as
    a NumPy array of coordinates.

    Args:
        file_path (str): The path to the JSON configuration file.

    Returns:
        List[np.ndarray]: A list of polygons, each represented as a NumPy array.
    """
    with open(file_path, "r") as file:
        data = json.load(file)
        return [np.array(polygon, np.int32) for polygon in data]


def find_in_list(array: np.ndarray, search_list: List[int]) -> np.ndarray:
    """Determines if elements of a numpy array are present in a list.

    Args:
        array (np.ndarray): The numpy array of integers to check.
        search_list (List[int]): The list of integers to search within.

    Returns:
        np.ndarray: A numpy array of booleans, where each boolean indicates whether
        the corresponding element in `array` is found in `search_list`.
    """
    if not search_list:
        return np.ones(array.shape, dtype=bool)
    else:
        return np.isin(array, search_list)


def get_stream_frames_generator(rtsp_url: str) -> Generator[np.ndarray, None, None]:
    """
    Generator function to yield frames from an RTSP stream.

    Args:
        rtsp_url (str): URL of the RTSP video stream.

    Yields:
        np.ndarray: The next frame from the video stream.
    """
    cap = cv2.VideoCapture(rtsp_url)
    if not cap.isOpened():
        raise Exception("Error: Could not open video stream.")

    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                print("End of stream or error reading frame.")
                break
            yield frame
    finally:
        cap.release()


In [None]:
from datetime import datetime
from typing import Dict

import numpy as np

import supervision as sv


class FPSBasedTimer:
    """
    A timer that calculates the duration each object has been detected based on frames
    per second (FPS).

    Attributes:
        fps (int): The frame rate of the video stream, used to calculate time durations.
        frame_id (int): The current frame number in the sequence.
        tracker_id2frame_id (Dict[int, int]): Maps each tracker's ID to the frame number
            at which it was first detected.
    """

    def __init__(self, fps: int = 30) -> None:
        """Initializes the FPSBasedTimer with the specified frames per second rate.

        Args:
            fps (int, optional): The frame rate of the video stream. Defaults to 30.
        """
        self.fps = fps
        self.frame_id = 0
        self.tracker_id2frame_id: Dict[int, int] = {}

    def tick(self, detections: sv.Detections) -> np.ndarray:
        """Processes the current frame, updating time durations for each tracker.

        Args:
            detections: The detections for the current frame, including tracker IDs.

        Returns:
            np.ndarray: Time durations (in seconds) for each detected tracker, since
                their first detection.
        """
        self.frame_id += 1
        times = []

        for tracker_id in detections.tracker_id:
            self.tracker_id2frame_id.setdefault(tracker_id, self.frame_id)

            start_frame_id = self.tracker_id2frame_id[tracker_id]
            time_duration = (self.frame_id - start_frame_id) / self.fps
            times.append(time_duration)

        return np.array(times)


class ClockBasedTimer:
    """
    A timer that calculates the duration each object has been detected based on the
    system clock.

    Attributes:
        tracker_id2start_time (Dict[int, datetime]): Maps each tracker's ID to the
            datetime when it was first detected.
    """

    def __init__(self) -> None:
        """Initializes the ClockBasedTimer."""
        self.tracker_id2start_time: Dict[int, datetime] = {}

    def tick(self, detections: sv.Detections) -> np.ndarray:
        """Processes the current frame, updating time durations for each tracker.

        Args:
            detections: The detections for the current frame, including tracker IDs.

        Returns:
            np.ndarray: Time durations (in seconds) for each detected tracker, since
                their first detection.
        """
        current_time = datetime.now()
        times = []

        for tracker_id in detections.tracker_id:
            self.tracker_id2start_time.setdefault(tracker_id, current_time)

            start_time = self.tracker_id2start_time[tracker_id]
            time_duration = (current_time - start_time).total_seconds()
            times.append(time_duration)

        return np.array(times)


In [None]:
import argparse
from typing import List
import cv2
import numpy as np
from ultralytics import YOLO

import supervision as sv

COLORS = sv.ColorPalette.from_hex(["#E6194B", "#3CB44B", "#FFE119", "#3C76D1"])
COLOR_ANNOTATOR = sv.ColorAnnotator(color=COLORS)
LABEL_ANNOTATOR = sv.LabelAnnotator(
    color=COLORS, text_color=sv.Color.from_hex("#000000")
)

source_video_path = "path_your_video"
zone_configuration_path = "/content/data/config.json"
output_video_path = "path_output_directory"
weights = "path_your_model"
device = "cuda"
confidence = 0.3
iou = 0.7
classes = 0

model = YOLO(weights,task = "detect")
tracker = sv.ByteTrack(minimum_matching_threshold=0.5)
video_info = sv.VideoInfo.from_video_path(video_path=source_video_path)
frames_generator = sv.get_video_frames_generator(source_video_path)

polygons = load_zones_config(file_path=zone_configuration_path)
zones = [
    sv.PolygonZone(
        polygon=polygon,
        triggering_anchors=(sv.Position.CENTER,),
    )
    for polygon in polygons
]
timers = [FPSBasedTimer(video_info.fps) for _ in zones]

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, video_info.fps, (video_info.width, video_info.height))

for frame in frames_generator:
    results = model(frame, verbose=False, device=device, conf=confidence)[0]
    detections = sv.Detections.from_ultralytics(results)
    detections = detections[find_in_list(detections.class_id, classes)]
    detections = detections.with_nms(threshold=iou)
    detections = tracker.update_with_detections(detections)

    annotated_frame = frame.copy()

    for idx, zone in enumerate(zones):
        annotated_frame = sv.draw_polygon(
            scene=annotated_frame, polygon=zone.polygon, color=COLORS.by_idx(idx)
        )

        detections_in_zone = detections[zone.trigger(detections)]
        time_in_zone = timers[idx].tick(detections_in_zone)
        custom_color_lookup = np.full(detections_in_zone.class_id.shape, idx)

        annotated_frame = COLOR_ANNOTATOR.annotate(
            scene=annotated_frame,
            detections=detections_in_zone,
            custom_color_lookup=custom_color_lookup,
        )
        labels = [
            f"#{tracker_id} {int(time // 60):02d}:{int(time % 60):02d}"
            for tracker_id, time in zip(detections_in_zone.tracker_id, time_in_zone)
        ]
        annotated_frame = LABEL_ANNOTATOR.annotate(
            scene=annotated_frame,
            detections=detections_in_zone,
            labels=labels,
            custom_color_lookup=custom_color_lookup,
        )

    out.write(annotated_frame)

out.release()
cv2.destroyAllWindows()


In [None]:
from google.colab import files

files.download("path_your_video")