In [None]:
# Preparing coco annotation file for training
import pycocotools.coco as coco
import os

training_image_dir = '/Users/am/Desktop/HKU/InnoWing/project/CNN/data/training-data/img'
training_annotation_file = '/Users/am/Desktop/HKU/InnoWing/project/CNN/data/training-data/annotations.json'

coco = coco.COCO(training_annotation_file)

In [50]:
import numpy as np
import cv2
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
from torchvision import transforms, ops

In [51]:
# Prepare dataset
class TrainingDataset(Dataset):
    def __init__(self, image_dir, coco_annotations, transform=None):
        self.image_dir = image_dir
        self.coco_annotations = coco_annotations
        self.img_ids = list(self.coco_annotations.imgs.keys())
        self.transform = transform

    def __len__(self):
        return len(self.img_ids)
    
    def __getitem__(self, idx):
        # Load image
        img_id = self.img_ids[idx]
        img_info = self.coco_annotations.loadImgs(img_id)[0]
        img_path = os.path.join(self.image_dir, img_info['file_name'])
        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        img = img.astype(np.float32) / 255.0    # Normalize image to [0, 1]

        # Load annotations
        ann_ids = self.coco_annotations.getAnnIds(imgIds=img_id)
        annotations = self.coco_annotations.loadAnns(ann_ids)

        # Load bounding boxes
        bboxes = []
        labels = []
        for ann in annotations:
            x, y, w, h = ann['bbox']
            bboxes.append([x, y, x+w, y+h])
            labels.append(1)

        bboxes = torch.as_tensor(bboxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        # Apply transformations if any
        if self.transform:
            img = self.transform(img)

        return img, bboxes, labels
    
def collate_fn(batch):
    """Custom collate function to handle variable numbers of boxes"""
    images = torch.stack([item[0] for item in batch])
    bboxes = [item[1] for item in batch]
    labels = [item[2] for item in batch]

    bboxes = [torch.as_tensor(b, dtype=torch.float32) for b in bboxes]
    labels = [torch.as_tensor(l, dtype=torch.int64) for l in labels]
    return images, bboxes, labels
    
training_dataset = TrainingDataset(training_image_dir, coco, transform=transforms.ToTensor())
training_dataloader = DataLoader(training_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)

In [72]:
ANCHOR_SIZE = [3, 4, 5]
ANCHOR_RATIO = [1.0]

# Create model class
class FasterRCNN(nn.Module):
    def __init__(self):
        super(FasterRCNN, self).__init__()

        self.backbone = nn.Sequential(
            nn.Conv2d(1, 8, 3, padding=1),
            nn.MaxPool2d(2, 2),
            # delete max pooling
            nn.ReLU(inplace=True),
            nn.Conv2d(8, 16, 3, padding=1),
            nn.ReLU(inplace=True)
        )

        # Region Proposal Network (RPN)
        self.RPN_conv = nn.Conv2d(16, 32, 3, stride=1, padding=1)
        self.RPN_cls_score = nn.Conv2d(32, 1 * len(ANCHOR_SIZE) * len(ANCHOR_RATIO), 1, stride=1, padding=0)    # Binary classification (blob or not)
        self.RPN_bbox_pred = nn.Conv2d(32, 4 * len(ANCHOR_SIZE) * len(ANCHOR_RATIO), 1, stride=1, padding=0)    # Bounding box regression for 4 coordinates for each proposed region

        # Fully connected layers for classification
        self.fc1 = nn.Linear(16 * 7 * 7, 32)

        self.cls_score = nn.Linear(32, 1)  # Binary classification (object or not)
        self.bbox_pred = nn.Linear(32, 4)  # Bounding box regression

        # Initialize RPN weights
        # nn.init.normal_(self.RPN_cls_score.weight, mean=0, std=0.01)  # Small weights for stability
        # nn.init.zeros_(self.RPN_cls_score.bias)
        # nn.init.normal_(self.RPN_bbox_pred.weight, mean=0, std=0.001)  # Smaller std for regression
        # nn.init.zeros_(self.RPN_bbox_pred.bias)

    def forward(self, x):
        feature_map = self.backbone(x)  # [batch_size, channels, height, width]
        
        rpn_conv = torch.relu(self.RPN_conv(feature_map))
        rpn_cls_score = self.RPN_cls_score(rpn_conv)
        rpn_bbox_pred = self.RPN_bbox_pred(rpn_conv)
        proposals = self.generate_proposals(rpn_cls_score, rpn_bbox_pred)

        proposal_list = []
        for i, boxes in enumerate(proposals):
            if len(boxes) > 0:
                boxes = boxes.view(-1, 4)
                batch_indices = torch.full((len(boxes), 1), i, device=boxes.device)
                proposal_list.append(torch.cat([batch_indices, boxes], dim=1))  # Add batch index to the front of each proposal

        if not proposal_list:
            return torch.zeros(1, 2), torch.zeros(1, 4), proposals

        # Combine all proposals across batch -> [total_proposals, 5]
        all_proposals = torch.cat(proposal_list, dim=0)

        # ROI Pool -> [total_proposals, C, 7, 7]
        roi_pooled = ops.roi_pool(feature_map, all_proposals, output_size=(7, 7))  

        # Flatten -> [total_proposals, 32*7*7]
        roi_flatten = roi_pooled.flatten(1)  

        # FC layers (bounding head detection)
        bbox_head = torch.relu(self.fc1(roi_flatten))  # [total_proposals, 512]
        cls_score = self.cls_score(bbox_head).sigmoid()          # [total_proposals, 2]
        bbox_pred_delta = self.bbox_pred(bbox_head)         # [total_proposals, 4]
        bbox_pred = self.decode_bboxes(all_proposals[:, 1:], bbox_pred_delta)
        batch_indices = all_proposals[:, 0]

        print("Final cls_score:", cls_score)
        print("Final bbox_pred delta:", bbox_pred_delta)

        return cls_score, bbox_pred, batch_indices
    
    def generate_proposals(self, rpn_cls_score, rpn_bbox_pred):
        """
        Generate proposals from RPN scores and bounding box predictions
        """

        batch_size = rpn_cls_score.size(0)
        all_proposals = []

        for i in range(batch_size):
            # Get scores and deltas for this image
            img_scores = rpn_cls_score[i].sigmoid().view(-1) 
            img_deltas = rpn_bbox_pred[i].view(-1, 4)
            
            # Generate anchors (same for all images)
            anchors = self.generate_anchors(rpn_cls_score.size(2), rpn_cls_score.size(3))

            # print(f"RPN scores: {torch.sigmoid(img_scores).mean():.3f} ± {torch.sigmoid(img_scores).std():.3f}")
            # print(f"RPN deltas: {img_deltas.mean():.3f} ± {img_deltas.std():.3f}")
            
            # Apply deltas to anchors to get proposals
            proposals = self.decode_bboxes(anchors, img_deltas)

            valid_mask = (
                (proposals[:, 0] >= 0) & (proposals[:, 0] <= 32) &  # x1
                (proposals[:, 1] >= 0) & (proposals[:, 1] <= 24) &  # y1
                (proposals[:, 2] >= 0) & (proposals[:, 2] <= 32) &  # x2
                (proposals[:, 3] >= 0) & (proposals[:, 3] <= 24)    # y2
            )

            proposals = proposals[valid_mask]
            img_scores = img_scores[valid_mask]
            
            # Filter and NMS
            keep = img_scores > 0
            proposals = proposals[keep]
            scores = img_scores[keep]
            
            if len(proposals) > 0:

                keep = ops.nms(proposals, scores, 0.2)
                proposals = proposals[keep]

            else:
                proposals = torch.empty((0, 4), dtype=torch.float32)
            
            all_proposals.append(proposals)

        return all_proposals
    
    def decode_bboxes(self, anchors, bbox_deltas):
        # Anchors: [N,4], bbox_deltas: [N,4]

        # Ensure bbox_deltas are within a reasonable range
        bbox_deltas = torch.clamp(bbox_deltas, min=-5.0, max=5.0)

        widths = anchors[:,2] - anchors[:,0]
        heights = anchors[:,3] - anchors[:,1]
        ctr_x = anchors[:,0] + 0.5 * widths
        ctr_y = anchors[:,1] + 0.5 * heights
        
        dx = bbox_deltas[:,0] * widths
        dy = bbox_deltas[:,1] * heights
        dw = torch.sigmoid(bbox_deltas[:,2]) * 2.0  # Constrain to (0, 2)
        dh = torch.sigmoid(bbox_deltas[:,3]) * 2.0
        
        pred_ctr_x = ctr_x + dx
        pred_ctr_y = ctr_y + dy
        pred_w = dw
        pred_h = dh
        
        pred_boxes = torch.zeros_like(bbox_deltas)
        pred_boxes[:,0] = pred_ctr_x - 0.5 * pred_w  # x1
        pred_boxes[:,1] = pred_ctr_y - 0.5 * pred_h  # y1
        pred_boxes[:,2] = pred_ctr_x + 0.5 * pred_w  # x2
        pred_boxes[:,3] = pred_ctr_y + 0.5 * pred_h  # y2
        
        return pred_boxes
    
    def generate_anchors(self, height, width):
        """
        Generate anchors for the given feature map size
        """
        anchors = []
        stride = (2, 2)
        for x in range(0, width):
            for y in range(0, height):
                for scale in ANCHOR_SIZE:
                    for ratio in ANCHOR_RATIO:
                        w = scale * math.sqrt(ratio)
                        h = scale / math.sqrt(ratio)
                        
                        # Center at feature map location
                        center_x = (x + 0.5) * stride[1]
                        center_y = (y + 0.5) * stride[0]

                        x1 = max(0, center_x - w/2)
                        y1 = max(0, center_y - h/2)
                        x2 = max(0, min(32, center_x + w/2))
                        y2 = max(0, min(24, center_y + h/2))
                        
                        anchors.append([
                            x1, y1, x2, y2
                        ])

        anchors = torch.tensor(anchors, dtype=torch.float32)
        return anchors
    
    def compute_loss(self, pred_cls_score, pred_bbox, gt_bboxes, gt_labels):
        """
        Compute loss for the model for each image

        Args:
        pred_cls_score: [N, 1] tensor of class scores (N = number of proposals)
        pred_bbox: [N, 4] tensor of predicted boxes
        gt_bboxes: [M, 4] tensor of ground truth boxes (M = number of gt boxes)
        gt_labels: [M] tensor of ground truth labels (0 or 1)
        """

        if len(gt_bboxes) == 0 or len(pred_bbox) == 0: 
            return torch.tensor(0.0, device=pred_cls_score.device, requires_grad=True)
        
        ious = ops.box_iou(pred_bbox, gt_bboxes) # [N, M]

        # For each proposal, find the best matching ground truth box
        max_ious, max_indices = ious.max(dim=1) # [N]

        # For each proposal, assign 1 if it has a matching ground truth box with IoU > 0.3
        assigned_labels = (max_ious > 0.3).float()

        # print(f"Assigned labels: {assigned_labels}")

        # Get the corresponding gt boxes for positive proposals
        positive_indices = assigned_labels > 0
        matched_gt_boxes = gt_bboxes[max_indices[positive_indices]] # If there are K positive indices, its shape will be [K, 4],

        # print(f"Matched GT boxes: {matched_gt_boxes}")

        # RPN Loss
        # rpn_cls_loss = F.binary_cross_entropy_with_logits(
        #     pred_cls_score.squeeze(1),  # [N]
        #     assigned_labels,            # [N]
        #     reduction='mean',
        #     pos_weight=torch.tensor([10.0])
        # )
        rpn_cls_loss = ops.sigmoid_focal_loss(
            pred_cls_score.squeeze(1),
            assigned_labels,
            alpha=0.25,
            gamma=3.0,
            reduction='mean'
        )

        # Bounding Box Loss (Smooth L1 Loss)
        # if positive_indices.sum() > 0:
        #     bbox_loss = F.smooth_l1_loss(
        #         pred_bbox[positive_indices], 
        #         matched_gt_boxes, 
        #         reduction='mean'
        #     )
        # else:
        #     bbox_loss = torch.tensor(0.0, device=pred_cls_score.device, requires_grad=True)

        total_loss = rpn_cls_loss
        return total_loss


In [None]:
# Initalize model instance
model = FasterRCNN()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.1)

