In [None]:
!nvidia-smi

In [None]:
!python -m pip install 'git+https://github.com/facebookresearch/detectron2.git'

In [None]:
import os
import cv2
import json
import copy
import shutil
import logging
import numpy as np
from pathlib import Path
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns

from datetime import datetime
from google.colab.patches import cv2_imshow

# Detectron2
from detectron2.data.datasets import register_coco_instances
from detectron2.data import DatasetCatalog, MetadataCatalog
from detectron2.utils.visualizer import Visualizer
from detectron2.utils.visualizer import ColorMode
from detectron2 import model_zoo
from detectron2.config import get_cfg
from detectron2.engine import DefaultPredictor
from detectron2.evaluation import COCOEvaluator
from detectron2.engine import DefaultTrainer
from detectron2.engine.hooks import HookBase
from detectron2.evaluation import inference_context
from detectron2.utils.logger import log_every_n_seconds
from detectron2.data import DatasetMapper, build_detection_test_loader
import detectron2.utils.comm as comm

# Metryki
import torch
from torchvision.ops import box_iou
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score

import time
import datetime

In [None]:
!pip install roboflow
ROBOFLOW_API_KEY="XXXXXXXX"
from roboflow import Roboflow
rf = Roboflow(api_key=ROBOFLOW_API_KEY)
project = rf.workspace("roboflow-jvuqo").project("football-ball-detection-rejhg")
version = project.version(1)
dataset = version.download("coco")


In [None]:
DATA_SET_NAME = "football-ball-detection"
FILTERED_DIR = "./filtered_ball_only"
os.makedirs(FILTERED_DIR, exist_ok=True)

In [None]:
for split in ['train', 'valid', 'test']:
    input_json = os.path.join(dataset.location, split, "_annotations.coco.json")
    output_json = os.path.join(FILTERED_DIR, f"{split}_ball_only.json")

    print(f"Processing split: {split}")

    if not os.path.exists(input_json):
        print(f"File not found: {input_json}")
        continue

    with open(input_json, 'r') as f:
        data = json.load(f)

    ball_keywords = ['ball']
    ball_cat_ids = []

    for cat in data.get('categories', []):
        cat_name_lower = cat['name'].lower()
        if any(keyword in cat_name_lower for keyword in ball_keywords):
            ball_cat_ids.append(cat['id'])

    if not ball_cat_ids:
        print(f"No 'ball' category found in {split}!")
        if len(data.get('categories', [])) == 1:
            ball_cat_ids = [data['categories'][0]['id']]
            print(f"Using the only available category: ID={ball_cat_ids[0]}")

    all_annotations = data.get('annotations', [])
    new_annotations = []
    for ann in all_annotations:
        if ann['category_id'] in ball_cat_ids:
            ann_copy = ann.copy()
            ann_copy['category_id'] = 0
            new_annotations.append(ann_copy)

    images_with_ball = set(ann['image_id'] for ann in new_annotations)
    all_images = data.get('images', [])
    new_images = [img for img in all_images if img['id'] in images_with_ball]

    filtered_data = {
        'info': data.get('info', {}),
        'licenses': data.get('licenses', []),
        'categories': [{'id': 0, 'name': 'ball', 'supercategory': 'none'}],
        'images': new_images,
        'annotations': new_annotations
    }

    os.makedirs(os.path.dirname(output_json), exist_ok=True)
    with open(output_json, 'w') as f:
        json.dump(filtered_data, f, indent=2)

    if len(new_annotations) == 0:
        print(f"Warning: {split} has 0 balls after filtering!")
    else:
        print(f"Saved filtered data for {split} to: {output_json}")

In [None]:
for ds_name in [f"{DATA_SET_NAME}-ball-train", f"{DATA_SET_NAME}-ball-test", f"{DATA_SET_NAME}-ball-valid"]:
    if ds_name in DatasetCatalog:
        DatasetCatalog.remove(ds_name)
    if ds_name in MetadataCatalog:
        MetadataCatalog.remove(ds_name)

TRAIN_DATA_SET_NAME = f"{DATA_SET_NAME}-ball-train"
train_json = os.path.join(FILTERED_DIR, "train_ball_only.json")
train_images = os.path.join(dataset.location, "train")

if os.path.exists(train_json):
    register_coco_instances(
        name=TRAIN_DATA_SET_NAME,
        metadata={},
        json_file=train_json,
        image_root=train_images
    )
    print(f"Registered: {TRAIN_DATA_SET_NAME}")
else:
    print(f"File not found: {train_json}")

