In [None]:
# Installazione del package Ultralytics
!pip install ultralytics


from google.colab import drive

# Monta Google Drive su /content/drive
#drive.mount('/content/drive')

Collecting ultralytics
  Downloading ultralytics-8.3.59-py3-none-any.whl.metadata (35 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.13-py3-none-any.whl.metadata (9.4 kB)
Downloading ultralytics-8.3.59-py3-none-any.whl (906 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m906.8/906.8 kB[0m [31m21.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.13-py3-none-any.whl (26 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.59 ultralytics-thop-2.0.13


### GENERAZIONE DATASET

In [None]:
import cv2
import os
import xml.etree.ElementTree as ET

video_path = ['/content/ID-1.avi', '/content/ID-2.avi','/content/ID-3.avi','/content/ID-4.avi' ]  # Percorso del video
images_dir = 'drive/MyDrive/dataset/images/'  # Directory per salvare i fotogrammi
os.makedirs(images_dir, exist_ok=True)



for i in range(len(video_path)):

  # Caricamento del video
  cap = cv2.VideoCapture(video_path[i])
  frame_count = 0


  while cap.isOpened():
      ret, frame = cap.read()
      if not ret:
          break  # Fine del video
      # Salva il fotogramma come immagine
      frame_filename = os.path.join(images_dir, f'{i}_{frame_count:05d}.jpg')
      cv2.imwrite(frame_filename, frame)
      frame_count += 1

  cap.release()
#FINE CARICAMENTO IMMAGINI


xml_path = ['/content/ID-1.xml', '/content/ID-2.xml', '/content/ID-3.xml', '/content/ID-4.xml']  # Percorso del file XML
labels_dir = 'drive/MyDrive/dataset/labels/'      # Directory per i file YOLO
os.makedirs(labels_dir, exist_ok=True)


box_width = 25
box_height = 25



# Funzione per normalizzare le coordinate
def normalize_coordinates(x, y, img_width, img_height, box_width, box_height):
    x_center = x / img_width
    y_center = y / img_height
    width_norm = box_width / img_width
    height_norm = box_height / img_height
    return x_center, y_center, width_norm, height_norm

for i in range (len(xml_path)):

    # Carica il file XML
    tree = ET.parse(xml_path[i])
    root = tree.getroot()

    # Dimensioni originali dell'immagine
    original_width = int(root.find(".//original_size/width").text)
    original_height = int(root.find(".//original_size/height").text)

    # Processa i punti per ogni frame
    for track in root.findall('track'):
        for points in track.findall('points'):
            frame_id = points.get('frame')  # ID del frame
            outside = int(points.get('outside'))
            attributes = {attr.attrib["name"]: attr.text for attr in points.findall("attribute")}
            used_in_game = int(attributes.get("used_in_game", 1))  # Predefinito 1 se non specificato

            # Ignora i frame con outside=1 o used_in_game=0
            if outside == 1 or used_in_game == 0:
                continue

            coordinates = points.get('points').split(',')  # Estrai le coordinate
            x, y = float(coordinates[0]), float(coordinates[1])

            # Normalizza le coordinate per YOLO
            x_center, y_center, w, h = normalize_coordinates(
                x, y, original_width, original_height, box_width, box_height
            )

            # Scrivi il file YOLO per il frame
            yolo_filename = os.path.join(labels_dir, f'{i}_{int(frame_id):05d}.txt')
            with open(yolo_filename, 'w') as f:
                f.write(f"0 {x_center:.6f} {y_center:.6f} {w:.6f} {h:.6f}\n")  # Class ID = 0




#ELIMINAZIONE IMMAGINI NON ANNOTATE

# Ottieni i nomi base (senza estensione) dei file .txt nella directory labels
label_files = {os.path.splitext(f)[0] for f in os.listdir(labels_dir) if f.endswith('.txt')}


# Itera su tutti i file nella directory delle immagini
for image_file  in os.listdir(images_dir):
    image_base, ext = os.path.splitext(image_file)
    if image_base not in label_files:
        image_path = os.path.join(images_dir, image_file)
        if os.path.isfile(image_path):  # Verifica che sia un file
            os.remove(image_path)

### TRAINING MODELLO

In [None]:
from ultralytics import YOLO
!mv /content/training_yolo.yaml /content/drive/MyDrive/dataset/
import torch
torch.cuda.empty_cache()


# Carica il modello preaddestrato (es. yolov8s, yolov8m, etc.)
model = YOLO('yolov8m.pt')  # Puoi scegliere anche altre versioni (yolov8n.pt, yolov8m.pt, etc.)

model.train(
    data='/content/drive/MyDrive/dataset/training_yolo.yaml',  # Percorso del file YAML
    epochs=50,                    # Numero di epoche
    batch=16,                     # Dimensione del batch
    imgsz=800,
    patience=5,
    project='/content/drive/MyDrive/yolov8_project',  # Directory per i risultati
    name='sports_ball_model',     # Nome della sessione di training
    save=True,                    # Salva il modello al termine
)

### FUNZIONI PER POST-PROCESSING CON TECNICA DI EMBEDDING

In [None]:
import torch
import torchvision.transforms as transforms
from torchvision.models import efficientnet_b0
from PIL import Image
from sklearn.metrics.pairwise import cosine_similarity
import math
import os

# Definizione modello per embedding, con rimozione della testa
model_emb = efficientnet_b0(pretrained=True)
model_emb = torch.nn.Sequential(*list(model_emb.children())[:-1])
model_emb.eval()  # Modalità di valutazione (disabilita dropout, batchnorm)


# Preprocessamento per le immagini
preprocess = transforms.Compose([
    transforms.Resize((224, 224)),  # EfficientNet richiede immagini 224x224
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

def extract_embedding(image_path):


    image = Image.open(image_path).convert('RGB')  # Converte in RGB
    input_tensor = preprocess(image).unsqueeze(0)  # Aggiunge dimensione batch

    # Passa l'immagine attraverso il modello per ottenere le feature

    with torch.no_grad():  # Disabilita il calcolo dei gradienti
        features = model_emb(input_tensor)
    # Appiattisce il tensor (da [1, 1280, 1, 1] a [1, 1280])
    embedding = features.flatten(start_dim=1)
    return embedding


def extract_and_save_bbox(image_path, output_directory, indice, x, y, w, h):
    image = Image.open(image_path).convert("RGB")
    # Calcola il bounding box (left, top, right, bottom)
    left = x
    top = y
    right = x + w
    bottom = y + h

    # Estrai il bounding box
    cropped_image = image.crop((left, top, right, bottom))

    output_path = os.path.join(output_directory, f'bounding_box{indice}.jpg')
    cropped_image.save(output_path)



def generate_coordinates(x, y, num_points=8):

    coordinates = []
    angle_step = 360 / (num_points)  # Passo angolare per equidistribuzione

    for i in range(num_points):
        angle = angle_step * i  # Angolo corrente
        angle_rad = math.radians(angle)  # Conversione in radianti

        # Calcola le nuove coordinate
        new_x = int(x + 15 * math.cos(angle_rad))
        new_y = int(y + 10 * math.sin(angle_rad))
        coordinates.append((new_x, new_y))

        # Calcola le nuove coordinate
        new_x = int(x + 30 * math.cos(angle_rad))
        new_y = int(y + 20 * math.sin(angle_rad))
        coordinates.append((new_x, new_y))

        # Calcola le nuove coordinate
        new_x = int(x + 45 * math.cos(angle_rad))
        new_y = int(y + 25 * math.sin(angle_rad))
        coordinates.append((new_x, new_y))

    return coordinates

def prediciCoordinateSuccessive(previous_image, current_image, x, y, w, h):
  #Calcolo embedding bounding box immagine precedente
  extract_and_save_bbox(previous_image, tmp_directory, 0, x, y, w, h)
  embedding_0= extract_embedding(os.path.join(tmp_directory, "bounding_box0.jpg"))

  coordinate = generate_coordinates(x, y)
  maxSimilarity=-1
  i= 1
  #Scelta del bounding box candidato che massimizza la similarità del coseno
  for (x1, y1) in coordinate:
    extract_and_save_bbox(current_image, tmp_directory, i, x1, y1, w, h)
    embedding_1= extract_embedding(os.path.join(tmp_directory, f'bounding_box{i}.jpg'))
    similarity = cosine_similarity(embedding_0, embedding_1)
    if similarity> maxSimilarity:
      maxSimilarity = similarity
      x_ris, y_ris= x1, y1

    i=i+1
  return x_ris, y_ris





Downloading: "https://download.pytorch.org/models/efficientnet_b0_rwightman-7f5810bc.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_b0_rwightman-7f5810bc.pth
100%|██████████| 20.5M/20.5M [00:00<00:00, 141MB/s]


### PREDIZIONE VIDEO DI TEST

In [None]:
from ultralytics import YOLO

import cv2
import json
import numpy as np
import math

#Calcolo della distanza tra una posizione e la media delle posizioni precedenti.
def calculate_distance(center_x, center_y, last_predictions):
    sum_x = 0
    sum_y = 0
    for x, y in last_predictions:
        sum_x += x
        sum_y += y
    avg_x = sum_x / len(last_predictions)
    avg_y = sum_y / len(last_predictions)
    distance = math.sqrt((center_x - avg_x) ** 2 + (3*(center_y - avg_y)) ** 2)
    return distance



# Caricamento il modello YOLOv8 pre-addestrato
model = YOLO('best.pt')
video_path = ["ID-5.avi", "ID-6.avi"]

tmp_directory= 'tmp/'
os.makedirs(tmp_directory, exist_ok=True)


raccoltaRisultato = []

for j in range(len(video_path)):

  cap = cv2.VideoCapture(video_path[j])

  if not cap.isOpened():
      print("Errore nell'apertura del video!")
      exit()

  risultato = {}
  i = 0
  x_curr=-1
  y_curr=-1
  max_it=20

  last_predictions=[]

  # Lettura del video frame per frame
  while True:
      ret, frame = cap.read()
      if not ret:  # Se non ci sono più frame, esci
          break
      cv2.imwrite("/content/tmp/immagine_corrente.jpg", frame)

      key = f"{i:05}"
      i += 1
      results = model.predict(source=frame, conf=0.69, save=False, save_txt=False)



      palla_trovata = False

      min_distance=2000
      for result in results[0].boxes:
          class_index = result[0].cls.item()  # Indice della classe
          if class_index == 0:  # Classe corrispondente alla palla
              max_it=20
              palla_trovata = True
              x1, y1, x2, y2 = result[0].xyxy[0]  # Coordinate del bounding box
              center_x = float((x1 + x2) / 2)
              center_y = float((y1 + y2) / 2)


              if last_predictions!=[]:
                distance = calculate_distance(center_x, center_y, last_predictions)
                if distance < min_distance:
                  min_distance = distance
                  x_curr, y_curr= center_x, center_y
              else:
                x_curr, y_curr= center_x, center_y

      if not palla_trovata and x_curr!=-1 and max_it>0:
          x_p, y_p= prediciCoordinateSuccessive("tmp/immagine_precedente.jpg", "tmp/immagine_corrente.jpg",x_curr, y_curr, 40, 40)
          if x_p>0 and x_p<=1920 and y_p>0 and y_p<=1080:
            max_it-=1
            x_curr, y_curr= x_p, y_p
          else:
            x_curr, y_curr= -1, -1
      elif not palla_trovata:
        x_curr, y_curr= -1, -1

      risultato[key] = {"x": round(x_curr, 6), "y": round(y_curr, 6)}

      if x_curr==-1 and len(last_predictions)>0:
        last_predictions.pop(0)
      else:
        last_predictions.append((x_curr, y_curr))
        if len(last_predictions)>10:
          last_predictions.pop(0)
      cv2.imwrite("/content/tmp/immagine_precedente.jpg", frame)


      # Esci premendo 'q'
      if cv2.waitKey(1) & 0xFF == ord('q'):
          break

  tmp = {}
  tmp.update(risultato)
  raccoltaRisultato.append(tmp)

  cap.release()
  cv2.destroyAllWindows()





[1;30;43mOutput streaming troncato alle ultime 5000 righe.[0m
Speed: 4.4ms preprocess, 21.2ms inference, 0.7ms postprocess per image at shape (1, 3, 480, 800)

0: 480x800 (no detections), 21.2ms
Speed: 5.2ms preprocess, 21.2ms inference, 0.6ms postprocess per image at shape (1, 3, 480, 800)

0: 480x800 (no detections), 21.2ms
Speed: 6.6ms preprocess, 21.2ms inference, 0.7ms postprocess per image at shape (1, 3, 480, 800)

0: 480x800 (no detections), 21.2ms
Speed: 5.0ms preprocess, 21.2ms inference, 0.8ms postprocess per image at shape (1, 3, 480, 800)

0: 480x800 (no detections), 21.3ms
Speed: 8.2ms preprocess, 21.3ms inference, 0.8ms postprocess per image at shape (1, 3, 480, 800)

0: 480x800 (no detections), 21.2ms
Speed: 4.5ms preprocess, 21.2ms inference, 0.6ms postprocess per image at shape (1, 3, 480, 800)

0: 480x800 (no detections), 21.2ms
Speed: 5.1ms preprocess, 21.2ms inference, 0.6ms postprocess per image at shape (1, 3, 480, 800)

0: 480x800 (no detections), 26.5ms
Speed

In [None]:
import os
import re
import json
import numpy as np
import pandas as pd
from pathlib import Path
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import xml.etree.ElementTree as ET

plt.rcParams["figure.figsize"] = (10, 8)


class TrackingEvaluator:
    def __init__(self, gt_ann_file, pred_file):
        self.gt_ann_file = gt_ann_file
        self.pred_file = pred_file
        self.gt_data_points = {}
        self.pred_points = {}
        self.frames_idx = None

    @staticmethod
    def extract_points_data(xml_content):
        root = ET.fromstring(xml_content)
        points_data = {}

        for track in root.findall(".//track"):
            for point in track.findall("points"):
                data = {
                    'frame': int(point.get("frame")),
                    'outside': int(point.get("outside")),
                    'occluded': int(point.get("occluded")),
                    'keyframe': int(point.get("keyframe")),
                    'points': tuple(map(float, point.get("points").split(","))),
                    'z_order': int(point.get("z_order")),
                }
                if data['frame'] in points_data:
                    print(f'Alert: multiple frame entries for ID {data["frame"]}')
                points_data[data['frame']] = data

        return points_data

    @staticmethod
    def _convert_key(k):
        return int(Path(k).stem)

    def load_data(self):
        # Load ground truth data
        gt_content = Path(self.gt_ann_file).read_text()
        self.gt_data_points = self.extract_points_data(gt_content)


        # Load prediction data
        pred_content = Path(self.pred_file).read_text().replace('-Infinity', '-1').replace('Infinity', '-1')
        raw_pred_points = json.loads(pred_content)

        self.pred_points = {
            self._convert_key(k): v for k, v in raw_pred_points.items() if v['x'] >= 0
        }


    def compute_frame_indices(self):
        ordered_list_pred_frame = sorted(self.pred_points.keys())
        ordered_list_gt_frame = sorted(self.gt_data_points.keys())

        print(f'GT   frames: {ordered_list_gt_frame[0]} - {ordered_list_gt_frame[-1]}')
        print(f'PRED frames: {ordered_list_pred_frame[0]} - {ordered_list_pred_frame[-1]}')

        self.frames_idx = (
            min(ordered_list_gt_frame[0], ordered_list_pred_frame[0]),
            max(ordered_list_gt_frame[-1], ordered_list_pred_frame[-1]),
        )
        print(f'Frame Index Range: {self.frames_idx}')

    @staticmethod
    def is_match(x1, y1, x2, y2, threshold=4):
        p1 = np.array((x1, y1))
        p2 = np.array((x2, y2))
        euclid_dist = np.sqrt(np.dot((p1 - p2).T, (p1 - p2)))
        return euclid_dist < threshold

    def evaluate_metrics(self):
        cnt_match = 0
        cnt_no_match = 0
        cnt_no_pred = 0
        cnt_no_frame = 0

        for i in range(self.frames_idx[0], self.frames_idx[1] + 1):
            if i not in self.gt_data_points:
                cnt_no_frame += 1
                continue
            if i not in self.pred_points:
                cnt_no_pred += 1
                continue

            p1 = self.gt_data_points[i]
            p2 = self.pred_points[i]

            if self.is_match(*p1['points'], p2['x'], p2['y']):
                cnt_match += 1
            else:
                cnt_no_match += 1

        total_frames = len(self.gt_data_points)
        print(f'Total frames: {total_frames}')
        print(f'Total predictions: {len(self.pred_points)}')
        print(f'Matches: {cnt_match} ({cnt_match / total_frames:.3f})')
        print(f'No matches: {cnt_no_match} ({cnt_no_match / total_frames:.3f})')
        print(f'No predictions: {cnt_no_pred} ({cnt_no_pred / total_frames:.3f})')
        print(f'No frame data: {cnt_no_frame} ({cnt_no_frame / (self.frames_idx[1] - self.frames_idx[0] + 1):.3f})')

    def compute_tracking_sequence(self):
        norm_width = 1920
        norm_height = 1080

        gt_seq = []
        pred_seq = []

        for i in range(min(self.gt_data_points.keys()), max(self.gt_data_points.keys()) + 1):
            if i in self.gt_data_points:
                if i not in self.pred_points:
                    pred_seq.append((0, 0))
                else:
                    p2 = self.pred_points[i]
                    pred_seq.append((p2['x'] / norm_width, p2['y'] / norm_height))

                x, y = self.gt_data_points[i]['points']
                gt_seq.append((x / norm_width, y / norm_height))

        return gt_seq, pred_seq

    def compute_mse(self):
        gt_seq, pred_seq = self.compute_tracking_sequence()
        mse = mean_squared_error(gt_seq, pred_seq)
        print(f'Mean Squared Error: {mse}')
        return mse

In [None]:
future_len = 5
past_erase = 18

for risultato in raccoltaRisultato:

  x_past = -1

  for i in range(len(risultato)):
    key = f"{i:05}"
    if risultato.get(key)["x"] == -1 and x_past != -1: # se la corrente è (-1,-1) e c'è qualcosa da cancellare
      erase = True
      for m in range(i+1, min(i + future_len, len(risultato))):
        tmp = f"{m:05}"
        if risultato.get(key)["x"] != -1:
          erase = False
          break

      if erase:
        for n in range(max(0, i - past_erase), i):
          tmp = f"{n:05}"
          risultato[tmp] = {"x": round(-1, 6), "y": round(-1, 6)}

    x_past = risultato[key]["x"]


In [None]:
with open(f"annotazione{0}.json", "w") as f:
      json.dump(raccoltaRisultato[0], f, indent=4)
with open(f"annotazione{1}.json", "w") as f:
      json.dump(raccoltaRisultato[1], f, indent=4)

In [None]:
gt_ann_file = 'ID-5.xml'
pred_file = 'annotazione0.json'

evaluator = TrackingEvaluator(gt_ann_file, pred_file)
evaluator.load_data()
evaluator.compute_frame_indices()
evaluator.evaluate_metrics()
mse5 = evaluator.compute_mse();

GT   frames: 420 - 2580
PRED frames: 424 - 2986
Frame Index Range: (420, 2986)
Total frames: 346
Total predictions: 376
Matches: 243 (0.702)
No matches: 91 (0.263)
No predictions: 12 (0.035)
No frame data: 2221 (0.865)
Mean Squared Error: 0.01901327778898359


In [None]:
gt_ann_file = 'ID-6.xml'
pred_file = 'annotazione1.json'

evaluator = TrackingEvaluator(gt_ann_file, pred_file)
evaluator.load_data()
evaluator.compute_frame_indices()
evaluator.evaluate_metrics()
mse6 = evaluator.compute_mse();

GT   frames: 417 - 2495
PRED frames: 418 - 2495
Frame Index Range: (417, 2495)
Total frames: 263
Total predictions: 265
Matches: 227 (0.863)
No matches: 34 (0.129)
No predictions: 2 (0.008)
No frame data: 1816 (0.873)
Mean Squared Error: 0.00509335086108523


In [None]:
print((mse5+mse6)/2)

0.01205331432503441