In [None]:
# Create model instance
num_epochs = 40
model.train()

for epoch in range(num_epochs):
    epoch_loss = 0.0
    for images, gt_bboxes_list, gt_labels_list in training_dataloader:
        # Forward pass
        pred_cls_score, pred_bbox, batch_indices = model(images)
        
        # Process each sample in batch separately
        batch_loss = 0
        valid_samples = 0

        for i in range(len(images)):
            # print(f"Processing sample {i} in batch {batch_indices[i].item()} epoch {epoch+1}")
            # Get predictions for each image
            img_mask = (batch_indices == i)
            img_pred_cls = pred_cls_score[img_mask]
            img_pred_bbox = pred_bbox[img_mask]
            img_gt_boxes = gt_bboxes_list[i]
            
            # Get ground truth for this sample
            img_gt_bboxes = gt_bboxes_list[i]
            img_gt_labels = gt_labels_list[i]
            
            # Skip if no ground truth boxes
            if len(img_gt_bboxes) == 0:
                continue

            loss = model.compute_loss(
                img_pred_cls,
                img_pred_bbox,
                img_gt_bboxes,
                img_gt_labels
            )
            if not torch.isnan(loss):
                valid_samples += 1
                batch_loss += loss
        
        # Only backprop if we had valid samples
        if batch_loss > 0:
            # Average loss over batch
            loss = batch_loss / valid_samples
            
            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
            epoch_loss += loss.item()
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss/len(training_dataloader):.4f}")

