In [2]:
!pip install pycocotools
!pip install torchmetrics
!pip install faster-coco-eval



In [3]:
import numpy as np 
import pandas as pd 
import json 
from tqdm import tqdm 
from pprint import pprint 
import os
import matplotlib.pyplot as plt
import glob
import shutil
import cv2
from PIL import Image
import torch
import torchvision
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import ToTensor, Compose, Normalize, RandomAffine, ColorJitter
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
from torchmetrics.detection.mean_ap import MeanAveragePrecision

In [4]:
dataset_path = "/kaggle/input/pklot-dataset"


In [5]:
device = ("cuda" if torch.cuda.is_available() else "cpu")
device


'cuda'

In [6]:
train_dir = '/kaggle/input/pklot-dataset/train'
test_dir = '/kaggle/input/pklot-dataset/test'
val_dir = '/kaggle/input/pklot-dataset/valid'
output_path = "/kaggle/working/"

In [7]:
def get_json_file(directory):
    json_files = glob.glob(os.path.join(directory, "**", "*.json"), recursive = True)
    return str(json_files[0]) if json_files else None

In [8]:
anno_train = get_json_file(train_dir)
anno_val = get_json_file(val_dir)
anno_test = get_json_file(test_dir)

In [9]:
with open(anno_train, "r") as file:
    data = json.load(file)

In [9]:
data['categories']

[{'id': 0, 'name': 'spaces', 'supercategory': 'none'},
 {'id': 1, 'name': 'space-empty', 'supercategory': 'spaces'},
 {'id': 2, 'name': 'space-occupied', 'supercategory': 'spaces'}]

In [10]:
class CustomDataset(Dataset):
    def __init__(self, img_dir, ann_file, transforms=None):
        self.img_dir = img_dir
        self.transforms = transforms

        with open(ann_file, 'r') as f:
            coco = json.load(f)

        self.images = coco['images']
        self.annotations = coco['annotations']
        self.categories = coco['categories']

        self.image_to_annotations = {}
        for ann in self.annotations:
            img_id = ann['image_id']
            if img_id not in self.image_to_annotations:
                self.image_to_annotations[img_id] = []
            self.image_to_annotations[img_id].append(ann)

        self.images = [
            img for img in self.images
            if img['id'] in self.image_to_annotations and
               len(self.image_to_annotations[img['id']]) > 0
        ]

    def __getitem__(self, idx):
        img_info = self.images[idx]
        img_id = img_info['id']
        img_path = os.path.join(self.img_dir, img_info['file_name'])
        img = Image.open(img_path).convert("RGB")

        annotations = self.image_to_annotations[img_id]

        boxes = []
        labels = []

        for ann in annotations:
            boxes.append(ann['bbox'])
            labels.append(ann['category_id'])

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        boxes[:, 2:] += boxes[:, :2]  
        labels = torch.as_tensor(labels, dtype=torch.int64)

        target = {
            "boxes": boxes,
            "labels": labels,
            "image_id": torch.tensor([img_id], dtype=torch.int64)  
            }


        if self.transforms:
            img = self.transforms(img)

        return img, target

    def __len__(self):
        return len(self.images)

In [11]:
train_transform = Compose([
    RandomAffine(
        degrees=(-5, 5),
        translate=(0.15, 0.15),
        scale=(0.85, 1.15),
        shear=10
    ),
    ColorJitter(
        brightness=0.125,
        contrast=0.5,
        saturation=0.5,
        hue=0.05
    ),
    ToTensor(), 
])

In [12]:
val_transform = (ToTensor())

In [13]:
train_dataset = CustomDataset(train_dir, anno_train, train_transform)
val_dataset = CustomDataset(test_dir, anno_test, val_transform)

In [14]:
def collate_fn(batch):
    batch = [item for item in batch if item is not None]
    if not batch:  
        return None, None
    image, labels = zip(*batch)
    return list(image), list(labels) 

In [15]:
train_dataloader = DataLoader(
    train_dataset,
    batch_size=8,
    shuffle=True,
    collate_fn=  collate_fn
)

val_dataloader = DataLoader(
    val_dataset,
    batch_size=8,
    shuffle=False,
    collate_fn= collate_fn
)

In [16]:
def calculate_iou(box1, box2):
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])

    intersection = max(0, x2 - x1) * max(0, y2 - y1)
    area_box1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area_box2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
    union = area_box1 + area_box2 - intersection

    return intersection / union if union != 0 else 0

In [17]:

def precision_recall(pred_boxes, gt_boxes, iou_threshold=0.5):
    tp, fp, fn = 0, 0, 0
    gt_matched = set()

    for pred_box in pred_boxes:
        matched = False
        for i, gt_box in enumerate(gt_boxes):
            if i in gt_matched:
                continue
            if calculate_iou(pred_box, gt_box) >= iou_threshold:
                tp += 1
                gt_matched.add(i)
                matched = True
                break
        if not matched:
            fp += 1

    fn = len(gt_boxes) - len(gt_matched)
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0

    return precision, recall

In [18]:
num_classes = len(train_dataset.categories)

In [19]:
def get_model(num_classes):
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    
    return model

In [20]:
check_point_folder = os.path.join(output_path, "check_point")
os.makedirs(check_point_folder, exist_ok=True)

In [21]:
num_epochs = 10

In [22]:
def train():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    model = get_model(num_classes)
    model.to(device)
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=0.9)
    best_map = -1
    metric = MeanAveragePrecision(iou_type="bbox").to(device)
    
    check_point_folder = "checkpoints"
    if not os.path.exists(check_point_folder):
        os.makedirs(check_point_folder)

    for epoch in range(num_epochs):
        model.train()
        train_loss = []
        progress_bar = tqdm(train_dataloader, colour="cyan")

        for iter, (images, targets) in enumerate(progress_bar):
            images = [image.to(device) for image in images]
            targets = [{"boxes": target["boxes"].to(device), "labels": target["labels"].to(device)} for target in targets]

            losses = model(images, targets)
            loss = sum([loss for loss in losses.values()])
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())
            mean_loss = sum(train_loss) / len(train_loss)
            progress_bar.set_description(f"Epoch {epoch+1}/{num_epochs}. Loss {mean_loss:.4f}")

        # Evaluation phase
        model.eval()
        metric.reset()

        precision_list, recall_list = [], []

        with torch.no_grad():
            for images, targets in val_dataloader:
                images = [img.to(device) for img in images]
                targets = [
                    {"boxes": target["boxes"].to(device), "labels": target["labels"].to(device)}
                    for target in targets
                ]
                outputs = model(images)

                preds = [
                    {
                        "boxes": output["boxes"].to("cpu"),
                        "scores": output["scores"].to("cpu"),
                        "labels": output["labels"].to("cpu"),
                    }
                    for output in outputs
                ]
                gts = [
                    {
                        "boxes": target["boxes"].to("cpu"),
                        "labels": target["labels"].to("cpu"),
                    }
                    for target in targets
                ]

                # Update metric for mAP
                metric.update(preds, gts)

                # Calculate precision and recall using your function
                for pred, gt in zip(preds, gts):
                    precision, recall = precision_recall(pred["boxes"], gt["boxes"], iou_threshold=0.5)
                    precision_list.append(precision)
                    recall_list.append(recall)

        # Compute average precision and recall
        avg_precision = sum(precision_list) / len(precision_list) if precision_list else 0
        avg_recall = sum(recall_list) / len(recall_list) if recall_list else 0

        # Compute mAP
        result = metric.compute()
        map_50 = result["map_50"].item()
        map_50_95 = result["map"].item()  

        print(f"Epoch {epoch+1}/{num_epochs} - mAP@0.5: {map_50:.4f} - mAP@0.5:0.95: {map_50_95:.4f}, Precision: {avg_precision:.4f}, Recall: {avg_recall:.4f}")


        checkpoint = {
            "model_state_dict": model.state_dict(),
            "epoch": epoch + 1,
            "optimizer_state_dict": optimizer.state_dict(),
            "map": map_50
        }

        if map_50_95 > best_map:
            best_map = map_50_95
            torch.save(checkpoint, f"{check_point_folder}/best.pt")

        torch.save(checkpoint, f"{check_point_folder}/last.pt")
        torch.save(model.state_dict(), f"{check_point_folder}/last_model.pth")


In [23]:
pip install torchmetrics[detection]

Note: you may need to restart the kernel to use updated packages.


In [None]:
train()