VALID_DATA_SET_NAME = f"{DATA_SET_NAME}-ball-valid"
valid_json = os.path.join(FILTERED_DIR, "valid_ball_only.json")
valid_images = os.path.join(dataset.location, "valid")

if os.path.exists(valid_json):
    register_coco_instances(
        name=VALID_DATA_SET_NAME,
        metadata={},
        json_file=valid_json,
        image_root=valid_images
    )
    print(f"Registered: {VALID_DATA_SET_NAME}")
else:
    print(f"File not found: {valid_json}")

TEST_DATA_SET_NAME = f"{DATA_SET_NAME}-ball-test"
test_json = os.path.join(FILTERED_DIR, "test_ball_only.json")
test_images = os.path.join(dataset.location, "test")

if os.path.exists(test_json):
    register_coco_instances(
        name=TEST_DATA_SET_NAME,
        metadata={},
        json_file=test_json,
        image_root=test_images
    )
    print(f"Registered: {TEST_DATA_SET_NAME}")
else:
    print(f"File not found: {test_json}")

MetadataCatalog.get(TRAIN_DATA_SET_NAME).thing_classes = ["ball"]
MetadataCatalog.get(VALID_DATA_SET_NAME).thing_classes = ["ball"]
MetadataCatalog.get(TEST_DATA_SET_NAME).thing_classes = ["ball"]

print("\n--- Dataset Summary ---")

for name in [TRAIN_DATA_SET_NAME, VALID_DATA_SET_NAME, TEST_DATA_SET_NAME]:
    try:
        dataset_dicts = DatasetCatalog.get(name)
        metadata = MetadataCatalog.get(name)

        ball_count = sum(len(d.get('annotations', [])) for d in dataset_dicts)
        images_with_balls = sum(1 for d in dataset_dicts if len(d.get('annotations', [])) > 0)

        print(f"\nDataset: {name}")
        print(f"  Classes: {metadata.thing_classes}")
        print(f"  Total images: {len(dataset_dicts)}")
        print(f"  Images with balls: {images_with_balls}")
        print(f"  Total ball annotations: {ball_count}")

        if ball_count == 0:
            print(f"  WARNING: NO BALLS FOUND IN THIS DATASET!")

    except Exception as e:
        print(f"\nError for {name}: {e}")

In [None]:
ARCHITECTURE = "retinanet_R_50_FPN_3x"
CONFIG_FILE_PATH = f"COCO-Detection/{ARCHITECTURE}.yaml"
MAX_ITER = 1800
EVAL_PERIOD = 100
BASE_LR = 0.001
NUM_CLASSES = 1

OUTPUT_DIR_PATH = os.path.join(
    "ball_only_model",
    ARCHITECTURE,
    datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
)
os.makedirs(OUTPUT_DIR_PATH, exist_ok=True)

cfg = get_cfg()
cfg.merge_from_file(model_zoo.get_config_file(CONFIG_FILE_PATH))
cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url(CONFIG_FILE_PATH)
cfg.DATASETS.TRAIN = (TRAIN_DATA_SET_NAME,)
cfg.DATASETS.TEST = (VALID_DATA_SET_NAME,)
cfg.TEST.EVAL_PERIOD = EVAL_PERIOD
cfg.DATALOADER.NUM_WORKERS = 2
cfg.SOLVER.IMS_PER_BATCH = 5
cfg.SOLVER.BASE_LR = BASE_LR
cfg.SOLVER.MAX_ITER = MAX_ITER
cfg.INPUT.MIN_SIZE_TRAIN = (1080,)
cfg.INPUT.MAX_SIZE_TRAIN = 1920
cfg.INPUT.MIN_SIZE_TEST = 1080
cfg.INPUT.MAX_SIZE_TEST = 1920
cfg.SOLVER.WARMUP_ITERS = 200
cfg.SOLVER.WEIGHT_DECAY = 0.0001
cfg.MODEL.RETINANET.NUM_CLASSES = NUM_CLASSES
cfg.MODEL.RETINANET.DETECTIONS_PER_IMAGE = 1
cfg.TEST.DETECTIONS_PER_IMAGE = 1

cfg.MODEL.ANCHOR_GENERATOR.SIZES = [[6,8, 10, 12, 16, 20, 24]]
cfg.MODEL.ANCHOR_GENERATOR.ASPECT_RATIOS = [[0.5, 0.7, 1.0, 1.5]]

cfg.OUTPUT_DIR = OUTPUT_DIR_PATH

