In [33]:
import torch
import torch.nn as nn
import torchvision
import math

In [34]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


Using device: cuda


Roadmap:

Training and inference:
 * Call RPN layers
 * Generate Anchors
 * Convert anchors to proposals using Box transformation prediction
 * Filter Proposals

Training only:
 * Assign Ground Truth boxes to anchors
 * Compute labels and regression targets for anchors
 * Sample positive and negative anchors
 * Compute classification loss using sampled anchors
 * Compute localization loss using sampled positive anchors

In [35]:
def sample_positive_negative(labels, num_positive=256, num_negative=256):
    
    positive = torch.where(labels > 1)[0]
    negative=torch.where(labels == 0)[0]
    
    num_pos=num_positive
    num_pos=min(positive.numel(), num_pos)
    
    num_neg=num_negative 
    num_neg=min(negative.numel(), num_neg)
    
    perm_positive_ids=torch.randperm(positive.numel(), device=device)[:num_pos]
    perm_negative_ids=torch.randperm(negative.numel(), device=device)[:num_neg]
    
    pos_ids=positive[perm_positive_ids]
    neg_ids=negative[perm_negative_ids]
    
    sampled_pos_ids_mask = torch.zeros(labels.shape, dtype=torch.bool, device=device)
    sampled_neg_ids_mask = torch.zeros(labels.shape, dtype=torch.bool, device=device)
    
    sampled_pos_ids_mask[pos_ids] = True
    sampled_neg_ids_mask[neg_ids] = True    
    
    return sampled_pos_ids_mask, sampled_neg_ids_mask
def get_IOU(box1, box2):


    area1 = (box1[:, 2] - box1[:, 0]) * (box1[:, 3] - box1[:, 1])
    area2 = (box2[:, 2] - box2[:, 0]) * (box2[:, 3] - box2[:, 1]) 

    # top left x1,y1 and bottom right x2,y2
    x1 = torch.max(box1[:,None, 0], box2[:, 0])
    y1 = torch.max(box1[:,None, 1], box2[:, 1])

    x2 = torch.min(box1[:,None, 2], box2[:, 2])
    y2 = torch.min(box1[:,None, 3], box2[:, 3])
    
    intersection = (x2-x1).clamp(min=0) * (y2-y1).clamp(min=0)
    union = area1[:, None] + area2 - intersection
    iou = intersection / union
    return iou


def box_to_boundary(boxes,img_shape):
        boxes_x1 = boxes[..., 0]
        boxes_y1 = boxes[..., 1]
        boxes_x2 = boxes[..., 2]
        boxes_y2 = boxes[..., 3]

        height, width = img_shape[-2:]
        boxes_x1 = torch.clamp(boxes_x1,max=width)
        boxes_y1 = torch.clamp(boxes_y1,max=height)
        boxes_x2 = torch.clamp(boxes_x2,max=width)
        boxes_y2 = torch.clamp(boxes_y2,max=height)
        boxes=torch.cat((
            boxes_x1[..., None],
            boxes_y1[..., None],
            boxes_x2[..., None],
            boxes_y2[..., None]
        ),dim=-1)
        return boxes
