In [None]:
#!unzip PennFudanPed.zip

In [None]:
import os
import numpy as np
import torch
import torch.utils.data
from PIL import Image
from torch.optim.lr_scheduler import StepLR
import timm
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import RetinaNet
from engine_my import train_one_epoch, evaluate
import utils
import torchvision.transforms as T

class PennFudanDataset(torch.utils.data.Dataset):
    def __init__(self, root, transforms=None):
        self.root = root
        self.transforms = transforms
        # load all image files, sorting them to
        # ensure that they are aligned
        self.imgs = list(sorted(os.listdir(os.path.join(root, "PNGImages"))))
        self.masks = list(sorted(os.listdir(os.path.join(root, "PedMasks"))))

    def __getitem__(self, idx):
        # load images ad masks
        img_path = os.path.join(self.root, "PNGImages", self.imgs[idx])
        mask_path = os.path.join(self.root, "PedMasks", self.masks[idx])
        img = Image.open(img_path).convert("RGB")
        # note that we haven't converted the mask to RGB,
        # because each color corresponds to a different instance
        # with 0 being background
        mask = Image.open(mask_path)

        mask = np.array(mask)
        # instances are encoded as different colors
        obj_ids = np.unique(mask)
        # first id is the background, so remove it
        obj_ids = obj_ids[1:]

        # split the color-encoded mask into a set
        # of binary masks
        masks = mask == obj_ids[:, None, None]

        # get bounding box coordinates for each mask
        num_objs = len(obj_ids)
        boxes = []
        for i in range(num_objs):
            pos = np.where(masks[i])
            xmin = np.min(pos[1])
            xmax = np.max(pos[1])
            ymin = np.min(pos[0])
            ymax = np.max(pos[0])
            boxes.append([xmin, ymin, xmax, ymax])

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        # there is only one class
        labels = torch.ones((num_objs,), dtype=torch.int64)
        masks = torch.as_tensor(masks, dtype=torch.uint8)

        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        # suppose all instances are not crowd
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["masks"] = masks
        target["image_id"] = image_id
        target["area"] = area
        target["iscrowd"] = iscrowd

        if self.transforms is not None:
            img = self.transforms(img)

        return img, target

    def __len__(self):
        return len(self.imgs)


def get_transform(train):
    transforms = []
    # converts the image, a PIL image, into a PyTorch Tensor
    transforms.append(T.ToTensor())
    if train:
        # during testing, randomly colour imgs for data augmentation
        #transforms.append(T.ColorJitter(hue=(-0.4, 0.4)))
        
        # Blurs image with randomly chosen Gaussian blur.
        #transforms.append(T.GaussianBlur(7, sigma=(0.1, 2.0)))
        
        # Inverts the colors of the given image randomly with a given probability.
        #transforms.append(T.RandomInvert(p=0.5))
        
        # Solarize the image randomly with a given probability by inverting all pixel values above a threshold.
        transforms.append(T.RandomSolarize(0.6, p=0.5))
        
        # Adjust the sharpness of the image randomly with a given probability. 0: blur, 2: sharp
        #transforms.append(T.RandomAdjustSharpness(5, p=1))

        # Autocontrast the pixels of the given image randomly with a given probability.b
        #transforms.append(T.RandomAutocontrast(p=0.))
    return T.Compose(transforms)

# use our dataset and defined transformations
dataset = PennFudanDataset('PennFudanPed', get_transform(train=False))
dataset2 = PennFudanDataset('PennFudanPed', get_transform(train=True))
dataset_test = PennFudanDataset('PennFudanPed', get_transform(train=False))

# split the dataset in train and test set
torch.manual_seed(1)
indices = torch.randperm(len(dataset)).tolist()
dataset = torch.utils.data.Subset(dataset, indices[:-50])
dataset2 = torch.utils.data.Subset(dataset2, indices[:-50])
dataset_test = torch.utils.data.Subset(dataset_test, indices[-50:])