In [None]:
class LossEvalHook(HookBase):
    def __init__(self, eval_period, model, data_loader):
        self._model = model
        self._period = eval_period
        self._data_loader = data_loader

    def _do_loss_eval(self):
        total = len(self._data_loader)
        num_warmup = min(5, total - 1)
        start_time = time.perf_counter()
        total_compute_time = 0
        losses = []

        for idx, inputs in enumerate(self._data_loader):
            if idx == num_warmup:
                start_time = time.perf_counter()
                total_compute_time = 0
            start_compute_time = time.perf_counter()
            if torch.cuda.is_available():
                torch.cuda.synchronize()
            total_compute_time += time.perf_counter() - start_compute_time
            loss_batch = self._get_loss(inputs)
            losses.append(loss_batch)

        mean_loss = np.mean(losses)
        self.trainer.storage.put_scalar('validation_loss', mean_loss)
        comm.synchronize()

    def _get_loss(self, data):
        metrics_dict = self._model(data)
        metrics_dict = {
            k: v.detach().cpu().item() if isinstance(v, torch.Tensor) else float(v)
            for k, v in metrics_dict.items()
        }
        total_losses_reduced = sum(loss for loss in metrics_dict.values())
        return total_losses_reduced

    def after_step(self):
        next_iter = self.trainer.iter + 1
        is_final = next_iter == self.trainer.max_iter
        if is_final or (self._period > 0 and next_iter % self._period == 0):
            self._do_loss_eval()

In [None]:
class MyTrainer(DefaultTrainer):
    @classmethod
    def build_evaluator(cls, cfg, dataset_name, output_folder=None):
        if output_folder is None:
            output_folder = os.path.join(cfg.OUTPUT_DIR, "inference")
        return COCOEvaluator(dataset_name, cfg, True, output_folder)

    def build_hooks(self):
        hooks = super().build_hooks()
        hooks.insert(-1, LossEvalHook(
            cfg.TEST.EVAL_PERIOD,
            self.model,
            build_detection_test_loader(
                self.cfg,
                self.cfg.DATASETS.TEST[0],
                DatasetMapper(self.cfg, True)
            )
        ))
        return hooks

In [None]:
trainer = MyTrainer(cfg)
trainer.resume_or_load(resume=False)
trainer.train()


In [None]:
for split in ['train', 'valid', 'test']:
    input_json = os.path.join(dataset.location, split, "_annotations.coco.json")
    output_json = os.path.join(FILTERED_DIR, f"{split}_ball_only.json")

    print(f"Processing split: {split}")

    if not os.path.exists(input_json):
        print(f"File not found: {input_json}")
        continue

    with open(input_json, 'r') as f:
        data = json.load(f)

    ball_keywords = ['ball', 'football', 'soccer', 'pilka', 'piÅ‚ka']
    ball_cat_ids = []

    for cat in data.get('categories', []):
        cat_name_lower = cat['name'].lower()
        if any(keyword in cat_name_lower for keyword in ball_keywords):
            ball_cat_ids.append(cat['id'])

    if not ball_cat_ids:
        print(f"No 'ball' category found in {split}!")
        if len(data.get('categories', [])) == 1:
            ball_cat_ids = [data['categories'][0]['id']]
            print(f"Using the only available category: ID={ball_cat_ids[0]}")

    all_annotations = data.get('annotations', [])
    new_annotations = []
    for ann in all_annotations:
        if ann['category_id'] in ball_cat_ids:
            ann_copy = ann.copy()
            ann_copy['category_id'] = 0
            new_annotations.append(ann_copy)

    images_with_ball = set(ann['image_id'] for ann in new_annotations)
    all_images = data.get('images', [])
    new_images = [img for img in all_images if img['id'] in images_with_ball]

    filtered_data = {
        'info': data.get('info', {}),
        'licenses': data.get('licenses', []),
        'categories': [{'id': 0, 'name': 'ball', 'supercategory': 'none'}],
        'images': new_images,
        'annotations': new_annotations
    }

    os.makedirs(os.path.dirname(output_json), exist_ok=True)
    with open(output_json, 'w') as f:
        json.dump(filtered_data, f, indent=2)

    if len(new_annotations) == 0:
        print(f"Warning: {split} has 0 balls after filtering!")
    else:
        print(f"Saved filtered data for {split} to: {output_json}")

In [None]:
cfg.MODEL.WEIGHTS = os.path.join(cfg.OUTPUT_DIR, "model_final.pth")
cfg.MODEL.RETINANET.SCORE_THRESH_TEST = 0.3
cfg.MODEL.RETINANET.NMS_THRESH_TEST = 0.4
predictor = DefaultPredictor(cfg)