class RegionalProposalNN(nn.Module):
    def __init__(self, num_classes=8, in_channels=512):
        
        super(RegionalProposalNN, self).__init__()
      


        self.scales=[128, 256, 512]  
        self.aspect_ratios = [0.5, 1.0, 2.0]
        self.num_anchors = len(self.scales) * len(self.aspect_ratios)

        # 3x3 convolutional layer for RPN
        self.rpn_conv = nn.Conv2d(in_channels,in_channels, kernel_size=3, padding=1,stride=1)
        # 1x1 convolutional layer for classification
        self.class_layer=nn.Conv2d(in_channels, self.num_anchors, kernel_size=1,stride=1)

        # 1x1 convolutional layer for bounding box regression
        self.bbox_layer = nn.Conv2d(in_channels, self.num_anchors * 4, kernel_size=1, stride=1)

    def anchors_to_predictions(self, predictions, anchors):
        bbox_predictions = predictions.reshape(bbox_predictions.size(0), -1, 4)
        
        # Get xs, cy , w , h from the predictions (x1, y1, x2, y2)
        w=anchors[:, 2] - anchors[:, 0]
        h=anchors[:, 3] - anchors[:, 1]
        cx = (anchors[:, 0] +  0.5 * w)
        cy = (anchors[:, 1] + 0.5 * h)
        dx= bbox_predictions[..., 0]
        dy= bbox_predictions[..., 1]
        dw= bbox_predictions[..., 2]
        dh= bbox_predictions[..., 3]

        pred_cx= dx * w[:,None] + cx[:, None]
        pred_cy= dy * h[:,None] + cy[:, None]
        pred_w = torch.exp(dw) * w[:, None]
        pred_h = torch.exp(dh) * h[:, None]

        pred_box_x1 = pred_cx - 0.5 * pred_w
        pred_box_y1 = pred_cy - 0.5 * pred_h
        pred_box_x2 = pred_cx + 0.5 * pred_w
        pred_box_y2 = pred_cy + 0.5 * pred_h

        pred_boxes= torch.stack([pred_box_x1, pred_box_y1, pred_box_x2, pred_box_y2], dim=2)

        return pred_boxes


    def transform_boxes_to_og_size(boxes,new_size,original_size):
        ratios = [ torch.tensor(s_og, dtype=torch.float32,devices=boxes.device) / torch.tensor(s,dtype=torch.float32, devices=boxes.device) for s_og, s in zip(original_size, new_size)]

        ratio_h, ratio_w = ratios
        xmin,ymin, xmax,ymax = boxes.unbind(1)
        xmin = xmin * ratio_w
        ymin = ymin * ratio_h
        xmax = xmax * ratio_w
        ymax = ymax * ratio_h
        return torch.stack([xmin, ymin, xmax, ymax], dim=1)

    def generate_anchors(self, image,feature):
        grid_h,grid_w=feature.shape[2],feature.shape[3]
        image_h, image_w = image.shape[2], image.shape[3]
        stride_h = image_h / grid_h
        stride_w = image_w / grid_w


        # Make sure h/w = aspect_ratio and hxw=1

        h_ratios=torch.sqrt(aspect_ratios)
        w_ratios=1/h_ratios

        ws=(w_ratios[:,None] * self.scales[None,:]).view(-1)
        hs=(h_ratios[:,None] * self.scales[None,:]).view(-1)
        

        base_anchors = (torch.stack([-ws,-hs,ws,hs], dim=1) /2 ).round()

        # Get the shifts in the x and y axis

        shifts_x = torch.arange(0, grid_w,device=feat.device) * stride_w
        shifts_y = torch.arange(0, grid_h,device=feat.device) * stride_h

        shifts_x, shifts_y = torch.meshgrid(shifts_x, shifts_y, indexing='ij')

        shifts_x = shifts_x.reshape(-1)
        shifts_y = shifts_y.reshape(-1)
        shifts=torch.stack([shifts_x, shifts_y, shifts_x, shifts_y], dim=1)
        anchors=(shifts.view(1, -1, 4) + base_anchors.view(-1, 1, 4))
        anchors=anchors.reshape(-1,4)
        return anchors
    

    def filter_proposals(self, proposals, class_scores, img_shape):
        class_scores = class_scores.reshape(-1)
        class_scores = torch.sigmoid(class_scores)
        _, top_idx = torch.topk(class_scores, k=2000, sorted=True)
        class_scores = class_scores[top_idx]
        proposals = proposals[top_idx]
        proposals = self.box_to_boundary(proposals, img_shape)

        # NMS
        keep_mask=torch.zeros_like(class_scores , dtype=torch.bool)
        keep_ids= torchvision.ops.nms(proposals, class_scores, iou_threshold=0.7)             # IOU threshold 0.7
        post_nms_keep_indexes = keep_ids[class_scores[keep_ids].sort(descending=True)][1]

        # Post NMS filtering
        proposals=proposals[post_nms_keep_indexes[:2000]]                                 # top 2000 proposals
        class_scores = class_scores[post_nms_keep_indexes[:2000]]
        return proposals, class_scores
    
    def assign_targets_to_anchors(self, anchors, gt_boxes):
        iou_matrix = get_IOU(anchors, gt_boxes)

        # Get the best ground truth box for each anchor
        best_match,best_gt_id = iou_matrix.max(dim=0)
        best_gt_id_pre_treshold = best_gt_id.clone()         # jeep a copy of the best_gt_id before thresholding


        below_threshold_mask = best_match < 0.3
        between_threshold_mask = (best_match >= 0.3) & (best_match < 0.7)
        best_gt_id[below_threshold_mask] = -1  # -1 for anchors that are below the threshold
        best_gt_id[between_threshold_mask] = -2


        # Low quality anchors
        best_anchor_iou_for_gt, _ = iou_matrix.max(dim=1)
        gt_pred_pair_max_iou=torch.where(iou_matrix == best_anchor_iou_for_gt[:, None])
        
        # Get all the anchor indexes
        preds_ids_to_update=gt_pred_pair_max_iou[1]
        best_gt_id[preds_ids_to_update]= best_gt_id_pre_treshold[best_gt_id_pre_treshold]

        # Best match index is either valid or -1 or -2
        matched_gt_boxes=gt_boxes[best_gt_id.clamp(min=0)]

        # Set all  foreground anchors to 1 and background anchors to 0
        labels = best_gt_id>=0
        labels=labels.to(  torch.float32)

        background_anchors= best_gt_id   == -1
        labels[background_anchors] = 0.0

        # anchors to be ignored to -1
        ignore_anchors = best_gt_id == -2
        labels[ignore_anchors] = -1.0

        return labels, matched_gt_boxes

    def boxes_to_transform_targets(self, groud_truth_boxes, anchors):
        #Get center x,y h,w from x1, y1, x2, y2 for  anchors 
        widths= anchors[:, 2] - anchors[:, 0]
        heights = anchors[:, 3] - anchors[:, 1]
        cx = (anchors[:, 0] + 0.5 * widths)
        cy = (anchors[:, 1] + 0.5 * heights)

        # for gt boxes
        gt_widths = groud_truth_boxes[:, 2] - groud_truth_boxes[:,0]
        gt_heights = groud_truth_boxes[:, 3] - groud_truth_boxes[:,1]
        gt_cx = (groud_truth_boxes[:, 0] + 0.5 * gt_widths)
        gt_cy = (groud_truth_boxes[:, 1] + 0.5 * gt_heights)


        target_dx = (gt_cx - cx) / widths
        target_dy = (gt_cy - cy) / heights
        target_dw = torch.log(gt_widths / widths)
        target_dh = torch.log(gt_heights / heights)

        regression_targets = torch.stack([target_dx, target_dy, target_dw, target_dh], dim=1)
        return regression_targets

    def forward(self, image, features,target):
        rpn_feat=nn.ReLU()(self.rpn_conv(features))
        classification_scores = self.class_layer(rpn_feat)
        bbox_predictions = self.bbox_layer(rpn_feat)

        anchors = self.generate_anchors(image, features)

        # class_scores = (Batch,anchors per location, h_feat, w_feat)
        anchors_per_location = classification_scores.shape[1]
        classification_scores = classification_scores.permute(0, 2, 3,1)
        classification_scores = classification_scores.reshape(-1,1)

        # classs_scores= (Batch*H_feat*w_feat, anchors per location,1)




        # bbox_predictions = (Batch,Anchors per location*4, h_feat, w_feat)
        bbox_predictions = bbox_predictions.view(bbox_predictions.size(0),
                                                 anchors_per_location,
                                                 4,
                                                 rpn_feat.shape[-2],
                                                 rpn_feat.shape[-1])
        bbox_predictions = bbox_predictions.permute(0, 3, 4, 1, 2)
        bbox_predictions = bbox_predictions.reshape(-1, 4)
        # bbox_predictions = (Batch*H_feat*w_feat, anchors per location,4)

        proposals=self.anchors_to_predictions( (bbox_predictions.detach().reshape(-1,1,4), anchors))
        proposals = proposals.reshape(proposals.size(0), 4)


        proposals, class_scores = self.filter_proposals(proposals, classification_scores.detach(), image.shape)
        rpn_output = {
            'proposals': proposals,
            'class_scores': class_scores
        }
        if not self.training or target is None:
            return rpn_output
        else:  # assign ground truth boxes and  labels to anchors

            labels_for_anchors, matched_gt_boxes = self.assign_targets_to_anchors(anchors, target['boxes'][0])

            regression_targets = self.boxes_to_transform_targets(matched_gt_boxes, anchors)

            # Sample positive and negative anchors for training
            sampled_pos_ids_mask, sampled_neg_ids_mask = sample_positive_negative(labels_for_anchors,128,128)

            sampled_ids=torch.where(sampled_pos_ids_mask | sampled_neg_ids_mask)[0]
            localization_loss  = nn.SmoothL1Loss(bbox_predictions[sampled_pos_ids_mask], regression_targets[sampled_ids],beta=1/9 , reductionn='sum') / sampled_ids.numel()

            classification_loss=nn.binary_cross_entropy_with_logits(
                classification_scores[sampled_ids].flatten(),
                labels_for_anchors[sampled_ids].flatten(),
                
            ) 

            rpn_output['rpn_classificatoin_loss'] = classification_loss
            rpn_output['rpn_localization_loss'] = localization_loss

            return rpn_output


