# Model

> API details.

In [None]:
#|hide
from nbdev.showdoc import *
%matplotlib inline
%load_ext autoreload
%autoreload 2

In [None]:
#|export
import cv2
import timm
import torch
import pickle
import random
import torch.nn as nn
from license_plate_detection.bbox_utils import *
from fastai.vision.all import *
from torchvision.ops import roi_align
from license_plate_detection.fpn import *

In [None]:
def pseudo_draw(anchors, im=None, size=128):
    if im is None: t = np.ones([size, size])
    else: t = im
    t = draw_bboxes(t, anchors)
    show_image(t)
    return t

In [None]:
root= Path("/home/aucsie/Documents/practice/")
xb_path = root/'one_batch.pkl'
yb_path = root/'one_batch_yb.pkl'

In [None]:
class CPU_Unpickler(pickle.Unpickler):
    def find_class(self, module, name):
        if module == 'torch.storage' and name == '_load_from_bytes':
            return lambda b: torch.load(io.BytesIO(b), map_location='cpu')
        else: return super().find_class(module, name)

In [None]:
xb= CPU_Unpickler(open(xb_path, 'rb')).load().cuda()
yb= CPU_Unpickler(open(yb_path, 'rb')).load().cuda()

In [None]:
arch= 'resnet50'

In [None]:
device='cuda:0'
out_indices=[1, 2, 3, 4]
feature_channels=[256, 512, 1024, 2048]
# backbone= timm.create_model(arch, features_only=False, out_indices=out_indices)
# backbone= timm.create_model(arch, features_only=True, out_indices=out_indices)

In [None]:
# features=backbone(xb)
# [f.shape for f in features]

In [None]:
# fpn=FPN(feature_channels, out_indices)

In [None]:
# fpn_feat = fpn(features)

In [None]:
# for f in fpn_feat: print(f.shape)

In [None]:
def draw_grid(img_shape, stride):
    im=np.zeros([*img_shape])
    x1=np.arange(0, img_shape[0], stride)
    y1=np.arange(0, img_shape[1], stride)
    for x in x1:
        for y in y1:
            cv2.line(im, (x,y), (x,img_shape[1]), (255,255,255), 1)
            cv2.line(im, (x,y), (img_shape[0], y), (255,255,255), 1)
    show_image(im, figsize=(6, 6))
    return im

In [None]:
class AnchorGenerator:
    def __init__(self, stride, ratios, scales, feature_map):
        self.stride=stride
        self.ratios=np.array(ratios)
        self.scales=np.array(scales)
        self.feature_map=feature_map
    
    def generate_base_anchors(self):
        h_ratios= np.sqrt(self.ratios)
        w_ratios=1/h_ratios
        hs =(h_ratios[:, None]*self.scales[None, :]).ravel()
        ws =(w_ratios[:, None]*self.scales[None, :]).ravel()
        xc, yc= 0., 0.
        base_anchors=np.array([
            -ws, -hs, ws, hs
        ]).T/2
        return base_anchors
    
    def generate_all_anchors(self):
        base_anchors= self.generate_base_anchors()
        num_k= base_anchors.shape[0]
        h, w = self.feature_map.shape
        shift_x= np.arange(0, w)*self.stride
        shift_y= np.arange(0, h)*self.stride
        shift_xx, shift_yy= np.meshgrid(shift_x, shift_y)
        shifts= np.stack([shift_xx.ravel(), shift_yy.ravel(), shift_xx.ravel(), shift_yy.ravel()])
        all_anchors= base_anchors[None, :, :]+ shifts.T[:, None, :]
        all_anchors= all_anchors.reshape(-1, 4)
        return all_anchors, num_k

In [None]:
# valid_flags= assign_anchor(all_anchors, test_bbox)

In [None]:
# valid_anchors=all_anchors[valid_flags==1]

