In [None]:
# Basic imports
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image

# PyTorch imports
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision.transforms import functional as F
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.backbone_utils import BackboneWithFPN
from torchvision.ops.feature_pyramid_network import LastLevelMaxPool

# Utilities for bounding boxes
import torchvision.transforms as T
from torchvision.ops import nms

# Progress bar
from tqdm import tqdm

# Warnings
import warnings
warnings.filterwarnings("ignore")
print("import finish")

In [None]:

"""
from torchvision.models import resnext50_32x4d

# 選擇裝置
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 載入較小的 ResNeXt-50 模型
backbone_model = resnext50_32x4d(pretrained=True)

# 定義 FPN 用的輸出 channel
backbone_model.out_channels = 2048  # ResNeXt-50 的 layer4 輸出

# 包裝成帶有 FPN 的 backbone
backbone_with_fpn = BackboneWithFPN(
    backbone_model,
    return_layers={'layer1': '0', 'layer2': '1', 'layer3': '2', 'layer4': '3'},
    in_channels_list=[256, 512, 1024, 2048],
    out_channels=256,
    extra_blocks=LastLevelMaxPool()
)

# 建立 Faster R-CNN 模型
model = FasterRCNN(backbone=backbone_with_fpn, num_classes=11)  # 10 digits + background
model.to(device)
###
"""

In [None]:
import torch
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.backbone_utils import BackboneWithFPN
from torchvision.models import resnext50_32x4d
from torchvision.ops import misc as misc_nn_ops
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.ops.feature_pyramid_network import LastLevelMaxPool

# Step 1: Define the model structure (must match the one used in training)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load ResNeXt-50
backbone = resnext50_32x4d(pretrained=False)
backbone.out_channels = 2048

# Wrap in FPN
backbone_with_fpn = BackboneWithFPN(
    backbone,
    return_layers={"layer1": "0", "layer2": "1", "layer3": "2", "layer4": "3"},
    in_channels_list=[256, 512, 1024, 2048],
    out_channels=256,
    extra_blocks=LastLevelMaxPool()
)

# Build the FasterRCNN model (must match num_classes from training)
model = FasterRCNN(backbone=backbone_with_fpn, num_classes=11)  # 10 digits + background
checkpoint = torch.load('/kaggle/input/version1/pytorch/default/1/fasterrcnn_epoch2.pth')
model.load_state_dict(checkpoint['model_state_dict'])
model.to(device)

In [None]:
class DigitDataset(Dataset):
    def __init__(self, root_dir, annotation_path, transforms=None):
        self.root_dir = root_dir
        self.transforms = transforms
        with open(annotation_path) as f:
            coco = json.load(f)

        # 建立 image_id -> file_name 對照
        self.image_info = {img["id"]: img["file_name"] for img in coco["images"]}

        # 建立 image_id -> list of annotations
        self.annotations = {}
        for ann in coco["annotations"]:
            img_id = ann["image_id"]
            if img_id not in self.annotations:
                self.annotations[img_id] = []
            self.annotations[img_id].append(ann)

        self.ids = list(self.image_info.keys())

    def __getitem__(self, idx):
        image_id = self.ids[idx]
        img_path = os.path.join(self.root_dir, self.image_info[image_id])
        img = Image.open(img_path).convert("RGB")

        boxes = []
        labels = []
        for ann in self.annotations.get(image_id, []):
            boxes.append(ann["bbox"])  # COCO 格式: [x_min, y_min, w, h]
            labels.append(ann["category_id"])

        boxes = torch.tensor(boxes, dtype=torch.float32)
        labels = torch.tensor(labels, dtype=torch.int64)

        # Faster R-CNN expects [x_min, y_min, x_max, y_max]
        boxes[:, 2] = boxes[:, 0] + boxes[:, 2]
        boxes[:, 3] = boxes[:, 1] + boxes[:, 3]

        target = {
            "boxes": boxes,
            "labels": labels,
            "image_id": torch.tensor([image_id])
        }

        if self.transforms:
            img = self.transforms(img)

        return img, target

    def __len__(self):
        return len(self.ids)
print("define dataset finish")

In [None]:
def get_transform(train):
    transforms = []
    transforms.append(T.ToTensor())
    # 可以加更多 Augmentation（僅限訓練時）
    if train:
        transforms.append(T.RandomHorizontalFlip(0.5))
    return T.Compose(transforms)