In [None]:
def evaluate_ball_detection(predictor, dataset_name, iou_threshold=0.5, conf_threshold=0.3):
    dataset_dicts = DatasetCatalog.get(dataset_name)

    all_gt_boxes = []
    all_pred_boxes = []
    all_pred_scores = []

    y_true_images = []
    y_pred_images = []

    TP = 0
    FP = 0
    FN = 0

    print(f"Evaluating on {len(dataset_dicts)} images...")

    for idx, d in enumerate(dataset_dicts):
        img = cv2.imread(d["file_name"])
        outputs = predictor(img)
        instances = outputs["instances"].to("cpu")

        pred_boxes = instances.pred_boxes.tensor.numpy()
        pred_scores = instances.scores.numpy()

        mask = pred_scores >= conf_threshold
        pred_boxes = pred_boxes[mask]
        pred_scores = pred_scores[mask]

        gt_boxes = []
        for anno in d["annotations"]:
            bbox = anno["bbox"]
            x, y, w, h = bbox
            gt_boxes.append([x, y, x+w, y+h])
        gt_boxes = np.array(gt_boxes)

        has_gt = len(gt_boxes) > 0
        has_pred = len(pred_boxes) > 0
        y_true_images.append(1 if has_gt else 0)
        y_pred_images.append(1 if has_pred else 0)

        if len(pred_boxes) > 0 and len(gt_boxes) > 0:
            ious = box_iou(torch.tensor(pred_boxes), torch.tensor(gt_boxes)).numpy()

            matched_gt = set()
            for pred_idx in range(len(pred_boxes)):
                best_gt_idx = np.argmax(ious[pred_idx])
                best_iou = ious[pred_idx, best_gt_idx]

                if best_iou >= iou_threshold and best_gt_idx not in matched_gt:
                    TP += 1
                    matched_gt.add(best_gt_idx)
                else:
                    FP += 1

            FN += len(gt_boxes) - len(matched_gt)

        elif len(pred_boxes) > 0:
            FP += len(pred_boxes)

        elif len(gt_boxes) > 0:
            FN += len(gt_boxes)

        if (idx + 1) % 50 == 0:
            print(f"  Processed {idx + 1}/{len(dataset_dicts)} images...")

    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

    cm = confusion_matrix(y_true_images, y_pred_images)

    print("\n--- Evaluation Results - Ball Detection ---")
    print(f"\nBounding Box Metrics (IoU={iou_threshold}):")
    print(f"  True Positives (TP):  {TP}")
    print(f"  False Positives (FP): {FP}")
    print(f"  False Negatives (FN): {FN}")
    print(f"\n  Precision: {precision:.4f}")
    print(f"  Recall:    {recall:.4f}")
    print(f"  F1 Score:  {f1:.4f}")

    print(f"\nConfusion Matrix (Image-level - ball detected):")
    print(f"  TN (no ball, not detected): {cm[0,0]}")
    print(f"  FP (no ball, detected):     {cm[0,1]}")
    print(f"  FN (ball present, not detected): {cm[1,0]}")
    print(f"  TP (ball present, detected):   {cm[1,1]}")

    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=['Not Detected', 'Detected'],
                yticklabels=['No Ball', 'Ball Present'])
    plt.ylabel('Ground Truth')
    plt.xlabel('Prediction')
    plt.title('Confusion Matrix - Ball Detection (Image-level)')
    plt.tight_layout()

    save_path = os.path.join(cfg.OUTPUT_DIR, "confusion_matrix.png")
    plt.savefig(save_path, dpi=150, bbox_inches='tight')
    print(f"\nConfusion matrix saved to: {save_path}")
    plt.show()

    return {
        'TP': TP, 'FP': FP, 'FN': FN,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'confusion_matrix': cm
    }

In [None]:
results = evaluate_ball_detection(
    predictor,
    VALID_DATA_SET_NAME,
    iou_threshold=0.5,
    conf_threshold=0.3
)

results_file = os.path.join(cfg.OUTPUT_DIR, "evaluation_results.json")
with open(results_file, 'w') as f:
    json.dump({
        'TP': int(results['TP']),
        'FP': int(results['FP']),
        'FN': int(results['FN']),
        'precision': float(results['precision']),
        'recall': float(results['recall']),
        'f1': float(results['f1'])
    }, f, indent=2)

print(f"Evaluation results saved to: {results_file}")