In [None]:
# load dataset -- copying them from Google Drive
!cp drive/MyDrive/coco.zip .
!cp drive/MyDrive/PennFudanPed.zip .
!cp drive/MyDrive/engine.py .
!cp drive/MyDrive/coco_eval.py .
!cp drive/MyDrive/coco_utils.py .
!cp drive/MyDrive/transforms.py .
!cp drive/MyDrive/utils.py .
!unzip coco.zip
!unzip PennFudanPed.zip

In [None]:
import torch
from torch import nn

class Classifier(nn.Module):
    def __init__(self, input_channel, num_classes):
        super(Classifier, self).__init__()
        self.cls_score = nn.Linear(input_channel, num_classes)
        self.bbox_pred = nn.Linear(input_channel, num_classes * 4)

    def forward(self, x):
      if x.dim() == 4:
            assert list(x.shape[2:]) == [1, 1]
      x = x.flatten(start_dim=1)
      
      scores = self.cls_score(x)
      bbox_coord = self.bbox_pred(x)
      return scores, bbox_coord

In [None]:
from torch.utils.data import Dataset
from pycocotools.coco import COCO
from PIL import Image
import os
import torchvision.transforms as T

class CocoDataset(Dataset):
    def __init__(self, annotation_file, images_file, show_bbox=False):
        self.showbbox = show_bbox
        self.coco = COCO(annotation_file)
        self.images_file = images_file 

        # only person images
        catIds = self.coco.getCatIds(catNms=['person'])
        self.ids = list(sorted(self.coco.getImgIds(catIds=catIds)))
        
        # all images
        # self.ids = list(sorted(self.coco.imgs.keys()))
        
    def __len__(self):
        return len(self.ids)

    def __getitem__(self, index):
        id_image = self.ids[index]
        
        img_name = self.coco.loadImgs(id_image)[0]["file_name"]
        img = Image.open(os.path.join(self.images_file, img_name))

        annotations_id = self.coco.getAnnIds(imgIds=id_image)
        annotations = self.coco.loadAnns(annotations_id)
        
        
        num_objs = len(annotations)

        boxes = []
        areas = []
        labels = []
        for j in range(num_objs):
          if annotations[j]['category_id'] == 1: # only person labels
            x_min = annotations[j]['bbox'][0]
            y_min = annotations[j]['bbox'][1]
            x_max = x_min + annotations[j]['bbox'][2]
            y_max = y_min + annotations[j]['bbox'][3]
            boxes.append([x_min, y_min, x_max, y_max])
            areas.append(annotations[j]['area'])
            labels.append(annotations[j]['category_id'])

        if num_objs == 0:
          boxes = torch.zeros((0, 4), dtype=torch.float32)
        else:
          boxes = torch.as_tensor(boxes, dtype=torch.float32)

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.long)
        id_image = torch.tensor([id_image])
        areas = torch.as_tensor(areas, dtype=torch.float32)
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)


        Annotations = {
            "boxes": boxes,
            "labels": labels,
            "image_id": id_image,
            "area": areas,
            "iscrowd": iscrowd
        }
    
        tr=T.Compose([
            T.ToTensor(),
        ])
        return tr(img), Annotations

In [None]:
import os
import numpy as np
import torch
import torch.utils.data
from PIL import Image
import torchvision.transforms as T

class PennFudanDataset(torch.utils.data.Dataset):
    def __init__(self, root, transforms=None):
        self.root = root
        self.transforms = transforms
        self.imgs = list(sorted(os.listdir(os.path.join(root, "PNGImages"))))
        self.masks = list(sorted(os.listdir(os.path.join(root, "PedMasks"))))


    def __getitem__(self, idx):
        img_path = os.path.join(self.root, "PNGImages", self.imgs[idx])
        mask_path = os.path.join(self.root, "PedMasks", self.masks[idx])
        img = Image.open(img_path).convert("RGB")
        mask = Image.open(mask_path)
        mask = np.array(mask)
        obj_ids = np.unique(mask)
        obj_ids = obj_ids[1:]
        masks = mask == obj_ids[:, None, None]
        num_objs = len(obj_ids)
        boxes = []
        for i in range(num_objs):
            pos = np.where(masks[i])
            xmin = np.min(pos[1])
            xmax = np.max(pos[1])
            ymin = np.min(pos[0])
            ymax = np.max(pos[0])
            boxes.append([xmin, ymin, xmax, ymax])

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.ones((num_objs,), dtype=torch.int64)
        masks = torch.as_tensor(masks, dtype=torch.uint8)

        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["masks"] = masks
        target["image_id"] = image_id
        target["area"] = area
        target["iscrowd"] = iscrowd
        target["img_path"] = img_path

        tr = T.ToTensor()
        img = tr(img)
        return img, target

    def __len__(self):
        return len(self.imgs)