In [76]:
def export_to_header(tensor, var_name, file_path, flatten=True):
    """Export tensor to C++ header file"""
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    
    data = tensor.detach().numpy()
    
    with open(file_path, 'w') as f:
        f.write("#pragma once\n\n")
        f.write("// Auto-generated weights from PyTorch\n\n")
        
        if len(data.shape) == 1:
            # Biases (1D array)
            f.write("const float {}[{}] = {{\n".format(var_name, len(data)))
            for value in data:
                f.write("    {:.6f}f,\n".format(value))
            f.write("};\n")
            
        elif len(data.shape) == 2:
            # FC weights (2D array)
            f.write("const float {}[{}][{}] = {{\n".format(
                var_name, data.shape[0], data.shape[1]))
            for i in range(data.shape[0]):
                f.write("    {")
                for j in range(data.shape[1]):
                    f.write("{:.6f}f".format(data[i,j]))
                    if j < data.shape[1]-1:
                        f.write(", ")
                f.write("}")
                if i < data.shape[0]-1:
                    f.write(",")
                f.write("\n")
            f.write("};\n")
            
        elif len(data.shape) == 4:
            # Conv weights (4D array)
            f.write("const float {}[{}][{}][{}][{}] = {{\n".format(
                var_name, data.shape[0], data.shape[1], 
                data.shape[2], data.shape[3]))
            for i in range(data.shape[0]):  # out channels
                f.write("    {")  # Start of filter block
                f.write(" // Filter %d\n" % i)  # Comment
                for j in range(data.shape[1]):  # in channels
                    f.write("        {")  # Start of input channel block
                    for k in range(data.shape[2]):  # kernel dim 1
                        for l in range(data.shape[3]):  # kernel dim 2
                            f.write("{:.6f}f".format(data[i,j,k,l]))
                            if l < data.shape[3]-1 or k < data.shape[2]-1:
                                f.write(", ")
                    f.write("}")
                    if j < data.shape[1]-1:
                        f.write(",")
                    f.write("\n")
                f.write("    }") 
                if i < data.shape[0]-1:
                    f.write(",")
                f.write("\n")
            f.write("};\n")

