In [None]:
import os
import json
import glob
import torch
import torchvision
import pandas as pd

from PIL import Image
from tqdm import tqdm
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator

In [None]:
import os
import json
import glob
import torch
import torchvision
import pandas as pd

from PIL import Image
from tqdm import tqdm
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

if torch.cuda.is_available():
    print(torch.cuda.get_device_name(0))

In [None]:
class HW2Data(Dataset):
    def __init__(self, img_root, json_path=''):
        super().__init__()
        self.mode = img_root.split('/')[-1]
        self.img_root = img_root
        self.preprocess = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                                 std=[0.229, 0.224, 0.225])
        ])
        
        if self.mode != 'test':
            with open(json_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
                self.img_list = data['images']
                self.anno_list = data['annotations']
                self.cat_list = data['categories']
        else:
            self.img_list = glob.glob(f'{img_root}/*.png')

    def __len__(self):
        return len(self.img_list)
    
    def __getitem__(self, index):
        if self.mode != 'test':
            img_id = self.img_list[index]['id']
            img_path = f"{self.img_root}/{self.img_list[index]['file_name']}"
            
            img = Image.open(img_path).convert('RGB')
            img = self.preprocess(img)

            annos = [anno for anno in self.anno_list if anno['image_id'] == img_id]
            boxes = [
                    [anno['bbox'][0], anno['bbox'][1], 
                     anno['bbox'][0]+anno['bbox'][2], anno['bbox'][1]+anno['bbox'][3]] 
                     for anno in annos
                ]
            labels = [anno['category_id'] for anno in annos]

            target = {
                "boxes": torch.tensor(boxes),
                "labels": torch.tensor(labels)
            }
            return img, target
        
        img = Image.open(self.img_list[index]).convert('RGB')
        img = self.preprocess(img)
        return img, index+1

In [None]:
backbone = torchvision.models.resnet50(weights=torchvision.models.ResNet50_Weights.DEFAULT)
backbone = torch.nn.Sequential(*list(backbone.children())[:-2])
backbone.out_channels = 2048

Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to /Users/boan/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth
100%|██████████| 97.8M/97.8M [00:11<00:00, 9.29MB/s]


In [None]:
anchor_generator = AnchorGenerator(
    sizes=((16, 32, 64, 128, 256),),
    aspect_ratios=((0.5, 1.0, 2.0),) * 5
)
roi_pooler = torchvision.ops.MultiScaleRoIAlign(
    featmap_names=['0'],
    output_size=7,
    sampling_ratio=2
)
model = FasterRCNN(
    backbone,
    num_classes=11,
    rpn_anchor_generator=anchor_generator,
    box_roi_pool=roi_pooler
)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

In [None]:
def train_model(model, data_loader, optimizer, num_epochs=5, device='cuda', checkpoint_dir='checkpoints'):
    model.to(device)

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0.0

        pbar = tqdm(data_loader, desc=f"[Train] Epoch {epoch+1}")
        for images, targets in pbar:
            images = [images.to(device).squeeze(0)]
            targets = [{
                'boxes': targets['boxes'].to(device).squeeze(0),
                'labels': targets['labels'].to(device).squeeze(0)
            }]

            loss_dict = model(images, targets)
            loss = sum(loss for loss in loss_dict.values())

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            pbar.set_postfix(loss=total_loss / (pbar.n + 1))

        avg_loss = total_loss / len(data_loader)
        print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")

        os.makedirs(checkpoint_dir, exist_ok=True)
        checkpoint_path = os.path.join(checkpoint_dir, f"epoch_{epoch+1}.pth")
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
        }, checkpoint_path)
        print(f"Checkpoint saved: {checkpoint_path}")

def evaluate_model(model, data_loader, device='cuda', output_file='pred.json'):
    model.eval()
    model.to(device)
    results = []
    with torch.no_grad():
        for images, image_ids in data_loader:
            images = [img.to(device) for img in images]
            outputs = model(images)
            
            for img_id, output in zip(image_ids, outputs):
                result = {
                    'image_id': int(img_id),  
                    'bbox': [box.tolist() for box in output['boxes'].cpu()],
                    'score': [float(s) for s in output['scores'].cpu()],
                    'category_id': [int(c) for c in output['labels'].cpu()],
                }
                results.append(result)
    
    with open(output_file, 'w') as f:
        json.dump(results, f, indent=4)
    print(f"Evaluation results saved to {output_file}")