In [None]:
import torch
import torchvision.transforms as transforms

annotation_file='coco/annotations/instances_val2017.json'
images_file='coco/val2017'
dataset = CocoDataset(annotation_file, images_file)
dataset_validation = CocoDataset(annotation_file, images_file)
dataset_test_coco = CocoDataset(annotation_file, images_file)

dataset_test_pennfudan = PennFudanDataset('PennFudanPed')


torch.manual_seed(1)
indices = torch.randperm(len(dataset)).tolist()
indices_pennfudan = torch.randperm(len(dataset_test_pennfudan)).tolist()


dataset = torch.utils.data.Subset(dataset, indices[:100])
dataset_validation = torch.utils.data.Subset(dataset_validation, indices[700:720])
dataset_test_coco = torch.utils.data.Subset(dataset_test_coco, indices[500:520])
dataset_test_pennfudan = torch.utils.data.Subset(dataset_test_pennfudan, indices_pennfudan[0:100])

def collate_fn(batch):
    return tuple(zip(*batch))


data_loader = torch.utils.data.DataLoader(
    dataset, batch_size=2, shuffle=True, num_workers=2,
    collate_fn=collate_fn)

data_loader_validation = torch.utils.data.DataLoader(
    dataset_validation, batch_size=2, shuffle=True, num_workers=2,
    collate_fn=collate_fn
)
data_loader_test_coco = torch.utils.data.DataLoader(
    dataset_test_coco, batch_size=1, shuffle=False, num_workers=2,
    collate_fn=collate_fn)
data_loader_test_pennfudan = torch.utils.data.DataLoader(
    dataset_test_pennfudan, batch_size=1, shuffle=False, num_workers=2,
    collate_fn=collate_fn)

loading annotations into memory...
Done (t=0.57s)
creating index...
index created!
loading annotations into memory...
Done (t=0.75s)
creating index...
index created!
loading annotations into memory...
Done (t=0.70s)
creating index...
index created!


In [None]:
'''
function for validation loss
https://stackoverflow.com/questions/71288513/how-can-i-determine-validation-loss-for-faster-rcnn-pytorch
'''
from typing import Tuple, List, Dict, Optional
import torch
from torch import Tensor
from collections import OrderedDict
from torchvision.models.detection.roi_heads import fastrcnn_loss
from torchvision.models.detection.rpn import concat_box_prediction_layers
def eval_forward(model, images, targets):
    model.eval()

    original_image_sizes: List[Tuple[int, int]] = []
    for img in images:
        val = img.shape[-2:]
        assert len(val) == 2
        original_image_sizes.append((val[0], val[1]))

    images, targets = model.transform(images, targets)
    if targets is not None:
        for target_idx, target in enumerate(targets):
            boxes = target["boxes"]
            degenerate_boxes = boxes[:, 2:] <= boxes[:, :2]
            if degenerate_boxes.any():
                bb_idx = torch.where(degenerate_boxes.any(dim=1))[0][0]
                degen_bb: List[float] = boxes[bb_idx].tolist()
                raise ValueError(
                    "All bounding boxes should have positive height and width."
                    f" Found invalid box {degen_bb} for target at index {target_idx}."
                )

    features = model.backbone(images.tensors)
    if isinstance(features, torch.Tensor):
        features = OrderedDict([("0", features)])
    model.rpn.training=True
    features_rpn = list(features.values())
    objectness, pred_bbox_deltas = model.rpn.head(features_rpn)
    anchors = model.rpn.anchor_generator(images, features_rpn)

    num_images = len(anchors)
    num_anchors_per_level_shape_tensors = [o[0].shape for o in objectness]
    num_anchors_per_level = [s[0] * s[1] * s[2] for s in num_anchors_per_level_shape_tensors]
    objectness, pred_bbox_deltas = concat_box_prediction_layers(objectness, pred_bbox_deltas)
    proposals = model.rpn.box_coder.decode(pred_bbox_deltas.detach(), anchors)
    proposals = proposals.view(num_images, -1, 4)
    proposals, scores = model.rpn.filter_proposals(proposals, objectness, images.image_sizes, num_anchors_per_level)

    proposal_losses = {}
    assert targets is not None
    labels, matched_gt_boxes = model.rpn.assign_targets_to_anchors(anchors, targets)
    regression_targets = model.rpn.box_coder.encode(matched_gt_boxes, anchors)
    loss_objectness, loss_rpn_box_reg = model.rpn.compute_loss(
        objectness, pred_bbox_deltas, labels, regression_targets
    )
    proposal_losses = {
        "loss_objectness": loss_objectness,
        "loss_rpn_box_reg": loss_rpn_box_reg,
    }

    image_shapes = images.image_sizes
    proposals, matched_idxs, labels, regression_targets = model.roi_heads.select_training_samples(proposals, targets)
    box_features = model.roi_heads.box_roi_pool(features, proposals, image_shapes)
    box_features = model.roi_heads.box_head(box_features)
    class_logits, box_regression = model.roi_heads.box_predictor(box_features)

    result: List[Dict[str, torch.Tensor]] = []
    detector_losses = {}
    loss_classifier, loss_box_reg = fastrcnn_loss(class_logits, box_regression, labels, regression_targets)
    detector_losses = {"loss_classifier": loss_classifier, "loss_box_reg": loss_box_reg}
    boxes, scores, labels = model.roi_heads.postprocess_detections(class_logits, box_regression, proposals, image_shapes)
    num_images = len(boxes)
    for i in range(num_images):
        result.append(
            {
                "boxes": boxes[i],
                "labels": labels[i],
                "scores": scores[i],
            }
        )
    detections = result
    detections = model.transform.postprocess(detections, images.image_sizes, original_image_sizes)
    model.rpn.training=False
    model.roi_heads.training=False
    losses = {}
    losses.update(detector_losses)
    losses.update(proposal_losses)
    return losses, detections