In [78]:
def export_model(model, output_dir="exported_weights"):
    os.makedirs(output_dir, exist_ok=True)
    
    # Export backbone
    for i, layer in enumerate(model.backbone):
        if isinstance(layer, nn.Conv2d):
            export_to_header(layer.weight, f"conv{i+1}_weights", f"{output_dir}/conv{i+1}_weights.h")
            if layer.bias is not None:
                export_to_header(layer.bias, f"conv{i+1}_biases", f"{output_dir}/conv{i+1}_biases.h")
    
    # Export RPN
    export_to_header(model.RPN_conv.weight, "rpn_conv_weights", f"{output_dir}/rpn_conv_weights.h")
    export_to_header(model.RPN_conv.bias, "rpn_conv_biases", f"{output_dir}/rpn_conv_biases.h")
    export_to_header(model.RPN_cls_score.weight, "rpn_cls_weights", f"{output_dir}/rpn_cls_weights.h")
    export_to_header(model.RPN_cls_score.bias, "rpn_cls_biases", f"{output_dir}/rpn_cls_biases.h")
    export_to_header(model.RPN_bbox_pred.weight, "rpn_bbox_weights", f"{output_dir}/rpn_bbox_weights.h")
    export_to_header(model.RPN_bbox_pred.bias, "rpn_bbox_biases", f"{output_dir}/rpn_bbox_biases.h")
    
    # Export detection head
    export_to_header(model.fc1.weight, "fc1_weights", f"{output_dir}/fc1_weights.h")
    export_to_header(model.fc1.bias, "fc1_biases", f"{output_dir}/fc1_biases.h")
    export_to_header(model.cls_score.weight, "cls_score_weights", f"{output_dir}/cls_score_weights.h")
    export_to_header(model.cls_score.bias, "cls_score_biases", f"{output_dir}/cls_score_biases.h")
    export_to_header(model.bbox_pred.weight, "bbox_pred_weights", f"{output_dir}/bbox_pred_weights.h")
    export_to_header(model.bbox_pred.bias, "bbox_pred_biases", f"{output_dir}/bbox_pred_biases.h")
    
    print(f"Model exported to {output_dir}")

