# FasterRCNN MobileNetV3Large

In [None]:
import torch
import torchvision
from torchvision.transforms import v2
from data import *

In [None]:
def get_weight_filename(epoch: int):
    return f"fasterrcnn_mobilenet_v3_large_fpn_epoch{str(epoch).zfill(4)}.pth"

def build_model(num_classes: int, resume: str, device: str = "cpu"):
    ### Instantiate model
    model = torchvision.models.detection.fasterrcnn_mobilenet_v3_large_fpn(
        weights=torchvision.models.detection.FasterRCNN_MobileNet_V3_Large_FPN_Weights.DEFAULT
    )
    
    ### Replace the head of the network
    if num_classes:
        model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(model.roi_heads.box_predictor.cls_score.in_features, num_classes+1)
    
    ### Load custom weights
    if resume:
        model.load_state_dict(torch.load(resume))
    return model.to(device)


def init_epoch_losses() -> dict:
    return {
        "loss": 0.0,
        "loss_classifier": 0.0,
        "loss_objectness": 0.0,
        "loss_box_reg": 0.0,
        "loss_rpn_box_reg": 0.0
    }


def update_epoch_losses(epoch_losses: dict, loss_dict: dict):
    loss = sum(v for v in loss_dict.values())
    loss_total = loss.detach().cpu().numpy()
    loss_classifier = loss_dict['loss_classifier'].detach().cpu().numpy()
    loss_box_reg = loss_dict['loss_box_reg'].detach().cpu().numpy()
    loss_objectness = loss_dict['loss_objectness'].detach().cpu().numpy()
    loss_rpn_box_reg = loss_dict['loss_rpn_box_reg'].detach().cpu().numpy()
    
    epoch_losses["loss"] += loss_total
    epoch_losses["loss_classifier"] += loss_classifier
    epoch_losses["loss_box_reg"] += loss_box_reg
    epoch_losses["loss_objectness"] += loss_objectness
    epoch_losses["loss_rpn_box_reg"] += loss_rpn_box_reg
    
    return epoch_losses


def train_one_epoch(model, optim, device="cpu") -> dict:
    # Initialize epoch losses
    epoch_losses = {
        "loss": 0.0,
        "loss_classifier": 0.0,
        "loss_objectness": 0.0,
        "loss_box_reg": 0.0,
        "loss_rpn_box_reg": 0.0
    }
    
    for data in train_dataloader:
        images, targets = extract_images_targets(data, device=device)
        ### Compute loss of batch
        loss_dict = model(images, targets)
        epoch_losses = update_epoch_losses(epoch_losses, loss_dict)
        loss = sum(v for v in loss_dict.values())
        ### Backpropagation
        optim.zero_grad()
        loss.backward()
        optim.step()
    return epoch_losses
    

def train(n_epochs: int, lr: float, train_dataloader, start_epoch: int = 0, resume: str = None, save_every: int = None, lr_step_every: int = 20, num_classes = None, device="cpu"):
    # check if custom weight file is available
    if start_epoch and start_epoch > 0:
        resume = get_weight_filename(start_epoch)
    else:
        resume = False
    
    # Instantiate model
    model = build_model(num_classes=num_classes, resume=resume, device=device)
    
    # Optimizer
    optim = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optim, step_size=3, gamma=0.1)

    model.train()
    print("Start training ...")

    for epoch in range(start_epoch+1, n_epochs+1):
        # Train the model for one epoch
        epoch_metrics = train_one_epoch(model, optim, device=device)

        # Update learning rate
        lr_scheduler.step()
        print(f"Epoch {epoch}/{n_epochs}: loss={epoch_metrics['loss']}, loss_classifier={epoch_metrics['loss_classifier']}, loss_objectness={epoch_metrics['loss_objectness']}, loss_box_reg={epoch_metrics['loss_box_reg']}, loss_rpn_box_reg={epoch_metrics['loss_rpn_box_reg']}")
        if save_every and epoch > 0 and epoch % save_every == 0:
            torch.save(model.state_dict(), get_weight_filename(epoch))
        if epoch > 0 and epoch % lr_step_every == 0:
            # Update learning rate
            lr_scheduler.step()

In [None]:
root_dir = "C:\\Users\\tilof\\PycharmProjects\\DeepLearningProjects\\DETR\\data\\spine"
train_data_dir = f"{root_dir}\\train2017"
train_annotation_file = f"{root_dir}\\annotations\\instances_train2017.json"
n_epochs = 200
start_epoch = 0
num_classes = 1
lr = 1e-2
batch_size = 64
save_every = 10
lr_step_every = 15

# Dataset
transforms = v2.Compose([
    v2.ToTensor(),
    v2.ToDtype(torch.float32),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

train_dataset = build_coco_dataset(root=train_data_dir, annFile=train_annotation_file, transform=transforms)
train_dataloader = build_dataloader(dataset=train_dataset, batch_size=batch_size, collate_fn=collate_fn_coco)

device = "cuda" if torch.cuda.is_available() else "cpu"

print("Device", device)

In [None]:
train(
    n_epochs=n_epochs,
    lr=lr,
    train_dataloader=train_dataloader,
    start_epoch=start_epoch,
    save_every=save_every,
    lr_step_every=lr_step_every,
    num_classes=num_classes,
    device=device,
)