In [None]:
import torchvision
from engine import evaluate


def get_model(num_classes):
    trainable_backbone_layers = 5
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True, pretrained_backbone=True, trainable_backbone_layers=5)
    # model = torchvision.models.detection.fasterrcnn_mobilenet_v3_large_fpn(
    #    pretrained=True, pretrained_backbone=True, trainable_backbone_layers=trainable_backbone_layers)

    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = Classifier(in_features, num_classes)
    
    return model

# define parameters
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# num_classes = 91 # all coco classes
num_classes = 2 # person + background

model = get_model(num_classes)
model.to(device)

params = [p for p in model.parameters() if p.requires_grad]
# optimizer = torch.optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)
optimizer = torch.optim.Adam(params, lr=0.005, betas=(0.9, 0.999), eps=0.1, weight_decay=0)

lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)


# training starts here
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    print(f"Epoch {epoch}")
    i = 0
    for images, targets in data_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        loss = sum(loss for loss in loss_dict.values())
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print(f"iteration: {i}\tloss: {loss}")
        i += 1

    # validation loss
    loss_validation_dict, dets = eval_forward(model, images, targets)
    loss_validation = sum(loss for loss in loss_validation_dict.values())

    print(f"validadion_loss: {loss_validation}\n\n")
    print(f"[test] evaluating coco")
    # evaluate(model, data_loader_test_coco, device)
    print(f"\n[test] evaluating pennfudan")
    # evaluate(model, data_loader_test_pennfudan, device) 
    lr_scheduler.step()
    print("\n\n")

In [None]:
# save the model
weights_path='drive/MyDrive/5-performance-loss--model_resnet50-lr_0_005-epoch_10-optim_pth.pth'
torch.save({
    'epoch': epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': loss},
     weights_path)

In [None]:
import cv2
from google.colab.patches import cv2_imshow # used for plotting images on Colab

tr = T.ToTensor()

model.eval()
i = 0
for images, targets in data_loader_test_pennfudan:
  images = list(img.to(device) for img in images)
  
  if torch.cuda.is_available():
            torch.cuda.synchronize()

  predictions = model(images)

  # coco
  # img_name = '0' * (12 - len(str(targets[0]['image_id'].item()))) + str(targets[0]['image_id'].item()) + '.jpg'
  # img = cv2.imread('coco/val2017/' + img_name)

  # penndufudan
  img_name = targets[0]['img_path']
  print(img_name)
  img = cv2.imread(img_name)

  for prediction in predictions:
    boxes = prediction['boxes']
    labels = prediction['labels']
    scores = prediction['scores']
    for box, label, score in zip(boxes, labels, scores):
      if score > 0.8:
        print(box, label, score)
        cv2.rectangle(img, (box[0], box[1]), (box[2], box[3]), (255, 0, 0), 2)

    cv2_imshow(img)