ROI head road map:

Training:
* Assign ground truth boxes to proposals

* Sample posotive and negative proposals
* Get classification and regression targets for proposals
* ROI pooling to get proposal features
* Call classification and regression layers
* Compute classification and localization loss

Inference:
* ROI pooling to get proposal features
* Classification and regression
* Convert proposals to predictions with box transformation prediction
* Filter boxes
    

In [36]:
class ROIHead(nn.Module):
    def __init__(self, num_classes=8, in_channels=512):
        super(ROIHead, self).__init__()
        self.num_classes = num_classes
        self.pool_size = 7
        self.fc_inner_dim = 1024

        self.fc1=nn.Linear(in_channels*self.pool_size*self.pool_size, self.fc_inner_dim)

        self.fc2=nn.Linear(self.fc_inner_dim, self.fc_inner_dim)
        self.class_layer = nn.Linear(self.fc_inner_dim, num_classes)
        self.bbox_reg_layer = nn.Linear(self.fc_inner_dim, num_classes * 4)


    def assign_targets_to_proposals(self, proposals, gt_boxes, gt_labels):
        iou_matrix = get_IOU(proposals, gt_boxes)

        # Get the best ground truth box for each proposal
        best_match, best_gt_id = iou_matrix.max(dim=0)
        
        below_low_threshold_mask = best_match < 0.5
        best_gt_id[below_low_threshold_mask] = -1  # -1 for proposals that are below the threshold
        matched_gt_boxes = gt_boxes[best_gt_id.clamp(min=0)]
        labels=gt_labels(best_gt_id.clamp(min=0))
        labels=labels.to(torch.int64)

        background_proposals = best_gt_id == -1
        labels[background_proposals] = 0  # Background proposals are labeled as 0
        return labels, matched_gt_boxes

    def filter_predictions(self, pred_boxes,pred_labels,pred_scores):
        # REmove low scoring boxes
        keep_mask = torch.where(pred_scores > 0.05)[0]
        pred_boxes,pred_scores,pred_labels = pred_boxes[keep_mask], pred_scores[keep_mask], pred_labels[keep_mask]

        # NMS
        keep_mask = torch.zeros_like(pred_scores, dtype=torch.bool)
        for class_id in torch.unique(pred_labels):
            ids= torch.where(pred_labels == class_id)[0]
            keep_ids= torch.ops.torchvision.nms(
                pred_boxes[ids], pred_scores[ids], iou_threshold=0.5)

            keep_mask[ids[keep_ids]] = True
        keep_indices = torch.where(keep_mask)[0]
        post_nms_indices = keep_indices[pred_scores[keep_indices].sort(descending=True)[1]]
        keep=post_nms_indices[:100]  # Keep top 100 predictions
        return pred_boxes[keep], pred_labels[keep], pred_scores[keep] 





    def forward(self, features, proposals, image_shape,target):
        if self.training and target is not None:
            # Assign ground truth boxes to proposals
            gt_boxes = target['boxes'][0]
            gt_labels = target['labels'][0]

            # Assign labels and gt boxes to proposals

            labels,matched_gt_boxes = self.assign_targets_to_proposals(proposals, gt_boxes, gt_labels)
            
            
            sampled_neg_ids_mask, sampled_pos_ids_mask = sample_positive_negative(labels, 32, 128-32) 
            # Sample positive and negative proposals for training
            
            sampled_ids = torch.where(sampled_pos_ids_mask | sampled_neg_ids_mask)[0]

            proposals = proposals[sampled_ids]
            labels = labels[sampled_ids]
            matched_gt_boxes = matched_gt_boxes[sampled_ids]

            regression_targets = self.boxes_to_transform_targets(matched_gt_boxes, proposals)  

        # ROI Pooling
        spacial_scale= 0.0625

        proposal_roi_pool_feats = torchvision.ops.roi_pool(
            features, proposals, output_size=self.pool_size, spatial_scale=spacial_scale)
        
        
        proposal_roi_pool_feats = proposal_roi_pool_feats.flatten(start_dim=1)

        box_fc1=torch.nn.functional.relu(self.fc1(proposal_roi_pool_feats))
        box_fc2 = torch.nn.functional.relu(self.fc2(box_fc1))

        class_scores = self.class_layer(box_fc2)
        bbox_predictions = self.bbox_reg_layer(box_fc2)

        num_boxes, num_classes = class_scores.shape
        bbox_predictions = bbox_predictions.reshape(num_boxes, num_classes, 4)

        frcnn_output = {}

        if self.training and target is not None:
            classification_loss = torch.nn.functional.cross_entropy(class_scores, labels)

            #compute localization loss only for non background proposals
            fg_proposal_ids=torch.where(labels > 0)[0]

            fg_class_labels = labels[fg_proposal_ids]
            localization_loss = torch.nn.functional.smooth_l1_loss(
                bbox_predictions[fg_proposal_ids, fg_class_labels],
                regression_targets[fg_proposal_ids],
                beta=1 / 9,
                reduction='sum'
            ) / fg_proposal_ids.numel()

            frcnn_output['frcnn_classification_loss'] = classification_loss
            frcnn_output['frcnn_localization_loss'] = localization_loss
            return frcnn_output
        else:
            #apply transformation to the proposals
            pred_boxes = apply_regression_to_proposals(bbox_predictions, proposals)

            pred_scores = torch.nn.functional.softmax(class_scores, dim=1)

            #clamp boxes to image boundaries
            pred_boxes = box_to_boundary(pred_boxes, image_shape)

            # create labels for predictions
            pred_labels = torch.arrange(num_classes, device=pred_boxes.device)
            pred_labels= pred_labels.view(1, -1).expand_as(pred_scores)


            # remove background predictions

            pred_boxes = pred_boxes[:, 1:]
            pred_scores = pred_scores[:, 1:]
            pred_labels = pred_labels[:, 1:]

            # batch everything making every class a separate prediction
            pred_boxes = pred_boxes.reshape(-1, 4)
            pred_scores = pred_scores.reshape(-1)
            pred_labels = pred_labels.reshape(-1)
            
            pred_boxes, pred_labels, pred_scores = self.filter_predictions(pred_boxes, pred_labels, pred_scores)
            
            frcnn_output['pred_boxes'] = pred_boxes
            frcnn_output['pred_labels'] = pred_labels
            frcnn_output['pred_scores'] = pred_scores

            return frcnn_output