Downloading: "https://download.pytorch.org/models/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth" to /root/.cache/torch/hub/checkpoints/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth
100%|██████████| 160M/160M [00:00<00:00, 199MB/s] 
Epoch 1/10. Loss 1.5016: 100%|[36m██████████[0m| 1063/1063 [20:36<00:00,  1.16s/it]


Epoch 1/10 - mAP@0.5: 0.0441 - mAP@0.5:0.95: 0.0111, Precision: 0.1410, Recall: 0.3325


Epoch 2/10. Loss 1.3768: 100%|[36m██████████[0m| 1063/1063 [20:15<00:00,  1.14s/it]


Epoch 2/10 - mAP@0.5: 0.0687 - mAP@0.5:0.95: 0.0231, Precision: 0.1692, Recall: 0.3808


Epoch 3/10. Loss 1.3292: 100%|[36m██████████[0m| 1063/1063 [20:18<00:00,  1.15s/it]


Epoch 3/10 - mAP@0.5: 0.1041 - mAP@0.5:0.95: 0.0419, Precision: 0.1942, Recall: 0.4313


Epoch 4/10. Loss 1.3050:   5%|[36m▍         [0m| 50/1063 [00:57<19:27,  1.15s/it]

In [61]:
import cv2
import pickle
import torch
from torchvision.transforms.functional import to_tensor
from shapely.geometry import box as shapely_box

slot_width = 130
slot_height = 65
pad_x, pad_y = 3, 3              # More generous padding
ios_thresh = 0.05                  # Allow low IoU but still call it occupied
score_thresh = 0.35                # Keep low-confidence cars
target_size = (1280, 720)  # match the size used in carposition.pkl collection


In [62]:
# ==== LOAD MODEL ====
def get_model(num_classes):
    from torchvision.models.detection import fasterrcnn_resnet50_fpn
    from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

    model = fasterrcnn_resnet50_fpn(weights="DEFAULT")
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

def load_trained_model(checkpoint_path, num_classes):
    model = get_model(num_classes)
    checkpoint = torch.load(checkpoint_path, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    model.to(device)
    model.eval()
    return model

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [63]:
# ==== LOAD PARKING SLOT POSITIONS ====
with open("/kaggle/input/positions/carposition.pkl", "rb") as f:
    top_left_coords = pickle.load(f)

slot_boxes = [[x - pad_x, y - pad_y, x + slot_width + pad_x, y + slot_height + pad_y] for x, y in top_left_coords]

In [64]:
# ==== IoSA Function ====
def calculate_ios(slot_box, pred_box):
    slot = shapely_box(*slot_box)
    pred = shapely_box(*pred_box)
    inter_area = slot.intersection(pred).area
    slot_area = slot.area
    return inter_area / slot_area if slot_area != 0 else 0


# ==== PREDICTION LOOP ====
def predict_on_video_with_ios(video_path, output_path, model, slot_boxes,
                              threshold=score_thresh, ios_thresh=ios_thresh):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)

    out_vid = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, target_size)

    print("🔄 Processing video with IoSA logic...")

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame = cv2.resize(frame, target_size)
        frame_tensor = to_tensor(frame).unsqueeze(0).to(device)

        with torch.no_grad():
            outputs = model(frame_tensor)[0]

        pred_boxes = outputs["boxes"].cpu().numpy()
        scores = outputs["scores"].cpu().numpy()
        pred_boxes = [box for box, score in zip(pred_boxes, scores) if score >= threshold]

        free_count = 0

        for i, slot in enumerate(slot_boxes):
            is_occupied = any(calculate_ios(slot, pred_box) > ios_thresh for pred_box in pred_boxes)

            x1, y1, x2, y2 = map(int, slot)
            color = (0, 0, 255) if is_occupied else (0, 255, 0)
            label = "Occupied" if is_occupied else "Empty"
            text_color = (255, 255, 255) if is_occupied else (0, 0, 0)

            if not is_occupied:
                free_count += 1

            # Draw slot box
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(frame, f"{label}", (x1 + 2, y1 + 15),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, text_color, 1)
            cv2.putText(frame, f"{i+1}", (x1 + 3, y1 + 35),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

        # Free space counter
        cv2.putText(frame, f"Free Spaces: {free_count}/{len(slot_boxes)}",
                    (40, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2)

        out_vid.write(frame)

    cap.release()
    out_vid.release()
    print("✅ Saved IoSA-based output to:", output_path)

In [65]:
# ==== RUN EVERYTHING ====
checkpoint_path = "/kaggle/input/best/other/default/1/best.pt"
video_path = "/kaggle/input/parking-video/car_test.mp4"
output_path = "/kaggle/working/parking_final_fixed.mp4"

num_classes = 3  # 0: spaces, 1: empty, 2: occupied (as per your dataset)
model = load_trained_model(checkpoint_path, num_classes)

predict_on_video_with_slots_only(video_path, output_path, model, slot_boxes)

  checkpoint = torch.load(checkpoint_path, map_location=device)


🔄 Processing video...
✅ Saved final video to: /kaggle/working/parking_final_fixed.mp4
