# Import 

In [1]:
import torch
import torch.nn as nn
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.ops import MultiScaleRoIAlign
from torchvision.datasets import CocoDetection
from torch.utils.data import DataLoader
import torch
import torchvision
import numpy as np
from torchvision.transforms import functional as F

# Class and Function

### ResVGG16 Lite

In [2]:
# Pertama definisikan ResidualBlock
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, num_convs):
        super().__init__()
        self.convs = nn.Sequential()
        
        for i in range(num_convs):
            input_channels = in_channels if i == 0 else out_channels
            self.convs.add_module(f'conv{i+1}', nn.Conv2d(
                input_channels, out_channels, kernel_size=3, padding=1))
            self.convs.add_module(f'bn{i+1}', nn.BatchNorm2d(out_channels))
            self.convs.add_module(f'relu{i+1}', nn.ReLU(inplace=True))
        
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.skip = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=2),
            nn.BatchNorm2d(out_channels)
        )
            
    def forward(self, x):
        identity = self.skip(x)
        out = self.convs(x)
        out = self.pool(out)
        out += identity
        return nn.functional.relu(out)

### Backbone

In [3]:

# Kemudian baru definisikan DetectorBackbone yang menggunakan ResidualBlock
class DetectorBackbone(nn.Module):
    def __init__(self, pretrained_weights=None):
        super().__init__()
        self.blocks = nn.Sequential(
            ResidualBlock(3, 32, 2),
            ResidualBlock(32, 64, 2),
            ResidualBlock(64, 128, 3),
            ResidualBlock(128, 256, 3),
            ResidualBlock(256, 256, 3)
        )
        self.out_channels = 256
        
        if pretrained_weights:
            self.load_pretrained(pretrained_weights)
            
            # Freeze parameters
            for param in self.parameters():
                param.requires_grad = False

    def load_pretrained(self, weight_path):
        pretrained_dict = torch.load(weight_path)
        model_dict = self.state_dict()
        pretrained_dict = {k: v for k, v in pretrained_dict.items() if k in model_dict}
        self.load_state_dict(pretrained_dict, strict=False)
        print(f"Loaded {len(pretrained_dict)}/{len(model_dict)} parameters")

    def forward(self, x):
        features = self.blocks(x)
        return {'0': features}

# Configuration

In [4]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


# 1. Inisialisasi backbone kustom
backbone = DetectorBackbone(pretrained_weights='best_model.pth').to(device)
backbone

  pretrained_dict = torch.load(weight_path)


Loaded 126/126 parameters


