In [63]:
import matplotlib.pyplot as plt
from pandas.core.common import flatten
import numpy as np
from numpy.ma.core import sqrt
import random
import json

import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torchvision import datasets, transforms, models
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import CosineAnnealingLR

import time

import cv2

import glob
from tqdm import tqdm

from torch import nn
from torch import Tensor
from PIL import Image
from torchvision.transforms import Compose, Resize, ToTensor
from torchvision.models import resnet50, resnet101
from torchsummary import summary
from torchvision.models._utils import IntermediateLayerGetter

device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cpu'

### Sinusoidal Spatial Encoding
https://github.com/tatp22/multidim-positional-encoding

In [163]:
def get_emb(sin_inp):
  """
  Gets a base embedding for one dimension with sin and cos intertwined
  """
  emb = torch.stack((sin_inp.sin(), sin_inp.cos()), dim=-1)
  return torch.flatten(emb, -2, -1)


class SinePositionalEncoding(nn.Module):
  def __init__(self, channels):
    super(SinePositionalEncoding, self).__init__()

    self.org_channels = channels
    channels = int(np.ceil(channels / 2) * 2)
    self.channels = channels
    inv_freq = 1.0 / (10000 ** (torch.arange(0, channels, 2).float() / channels))
    self.register_buffer("inv_freq", inv_freq)
    self.register_buffer("cached_penc", None)

  def get_emb(self, sin_inp):
    """
    Gets a base embedding for one dimension with sin and cos intertwined
    """
    emb = torch.stack((sin_inp.sin(), sin_inp.cos()), dim=-1)
    return torch.flatten(emb, -2, -1)

  def forward(self, tensor):
    if len(tensor.shape) != 3:
      raise RuntimeError("The input tensor has to be 3d!")

    if self.cached_penc is not None and self.cached_penc.shape == tensor.shape:
      return self.cached_penc

    self.cached_penc = None
    batch_size, seq_len, _ = tensor.shape
    pos_x = torch.arange(seq_len, device=tensor.device).type(self.inv_freq.type())
    sin_inp_x = torch.einsum("i, j->ij", pos_x, self.inv_freq)

    emb = self.get_emb(sin_inp_x)
    self.cached_penc = emb.unsqueeze(0)

    return self.cached_penc

### CNN Backbone
https://github.com/DanieleVeri/fair-DETR/blob/main/DETR.ipynb

In [135]:
class FrozenBatchNorm2d(torch.nn.Module):
  """
  BatchNorm2d where the batch statistics and the affine parameters are fixed.

  Copy-paste from torchvision.misc.ops with added eps before rqsrt,
  without which any other models than torchvision.models.resnet[18,34,50,101]
  produce nans.
  """

  def __init__(self, n):
    super(FrozenBatchNorm2d, self).__init__()
    self.register_buffer("weight", torch.ones(n))
    self.register_buffer("bias", torch.zeros(n))
    self.register_buffer("running_mean", torch.zeros(n))
    self.register_buffer("running_var", torch.ones(n))

  def _load_from_state_dict(self, state_dict, prefix, local_metadata, strict,
                              missing_keys, unexpected_keys, error_msgs):
    num_batches_tracked_key = prefix + 'num_batches_tracked'
    if num_batches_tracked_key in state_dict:
      del state_dict[num_batches_tracked_key]

    super(FrozenBatchNorm2d, self)._load_from_state_dict(
        state_dict, prefix, local_metadata, strict,
        missing_keys, unexpected_keys, error_msgs)

  def forward(self, x):
    # move reshapes to the beginning
    # to make it fuser-friendly
    w = self.weight.reshape(1, -1, 1, 1)
    b = self.bias.reshape(1, -1, 1, 1)
    rv = self.running_var.reshape(1, -1, 1, 1)
    rm = self.running_mean.reshape(1, -1, 1, 1)
    eps = 1e-5
    scale = w * (rv + eps).rsqrt()
    bias = b - rm * scale

    return x * scale + bias


class BackboneBase(nn.Module):
  def __init__(self, backbone: nn.Module, train_backbone: bool, return_interm_layers: bool):
    super(BackboneBase, self).__init__()

    for name, parameter in backbone.named_parameters():
      if not train_backbone or 'layer2' not in name and 'layer3' not in name and 'layer4' not in name:
        parameter.requires_grad_(False)

    if return_interm_layers:
      return_layers = {'layer1': '0', 'layer2': '1', 'layer3': '2', 'layer4': '3'}
    else:
      return_layers = {'layer4': '0'}

    self.body = IntermediateLayerGetter(backbone, return_layers=return_layers)

  def forward(self, tensor):
    out = self.body(tensor)
    return out


