## Preparando l'ambiente

#### Ultralytics 

In [None]:
!pip install ultralytics==8.3.19 --quiet

from IPython.display import clear_output
clear_output()

!yolo settings sync=False

#### Supervision

In [None]:
!pip install "supervision[assets]>=1.0.0,<2.0.0" --quiet

#### Motmetrics

In [None]:
!pip install motmetrics --quiet

#### import librerie

In [None]:
import os
import shutil
import re

import numpy as np
import cv2
import matplotlib.pyplot as plt
from ultralytics import YOLO
import supervision as sv
import motmetrics as mm

from tqdm.notebook import tqdm
from IPython.display import display, Image

import ultralytics
ultralytics.checks()
print(sv.__version__)

#### Verifica GPU

In [1]:
!nvidia-smi

"nvidia-smi" non � riconosciuto come comando interno o esterno,
 un programma eseguibile o un file batch.


#### Connessione a drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

#### Definizione percorsi

In [None]:
base_dir = '/content/drive/MyDrive/Tracking_project'
data_yaml_path = os.path.join(base_dir, 'dataset', 'data.yaml')

load_previous_model = True
if load_previous_model:
  results_dir = os.path.join(base_dir, 'runs', 'detect', 'first_train18')
  print(f"Caricamento del modello da {results_dir}")
else:
  results_dir = None
  print("Un nuovo training verrà eseguito")

## Semi-supervised learning

#### Pseudo labelling

In [None]:
%cd {base_dir}

import os
import cv2
from ultralytics import YOLO
import supervision as sv
from tqdm.notebook import tqdm

model_path = os.path.join(results_dir, 'weights', 'best.pt')
source = os.path.join(base_dir, "video_tagliato_finale.mp4")

pseudo_labels_dir = os.path.join(base_dir, "dataset", "labels", "train_pseudo")
os.makedirs(pseudo_labels_dir, exist_ok=True)

confidence_threshold = 0.3

model = YOLO(model_path)
video_info = sv.VideoInfo.from_video_path(source)
frame_generator = sv.get_video_frames_generator(source_path=source)

print("Inizio generazione pseudo-labels...")

for frame_index, frame in enumerate(tqdm(frame_generator, total=video_info.total_frames)):
    results = model(frame, verbose=False)[0]

    yolo_format_labels = []
    for box in results.boxes:
        if box.conf[0] >= confidence_threshold:
            class_id = int(box.cls[0])
            xywhn = box.xywhn[0]
            x_center, y_center, width, height = xywhn

            yolo_format_labels.append(f"{class_id} {x_center} {y_center} {width} {height}")

    if yolo_format_labels:
        label_path = os.path.join(pseudo_labels_dir, f"frame_{frame_index:06d}.txt")
        with open(label_path, 'w') as f:
            f.write("\n".join(yolo_format_labels))

print(f"Pseudo-labels salvate in: {pseudo_labels_dir}")

#### Rinomina file pseudo-labels

In [None]:
%cd {base_dir}

import os
import re

pseudo_labels_path = os.path.join(base_dir, "dataset", "labels", "pseudo")
START_INDEX = 270

print(f"Inizio la rinomina dei file in: {pseudo_labels_path}")
print(f"Il nuovo indice di partenza sarà: {START_INDEX}")

# Ottieni e ordina i file .txt validi
file_list = sorted(
    [f for f in os.listdir(pseudo_labels_path) if re.match(r'^frame_\d{6}\.txt$', f)]
)

# Prima passata: rinomina temporaneamente per evitare conflitti
for i, filename in enumerate(file_list):
    old_path = os.path.join(pseudo_labels_path, filename)
    temp_path = os.path.join(pseudo_labels_path, f"temp_{i:06d}.txt")
    os.rename(old_path, temp_path)

# Seconda passata: rinomina mantenendo distanza originale + offset
temp_files = sorted(
    [f for f in os.listdir(pseudo_labels_path) if f.startswith("temp_")]
)