In [None]:
class RPN(nn.Module):
    def __init__(self, anchor_generator, stride, ratios, scales, feature_map):
        super().__init__()
        generator= anchor_generator(stride, ratios, scales, feature_map)
        self.all_anchors, self.num_k= generator.generate_all_anchors()
        self.shared_conv= nn.Conv2d(512, 512, 3, 1, 1)
        self.reg_conv= nn.Conv2d(512, 4*self.num_k, 1, 1)
        self.cls_conv= nn.Conv2d(512, 2*self.num_k, 1, 1)
    
    def get_rpn_gt(self, gt_bboxes):
        valid_flags= assign_anchor(self.all_anchors, gt_bboxes)
        gt_rpn_cls= sample_anchor(self.all_anchors, valid_flags)
        anchors= self.all_anchors[gt_rpn_cls==1]
        gt_rpn_reg= calc_offset(gt_bboxes, anchors)
        return gt_rpn_cls, gt_rpn_reg, anchors
        
    def forward(self, x):
        x= self.shared_conv(x)
        reg_feats= self.reg_conv(x)
        cls_feats= self.cls_conv(x)
        return reg_feats, cls_feats

In [None]:
def reg2loc(reg_feats, n):
    return reg_feats.permute(0,2,3,1).contiguous().view(n,-1,4)

In [None]:
class ROIHead(nn.Module):
    def __init__(self, in_channels, num_classes, roi_feat_size=(7,7)):
        super().__init__()
        self.roi_head= roi_align
        self.roi_feat_size= roi_feat_size
        self.shared_conv= nn.Conv2d(in_channels, in_channels, 3, 1, 1)
        self.reg_conv=nn.Conv2d(in_channels, num_classes*4, 1, 1)
        self.cls_conv=nn.Conv2d(in_channels, num_classes, 1, 1)
    
    def forward(self, x, rois):
        roi_feats= self.roi_head(x, rois, self.roi_feat_size)
        shared_feats= self.shared_conv(roi_feats)
        roi_reg_feats= self.reg_conv(shared_feats)
        roi_cls_feats= self.cls_conv(shared_feats)
        roi_reg_feats= reg2loc(reg_feats, x.shape[0])
        return roi_reg_feats, roi_cls_feats

In [None]:
backbone= timm.create_model(arch, features_only=True, out_indices=[2], ).cuda()

In [None]:
test_bbox =unscale_pnts(yb[:3], 128).detach().cpu().numpy().reshape(-1, 4)
test_bbox= tensor(test_bbox)

In [None]:
feature_map= np.zeros([16, 16])
stride=16
ratios=[0.5]
scales=[35]
rpn=RPN(AnchorGenerator, stride, ratios, scales, feature_map).cuda()

In [None]:
x= backbone(xb)[0]

In [None]:
reg_feats, cls_feats= rpn(x)

In [None]:
all_anchors= rpn.all_anchors
all_anchors= tensor(all_anchors)

In [None]:
roi_head= ROIHead(512, 4)

In [None]:
class FasterRCNN(nn.Module):
    def __init__(self, backbone, rpn, roi):
        super().__init__()
        
        #TODO add fpn
        self.backbone= backbone
        self.rpn_head= rpn
        self.roi_head= roi
        # self.fpn= fpn
    
    def forward(self, x, gt_bboxes):
        x = self.backbone(x)[0]
        rpn_reg_feats, rpn_cls_feats= self.rpn_head(x)
        _, gt_rpn_reg, anchors= self.rpn_head.get_rpn_gt(gt_bboxes) 
        rois= reverse_offset(anchors, gt_rpn_reg)
        rois= [tensor(r).unsqueeze(0) for r in rois]
        roi_reg_feats, roi_cls_feats= self.roi_head(x, rois)
        features= dict(rpn_reg_feats=rpn_reg_feats,
                       rpn_cls_feats=rpn_cls_feats,
                       roi_reg_feats=roi_reg_feats,
                       roi_cls_feats=roi_cls_feats)
        return features

In [None]:
frcc= FasterRCNN(backbone, rpn, roi_head)

In [None]:
"""
#TODO roi pooling layer, output roi, roi_head
rpn_loss, roi_loss
faster rcnn model outputs features only
loss function will handle backprop
"""