In [None]:
import numpy as np
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
from torchvision.models.detection.rpn import AnchorGenerator, RPNHead, RegionProposalNetwork
from torchvision.models.detection.image_list import ImageList
from torchvision.ops import MultiScaleRoIAlign
import torchvision.transforms as transforms
import os
import cv2
from torchvision.models import resnet50
from sklearn.model_selection import train_test_split
import torchvision.transforms.functional as TF

In [14]:
import sys
import os
project_root= os.getcwd() + "/../"

src_path= os.path.join(project_root, 'src')
if src_path not in sys.path:
    sys.path.append(src_path)
from config import data_dir, images_train_dir, images_val_dir, labels_train_dir, labels_val_dir, artifacts_dir
import config
from preprocessing import FaceDataset, generate_anchor_boxes, match_anchors_to_gt, filter_valid_bboxes, calculate_iou
from utils import draw_image_with_box, visualize_anchors_and_gt

In [None]:
class FaceDataset(Dataset):
    def __init__(self, image_dir, label_dir, image_list, transform=None):
        self.image_dir = image_dir
        self.label_dir = label_dir
        self.image_list = image_list
        self.transform = transform

    def __len__(self):
        return len(self.image_list)

    def __getitem__(self, idx):
        image_name = self.image_list[idx]
        image_path = os.path.join(self.image_dir, image_name)
        label_path = os.path.join(self.label_dir,
                                  os.path.splitext(image_name)[0] + '.txt')

        # ---- load image ----
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # ---- apply transform (resize, normalize, etc.) ----
        if self.transform is not None:
            image = self.transform(image)          # -> C,H,W tensor
        else:
            image = TF.to_tensor(image)

        _, H, W = image.shape

        # ---- parse labels ----
        boxes, labels = [], []
        if os.path.exists(label_path):
            with open(label_path) as f:
                for line in f:
                    parts = line.strip().split()
                    if len(parts) < 5:
                        continue
                    _, x_c, y_c, w, h = map(float, parts[:5])

                    # YOLO -> absolute coordinates
                    x1 = (x_c - w / 2) * W
                    y1 = (y_c - h / 2) * H
                    x2 = (x_c + w / 2) * W
                    y2 = (y_c + h / 2) * H

                    # clamp to image bounds
                    x1, y1, x2, y2 = max(x1, 0), max(y1, 0), min(x2, W), min(y2, H)
                    if x2 > x1 and y2 > y1:
                        boxes.append([x1, y1, x2, y2])
                        labels.append(1)          # face = 1

        target = {
            "boxes": torch.as_tensor(boxes, dtype=torch.float32),
            "labels": torch.as_tensor(labels, dtype=torch.int64)
        }

        return image, target

In [21]:
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [22]:
backbone = resnet50(pretrained=True)
backbone = nn.Sequential(*list(backbone.children())[:-2])  # Remove classifier
backbone.out_channels = 2048  # ResNet50 output channels
torch.save(backbone, artifacts_dir +  "resnet50_without_classifier.pth")



In [29]:
valid_img_extensions= ('.jpg', '.jpeg', '.png')
image_list= [
    img for img in os.listdir(images_train_dir)
    if img.lower().endswith(valid_img_extensions) and os.path.exists(os.path.join(labels_train_dir, img.rsplit('.', 1)[0] + ".txt"))
]
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
train_images, val_images= train_test_split(image_list, test_size= 0.2, random_state= 42)
# Create dataset
train_dataset = FaceDataset(images_train_dir, labels_train_dir, train_images, transform=transform)
val_dataset= FaceDataset(images_train_dir, labels_train_dir, val_images, transform= transform)
train_loader = DataLoader(train_dataset, batch_size= config.BATCH_SIZE, shuffle=True, collate_fn= lambda x: tuple(zip(*x)))
val_loader= DataLoader(val_dataset, batch_size= config.BATCH_SIZE, shuffle= False, collate_fn= lambda x: tuple(zip(*x)))

In [24]:
anchor_generator= AnchorGenerator(
    sizes= ((4, 16, 24, 32, 64, 128)),
    aspect_ratios=((0.5, 0.75, 1, 1.5, 2))
)
rpn_head= RPNHead(
    in_channels= backbone.out_channels,
    num_anchors= anchor_generator.num_anchors_per_location()[0]
)
rpn= RegionProposalNetwork(
    anchor_generator= anchor_generator,
    head= rpn_head,
    fg_iou_thresh= 0.7,
    bg_iou_thresh= 0.3,
    batch_size_per_image= 256,
    positive_fraction= 0.5,
    pre_nms_top_n= dict(training= 2000, testing= 1000),
    post_nms_top_n= dict(training= 2000, testing= 1000),
    nms_thresh= 0.7
)

