In [2]:
import torch
import torchvision
import torch.nn as nn
# import sys
# sys.path.append('/opt/conda/lib/python3.7/site-packages')

from torchvision.models.detection import RetinaNet
from torchvision.models.detection.anchor_utils import AnchorGenerator
# from torchvision.transforms import GeneralizedRCNNTransform
# import albumentations as A

import math
from collections import OrderedDict
import warnings

In [2]:
# class ImageTransform(nn.Module) :
#     def __init__(self, image_mean, image_std, size_divisible=32, fixed_size=None):
#         #super(ImageTransform, self).__init__()  
#         self.image_mean = image_mean
#         self.image_std = image_std
#         self.size_divisible = size_divisible
#         self.fixed_size = fixed_size      
        
#     def forward(self,
#                 images,       # type: List[Tensor]
#                 targets=None  # type: Optional[List[Dict[str, Tensor]]]
#                 ):
#         # type: (...) -> Tuple[ImageList, Optional[List[Dict[str, Tensor]]]]
#         images = [img for img in images]
#         if targets is not None:
#             # make a copy of targets to avoid modifying it in-place
#             # once torchscript supports dict comprehension
#             # this can be simplified as follows
#             # targets = [{k: v for k,v in t.items()} for t in targets]
#             targets_copy: List[Dict[str, Tensor]] = []
#             for t in targets:
#                 data: Dict[str, Tensor] = {}
#                 for k, v in t.items():
#                     data[k] = v
#                 targets_copy.append(data)
#             targets = targets_copy
#         for i in range(len(images)):
#             image = images[i]
#             target_index = targets[i] if targets is not None else None

#             if image.dim() != 3:
#                 raise ValueError("images is expected to be a list of 3d tensors "
#                                  "of shape [C, H, W], got {}".format(image.shape))
#             image = self.normalize(image)
# #            image, target_index = self.resize(image, target_index)
#             images[i] = image
#             if targets is not None and target_index is not None:
#                 targets[i] = target_index

#         image_sizes = [img.shape[-2:] for img in images]
#         images = self.batch_images(images, size_divisible=self.size_divisible)
#         image_sizes_list: List[Tuple[int, int]] = []
#         for image_size in image_sizes:
#             assert len(image_size) == 2
#             image_sizes_list.append((image_size[0], image_size[1]))

#         image_list = ImageList(images, image_sizes_list)
#         return image_list, targets        

In [3]:
def transforms():
    return A.Compose(
        [
            A.OneOf(
            [
                A.HueSaturationValue(hue_shift_limit=0.2, sat_shift_limit= 0.2, val_shift_limit=0.2, p=0.9),      
                A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.9)
            ],
            p=0.9),         
            #A.ToGray(p=0.01),         
            A.HorizontalFlip(p=0.5),         
            A.VerticalFlip(p=0.5),         
            A.Resize(height=512, width=512, p=1),      
            A.Normalize(max_pixel_value=1),
            #A.Cutout(num_holes=8, max_h_size=32, max_w_size=32, fill_value=0, p=0.5),
            ToTensorV2(p=1.0)
        ], 
        p=1.0,         
        bbox_params=A.BboxParams(format='coco',min_area=0, min_visibility=0.99,label_fields=['labels'])
        )

def get_valid_transforms():
    return A.Compose([A.Resize(height=512, width=512, p=1.0),
                      A.Normalize(max_pixel_value=1),
                      ToTensorV2(p=1.0),
                      ], 
                      p=1.0, 
                      bbox_params=A.BboxParams(format='coco',min_area=0, min_visibility=0,label_fields=['labels'])
                      )



In [4]:
def conv3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=1, bias=False)

