### **Import and Model**

Reference:

voxel51, “Fiftyone-examples/pytorch_detection_training.ipynb at master · Voxel51/fiftyone-examples,” GitHub, 07-Sep-2022. [Online]. Available: https://github.com/voxel51/fiftyone-examples/blob/master/examples/pytorch_detection_training.ipynb.

In [None]:
!pip install fiftyone
!pip install torch torchvision

In [None]:
%%shell

# Download TorchVision repo to use some files from
# references/detection
git clone https://github.com/pytorch/vision.git
cd vision
git checkout v0.3.0

cp references/detection/utils.py ../
cp references/detection/transforms.py ../
cp references/detection/coco_eval.py ../
cp references/detection/engine.py ../
cp references/detection/coco_utils.py ../

In [None]:
import torch
from torch.utils.data import Dataset
import torchvision
import torchvision.ops as ops
import torch.optim as optim
from torchvision import datasets
from torchvision import transforms
import torch.nn as nn
from torchvision.transforms import ToTensor

class Patch_Embedding(nn.Module):
    def __init__(self, channel, embed_dim, patch_dim):
        super().__init__()
        self.in_dim = channel
        self.out_dim = embed_dim

        self.P = patch_dim

        # this outputs a shape of Batch size, embedding dimension, H, W
        self.linear = nn.Conv2d(
            channel, embed_dim, kernel_size=patch_dim, stride=patch_dim, bias=True)
        # self.norm = nn.LayerNorm([height/self.P, width/self.P, embed_dim])
        self.norm = nn.LayerNorm(embed_dim)

    def forward(self, x):

        # flatten it into 2d, so H and W collapse into number of patches, then we swap the shape
        # from [B, ED, H,W] -> [B, ED, number of patches] -> [B, number of patches, ED]
        # this is done to follow the convention of the paper, where the embedding dimension is the last dimension

        x = self.linear(x)

        x = x.flatten(2).transpose(1, 2)
        x = self.norm(x)
        # output shape should be [B, Number of patches, ED], where number of patches should be HW/4*2

        return x


# Spatial-Reduction Attention
class SRAttention(nn.Module):
    def __init__(self, num_heads, channels, height, width, reduction_ratio, batch_size):
        super().__init__()
        self.num_heads = num_heads
        self.head_dimension = channels//self.num_heads

        self.c = channels

        # the Weight is Ci X d Head, so the input dimension should be c and the output should be d head
        self.L = nn.Linear(self.c,
                           self.head_dimension)
        self.sr = SR(height, width, channels,
                     reduction_ratio, batch_size)
        #  Wo has size Ci X Ci, this is becasuse d head = Ci/Ni, after concatnating N Ci, the dimension becomes Ci.
        self.L2 = nn.Linear(self.c, self.c)

    def forward(self, query, key, value):
        SRA = None
        for i in range(self.num_heads):
            # HW x d_head
            qi = self.L(query)
            # HW/R^2 x d_head
            srk = self.L(self.sr(key))
            # HW/R^2 x d_head
            srv = self.L(self.sr(value))
            # attention at stage i
            # HW X d_head @ d_head X HW/R^2 @ HW/R^2 x d_head = > HW X d_head <--- the shape of the A_i
            Ai = ((torch.softmax(qi.clone().detach()@srk.clone().detach().transpose(1, 2) /
                                (self.head_dimension**0.5), dim=1))@srv)
            if(SRA is None):
                SRA = Ai
            else:

                SRA = torch.cat((SRA, Ai), dim=2)

        # SRA after concatinating should be HW X D_head*Ni -> HW X Ci
        SRA = self.L2(SRA)

        return SRA