In [None]:
torch.save(model.state_dict(), 'model.pth')
model.load_state_dict(torch.load('model.pth'))
model.eval()

# Export all weights and biases
export_model(model, "exported_weights")

print("Export successfully")

In [80]:
import matplotlib.pyplot as plt
import numpy as np

def validate(model, val_loader, device="cpu", show_images=True):
    model.eval()
    with torch.no_grad():
        for batch_idx, (images, gt_boxes_list, _) in enumerate(val_loader):
            images = images.to(device)
            
            # Get predictions
            pred_scores, pred_boxes, batch_indices = model(images)
            
            # Convert to CPU for visualization
            images_np = images.cpu().numpy().squeeze(1)  # [B,1,H,W] -> [B,H,W]
            pred_scores = torch.sigmoid(pred_scores).cpu()
            
            # Process each image in batch
            for i in range(len(images)):
                img = images_np[i]
                gt_boxes = gt_boxes_list[i].cpu().numpy()
                
                # Get predictions for this image
                img_mask = (batch_indices == i)
                img_pred_scores = pred_scores[img_mask].sigmoid()
                img_pred_boxes = pred_boxes[img_mask].cpu().numpy()

                print(img_pred_boxes)
                
                # Filter predictions with score > 0.5
                keep = img_pred_scores > 0.5
                detections = img_pred_boxes[keep.squeeze()]

                # Visualize
                if show_images:
                    plt.figure(figsize=(10,4))
                    plt.imshow(img, cmap='gray')
                    
                    # Draw ground truth (green)
                    for box in gt_boxes:
                        plt.plot([box[0], box[2], box[2], box[0], box[0]],
                                [box[1], box[1], box[3], box[3], box[1]], 'g-', linewidth=2)
                    
                    # Draw predictions (red)
                    print(f"Detections: {detections}")
                    for box in detections:
                        plt.plot([box[0], box[2], box[2], box[0], box[0]],
                                [box[1], box[1], box[3], box[3], box[1]], 'r--', linewidth=1.5)
                    
                    plt.title(f"Ground Truth (Green) vs Predictions (Red)\nDetections: {len(detections)}")
                    plt.axis('off')
                    plt.show()
                    
                    # Print metrics for this image
                    if len(detections) > 0 and len(gt_boxes) > 0:
                        iou = ops.box_iou(torch.tensor(detections), torch.tensor(gt_boxes))
                        print(f"Max IoU with GT: {iou.max().item():.2f}")
                    print("="*50)

In [None]:
import pycocotools.coco as coco

validation_image_dir = '/Users/am/Desktop/HKU/InnoWing/project/CNN/data/validation-data/img'
validation_annotation_file = '/Users/am/Desktop/HKU/InnoWing/project/CNN/data/validation-data/annotations.json'

validation_coco = coco.COCO(validation_annotation_file)

validation_dataset = TrainingDataset(validation_image_dir, validation_coco, transform=transforms.ToTensor())
validation_dataloader = DataLoader(validation_dataset, batch_size=8, shuffle=False, collate_fn=collate_fn)

validate(model, validation_dataloader, device="cpu", show_images=True)