# collate_fn for variable-size targets
def collate_fn(batch):
    return tuple(zip(*batch))
print("define transform finish")

In [None]:
from torch.utils.data import DataLoader
import json
train_dataset = DigitDataset(
    root_dir='/kaggle/input/dataset/nycu-hw2-data/train',
    annotation_path='/kaggle/input/dataset/nycu-hw2-data/train.json',
    transforms=get_transform(train=False)
)

valid_dataset = DigitDataset(
    root_dir='/kaggle/input/dataset/nycu-hw2-data/valid',
    annotation_path='/kaggle/input/dataset/nycu-hw2-data/valid.json',
    transforms=get_transform(train=False)
)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)
print("dataset ready")

In [None]:
for images, targets in train_loader:
    print(type(images), len(images), type(images[0]))
    print(type(targets), len(targets), type(targets[0]))
    break

In [None]:
for images, targets in train_loader:
    print("✅ Images:")
    print(type(images), len(images))
    print(images[0].shape, images[0].dtype)
    
    print("\n✅ Targets:")
    print(type(targets), len(targets))
    print(targets[0].keys())
    print("boxes:", targets[0]["boxes"].shape, targets[0]["boxes"].dtype)
    print("labels:", targets[0]["labels"].shape, targets[0]["labels"].dtype)
    print("image_id:", targets[0]["image_id"], targets[0]["image_id"].shape)
    
    break

In [None]:
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
def get_optimizer(model, lr=0.005, weight_decay=1e-4):
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(params, lr=lr, momentum=0.9, weight_decay=weight_decay)
    return optimizer

def get_scheduler(optimizer):
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)
    return scheduler
print("define opt sch finish")

In [None]:
def train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10):
    model.train()
    running_loss = 0.0
    progress = tqdm(data_loader, desc=f"Epoch {epoch}")
    count = 0
    for images, targets in progress:
        count +=1
        images = list(img.to(device) for img in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        if count % 100 == 0:
            print(count)
        running_loss += losses.item()
        progress.set_postfix(loss=losses.item())

    epoch_loss = running_loss / len(data_loader)
    return epoch_loss
print("define train one epoch finish")

In [None]:
def save_checkpoint(model, optimizer, epoch, path="checkpoint"):
    os.makedirs(path, exist_ok=True)
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict()
    }, os.path.join(path, f"fasterrcnn_epoch2{epoch}.pth"))
print("define save_chechpoint finish")

In [None]:
from torchvision.ops import box_iou
from collections import defaultdict
import numpy as np
import pycocotools.mask as mask_util
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
import json
import tempfile
print("import 2 finish")