In [32]:
images, targets= next(iter(train_loader))
images

(tensor([[[-1.6727, -1.7240, -1.7583,  ...,  0.7248,  0.7077,  0.6906],
          [-1.8097, -1.7754, -1.6727,  ...,  0.6049,  0.5364,  0.4679],
          [-1.6384, -1.5528, -1.4329,  ...,  0.3823,  0.3652,  0.2796],
          ...,
          [ 1.8893,  1.8893,  1.8893,  ...,  1.0331,  0.9132,  0.7591],
          [ 1.7694,  1.7523,  1.7523,  ...,  1.0502,  0.9646,  0.8618],
          [ 1.6495,  1.6324,  1.5982,  ...,  1.1187,  1.0159,  0.8789]],
 
         [[-1.1954, -1.2304, -1.2479,  ...,  1.8333,  1.8333,  1.8333],
          [-1.2479, -1.1954, -1.0903,  ...,  1.6933,  1.6408,  1.5882],
          [-0.9153, -0.8452, -0.7227,  ...,  1.4657,  1.4307,  1.3606],
          ...,
          [ 2.3936,  2.3936,  2.3936,  ...,  1.0455,  0.7654,  0.5378],
          [ 2.3410,  2.3410,  2.3410,  ...,  1.0980,  0.8880,  0.6954],
          [ 2.2710,  2.2535,  2.2360,  ...,  1.2031,  0.9755,  0.7654]],
 
         [[-0.6367, -0.6715, -0.6367,  ...,  1.9951,  2.0125,  2.0125],
          [-0.5495, -0.4973,

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
backbone.to(device)
rpn.to(device)

optimizer = torch.optim.Adam(list(backbone.parameters()) + list(rpn.parameters()), lr=1e-4)
train_loss_history, val_loss_history= [], []
for epoch in range(config.NUM_EPOCHS):
    num_batches= 0.0
    epoch_total_loss= 0.0
    epoch_cls_loss= 0.0
    epoch_reg_loss= 0.0 
    rpn.train()
    for images, targets in train_loader:
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        
        # Forward pass through backbone
        features= backbone(images)
        
        # Create image list (required by RPN)
        image_sizes= [img.shape[-2:] for img in images]
        image_list= ImageList(images, image_sizes)
        
        # RPN forward pass
        features_dict= {'0': features}  # Single scale features
        proposals, losses= rpn(image_list, features_dict, targets)
        
        # Compute total loss
        total_loss= sum(loss for loss in losses.values())
        
        # Backward pass
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()

        epoch_total_loss += total_loss.item()
        epoch_cls_loss+= losses['loss_objectness'].item()
        epoch_reg_loss+= losses['loss_rpn_box_reg'].item()
        num_batches+= 1
    train_loss_history.append(epoch_total_loss / num_batches)
    print(f"Epoch: {epoch + 1} / {config.NUM_EPOCHS} | Train Objectness Loss: {epoch_cls_loss / num_batches:.4f} | Train Regression Loss: {epoch_reg_loss / num_batches:.4f}")
    print(f"Total Train Loss: {epoch_total_loss / num_batches:.4f}")
    rpn.eval()
    num_batches= 0.0
    epoch_total_loss= 0.0
    epoch_cls_loss= 0.0
    epoch_reg_loss= 0.0 

    with torch.no_grad():
        for images, targets in val_loader:
            images= images.to(device)
            targets= [{k: v.to(device) for k, v in t.items()} for t in targets]

            features= backbone(images)
            image_sizes= [img.shape[-2:] for img in images]
            image_list= ImageList(images, image_sizes)
            features_dict= {'0': features}
            
            proposals, losses= rpn(image_list, features_dict, targets)
            total_loss= sum(loss for loss in losses.values())

            epoch_total_loss += total_loss.item()
            epoch_cls_loss+= losses['loss_objectness'].item()
            epoch_reg_loss+= losses['loss_rpn_box_reg'].item()
            num_batches+= 1 
    val_loss_history.append(epoch_total_loss / num_batches)
    print(f"Epoch: {epoch + 1} / {config.NUM_EPOCHS} | Val Objectness Loss: {epoch_cls_loss / num_batches:.4f} | Val Regression Loss: {epoch_reg_loss / num_batches:.4f}")
    print(f"Total Val Loss: {epoch_total_loss / num_batches:.4f}")