for temp_filename in temp_files:
    # Estrai l'indice originale dal nome temporaneo associato al file
    i = int(re.search(r'temp_(\d+)\.txt', temp_filename).group(1))
    original_filename = file_list[i]

    try:
        original_index = int(re.search(r'_(\d{6})\.txt', original_filename).group(1))
    except AttributeError:
        print(f"Errore nel leggere l'indice da '{original_filename}', salto.")
        continue

    new_index = START_INDEX + original_index
    new_filename = f"frame_{new_index:06d}.txt"

    old_path = os.path.join(pseudo_labels_path, temp_filename)
    new_path = os.path.join(pseudo_labels_path, new_filename)
    os.rename(old_path, new_path)

    print(f"Rinominato: '{original_filename}' -> '{new_filename}'")

print("\nRinomina completata!")

#### Stampa di un frame con pseudo-labels

In [None]:
%cd {base_dir}

import os
import cv2
import supervision as sv
import numpy as np
import matplotlib.pyplot as plt

source = os.path.join(base_dir, "video_tagliato_finale.mp4")
pseudo_labels_path = os.path.join(base_dir, "dataset", "labels", "pseudo")

START_INDEX = 270
FRAME_TO_INSPECT = 274

cap = cv2.VideoCapture(source)
cap.set(cv2.CAP_PROP_POS_FRAMES, FRAME_TO_INSPECT-START_INDEX)
ret, frame = cap.read()
cap.release()

h, w, _ = frame.shape

label_file_name = f"frame_{FRAME_TO_INSPECT:06d}.txt"
label_path = os.path.join(pseudo_labels_path, label_file_name)

if os.path.exists(label_path) and os.path.getsize(label_path) > 0:
    yolo_data = np.loadtxt(label_path, ndmin=2)

    class_ids = yolo_data[:, 0].astype(int)
    boxes_yolo = yolo_data[:, 1:5]

    boxes_xyxy = np.zeros_like(boxes_yolo)
    boxes_xyxy[:, 0] = (boxes_yolo[:, 0] - boxes_yolo[:, 2] / 2) * w
    boxes_xyxy[:, 1] = (boxes_yolo[:, 1] - boxes_yolo[:, 3] / 2) * h
    boxes_xyxy[:, 2] = (boxes_yolo[:, 0] + boxes_yolo[:, 2] / 2) * w
    boxes_xyxy[:, 3] = (boxes_yolo[:, 1] + boxes_yolo[:, 3] / 2) * h

    detections = sv.Detections(xyxy=boxes_xyxy, class_id=class_ids)

    box_annotator = sv.BoxAnnotator(thickness=2)
    label_annotator = sv.LabelAnnotator(text_scale=0.5)
    labels = [model.model.names[class_id] for class_id in detections.class_id]
    annotated_frame = box_annotator.annotate(scene=frame.copy(), detections=detections)
    annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=detections, labels=labels)

    sv.plot_image(image=annotated_frame, size=(10, 10))
else:
    print(f"Nessun file di etichetta trovato per il frame {FRAME_TO_INSPECT}.")
    sv.plot_image(image=frame, size=(10, 10))

## Yolov8 Training

In [None]:
%cd {base_dir}

from ultralytics import YOLO

if not load_previous_model:

    print("Esecuzione di un nuovo training in corso...")

    model = YOLO('yolov8m.pt')
    results = model.train(
    task='detect',
    data=data_yaml_path,
    epochs=100,
    imgsz=960,
    plots=True,
    patience=10,
    name='first_train'
    )

    results_dir = results.save_dir
    print(f"I risultati sono stati salvati in: {results_dir}")
else:
    results_dir = os.path.join(base_dir, 'runs', 'detect', 'first_train18')
    print(f"Caricamento del modello da {results_dir}")



#### Matrice di confusione

In [None]:
display(Image(filename=f'{results_dir}/confusion_matrix.png', width=600))

#### Grafici metriche d'apprendimento

In [None]:
display(Image(filename=f'{results_dir}/results.png', width=600))

#### Batch di rilevazioni

In [None]:
display(Image(filename=f'{results_dir}/val_batch0_pred.jpg'))

#### Detection test

In [None]:
%cd {base_dir}

model_path = os.path.join(results_dir, 'weights', 'best.pt')

source_test_image = os.path.join(base_dir, 'dataset', 'images', 'test')