# define training and validation data loaders
data_loader = torch.utils.data.DataLoader(
    dataset, batch_size=2, shuffle=True, pin_memory=True,
    collate_fn=utils.collate_fn)

data_loader2 = torch.utils.data.DataLoader(
    dataset2, batch_size=2, shuffle=True, pin_memory=True,
    collate_fn=utils.collate_fn)

data_loader_test = torch.utils.data.DataLoader(
    dataset_test, batch_size=1, shuffle=False, pin_memory=True,
    collate_fn=utils.collate_fn)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# our dataset has two classes only - background and person
num_classes = 2


img, _ = dataset2[0]
img = torchvision.transforms.ConvertImageDtype(torch.uint8)(img.cpu())
out = Image.fromarray(img.permute(1, 2, 0).contiguous().numpy())
out.show()

### retinanet with resnet50 backbone

In [None]:
model_retina_res = torchvision.models.detection.retinanet_resnet50_fpn(pretrained=True)
# move model to the right device
model_retina_res.to(device)

# construct an optimizer
params = [p for p in model_retina_res.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005,
                            momentum=0.9, weight_decay=0.0005)

# and a learning rate scheduler which decreases the learning rate by
# 10x every 3 epochs
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                               step_size=3,
                                               gamma=0.1)

for epoch in range(10):
    # train for one epoch, printing every 10 iterations
    train_one_epoch(model_retina_res, optimizer, data_loader, device, epoch, print_freq=10)
    train_one_epoch(model_retina_res, optimizer, data_loader2, device, epoch, print_freq=10)
    # update the learning rate
    lr_scheduler.step()
    # evaluate on the test dataset
    #evaluate(model_retina_res, data_loader_test, device=device)
    

evaluate(model_retina_res, data_loader_test, device=device)


### Faser rcnn with resnet50 backbone

In [None]:
#wandb.init(project='228_project', name="faster_res")
model_faster_res = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
model_faster_res.to(device)

# construct an optimizer
params = [p for p in model_faster_res.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005,
                            momentum=0.9, weight_decay=0.0005)

# and a learning rate scheduler which decreases the learning rate by
# 10x every 3 epochs
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                               step_size=3,
                                               gamma=0.1)

for epoch in range(10):
    # train for one epoch, printing every 10 iterations
    train_one_epoch(model_faster_res, optimizer, data_loader, device, epoch, print_freq=10)
    train_one_epoch(model_retina_res, optimizer, data_loader2, device, epoch, print_freq=10)
    # update the learning rate
    lr_scheduler.step()
    # evaluate on the test dataset

evaluate(model_faster_res, data_loader_test, device=device)



### Faser rcnn with mobilenet backbone

In [None]:
#wandb.init(project='228_project', name="faster_mo")
model_faster_mo = torchvision.models.detection.fasterrcnn_mobilenet_v3_large_fpn(pretrained=True)
model_faster_mo.to(device)

# construct an optimizer
params = [p for p in model_faster_mo.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005,
                            momentum=0.9, weight_decay=0.0005)

# and a learning rate scheduler which decreases the learning rate by
# 10x every 3 epochs
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                               step_size=3,
                                               gamma=0.1)


for epoch in range(10):
    # train for one epoch, printing every 10 iterations
    train_one_epoch(model_faster_mo, optimizer, data_loader, device, epoch, print_freq=10)
    train_one_epoch(model_retina_res, optimizer, data_loader2, device, epoch, print_freq=10)
    
    # update the learning rate
    lr_scheduler.step()
    # evaluate on the test dataset
evaluate(model_faster_mo, data_loader_test, device=device)

In [None]:
torch.save(model_retina_res.state_dict(), 'models/retina_res_aug.pth')
torch.save(model_faster_res.state_dict(), 'models/faster_res_aug.pth')
torch.save(model_faster_mo.state_dict(), 'models/faster_mo_aug.pth')