<a href="https://colab.research.google.com/github/MdA-Saad/MaskRCNN-Pedestrian-Detection-and-Segmentation/blob/main/pedistrain_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
from google.colab import drive
from matplotlib import pyplot as plt

import torch
import torchvision
from torchvision.io import read_image
from torchvision.ops.boxes import masks_to_boxes
from torchvision import tv_tensors
from torchvision.transforms.v2 import functional as tF
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
from torchvision.utils import draw_bounding_boxes, draw_segmentation_masks

In [None]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
path_to_folder='/content/drive/My Drive/Colab Notebooks/projects/pennFudan-Peds/'
os.listdir(path_to_folder)

['data']

In [None]:
def pennFudandDataset(torch.utils.data.Dataset):
  def __init__(self, root, transform):
    self.root = root
    self.transform = transform
    self.imgs=list(sorted(os.listdir(os.path.join(root,"PNGImages"))))
    self.masks=list(sorted(os.listdir(os.path.join(root,"PedMasks"))))\

  def __getitem__(self,idx):
    img_path=os.path.join(self.root,"PNGImages",self.imgs[idx])
    mask_path=os.path.join(self.root,"PedMasks",self.masks[idx])
    img=read_image(img_path)
    mask=read_image(mask_path)
    obj_ids=torch.unique(mask) #this is a vector of 1 dim
    obj_ids=obj_ids[1:] #obj_ids[0] is background and we dont want it.
    num_objs=len(obj_ids) #number of pedestrains

    masks=(mask==obj_ids[:,None,None]).to(dtype=torch.uint8)
    """both mask and obj_ids are broadcasted. Mask is broadcasted along dim 0
    while obj_ids is broadcasted along dim 1 and 2.
    """

    boxes=masks_to_boxes(masks)
    label=torch.ones((num_obj),dtype=torch.int64) # 1 represents the pedestrain
    #Model expects int64 for the labels

    image_id=idx
    area=(boxes[:,3]-boxes[:,1])*(boxes[:,2]-boxes[:,0])
    iscrowd=torch.zeros((num_objs),dtype=torch.int64)
    img=tv_tensors.Image(img)

    target={}
    target["boxe"]=tv_tensors.BoundingBoxes(boxes,format="XYXY",canvas_size=tF.get_size(img))
    target["labels"]=labels
    target["image_id"]=image_id
    target["area"]=area
    target["iscrowd"]=iscrowd

    if self.transform is not None:
      img,target=self.transfor(img,target)
    return img,target

  def __len__(self):
    return len(self.imgs)


In [None]:
def collate_fn(batch):
  return tuple(zip(*batch))

def get_transform(train):
  transform=[]
  if train:
    transform.append(tF.RandomHorizontalFlip(0.5))
  transforms.append(tF.ToDtype(torch.float32,scale=True))
  transforms.append(tF.ToPureTensor())
  return tF.compose(transforms)

In [None]:
def get_model_instance_segementation():
  num_classes=2
  model=torchvision.models.detection.maskrcnn_resnet50_fpn(weights="DEFAULT")
  in_features=model.roi_heads.box_predictor.cls_score.in_features
  model.roi_heads.box_predictor=FastRCNNPredictor(in_features,num_classes)
  in_features_mask=model.roi_heads.mask_predictor.conv5_mask.in_channels
  hidden_layer=256
  model.roi_heads.mask_predictor=MaskRCNNPredictor(
      in_features_mask,
      hidden_layer,
      num_classes
  )

  return model

In [None]:
def train(model,optimizer,data_loader,device):
  model.train()
  for images,targets in data_loader:
    images=list(image.to(device) for image in images)
    targets=[{k:v.to(device) for k,v in t.items()} for t in targets]
    loss_dict=model(images,targets)
    losses=sum(loss for loss in loss_dict.values())
    optimizer.zero_grad()
    losses.backward()
    optimizer.step()

    print(f"loss:{losses.item()}")

In [None]:
@torch.inference_mode()
def evaluate(model,data_loader,device):
  model.eval()
  results=[]

  for images,targets in data_loader:
    images=list(image.to(device) for img in images)
    outputs=model(images)
    outputs=[{k:v.to("cpu") for k,v in t.items()} for t in outputs]
    results.append(outputs)
  print(f"Inference finished on {len(data_loader.dataset)} images.")
  return results

In [None]:
device=torch.accelerator.current_accelerator() if torch.accelerator.is_available() else torch.device('cpu')

dataset=PennFudanDataset('data/PennFudan',get_transform(train=True))
dataset_test=PennFudanDataset('data/PennFudan',get_transform(train=False))

indices=torch.randperm(len(dataset)).tolist()
dataset=torch.utils.data.Subset(dataset,indices[:-50])
dataset_test=torch.utils.data.Subset(dataset_test,indices[-50:])

data_loader=torch.utils.data.DataLoader(
    dataset,
    batch_size=2,
    shuffle=True,
    collate_fn=collate_fn
)
data_loader_test=torch.utils.data.DataLoader(
    dataset,
    batch_size=1,
    shuffle=False,
    collate_fn=collate_fn
)

model=get_model_instance_segementation()
model.to(device)

params=[p for p in model.parameters() if p.requires_grad]
optimizer=torch.optim.SGD(
    params,
    lr=0.005,
    momentum=0.9,
    weight_decay=0.0005
)

lr_scheduler=torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=3,
    gamma=0.1
)
num_epochs=1
for epoch in range(num_epochs):
  train(model,optimizer,data_loader,device)
  lr_scheduler.step()
  evaluate(model,data_loader_test,device=device)
print("That's it")