class BlockAfterFPN(nn.Module) :
    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(BlockAfterFPN, self).__init__()
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = nn.BatchNorm2d(planes)
        self.relu1 = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = nn.BatchNorm2d(planes)
        self.relu2 = nn.ReLU(inplace=True)
        self.conv3 = conv3x3(planes, inplanes)
        self.bn3 = nn.BatchNorm2d(inplanes)
        self.relu3 = nn.ReLU(inplace=True)        


    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu1(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu2(out)

        out = self.conv3(out)
        out = self.bn3(out)
        out = self.relu3(out)

        out += residual
#         out = self.relu(out)
        return out    

In [10]:
class mobilenetv2_retinanet(RetinaNet) :
    def __init__(self, backbone, num_classes,
                 anchor_generator=None):
        
        super().__init__(backbone, num_classes,
                 anchor_generator=anchor_generator)
    
#         super().__init__(backbone, num_classes=num_classes)
#         if image_mean is None:
#             image_mean = [0.485, 0.456, 0.406]
#         if image_std is None:
#             image_std = [0.229, 0.224, 0.225]
#         self.transform = ImageTransform(image_mean, image_std)
        self.BlockAfterFPN = BlockAfterFPN(inplanes=1280, planes=int(1280/4))
        
    def forward(self, images, targets=None):
        # type: (List[Tensor], Optional[List[Dict[str, Tensor]]]) -> Tuple[Dict[str, Tensor], List[Dict[str, Tensor]]]
        """
        Args:
            images (list[Tensor]): images to be processed
            targets (list[Dict[Tensor]]): ground-truth boxes present in the image (optional)

        Returns:
            result (list[BoxList] or dict[Tensor]): the output from the model.
                During training, it returns a dict[Tensor] which contains the losses.
                During testing, it returns list[BoxList] contains additional fields
                like `scores`, `labels` and `mask` (for Mask R-CNN models).

        """
        if self.training and targets is None:
            raise ValueError("In training mode, targets should be passed")

        if self.training:
            assert targets is not None
            for target in targets:
                boxes = target["boxes"]
                if isinstance(boxes, torch.Tensor):
                    if len(boxes.shape) != 2 or boxes.shape[-1] != 4:
                        raise ValueError("Expected target boxes to be a tensor"
                                         "of shape [N, 4], got {:}.".format(
                                             boxes.shape))
                else:
                    raise ValueError("Expected target boxes to be of type "
                                     "Tensor, got {:}.".format(type(boxes)))

        # get the original image sizes
        original_image_sizes: List[Tuple[int, int]] = []
        for img in images:
            val = img.shape[-2:]
            assert len(val) == 2
            original_image_sizes.append((val[0], val[1]))

        # transform the input
        print(type(images))
#         images, targets = self.transform(images, targets)
        print(len(images))
        features = torch.cat([torch.unsqueeze(torch.tensor(image), 0) for image in images], dim=0)
#        image_sizes_list = []
#        for image in images :
#             print(image.shape)
#            image_sizes_list.append([image.shape[1], image.shape[2]])
#        images = ImageList(images, image_sizes_list)
        
        
#         print(images.size())

        # Check for degenerate boxes
        # TODO: Move this to a function
        if targets is not None:
            for target_idx, target in enumerate(targets):
                boxes = target["boxes"]
                degenerate_boxes = boxes[:, 2:] <= boxes[:, :2]
                if degenerate_boxes.any():
                    # print the first degenerate box
                    bb_idx = torch.where(degenerate_boxes.any(dim=1))[0][0]
                    degen_bb: List[float] = boxes[bb_idx].tolist()
                    raise ValueError("All bounding boxes should have positive height and width."
                                     " Found invalid box {} for target at index {}."
                                     .format(degen_bb, target_idx))

        # get the features from the backbone
        features = self.backbone(features)
        print('feature size', features.size())
#         print(type(images.tensors))
#         print((images.tensors.size()))
        if isinstance(features, torch.Tensor):
            features = OrderedDict([('0', features)])

        # TODO: Do we want a list or a dict?
        features = list(features.values())

        # compute the retinanet heads outputs using the features
        head_outputs = self.head(features)

        # create the set of anchors
        anchors = self.anchor_generator(images, features)
        print(anchors)

        losses = {}
        detections: List[Dict[str, Tensor]] = []
        if self.training:
            assert targets is not None

            # compute the losses
            losses = self.compute_loss(targets, head_outputs, anchors)
        else:
            # recover level sizes
            num_anchors_per_level = [x.size(2) * x.size(3) for x in features]
            HW = 0
            for v in num_anchors_per_level:
                HW += v
            HWA = head_outputs['cls_logits'].size(1)
            A = HWA // HW
            num_anchors_per_level = [hw * A for hw in num_anchors_per_level]

            # split outputs per level
            split_head_outputs: Dict[str, List[Tensor]] = {}
            for k in head_outputs:
                split_head_outputs[k] = list(head_outputs[k].split(num_anchors_per_level, dim=1))
            split_anchors = [list(a.split(num_anchors_per_level)) for a in anchors]

            # compute the detections
            detections = self.postprocess_detections(split_head_outputs, split_anchors, images.image_sizes)
            detections = self.transform.postprocess(detections, images.image_sizes, original_image_sizes)

        if torch.jit.is_scripting():
            if not self._has_warned:
                warnings.warn("RetinaNet always returns a (Losses, Detections) tuple in scripting")
                self._has_warned = True
            return losses, detections
        return self.eager_outputs(losses, detections)
        

In [11]:
backbone = torchvision.models.mobilenet_v2(pretrained=True).features
# RetinaNet needs to know the number of
# output channels in a backbone. For mobilenet_v2, it's 1280
# so we need to add it here
backbone.out_channels = 1280

# let's make the network generate 5 x 3 anchors per spatial
# location, with 5 different sizes and 3 different aspect
# ratios. We have a Tuple[Tuple[int]] because each feature
# map could potentially have different sizes and
# aspect ratios
anchor_generator = AnchorGenerator(
    sizes=((32, 64, 128, 256, 512),),
    aspect_ratios=((0.5, 1.0, 2.0),)
)

# put the pieces together inside a RetinaNet model
model = mobilenetv2_retinanet(backbone,
                  num_classes=2,
                  anchor_generator=anchor_generator)
model.eval()
x = [torch.rand(3, 1012, 1012), torch.rand(3, 1012, 1012)]
predictions = model(x) 

<class 'list'>
2




feature size torch.Size([2, 1280, 32, 32])


AttributeError: 'list' object has no attribute 'tensors'

In [None]:
predictions

In [None]:
x = [torch.rand(3, 1012, 1012), torch.rand(3, 1012, 1012)]

In [None]:
type(x[0])

In [None]:
anchor_generator

In [None]:
model.anchor_generator