In [1]:
import torch
from torch import Tensor
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import pycocotools
import torchvision.transforms as transforms
from torchvision.models import ResNet50_Weights
from torchvision.models.detection.backbone_utils import resnet_fpn_backbone
from torchvision.ops import MultiScaleRoIAlign,nms,box_convert,box_iou
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.datasets import CocoDetection
from torch.utils.data import DataLoader
from PIL import Image
from typing import List, Tuple





In [2]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [3]:
class ImageList:
    """
    Structure that holds a list of images (of possibly
    varying sizes) as a single tensor.
    This works by padding the images to the same size,
    and storing in a field the original sizes of each image

    Args:
        tensors (tensor): Tensor containing images.
        image_sizes (list[tuple[int, int]]): List of Tuples each containing size of images.
    """

    def __init__(self, tensors: Tensor, image_sizes: List[Tuple[int, int]]) -> None:
        self.tensors = tensors
        self.image_sizes = image_sizes

    def to(self, device: torch.device) -> "ImageList":
        cast_tensor = self.tensors.to(device)
        return ImageList(cast_tensor, self.image_sizes)

In [4]:
def find_max_dimensions(dataset):
    max_height = 0
    max_width = 0

    for image, annotation in dataset:
        width, height = image.shape[2],image.shape[1]
        max_height = max(max_height, height)
        max_width = max(max_width, width)

    return max_height, max_width

In [5]:
def pad_to_max_size(image, target_height, target_width):
    width, height = image.shape[2],image.shape[1]
    pad_height = target_height - height
    pad_width = target_width - width

    padding = (0, 0, pad_width, pad_height)
    padded_image = F.pad(image, padding, value=0)
    return padded_image

In [6]:
class PadToMaxSize:
    def __init__(self, max_height, max_width):
        self.max_height = max_height
        self.max_width = max_width

    def __call__(self, image):
        padding_top = 0
        padding_left = 0
        padding_bottom = max(0, self.max_height - image.shape[1])
        padding_right = max(0, self.max_width - image.shape[2])
        
        return F.pad(image, (padding_left, padding_right, padding_top, padding_bottom))


In [7]:

def compute_mean_std(dataset):
    mean = torch.zeros(3).to(device)
    std = torch.zeros(3).to(device)
    n_images = 0

    for image,annotation in dataset:
        mean += torch.mean(image, [1, 2])
        std += torch.std(image, [1, 2])
        n_images += 1

    mean /= n_images
    std /= n_images
    return mean, std



In [8]:
dataset=CocoDetection(root='./content/train2017',annFile='./content/annotations/instances_train2017.json',transform=transforms.ToTensor())
small_train_ds = torch.utils.data.Subset(dataset, range(10))



loading annotations into memory...
Done (t=51.73s)
creating index...
index created!


In [9]:
max_height,max_width=find_max_dimensions(small_train_ds)
pad_transform=transforms.Compose([PadToMaxSize(max_height,max_width)])
small_train_ds.transform=pad_transform
mean,std=compute_mean_std(small_train_ds)

image_transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize(mean, std)])
small_train_ds.transform=image_transform
#normalize(small_train_ds,mean,std)
small_train_loader=DataLoader(small_train_ds,shuffle=False)


In [10]:
class RPN(nn.Module):
  def __init__(self,in_channels,anchors):
    super(RPN,self).__init__()

    self.conv = nn.Conv2d(in_channels,512,kernel_size=3,stride=1,padding=1)
    self.cls_logits = nn.Conv2d(512,anchors*2,kernel_size=1,stride=1)
    self.bbox_pred = nn.Conv2d(512,anchors*4,kernel_size=1,stride=1)

    for layer in [self.conv,self.cls_logits,self.bbox_pred]:
      nn.init.normal_(layer.weight,std=0.01)
      nn.init.constant_(layer.bias,0)

    self.anchors = anchors

  def forward(self,x):
    logits = []
    bbox_preds = []
    for feature in x:
      t = F.relu(self.conv(feature))
      logits.append(self.cls_logits(t).permute(0, 2, 3, 1).reshape(t.shape[0], -1, 2))  # (N, H*W*K, 2)
      bbox_preds.append(self.bbox_pred(t).permute(0, 2, 3, 1).reshape(t.shape[0], -1, 4))  # (N, H*W*K, 4)
      #break
    
    logits = torch.cat(logits, dim=1)  # Shape: (total_anchors, 2)
    bbox_preds = torch.cat(bbox_preds, dim=1)  # Shape: (total_anchors, 4)
    


    return logits,bbox_preds

In [11]:

anchor_generator=AnchorGenerator(sizes=((32,64,128),(64,128,256),(128,256,512)),
                                   aspect_ratios=(0.5,1.0,2.0),)
print(anchor_generator.sizes)
print(anchor_generator.aspect_ratios)

((32, 64, 128), (64, 128, 256), (128, 256, 512))
((0.5, 1.0, 2.0), (0.5, 1.0, 2.0), (0.5, 1.0, 2.0))