The Faster RCNN is comprised of a pretrained vgg16 backbone and the regional proposal nn and roi head we implemented above

In [37]:
class FasterRCNN(nn.Module):
    def __init__(self, num_classes=8):
        super(FasterRCNN, self).__init__()
        vgg16=torchvision.models.vgg16(pretrained=True)
        self.backbone = vgg16.features[:-1] # Exclude the last max pooling layer
        self.rpn = RegionalProposalNN(num_classes=num_classes, in_channels=512)
        self.roi_head = ROIHead(num_classes=num_classes, in_channels=512)

        for layer in self.backbone[:10]:
            for param in layer.parameters():
                param.requires_grad = False
        self.image_mean= [0.485, 0.456, 0.406]
        self.image_std = [0.229, 0.224, 0.225]
        self.min_size = 600
        self.max_size = 1000
    def normalize_resize(self, image,bboxes=None):
        # Normalize the image
        mean= torch.as_tensor(self.image_mean, dtype=image.dtype, device=image.device)
        std = torch.as_tensor(self.image_std, dtype=image.dtype, device=image.device)
        image = (image - mean[:, None, None]) / std[:, None, None]

        # resize the image so that lower dim gets to 600 but larger dim does not exceed 1000

        h, w = image.shape[-2:]
        im_shape= torch.tensor(image.shape[-2:])
        min_size = torch.min(im_shape).to(torch.float32)
        max_size = torch.max(im_shape).to(torch.float32)
        scale=torch.min(
            float(self.min_size) / min_size,     
            float(self.max_size) / max_size
        )
        image = torch.nn.functional.interpolate(
            image,
            scale_factor=scale,
            mode='bilinear',
            recompute_scale_factor=True, 
            align_corners=False
        )

        # Resize the bounding boxes if provided
        if bboxes is not None:
            ratios = [ torch.tensor(s,dtype=torch.float32,device=bboxes.device)/
                      torch.tensor(s_orig,dtype=torch.float32,device=bboxes.device)
                     for s, s_orig in zip(image.shape[-2:], (h,w))]
            
            ratio_h,ratio_w=ratios
            xmin, ymin, xmax, ymax = bboxes.unbind(2)
            xmin = xmin * ratio_w
            ymin = ymin * ratio_h
            xmax = xmax * ratio_w
            ymax = ymax * ratio_h

            bboxes = torch.stack([xmin, ymin, xmax, ymax], dim=2)
            return image, bboxes



    def forward(self, image, target=None):
        old_shape = image.shape[-2:]
        if self.training:
            image, bboxes = self.normalize_resize(image, target['bboxes'])
            target['bboxes'] = bboxes
        else:
            image, _ = self.normalize_resize(image,None)

         # call backbone and RPN   
        features = self.backbone(image)
        rpn_output = self.rpn(image, features, target)
        proposals = rpn_output['proposals']

        FasterRCNN_output = self.roi_head(features, proposals, image.shape[-2:])

        if not self.training:
            # transform the predicted boxes to the original image shape
            FasterRCNN_output['boxes'] = transform_boxes_to_og_size(
                FasterRCNN_output['boxes'], image.shape[-2:], old_shape
            )

        return rpn_output, FasterRCNN_output

In [38]:
# Your code runs without errors. The previous cells have set up the environment and device.
# You can start building your model or loading data here.


TypeError: list indices must be integers or slices, not str

ModuleNotFoundError: No module named 'your_model_file'