# Spatial Reduction
# SR(x) = Norm(Reshape(x,Ri)W^s)
class SR(nn.Module):
    def __init__(self, height, width, channels, reduction_ratio, batch_size):
        super().__init__()
        self.H = height
        self.W = width
        self.C = channels
        self.B = batch_size
        self.R = reduction_ratio
        # after reshaping x into HW/R^2 X R^2C, it takes in R^2C and projects to Ci
        self.linear_projection = nn.Linear(self.R**2*self.C, self.C)
        # then re layer norm on the number of channels
        self.norm = nn.LayerNorm(self.C)

    def forward(self, x):
        # reduced the sptial scale of x
        # by reshaping the sequence into size HW/R^2 X R^2C at stage i

        reduced_x = torch.reshape(
            x, [self.B, self.H*self.W//(self.R**2), (self.R**2*self.C)]).clone()
        new_x = self.linear_projection(reduced_x)
        new_x = self.norm(new_x)
        # output should be of size HW/R^2 x CI

        return new_x


class Feed_Forward(nn.Module):
    def __init__(self, in_size, hidden_size, out_size):
        super().__init__()
        self.l1 = nn.Linear(in_size, hidden_size)
        self.relu = nn.ReLU(inplace=False)
        self.l2 = nn.Linear(hidden_size, out_size)

    def forward(self, x):
        x = self.l1(x)
        x = self.relu(x)
        x = self.l2(x)
        return x


class Transformer_Encoder(nn.Module):
    def __init__(self, height, width, channels, reduction_ratio, patch_dim, batch_size, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.norm1 = nn.LayerNorm(channels)
        self.a = SRAttention(self.num_heads, channels,
                             height//patch_dim, width//patch_dim, reduction_ratio, batch_size)
        self.norm2 = nn.LayerNorm(channels)
        self.ff = Feed_Forward(channels, channels//2, channels)

    def forward(self, x):
        n1 = self.norm1(x)
        a = self.a(n1, n1, n1)
        x1 = torch.add(x, a)
        n2 = self.norm2(x1)
        ff = self.ff(n2)
        x2 = torch.add(x1, ff)
        return x2


class Stage_Module(nn.Module):
    # # # added parameter patch_dim
    def __init__(self, channels, embedding_dim, Height, Width, reduction_ratio, patch_dim, batch_size, num_heads):
        super().__init__()
        self.H = Height
        self.W = Width
        self.out_dim = embedding_dim
        self.P = patch_dim
        self.B = batch_size
        self.PE = Patch_Embedding(channels, embedding_dim, patch_dim)
        self.TE = Transformer_Encoder(
            Height, Width, embedding_dim, reduction_ratio, patch_dim, batch_size, num_heads)

    def forward(self, x):
        x = self.PE(x)
        x = self.TE(x)
        # # # reshape to H(i-1)/P x W(i-1)/P x ED as output
        x = torch.reshape(x.clone(), [self.B, self.H//self.P,
                              self.W//self.P, self.out_dim]).permute([0, 3, 1, 2])
        return x


class PVT(nn.Module):
    def __init__(self, channels, height, width, batch_size):
        super().__init__()
        # input at stage 1 is H X W X 3

        self.stg1 = Stage_Module(channels, 64, height,
                                 width, reduction_ratio=8, patch_dim=4, batch_size=batch_size, num_heads=1)
        
        self.stg2 = Stage_Module(
            64, 128, height//4, width//4, reduction_ratio=4, patch_dim=2, batch_size=batch_size, num_heads=2)
        
        self.stg3 = Stage_Module(
            128, 256, height//8, width//8, reduction_ratio=2, patch_dim=2, batch_size=batch_size, num_heads=4)
        
        self.stg4 = Stage_Module(256, 512, height//16,
                                 width//16, reduction_ratio=1, patch_dim=2, batch_size=batch_size, num_heads=8)
        

        self.head = nn.linear(512)

    def forward(self, x):

        x = self.stg1(x)

        x = self.stg2(x)

        x = self.stg3(x)

        x = self.stg4(x)

        return x


class classification_pvt(nn.Module):
    def __init__(self, channels, height, width, batch_size, num_classes):
        super().__init__()
        # input at stage 1 is H X W X 3

        # # # will look to clean it up later
        # # # maybe we should only pass the original height and width for all stages, will verify it tmr
        self.stg1 = Stage_Module(channels, 64, height,
                                 width, reduction_ratio=8, patch_dim=4, batch_size=batch_size, num_heads=1)
        self.stg2 = Stage_Module(
            64, 128, height//4, width//4, reduction_ratio=4, patch_dim=2, batch_size=batch_size, num_heads=2)
        self.stg3 = Stage_Module(
            128, 256, height//8, width//8, reduction_ratio=2, patch_dim=2, batch_size=batch_size, num_heads=4)
        self.stg4 = Stage_Module(256, 512, height//16,
                                 width//16, reduction_ratio=1, patch_dim=2, batch_size=batch_size, num_heads=8)

        self.head = nn.Linear(7*7*512, 128)
        self.head2 = nn.Linear(128, 100)
        self.relu = nn.ReLU(inplace=False)

    def forward(self, x):

        x = self.stg1(x)

        x = self.stg2(x)

        x = self.stg3(x)

        x = self.stg4(x)
        return x

### **COCO2017 (detection)**

In [None]:
import fiftyone.utils.coco as fouc
import fiftyone.zoo as foz
import fiftyone as fo
from PIL import Image


class FiftyOneTorchDataset(torch.utils.data.Dataset):
    def __init__(
        self,
        fiftyone_dataset,
        transforms=None,
        gt_field="ground_truth",
        classes=None,
    ):
        self.samples = fiftyone_dataset
        self.transforms = transforms
        self.gt_field = gt_field

        self.img_paths = self.samples.values("filepath")

        self.classes = classes
        if not self.classes:
            self.classes = self.samples.distinct(
                "%s.detections.label" % gt_field
            )

        if self.classes[0] != "background":
            self.classes = ["background"] + self.classes

        self.labels_map_rev = {c: i for i, c in enumerate(self.classes)}

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        sample = self.samples[img_path]
        metadata = sample.metadata
        img = Image.open(img_path).convert("RGB")

        boxes = []
        labels = []
        area = []
        iscrowd = []
        detections = sample[self.gt_field].detections
        for det in detections:
            category_id = self.labels_map_rev[det.label]
            coco_obj = fouc.COCOObject.from_label(
                det, metadata, category_id=category_id,
            )
            x, y, w, h = coco_obj.bbox
            boxes.append([x, y, x + w, y + h])
            labels.append(coco_obj.category_id)
            area.append(coco_obj.area)
            iscrowd.append(coco_obj.iscrowd)

        target = {}
        target["boxes"] = torch.as_tensor(boxes, dtype=torch.float32)
        target["labels"] = torch.as_tensor(labels, dtype=torch.int64)
        target["image_id"] = torch.as_tensor([idx])
        target["area"] = torch.as_tensor(area, dtype=torch.float32)
        target["iscrowd"] = torch.as_tensor(iscrowd, dtype=torch.int64)

        if self.transforms is not None:
            img, target = self.transforms(img, target)

        return img, target

    def __len__(self):
        return len(self.img_paths)

    def get_classes(self):
        return self.classes

In [None]:
coco_train = foz.load_zoo_dataset(
    "coco-2017",
    split = "train",
    label_types=["detections"])

coco_val = foz.load_zoo_dataset(
    "coco-2017",
    split = "validation",
    label_types=["detections"])

In [None]:
coco_train.compute_metadata()
coco_val.compute_metadata()

batch_size = 16

In [None]:
import transforms as T

train_transforms = T.Compose([T.ToTensor(), T.RandomHorizontalFlip(0.5)])
test_transforms = T.Compose([T.ToTensor()])

torch_dataset = FiftyOneTorchDataset(coco_train, train_transforms)

torch_dataset_test = FiftyOneTorchDataset(coco_val, test_transforms)

In [None]:
# Import functions from the torchvision references we cloned
from engine import train_one_epoch, evaluate

def do_training(model, torch_dataset, torch_dataset_test, num_epochs=4):
    # define training and validation data loaders
    data_loader = torch.utils.data.DataLoader(
        torch_dataset, batch_size=16, shuffle=True, num_workers=2,
        collate_fn=utils.collate_fn, drop_last=True)
    
    data_loader_test = torch.utils.data.DataLoader(
        torch_dataset_test, batch_size=16, shuffle=False, num_workers=2,
        collate_fn=utils.collate_fn, drop_last=True)

    # train on the GPU or on the CPU, if a GPU is not available
    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
    print("Using device %s" % device)

    # move model to the right device
    model.to(device)

    # construct an optimizer
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(params, lr=0.005,
                                momentum=0.9, weight_decay=0.0005)
    # and a learning rate scheduler
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                                    step_size=3,
                                                    gamma=0.1)

    for epoch in range(num_epochs):
        # train for one epoch, printing every 10 iterations
        train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=1)

        # update the learning rate
        lr_scheduler.step()
        # evaluate on the test dataset
        evaluate(model, data_loader_test, device=device)

In [None]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

def get_model(num_classes):
    # load a model pre-trained pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    
    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    print(in_features)
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

In [None]:
model = get_model(80)

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
save_path = './ckpt_coco2017'

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

net = classification_pvt(3, 640, 480, batch_size, 80)
net.to(device)
net.load_state_dict(torch.load('./ckpt_cifar100/cifar100_new_params.pth'))

In [None]:
model_final = nn.Sequential(net, model)
do_training(model_final, torch_dataset, torch_dataset_test, num_epochs=30)