model = YOLO(model_path)

prediction_results = model.predict(source=source_test_image, conf=0.25, save=True)

### Stampa rilevazioni su frame

In [None]:
display(Image(filename=f'{base_dir}/result_image_detection.png', width=600))

## Tracking

In [None]:
%cd {base_dir}

import os
from ultralytics import YOLO
import numpy as np
import supervision as sv
import tqdm.notebook as tqdm

model_path = os.path.join(results_dir, 'weights', 'best.pt')
INPUT_VIDEO_PATH = os.path.join(base_dir, "output120.mp4")
OUTPUT_VIDEO_PATH = os.path.join(base_dir, "risultato_tracking120_3.mp4")
results_tracking_path = os.path.join(base_dir, "tracking_12.txt")

model = YOLO(model_path)
video_info = sv.VideoInfo.from_video_path(INPUT_VIDEO_PATH)

byte_tracker = sv.ByteTrack(
    track_activation_threshold=0.25,
    lost_track_buffer=video_info.fps * 2,
    frame_rate=video_info.fps
)

box_annotator = sv.BoxAnnotator(thickness=2)
trace_annotator = sv.TraceAnnotator(thickness=2, trace_length=video_info.fps)

label_annotator = sv.LabelAnnotator(
    text_position=sv.Position.TOP_CENTER,
    text_scale=0.5,
    text_color=sv.Color.WHITE,
    text_thickness=1)

mot_results = []

def process_frame(frame: np.ndarray, index: int) -> np.ndarray:

    results = model(frame, imgsz=960, verbose=False)[0]
    detections = sv.Detections.from_ultralytics(results)

    detections = byte_tracker.update_with_detections(detections)

    annotated_frame = frame.copy()
    labels = []

    for box, confidence, class_id, tracker_id, in zip(
        detections.xyxy,
        detections.confidence,
        detections.class_id,
        detections.tracker_id
    ):
      if tracker_id is not None:
        x_min, y_min, x_max, y_max = box
        width = x_max - x_min
        height = y_max - y_min
        mot_results.append(f"{index + 1}, {tracker_id}, {x_min}, {y_min}, {width}, {height}, {confidence}, -1, -1, -1\n")

        labels.append(f"ID: {tracker_id}")

    annotated_frame = trace_annotator.annotate(scene=annotated_frame, detections=detections)
    annotated_frame = box_annotator.annotate(scene=annotated_frame, detections=detections)
    annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=detections, labels=labels)

    return annotated_frame

sv.process_video(
    source_path=INPUT_VIDEO_PATH,
    target_path=OUTPUT_VIDEO_PATH,
    callback=process_frame
)

with open(results_tracking_path, 'w') as f:
    f.writelines(mot_results)

print(f"Il video risultato è stato salvato in: {OUTPUT_VIDEO_PATH}")
print(f"I risultati per la valutazione sono stati salvati in: {results_tracking_path}")

#### Valutazione Tracking

In [None]:
import motmetrics as mm
import numpy as np
import os

truth_path = os.path.join(base_dir, "dataset", "test_set.txt")
tracking_path = os.path.join(base_dir, "risultato_tracking.txt")

truth_loader = mm.io.loadtxt(truth_path, fmt="mot15-2D")
tracking_loader = mm.io.loadtxt(tracking_path, fmt="mot15-2D")

acc = mm.MOTAccumulator(auto_id=True)

for frame_id in truth_loader.index.get_level_values('FrameId').unique():
  if frame_id not in tracking_loader.index.get_level_values('FrameId'):
        continue

  truth_frame = truth_loader.loc[frame_id]
  tracking_frame = tracking_loader.loc[frame_id]

  dist = mm.distances.iou_matrix(truth_frame[['X', 'Y', 'Width', 'Height']].values, tracking_frame[['X', 'Y', 'Width', 'Height']].values, max_iou=0.5)
  acc.update(truth_frame.index.get_level_values('Id').tolist(), tracking_frame.index.get_level_values('Id').tolist(), dist, frameid = frame_id)

m_handler = mm.metrics.create()

met = m_handler.compute(acc, metrics = mm.metrics.motchallenge_metrics, name='acc')
print(met)