In [None]:
@torch.no_grad()
def evaluate_on_validation(model, valid_loader, device):
    model.eval()
    running_loss = 0.0

    for images, targets in tqdm(valid_loader, desc="Validating"):
        images = list(img.to(device) for img in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        # 計算 loss，但不進行 backward
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        running_loss += losses.item()

    val_loss = running_loss / len(valid_loader)
    return val_loss
print("define eval finish")

In [None]:
def evaluate_map(model, valid_loader, annotation_file, device):
    model.eval()
    results = []
    image_ids = []

    print("🔍 Running inference for mAP...")
    for images, targets in tqdm(valid_loader, desc="Evaluating mAP"):
        images = list(img.to(device) for img in images)
        outputs = model(images)

        for target, output in zip(targets, outputs):
            image_id = int(target["image_id"].item())
            image_ids.append(image_id)

            boxes = output["boxes"].detach().cpu().numpy()
            scores = output["scores"].detach().cpu().numpy()
            labels = output["labels"].detach().cpu().numpy()

            for box, score, label in zip(boxes, scores, labels):
                x_min, y_min, x_max, y_max = box
                width = x_max - x_min
                height = y_max - y_min
                results.append({
                    "image_id": image_id,
                    "category_id": int(label),
                    "bbox": [x_min, y_min, width, height],
                    "score": float(score)
                })

    # 存成 pred.json 暫存檔
    with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as tmp_pred_file:
        json.dump(results, tmp_pred_file)
        pred_path = tmp_pred_file.name

    coco_gt = COCO(annotation_file)
    coco_dt = coco_gt.loadRes(pred_path)

    coco_eval = COCOeval(coco_gt, coco_dt, iouType='bbox')
    coco_eval.evaluate()
    coco_eval.accumulate()
    coco_eval.summarize()

    return coco_eval.stats[0]  # mAP at IoU=0.5:0.95
print("define eval map finish")

In [None]:
def train_model(model, train_loader, valid_loader, device, num_epochs=2, save_every=1, val_json_path='/kaggle/input/dataset/nycu-hw2-data/valid.json'):
    optimizer = get_optimizer(model)
    scheduler = get_scheduler(optimizer)

    for epoch in range(1, num_epochs + 1):
        print(f"\n🟢 Epoch {epoch}/{num_epochs}")
        train_loss = train_one_epoch(model, optimizer, train_loader, device, epoch)
        if epoch % save_every == 0:
            save_checkpoint(model, optimizer, epoch)
        #val_loss = evaluate_on_validation(model, valid_loader, device)
        #map_score = evaluate_map(model, valid_loader, val_json_path, device)
        scheduler.step()

        #print(f"[Epoch {epoch}]  Train Loss: {train_loss:.4f} |  Val Loss: {val_loss:.4f} |  mAP: {map_score:.4f}")
print("define train finish")

In [None]:
train_model(model, train_loader, valid_loader, device, num_epochs=2, save_every=1, val_json_path="/kaggle/input/dataset/nycu-hw2-data/valid.json")
print("train finish")

In [None]:
"""
import torch
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.backbone_utils import BackboneWithFPN
from torchvision.models import resnext50_32x4d
from torchvision.ops import misc as misc_nn_ops
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.ops.feature_pyramid_network import LastLevelMaxPool

# Step 1: Define the model structure (must match the one used in training)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load ResNeXt-50
backbone = resnext50_32x4d(pretrained=False)
backbone.out_channels = 2048

# Wrap in FPN
backbone_with_fpn = BackboneWithFPN(
    backbone,
    return_layers={"layer1": "0", "layer2": "1", "layer3": "2", "layer4": "3"},
    in_channels_list=[256, 512, 1024, 2048],
    out_channels=256,
    extra_blocks=LastLevelMaxPool()
)

# Build the FasterRCNN model (must match num_classes from training)
model = FasterRCNN(backbone=backbone_with_fpn, num_classes=11)  # 10 digits + background
checkpoint = torch.load('/kaggle/input/version1/pytorch/default/1/fasterrcnn_epoch2.pth')
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()
"""

In [None]:
import json

def save_predictions(model, data_loader, device, output_file="pred.json", score_threshold=0.3):
    model.eval()
    results = []

    with torch.no_grad():
        for images, targets in tqdm(data_loader, desc="Running inference"):
            images = list(img.to(device) for img in images)
            outputs = model(images)

            for i, output in enumerate(outputs):
                image_id = targets[i]
                for box, label, score in zip(output["boxes"], output["labels"], output["scores"]):
                    if score < score_threshold:
                        continue

                    box = box.tolist()
                    x_min, y_min, x_max, y_max = box
                    width = x_max - x_min
                    height = y_max - y_min

                    results.append({
                        "image_id": image_id,
                        "bbox": [x_min, y_min, width, height],
                        "score": score.item(),
                        "category_id": label.item()
                    })

    # 儲存成 JSON 並加上換行與縮排
    with open(output_file, "w") as f:
        json.dump(results, f, indent=4)

    print(f"✅ Saved {len(results)} predictions to {output_file}")

In [None]:
class TestDataset(torch.utils.data.Dataset):
    def __init__(self, image_folder, transform=None):
        self.image_folder = image_folder
        self.image_paths = sorted(list(image_folder.glob("*.png")))  # or png
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        image_id = int(image_path.stem)  # assuming image name is like '123.jpg'
        return image, image_id

In [None]:
from torchvision import transforms
from pathlib import Path

test_transform = transforms.Compose([
    transforms.ToTensor()
])

test_dataset = TestDataset(Path("/kaggle/input/dataset/nycu-hw2-data/test"), transform=test_transform)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=4, shuffle=False, collate_fn=lambda x: list(zip(*x)))
print("test data ready")

In [None]:

model.to(device)
save_predictions(model, test_loader, device, output_file="pred.json")