DetectorBackbone(
  (blocks): Sequential(
    (0): ResidualBlock(
      (convs): Sequential(
        (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu1): ReLU(inplace=True)
        (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu2): ReLU(inplace=True)
      )
      (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (skip): Sequential(
        (0): Conv2d(3, 32, kernel_size=(1, 1), stride=(2, 2))
        (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (1): ResidualBlock(
      (convs): Sequential(
        (conv1): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1

###  RPN Config

In [5]:
# 2. Konfigurasi Anchor Generator untuk RPN
anchor_sizes = ((32, 64, 128, 256, 512),)  # Anchor sizes untuk single feature map
aspect_ratios = ((0.5, 1.0, 2.0),)         # Aspect ratios untuk tiap anchor size

rpn_anchor_gen = AnchorGenerator(
    sizes=anchor_sizes,
    aspect_ratios=aspect_ratios
)

# 3. Konfigurasi ROI Pooling
roi_pooler = MultiScaleRoIAlign(
    featmap_names=['0'],    # Sesuai dengan key output backbone
    output_size=7,          # Ukuran output ROI pooling
    sampling_ratio=2
)

# Merging ResVGG16 Lite With RPN

In [6]:
# 4. Bangun model Faster R-CNN lengkap
model = FasterRCNN(
    backbone,
    num_classes=2,          # Sesuaikan dengan jumlah kelas (+ background)
    rpn_anchor_generator=rpn_anchor_gen,
    box_roi_pool=roi_pooler,
    min_size=224,           # Sesuaikan dengan ukuran input
    max_size=224
).to(device)

model


FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(224,), max_size=224, mode='bilinear')
  )
  (backbone): DetectorBackbone(
    (blocks): Sequential(
      (0): ResidualBlock(
        (convs): Sequential(
          (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
          (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (relu1): ReLU(inplace=True)
          (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
          (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (relu2): ReLU(inplace=True)
        )
        (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
        (skip): Sequential(
          (0): Conv2d(3, 32, kernel_size=(1, 1), stride=(2, 2))
          (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=Tru

# Load Dataset

In [7]:
class CocoTransform:
    def __call__(self, image, target):
        image = F.to_tensor(image)
        return image, target

def get_coco_dataset(img_dir, ann_file):
    return CocoDetection(
        root=img_dir,
        annFile=ann_file,
        transforms=CocoTransform()
    )

In [8]:
train_dataset = get_coco_dataset(
    img_dir='Dataset/COCODataset/train', 
    ann_file='Dataset/COCODataset/train/_annotations.coco.json'
)

val_dataset = get_coco_dataset(
    img_dir='Dataset/COCODataset/valid', 
    ann_file='Dataset/COCODataset/valid/_annotations.coco.json'
)

loading annotations into memory...
Done (t=0.00s)
creating index...
index created!
loading annotations into memory...
Done (t=0.00s)
creating index...
index created!


In [9]:
train_loader = DataLoader(
    train_dataset,
    batch_size=10,
    collate_fn=lambda x: tuple(zip(*x)),  # Menggabungkan batch
    shuffle=True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=10,
    collate_fn=lambda x: tuple(zip(*x)),  # Menggabungkan batch
    shuffle=False  # Tidak perlu shuffle untuk validation
)

# Training

In [10]:
param = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.Adam(param, lr=0.0001)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

In [11]:
def train_one_epoch(model, optimizer, data_loader, device):
    model.train()
    for images, targets in data_loader:
        images = list(image.to(device) for image in images)
        
        processed_targets = []
        valid_images = []
        for i, target in enumerate(targets):
            boxes = []
            labels = []
            for obj in target:
                bbox = obj['bbox']
                x, y, w, h = bbox

                if w > 0 and h > 0:
                    boxes.append([x, y, x + w, y + h])
                    labels.append(obj['category_id'])

            if boxes:
                processed_target = {
                    'boxes': torch.tensor(boxes, dtype=torch.float32).to(device),
                    'labels': torch.tensor(labels, dtype=torch.int64).to(device)
                }
                processed_targets.append(processed_target)
                valid_images.append(images[i])

        if not processed_targets:
            continue

        images = valid_images

        #Forward pass
        loss_dict = model(images, processed_targets)
        losses = sum(loss for loss in loss_dict.values())

        #Backward pass
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
    
    return losses.item()


In [12]:
num_epochs = 10
best_loss = 9999999
for epoch in range(num_epochs):
    losses = train_one_epoch(model, optimizer, train_loader, device)
    lr_scheduler.step()

    print(f"Epoch [{epoch}], Loss: {losses}")

    if losses < best_loss:
        best_loss = losses
        # Save the model state
        torch.save(model.state_dict(), f"Best_RPNmodel.pth")

Epoch [0], Loss: 0.12288139760494232
Epoch [1], Loss: 0.11931924521923065
Epoch [2], Loss: 0.08696804195642471
Epoch [3], Loss: 0.0859341025352478
Epoch [4], Loss: 0.11385681480169296
Epoch [5], Loss: 0.08097130060195923
Epoch [6], Loss: 0.1056571900844574
Epoch [7], Loss: 0.09296265244483948
Epoch [8], Loss: 0.09486984461545944
Epoch [9], Loss: 0.11524958163499832