class Backbone(BackboneBase):
  def __init__(self, name: str, train_backbone: bool, return_interm_layers: bool, dilation:bool):

    backbone = getattr(models, name)(
        replace_stride_with_dilation = [False, False, dilation],
        pretrained = True,
        norm_layer = FrozenBatchNorm2d
    )

    self.num_channels = 512 if name in ('resnet18', 'resnet34') else 2048
    self.return_interm_layers = return_interm_layers
    super().__init__(backbone, train_backbone, return_interm_layers)

### Multi Head Attention Module
https://www.datacamp.com/tutorial/building-a-transformer-with-py-torch

In [42]:
class MultiHeadAttention(nn.Module):
  def __init__(self, d_model, num_heads):
    super(MultiHeadAttention, self).__init__()

    # Ensure d_model and num_heads are perfectly divisible
    assert d_model % num_heads == 0, 'd_model must be divisible by num_heads'

    # Initialize dimensions
    self.d_model = d_model
    self.num_heads = num_heads
    self.d_k = d_model // num_heads

    # Linear Layers to transform the inputs
    self.W_q = nn.Linear(d_model, d_model)  # Key transformation
    self.W_k = nn.Linear(d_model, d_model)  # Query transformation
    self.W_v = nn.Linear(d_model, d_model)  # Value transformation
    self.W_o = nn.Linear(d_model, d_model)  # Output transformation

  def scaled_dot_product_attention(self, Q, K, V, mask=None):
    # Calculate attention scores
    attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / np.sqrt(self.d_k)

    # Apply mask if provided
    if mask is not None:
      attn_scores = attn_scores.masked_fill(mask == 0, 1e-9)

    # Softmax is applied to attention scores to obtain attention probabilities
    attn_probs = attn_scores.softmax(dim=-1)

    # Multiply by value
    output = torch.matmul(attn_probs, V)
    return output

  def split_heads(self, x):
    # Split the input into num_heads for multi-head attention
    batch_size, seq_len, d_model = x.shape
    return x.view(batch_size, self.num_heads, seq_len, self.d_k)

  def combine_heads(self, x):
    # Combine the multiple heads back to original shape
    batch_size, _, seq_len, d_k = x.shape
    return x.view(batch_size, seq_len, self.d_model)

  def forward(self, Q, K, V, mask=None):
    # Apply Linear Transformations and split heads
    Q = self.split_heads(self.W_q(Q))
    K = self.split_heads(self.W_k(K))
    V = self.split_heads(self.W_v(V))

    # Scaled dot product attention
    attn = self.scaled_dot_product_attention(Q, K, V, mask=None)
    output = self.W_o(self.combine_heads(attn))

    return output




###Feed Forward Network (with 2 layers)

In [43]:
class FeedForwardNetwrok(nn.Module):
  def __init__(self, d_model, d_ffn):
    super(FeedForwardNetwrok, self).__init__()

    self.fc1 = nn.Linear(d_model, d_ffn)
    self.fc2 = nn.Linear(d_ffn, d_model)
    self.relu = nn.ReLU()

  def forward(self, x):
    return self.fc2(self.relu(self.fc1(x)))

### Encoder

In [44]:
class EncoderLayer(nn.Module):
  def __init__(self, d_model, num_heads, d_ffn, dropout=0):
    super(EncoderLayer, self).__init__()

    self.self_attn = MultiHeadAttention(d_model, num_heads)
    self.ffn = FeedForwardNetwrok(d_model, d_ffn)
    self.norm1 = nn.LayerNorm(d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.dropout = nn.Dropout(dropout)

  def with_pos_enc(self, tensor, pos: None):
    return tensor if pos is None else tensor + pos

  def forward(self, src, pos, mask=None):
    V = src
    x = self.with_pos_enc(src, pos)
    Q = K  = x

    attn_output = self.self_attn(Q, K, V, mask)
    out = self.norm1(x + self.dropout(attn_output))
    ffn_output = self.ffn(out)
    out = self.norm2(out + self.dropout(ffn_output))

    return out

### Decoder

In [80]:
class DecoderLayer(nn.Module):
  def __init__(self, d_model, num_heads, d_ffn, dropout=0):
    super(DecoderLayer, self).__init__()

    self.self_attn = MultiHeadAttention(d_model, num_heads)
    self.cross_attn = MultiHeadAttention(d_model, num_heads)
    self.ffn = FeedForwardNetwrok(d_model, d_ffn)
    self.norm1 = nn.LayerNorm(d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.norm3 = nn.LayerNorm(d_model)
    self.dropout = nn.Dropout(dropout)

  def with_pos_enc(self, tensor, pos: None):
    return tensor if pos is None else tensor + pos

  def forward(self, tgt, enc_output, pos, query_pos, src_mask=None, tgt_mask=None):
    Q = K = self.with_pos_enc(tgt, query_pos)
    V = tgt
    attn_output = self.self_attn(Q, K, V)
    norm1_output = self.norm1(tgt + self.dropout(attn_output))

    Q = self.with_pos_enc(norm1_output, query_pos)
    K = self.with_pos_enc(enc_output, pos)
    V = enc_output
    attn_output = self.cross_attn(Q, K, V)
    norm2_output = self.norm2(norm1_output + self.dropout(attn_output))

    ffn_output = self.ffn(norm2_output)
    out = self.norm3(norm2_output + self.dropout(ffn_output))

    return out

### Transformer

In [98]:
class Transformer(nn.Module):
  def __init__(self, d_model, num_heads, num_encoder_layers, num_decoder_layers, d_ffn, return_intermediate=False, dropout=0):
    super().__init__()

    self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ffn, dropout) for i in range(num_encoder_layers)])
    self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ffn, dropout) for i in range(num_decoder_layers)])

    self._reset_parameters()

    self.d_model = d_model
    self.num_heads = num_heads
    self.return_intermediate = return_intermediate

  def _reset_parameters(self):
    for p in self.parameters():
      if p.dim() > 1:
        nn.init.xavier_uniform_(p)

  def forward(self, src, pos, query_pos):

    enc_output = src
    for encoder_layer in self.encoder_layers:
      enc_output = encoder_layer(enc_output, pos)

    tgt = torch.zeros_like(query_pos)
    intermediate = []
    dec_output = tgt
    for decoder_layer in self.decoder_layers:
      dec_output = decoder_layer(dec_output, enc_output, pos, query_pos)
      if self.return_intermediate:
        intermediate.append(dec_output)

    if self.return_intermediate:
      output = torch.stack(intermediate)
    else:
      output = dec_output.unsqueeze(0)

    return output