def load_checkpoint(model, optimizer, checkpoint_path):
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    start_epoch = checkpoint['epoch']
    print(f"Checkpoint loaded: {checkpoint_path}, Starting from epoch {start_epoch}")
    return start_epoch

# Now 3
def save_checkpoint(model, optimizer, epoch, checkpoint_dir):
    os.makedirs(checkpoint_dir, exist_ok=True)
    checkpoint_path = os.path.join(checkpoint_dir, f"epoch_{epoch+1}.pth")
    torch.save({
        'epoch': epoch + 1,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }, checkpoint_path)
    print(f"Checkpoint saved: {checkpoint_path}")

In [None]:
train_loader = DataLoader(
    HW2Data('./nycu-hw2-data/train', './nycu-hw2-data/train.json'), 
    batch_size=1, 
    shuffle=True, 
    num_workers=0, 
    pin_memory=True
)
train_model(model, train_loader, num_epochs=5, device=device)

In [None]:
valid_loader = DataLoader(
    HW2Data('./nycu-hw2-data/valid', './nycu-hw2-data/valid.json'), 
    batch_size=1, 
    shuffle=True, 
    num_workers=0, 
    pin_memory=True
)
def validate_model(model, data_loader, device='cuda'):
    model.eval()
    model.to(device)
    total_val_loss = 0.0

    with torch.no_grad():
        val_pbar = tqdm(data_loader, desc='[Validation]')
        for images, targets in val_pbar:
            images = [img.to(device) for img in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(images, targets)
            loss = sum(loss for loss in loss_dict.values())
            total_val_loss += loss.item()

            val_pbar.set_postfix(val_loss=total_val_loss / (val_pbar.n + 1))

    avg_val_loss = total_val_loss / len(data_loader)
    print(f"Validation Loss: {avg_val_loss:.4f}")
    return avg_val_loss

validate_model(model, valid_loader, device=device)

In [None]:
checkpoint_path = './checkpoints/epoch_5.pth'
checkpoint = torch.load(checkpoint_path)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
model.to(device)
model.eval()

test_loader = DataLoader(
    HW2Data('./nycu-hw2-data/test'),
    batch_size=1,
    shuffle=False,
    num_workers=0,
    pin_memory=True
)

results = []
score_threshold = 0.05

with torch.no_grad():
    for images, image_ids in test_loader:
        images = [img.to(device) for img in images]
        outputs = model(images)

        for img_id, output in zip(image_ids, outputs):
            boxes = output['boxes'].cpu()
            scores = output['scores'].cpu()
            labels = output['labels'].cpu()

            for box, score, label in zip(boxes, scores, labels):
                if score < 0.05:
                    continue
                x_min, y_min, x_max, y_max = box.tolist()
                results.append({
                    "image_id": int(img_id.item()),
                    "bbox": [x_min, y_min, x_max - x_min, y_max - y_min],
                    "score": float(score),
                    "category_id": int(label)
                })
                
                if len(results) < 5:
                    print("Debug:", results[-1])


with open("pred.json", "w") as f:
    json.dump(results, f, indent=4)


In [None]:
with open('pred.json') as f:
    preds = json.load(f)

image_predictions = {}

for pred in preds:
    image_id = pred["image_id"]
    category_id = pred["category_id"]
    x_min = pred["bbox"][0]
    score = pred["score"]

    if score < score_threshold:
        continue

    if image_id not in image_predictions:
        image_predictions[image_id] = []

    image_predictions[image_id].append((x_min, category_id))

all_image_ids = list(range(1, 13069))
result_rows = []

for image_id in all_image_ids:
    digits = image_predictions.get(image_id, [])

    if not digits:
        pred_label = -1
    else:
        digits.sort(key=lambda x: x[0])
        try:
            pred_label = int(''.join(str(d[1] - 1) for d in digits))
        except ValueError:
            pred_label = -1

    result_rows.append({"image_id": image_id, "pred_label": pred_label})

df = pd.DataFrame(result_rows)
df.to_csv("pred.csv", index=False)