In [55]:
class DetectionHead(nn.Module):
    def __init__(self, in_channels, num_classes, roi_output_size):
        super(DetectionHead, self).__init__()

        self.fc1 = nn.Linear(in_channels * roi_output_size ** 2, 1024)
        self.fc2 = nn.Linear(1024, 1024)

        self.cls_score = nn.Linear(1024, num_classes)
        self.bbox_pred = nn.Linear(1024, num_classes * 4)

        for layer in [self.fc1, self.fc2, self.cls_score, self.bbox_pred]:
            torch.nn.init.normal_(layer.weight, std=0.01)
            torch.nn.init.constant_(layer.bias, 0)

    def forward(self, x):
        x = x.flatten(start_dim=1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))

        cls_logits = self.cls_score(x)
        bbox_deltas = self.bbox_pred(x)

        return cls_logits, bbox_deltas


In [None]:

class FasterRcnn(nn.Module):
  def __init__(self,backbone,num_classes,anchor_generator):
    super(FasterRcnn,self).__init__()

    self.backbone = backbone
    self.rpn = RPN(backbone.out_channels,anchors=9)
    self.head = DetectionHead(in_channels=backbone.out_channels,num_classes=num_classes,roi_output_size=7)
    self.roi_poooler= MultiScaleRoIAlign(featmap_names=['1','2','3'],output_size=7,sampling_ratio=2)
    
    self.num_classes = num_classes

    self.anchor_generator = anchor_generator
    self.classification_loss_fn=nn.CrossEntropyLoss()
    self.regression_loss_fn=nn.SmoothL1Loss()

  def gen_anchors(self, features, images):
    #Initial feature shapes e o lista cu len de 3
    feature_shapes = [feat[-2:] for feat in features] 
    #Dupa ce iese din anchor generator e o lista cu un len de 1 ce contine tensorii cu ancore de shape (N,4)
    anchors = self.anchor_generator(images,feature_shapes)
   
    return anchors

  def process_proposals(self,objectness_logits,bbox_deltas,anchors,pos_thresh=0.7,neg_thresh=0.3,num_proposals=1000):
        
        #We flatten the bbox deltas to match the anchors , and get the foreground (object) value from the clasification logits
        print(bbox_deltas.shape)
        bbox_deltas = bbox_deltas.view(-1, 4) # Shape (anchors,4)
        objectness_logits=objectness_logits.view(-1,2)# Shape(anchors,2) - 2 reprezinta clasele din partea de clasifier de RPN (0-background,1-foreground)
        logits_prob = torch.sigmoid(objectness_logits)[:, 1]# Am incercat sa filtrez peste clasele de foreground
       
        #Filter the proposals (applying bbox deltas to anchors)
        anchors=torch.cat(anchors,dim=0)
        proposals=box_convert(anchors,in_fmt='cxcywh',out_fmt='xyxy')
        proposals=proposals+bbox_deltas

        #We filter results by threshold
        keep=logits_prob>0.5
        proposals=proposals[keep]
        logits_prob=logits_prob[keep]
        keep = nms(proposals, logits_prob, neg_thresh)
        keep = keep[:num_proposals]
        proposals = proposals[keep]
        objectness_logits = objectness_logits[keep]

        return proposals
  def filter_proposals(self,proposals,cls_logits,bbox_pred,gt_boxes,gt_labels):

        print(len(proposals))
        iou_matrix = box_iou(proposals, gt_boxes)  # Shape(Proposals,gt_boxes)
        max_iou, matched_idxs = iou_matrix.max(dim=1) 
        
        fg_iou_thresh=0.4
        bg_iou_thresh=0.2
        
        labels = torch.full((proposals.size(0),), -1, dtype=torch.long, device=proposals.device)  # -1 <=>ignore
        fg_mask = max_iou >= fg_iou_thresh
        bg_mask = (max_iou < fg_iou_thresh) & (max_iou >= bg_iou_thresh)

        if fg_mask.any():
            labels[fg_mask] = gt_labels[matched_idxs[fg_mask]]
            labels[bg_mask] = 0 #background


        bbox_targets = torch.zeros_like(bbox_pred)
        if fg_mask.any():
            fg_proposals = proposals[fg_mask]
            fg_gt_boxes = gt_boxes[matched_idxs[fg_mask]]
            fg_gt_labels = gt_labels[matched_idxs[fg_mask]]
            fg_bbox_targets = self.compute_bbox_targets(fg_proposals, fg_gt_boxes)

            for idx, label in enumerate(fg_gt_labels):
          
              class_idx = label.item()  #class index
              start = class_idx * 4
              end = start + 4
              bbox_targets[fg_mask][idx, start:end] = fg_bbox_targets[idx]


        # Classification loss

        valid_mask = labels >= 0
        classification_loss=torch.tensor(0.0,requires_grad=True)
        if valid_mask.any():
          filtered_logits=cls_logits[valid_mask]
          filtered_labels=labels[valid_mask]
          classification_loss = self.classification_loss_fn(filtered_logits, filtered_labels)
      
        regression_loss =torch.tensor(0.0,requires_grad=True)
        if fg_mask.any():
            regression_loss = self.regression_loss_fn(bbox_pred[fg_mask], bbox_targets[fg_mask])
            regression_loss /= fg_mask.sum().float()  

        return classification_loss, regression_loss
  
  def compute_bbox_targets(self,proposals, gt_boxes):
        """
        Computes regression targets (dx, dy, dw, dh) for proposals.
        """
        proposals = box_convert(proposals, 'xyxy', 'cxcywh')
       
        gt_boxes = box_convert(gt_boxes, 'xyxy', 'cxcywh')
        

        targets_dx = (gt_boxes[:, 0] - proposals[:, 0]) / proposals[:, 2]
        targets_dy = (gt_boxes[:, 1] - proposals[:, 1]) / proposals[:, 3]
        targets_dw = torch.log(gt_boxes[:, 2] / proposals[:, 2])
        targets_dh = torch.log(gt_boxes[:, 3] / proposals[:, 3])

        return torch.stack((targets_dx, targets_dy, targets_dw, targets_dh), dim=1)


  def forward(self,images,targets):
      featmaps = self.backbone(images)
      features=([featmaps[f'{i}'] for i in range(1,4)])#Filtare pentru a lua layerele 1,2,3 din featmap (fiecare layer are un scale diferit)
      image_sizes = [(image.shape[1], image.shape[2]) for image in images]
      image_list = ImageList(images, image_sizes)

      
      anchors=self.gen_anchors(features,image_list)#(1,N,4)
      #print('trece de ancore')
      
      logits,bbox_deltas = self.rpn(features)#(1,N,2) si (1,N,4)
      #print('trece de rpn')
    
      proposals=self.process_proposals(logits,bbox_deltas,anchors)
      #print('trece de proposals')

      #MultiRoiScaleAlign <=> RoiPooler aici necesita ca parametri featmaps(sunt filtrate din declararea la roi_pooler),Lista de tensori pentru boxes (cu toate ca la mine e doar un tensor cu shape (N,4),lista de image_size)
      #N fiind numar de ancore
      pooled_features=self.roi_poooler(featmaps,[proposals],image_list.image_sizes)
      #print('trece de roi')

      cls_logits,bbox_pred = self.head(pooled_features)
      # cls_logits: (Proposals, num_classes) -> (1, Proposals, num_classes)
      cls_logits = cls_logits.unsqueeze(0)

      # bbox_pred: (Proposals, num_classes * 4) -> (1, Proposals, 4, num_classes)
      num_classes = bbox_pred.shape[1] // 4 
      bbox_pred = bbox_pred.view(-1, 4, num_classes).unsqueeze(0)  


      #Ar trebui sa dau reshape in (Batch_size,N,...)

      #gt_boxes = torch.cat([torch.tensor(t['bbox'], device=device).view(1, 4) for t in targets], dim=0)
      #gt_labels = torch.cat([torch.tensor(t['category_id'], device=device) for t in targets], dim=0)

      #cls_loss,regr_loss=self.filter_proposals(proposals,cls_logits,bbox_pred,gt_boxes,gt_labels)

      gt_boxes = torch.tensor([t['bbox'] for t in targets], device=device).view(-1,len(targets), 4)

      gt_labels = torch.tensor([t['category_id'] for t in targets], device=device).view(-1,len(targets))

      num_gt_boxes = gt_labels.shape[1]
      fg_probs = cls_logits[0, :, 1:].max(dim=-1)[0]
      top_indices = torch.topk(fg_probs, num_gt_boxes, dim=0).indices
      cls_logits = cls_logits[:, top_indices, :]
      bbox_pred = bbox_pred[:, top_indices, :, :]
      print(cls_logits.shape)
      print(bbox_pred.shape)

      cls_loss = self.classification_loss_fn(
        cls_logits.view(-1, num_classes),  #(1, N, num_classes) -> (N, num_classes)
        gt_labels.view(-1)  #(1, N) -> (N,)
    )
      reg_loss_per_class = []
      for class_idx in range(1, num_classes):  #fara clasa 0 (background)
        pred_bboxes_class = bbox_pred[0, :, :, class_idx]  # (N, 4)

        gt_bboxes_class = gt_boxes[0]  # (N, 4)

        reg_loss = self.regression_loss_fn(pred_bboxes_class, gt_bboxes_class)
        reg_loss_per_class.append(reg_loss)

      regr_loss = torch.mean(torch.stack(reg_loss_per_class))

      return cls_loss,regr_loss


In [51]:
import time
def train(model, train_loader, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        start_time = time.time() 
        
        running_loss = 0.0
        for images, targets in train_loader:
            images = images.to(device)
            optimizer.zero_grad()

            classification_loss, bbox_regression_loss = model(images, targets)
            loss = classification_loss + bbox_regression_loss
            loss.backward()

            optimizer.step()

            running_loss += loss.item()

        epoch_time = time.time() - start_time  

        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}, Time: {epoch_time:.2f} seconds")


In [None]:
model=FasterRcnn(backbone=resnet_fpn_backbone('resnet50',weights=ResNet50_Weights.DEFAULT),num_classes=91,anchor_generator=anchor_generator)
optimizer=optim.Adam(model.parameters(),lr=0.001)
train(model,small_train_loader,optimizer,num_epochs=1000)
 