### Multi Layer Perceptron Layer

In [122]:
class MLP(nn.Module):
  """ Very simple multi-layer perceptron """

  def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
    super().__init__()
    self.num_layers = num_layers
    h = [hidden_dim] * (num_layers - 1)
    self.layers = nn.ModuleList(nn.Linear(n, k) for n, k in zip([input_dim] + h, h + [output_dim]))

  def forward(self, x):
    for i, layer in enumerate(self.layers):
      x = F.relu(layer(x)) if i < self.num_layers - 1 else layer(x)
    return x

### DETR
returns the classes and bounding boxes

In [162]:
class DETR(nn.Module):
  def __init__(self, backbone, transformer, num_classes, num_queries, aux_loss=False):
    super().__init__()

    self.num_queries = num_queries
    self.transformer = transformer
    self.hidden_dim = transformer.d_model
    self.class_embed = nn.Linear(self.hidden_dim, num_classes + 1)
    self.bbox_embed = MLP(self.hidden_dim, self.hidden_dim, 4, 3)
    self.query_embed = nn.Embedding(num_queries, self.hidden_dim)
    self.input_proj = nn.Conv2d(backbone.num_channels, self.hidden_dim, kernel_size=1)
    self.backbone = backbone
    self.position = SinePositionalEncoding(self.hidden_dim)
    self.aux_loss = aux_loss

  def forward(self, x):
    if self.backbone.return_interm_layers:
      features = self.backbone(x)['3']
    else:
      features = self.backbone(x)['0']

    x = self.input_proj(features)
    bs, _, _, _ = x.shape

    src = x.view(bs, -1, self.hidden_dim)
    pos = self.position(src)
    query_pos = self.query_embed.weight.unsqueeze(0)

    hs = self.transformer(src, pos, query_pos)

    outputs_class = self.class_embed(hs)
    outputs_coord = self.bbox_embed(hs).sigmoid()
    out = {'pred_logits': outputs_class[-1], 'pred_boxes': outputs_coord[-1]}
    if self.aux_loss:
      out['aux_outputs'] = self._set_aux_loss(outputs_class, outputs_coord)
    return out

  def _set_aux_loss(self, outputs_class, outputs_coord):
    # this is a workaround to make torchscript happy, as torchscript
    # doesn't support dictionary with non-homogeneous values, such
    # as a dict having both a Tensor and a list.
    return [{'pred_logits': a, 'pred_boxes': b}
            for a, b in zip(outputs_class[:-1], outputs_coord[:-1])]


In [136]:
backbone = Backbone('resnet50', False, False, False)



In [156]:
d_model = 512
num_heads = 8
num_encoder_layers = 6
num_decoder_layers = 6
d_ffn = 2048
return_intermediate=False

In [157]:
transformer = Transformer(d_model, num_heads, num_encoder_layers, num_decoder_layers, d_ffn, return_intermediate)

In [158]:
num_classes = 91
num_queries = 100

In [164]:
detr = DETR(backbone, transformer, num_classes, num_queries)

In [165]:
x_test = torch.rand(2, 3, 800, 1000)

In [166]:
y = detr(x_test)

In [167]:
y['pred_logits'].shape

torch.Size([2, 100, 92])

In [168]:
y['pred_boxes'].shape

torch.Size([2, 100, 4])