In [1]:
!pip install timm

Collecting timm
  Downloading timm-0.5.4-py3-none-any.whl (431 kB)
[?25l[K     |▊                               | 10 kB 21.2 MB/s eta 0:00:01[K     |█▌                              | 20 kB 11.7 MB/s eta 0:00:01[K     |██▎                             | 30 kB 9.3 MB/s eta 0:00:01[K     |███                             | 40 kB 8.6 MB/s eta 0:00:01[K     |███▉                            | 51 kB 5.2 MB/s eta 0:00:01[K     |████▋                           | 61 kB 5.7 MB/s eta 0:00:01[K     |█████▎                          | 71 kB 5.7 MB/s eta 0:00:01[K     |██████                          | 81 kB 6.3 MB/s eta 0:00:01[K     |██████▉                         | 92 kB 5.0 MB/s eta 0:00:01[K     |███████▋                        | 102 kB 5.4 MB/s eta 0:00:01[K     |████████▍                       | 112 kB 5.4 MB/s eta 0:00:01[K     |█████████▏                      | 122 kB 5.4 MB/s eta 0:00:01[K     |█████████▉                      | 133 kB 5.4 MB/s eta 0:00:01[K     |█

In [2]:
import os
import time
import math
import cv2
import timm
import torch as T
import numpy as np
import pandas as pd
from torch import nn
from torch.nn import functional as F
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler, SequentialSampler
from torchvision import transforms
from copy import deepcopy

device = T.device('cuda' if T.cuda.is_available() else 'cpu')

class Encoder(nn.Module):
    def __init__(self, backbone = 'resnet18', device = 'cuda'):
        super(Encoder, self).__init__()
        self.backbone = timm.create_model(backbone, pretrained = True)
        self.List = list(self.backbone.children())[:-2]
        self.device = device
    def forward(self,X):
        outputs = []
        X = X.float()
        for i,layer in enumerate(self.List):
            X = layer(X)
            if i>1:
                outputs.append(X)
        return outputs
 
class objdet_Decoder(nn.Module):
    '''series of convs ==> final output heatmap'''
    def __init__(self, n_classes, stride = 2, device = 'cuda'):
        super(objdet_Decoder, self).__init__()
        self.upsample = nn.Upsample(scale_factor=2, mode = 'bilinear')
        self.conv1 = nn.Conv2d(512,256,(3,3),padding = 1)  # 16
        self.conv2 = nn.Conv2d(256,128,(3,3),padding = 1)  #32
        self.conv3 = nn.Conv2d(128,64,(3,3),padding = 1) #64
        self.conv4 = nn.Conv2d(64,32,(3,3),padding = 1) #128
        self.hmap = nn.Conv2d(32,n_classes,(1,1)) #128
        self.regs = nn.Conv2d(32,2,(1,1))
        self.w_h_ = nn.Conv2d(32,2,(1,1))
        self.device = device

    def forward(self,X):
        X = self.upsample(X[-1])
        X = F.relu(self.conv1(X))
        X = self.upsample(X)
        X = F.relu(self.conv2(X))
        X = self.upsample(X)
        X = F.relu(self.conv3(X))
        X = self.upsample(X)
        X = F.relu(self.conv4(X))
        return [[T.sigmoid(self.hmap(X)), T.sigmoid(self.regs(X)), T.sigmoid(self.w_h_(X))]]
        
        
class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""
 
    def __init__(self, in_channels, out_channels, mid_channels=None):
        super(DoubleConv,self).__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
 
    def forward(self, x):
        return self.double_conv(x)
        
class up(nn.Module):
    '''down samling--->double conv'''
    def __init__(self,in_channels, out_channels,last_layer=False):
        super(up,self).__init__()
        self.upsample = nn.Upsample(scale_factor=2, mode = 'bilinear')
        if last_layer:
            self.conv = DoubleConv(in_channels*2,out_channels)
        else:
            self.conv = DoubleConv(in_channels*3//2,out_channels)   #since we are concatenating 
    def forward(self,x1,x2):
        x1 = self.upsample(x1)
        X = T.cat([x1,x2],dim=1)
        X = self.conv(X)
        return X
        
class seg_decoder(nn.Module):
    def __init__(self, n_classes = 23, device="cuda"):
        super(seg_decoder, self).__init__()
        
        self.up1 = up(512,256)
        self.up2 = up(256,128)
        self.up3 = up(128,64)
        self.up4 = up(64,32,last_layer=True)
        self.out_conv = nn.Conv2d(32,n_classes,(3,3),padding=1)
        self.device = device
    
    def forward(self,outputs):
        X = self.up1(outputs[-1],outputs[-2])
        X = self.up2(X,outputs[-3])
        X = self.up3(X,outputs[-4])
        X = self.up4(X,outputs[-6])
        X = self.out_conv(X)
        return X
     
class MTL_Model(nn.Module):
    def __init__(self,n_classes = 35,device='cuda'):
        super(MTL_Model,self).__init__()
        self.encoder = Encoder(device=device)
        self.seg_decoder = seg_decoder(n_classes ,device=device)
        self.dep_decoder = seg_decoder(n_classes = 1,device=device)
        self.obj_decoder = objdet_Decoder(n_classes = 15,device=device)
        self.to(device)
        
    def forward(self,X):
        outputs = self.encoder(X)
        seg_maps = self.seg_decoder(outputs)
        depth_maps = self.dep_decoder(outputs)
        detection_maps = self.obj_decoder(outputs)
        return (seg_maps, T.sigmoid(depth_maps),detection_maps)

PALETTE = {
    (128, 64,128)  : 0 , #'road' 
    (250,170,160) : 1 , #'parking'  
    ( 81,  0, 81) : 2 ,#drivable fallback
    (244, 35,232) : 3 , #sidewalk
    (230,150,140) : 4 , #rail track
    (152,251,152) : 5 ,#non-drivable fallback
    (220, 20, 60) : 6 ,#person
    (246, 198, 145) : 7 ,#animal
    (255,  0,  0) : 8 , #rider
    (  0,  0,230) : 9 ,#motorcycle
    (119, 11, 32) : 10 ,  #bicycle
    (255, 204, 54) : 11,#autorickshaw
    (  0,  0,142) : 12,  #car
    (  0,  0, 70) : 13, #truck
    (  0, 60,100) : 14,    #bus
    (  0,  0, 90) : 15,#caravan
    (  0,  0,110) : 16,#trailer
    (  0, 80,100) : 17,#train
    (136, 143, 153) : 18,#vehicle fallback
    (220, 190, 40) : 19,#curb
    (102,102,156) : 20,#wall
    (190,153,153) : 21,#fence
    (180,165,180) : 22,#guard rail
    (174, 64, 67) : 23,#billboard
    (220,220,  0) : 24,#traffic sign
    (250,170, 30) : 25,#traffic light
    (153,153,153) : 26,#pole
    (169, 187, 214) : 27,#obs-str-bar-fallback
    ( 70, 70, 70) : 28,#building
    (150,100,100) : 29,#bridge
    (150,120, 90) : 30,#tunnel
    (107,142, 35) : 31,#vegetation
    ( 70,130,180) : 32,#sky
    (169, 187, 214) : 33,#fallback background
    (  0,  0,  0) : 34#unlabeled
}

def convert_from_color_segmentation(arr_3d):
    arr_3d = np.array(arr_3d)
    arr_2d = np.zeros((arr_3d.shape[0], arr_3d.shape[1]), dtype=np.uint8)
    palette = PALETTE
    for i in range(0, arr_3d.shape[0]):
        for j in range(0, arr_3d.shape[1]):
            key = (arr_3d[i, j, 2], arr_3d[i, j, 1], arr_3d[i, j, 0])
            arr_2d[i, j] = palette.get(key,34) # default value if key was not found is 0

    return arr_2d

def labels_to_cityscapes_palette(array):
    result = np.zeros((array.shape[0], array.shape[1], 3))
    for value, key in PALETTE.items():
        result[np.where(array == key)] = (value[2],value[1],value[0])
    return result/255

def to_one_hot(mask, n_classes=35):
    one_hot = np.zeros((mask.shape[0], mask.shape[1], n_classes))
    for i, unique_value in enumerate(np.unique(mask)):
        one_hot[:, :, unique_value][mask == unique_value] = 1
    return one_hot

input_size_x,input_size_y = (640, 480)
MODEL_SCALE = 2

def _gather_feature(feat, ind, mask=None):
    dim = feat.size(2)
    ind = ind.unsqueeze(2).expand(ind.size(0), ind.size(1), dim)
    feat = feat.gather(1, ind)
    if mask is not None:
        mask = mask.unsqueeze(2).expand_as(feat)
        feat = feat[mask]
        feat = feat.view(-1, dim)
    return feat

def _tranpose_and_gather_feature(feat, ind):
    feat = feat.permute(0, 2, 3, 1).contiguous()
    feat = feat.view(feat.size(0), -1, feat.size(3))
    feat = _gather_feature(feat, ind)
    return feat

In [4]:
model = MTL_Model()

Downloading: "https://download.pytorch.org/models/resnet18-5c106cde.pth" to /root/.cache/torch/hub/checkpoints/resnet18-5c106cde.pth


In [5]:
# model.load_state_dict(T.load("/content/model_v3-1.8697537092062144.pth",map_location=device))
model.eval()

MTL_Model(
  (encoder): Encoder(
    (backbone): ResNet(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (act1): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): BasicBlock(
          (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (act1): ReLU(inplace=True)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (act2): ReLU(inplace=True)
        )
        (1): BasicBlock(
          (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)

In [6]:
def showbox(img, hm, off,regr,box_=None):
    sample = cv2.resize(img,(640, 480))
    boxes = ctdet_decode(hm,off,regr)
    
    classes = {0:"bicycle",1:"bus",2:"traffic sign",3:"train",4:"motorcycle",5:"car",6:"traffic light",7:"person",8:"vehicle fallback",9:"truck",10:"autorickshaw",11:"animal",12:"caravan",13:"rider",14:"trailer"}
    font = cv2.FONT_HERSHEY_SIMPLEX
    fontScale = 0.5
    thickness = 1
    color =(250, 0, 0)
    for box in boxes:
        cv2.rectangle(sample,
                      (int(box[0]-(box[2]/2)), int(box[1]-(box[3]/2))),
                      (int(box[0]+(box[2]/2)), int(box[1]+(box[3]/2))),
                      color, 2)
    return sample

def _nms(heat, kernel=7):
    hmax = F.max_pool2d(heat, kernel, stride=1, padding=(kernel - 1) // 2)
    keep = (hmax == heat).float()
    return heat * keep


def _topk(scores, K=40, threshold=0.2):
    batch, cat, height, width = scores.size()

    topk_scores, topk_inds = T.topk(scores.view(batch, cat, -1), K)

    topk_inds = topk_inds % (height * width)
    topk_ys = (topk_inds / width).int().float()
    topk_xs = (topk_inds % width).int().float()

    topk_score, topk_ind = T.topk(topk_scores.view(batch, -1), K)
    topk_clses = (topk_ind / K).int()
    topk_inds = _gather_feature(topk_inds.view(batch, -1, 1), topk_ind).view(batch, K)
    topk_ys = _gather_feature(topk_ys.view(batch, -1, 1), topk_ind).view(batch, K)
    topk_xs = _gather_feature(topk_xs.view(batch, -1, 1), topk_ind).view(batch, K)
    mask = T.where(topk_score>threshold, True, False)
    return topk_score[:,mask[0]], topk_inds[:,mask[0]], topk_clses[:,mask[0]], topk_ys[:,mask[0]], topk_xs[:,mask[0]], len(topk_score[:,mask[0]][0])


def ctdet_decode(hmap, regs, w_h_, K=40):
    batch, cat, height, width = hmap.shape
    batch = 1
    input_size_x = 640
    input_size_y = 480
    hmap = _nms(hmap)  # perform nms on heatmaps

    scores, inds, clses, ys, xs, M = _topk(hmap, K=K)
    regs = _tranpose_and_gather_feature(regs, inds)
    regs = regs.view(batch, M, 2)
    xs = xs.view(batch, M, 1)*MODEL_SCALE + regs[:, :, 0:1]
    ys = ys.view(batch, M, 1)*MODEL_SCALE + regs[:, :, 1:2]
    w_h_ = _tranpose_and_gather_feature(w_h_, inds)
    w_h_ = w_h_.view(batch, M, 2)

    clses = clses.view(batch, M, 1).float()
    scores = scores.view(batch, M, 1)
    bboxes = T.cat([xs ,ys  ,w_h_[..., 0:1]*input_size_x ,w_h_[..., 1:2]*input_size_y ], dim=2)
    detections = T.cat([bboxes, scores, clses], dim=2)
    return detections.cpu().numpy()[0]

In [None]:
class MTL_TEST_DETECTOR(Dataset):
    def __init__(self, filename=None, input_size=(640, 480), output_size=(320, 240)):
        super().__init__()
        self.filename = filename
        self.dataset = pd.read_csv(self.filename)
        self.input_size = input_size
        self.output_size = output_size
        self.input_size_x = self.input_size[0]
        self.input_size_y = self.input_size[1]
        self.MODEL_SCALE = self.input_size[0]//self.output_size[0]
        self.preprocess = transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
        self.resize1 = transforms.Compose([transforms.Resize(self.input_size)])
        self.resize2 = transforms.Compose([transforms.Resize(self.output_size)])


    def __len__(self): return len(self.dataset)
    
    def __getitem_internal__(self, idx, preprocess=True):
        target = self.dataset.iloc[idx]
        rgb_image = cv2.imread(target["Path"])
        height, width, channels = rgb_image.shape
        rgb_image = cv2.resize(rgb_image,self.input_size)
        boxes = literal_eval(target["bbox"])
        b = []
        for i,a in enumerate(boxes):
            box_ = a["bbox"]
            box = convert(box_,width,height)
            b.append(box)
        if preprocess:
            rgb_image = self.preprocess(np.array(rgb_image))
        else:
            rgb_image = transforms.ToTensor()(np.array(rgb_image))
        return (rgb_image, b)

    def __getitem__(self, idx):
        return self.__getitem_internal__(idx, True)
    
    def raw(self, idx):
        return self.__getitem_internal__(idx, False)

In [7]:
cap = cv2.VideoCapture(0)

while cap.isOpened():
  ret, frame = cap.read()
  if cv2.waitKey(1) & 0xFF == ord('q'):
    break
cap.release()
cv2.destroyAllWindows()

In [None]:
# test_dataloader = MTL_TEST_DETECTOR("/home/b170007ec/Programs/MTL/DSD_MTL/Dataset/val_dataset.csv")
# device = T.device("cpu")
# j=10
# for i in range(j,j+30,5):
#     matrix = []
#     for a in range(5):
#         rgb ,box = test_dataloader[i+a]
#         rgb_raw,_ = test_dataloader.raw(i+a)
#         rgb_raw = rgb_raw.permute(1,2,0)
#         rgb = T.unsqueeze(rgb, 0)
#         rgb = rgb.to(device)
#         y_pred = model(rgb)
#         y_pred_ = F.softmax(y_pred[0],dim=1)
#         pseg = T.squeeze(y_pred_,0)
#         pseg = T.argmax(pseg, dim=0)
#         pdepth = T.squeeze(y_pred[1],0)
#         pdepth = pdepth.permute(1,2,0)
#         pdepth = pdepth.reshape(240,320)
#         hmap, regs, w_h_ = zip(*y_pred[2])
#         d = showbox(rgb_raw.numpy(), hmap[0].detach(), regs[0].detach(),w_h_[0].detach())
#         matrix.append([rgb_raw.numpy(),labels_to_cityscapes_palette(pseg.cpu().detach().numpy()),pdepth.cpu().detach().numpy(),d])
    
#     fig, ax = plt.subplots(5, 4,figsize=(40,40))
#     for k in range(5):
#         for j in range(4):
#             ax[k][j].imshow(matrix[k][j])
#             ax[k][j].set_xticks([])
#             ax[k][j].set_yticks([])
#     plt.savefig("END_TO_END_MTL_{}".format(i))
#     plt.show()

In [None]:
# import cv2
# fps = 30
# preprocess = transforms.Compose([
#                 transforms.ToTensor(),
#                 transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
#         ])
# video_capture = cv2.VideoCapture("/home/b170007ec/Programs/MTL/DSD_MTL/video.mp4")
# size=(1280,960)
# frames = 0
# model.eval()
# while(video_capture.isOpened()):
#     ret, frame = video_capture.read()
#     if frames == 110:
#         rgb_raw = cv2.resize(frame,(640, 480))
#         rgb = preprocess(rgb_raw)
#         rgb = T.unsqueeze(rgb, 0)
#         rgb = rgb.to(device)
#         y_pred = model(rgb)
#         y_pred_ = F.softmax(y_pred[0],dim=1)
#         pseg = T.squeeze(y_pred_,0)
#         pseg = T.argmax(pseg, dim=0)
#         pseg = labels_to_cityscapes_palette(pseg.cpu().detach().numpy())
#         pdepth = T.squeeze(y_pred[1],0)
#         pdepth = pdepth.permute(1,2,0)
#         pdepth = pdepth.reshape(240,320)

#         hmap, regs, w_h_ = zip(*y_pred[2])
#         d = showbox(rgb_raw, hmap[0].detach(), regs[0].detach(),w_h_[0].detach())
#         fig = plt.figure(figsize =(8, 8))
#         plt.imshow(rgb_raw)
#         plt.tick_params(left = False, right = False , labelleft = False ,
#                 labelbottom = False, bottom = False)
#         plt.savefig("rgb.png")
#         plt.show()
#         fig = plt.figure(figsize =(8, 8))
#         plt.imshow(pseg)
#         plt.tick_params(left = False, right = False , labelleft = False ,
#                 labelbottom = False, bottom = False)
#         plt.savefig("seg.png")
#         plt.show()
#         fig = plt.figure(figsize =(8, 8))
#         plt.imshow(pdepth.cpu().detach().numpy(),cmap='magma')
#         plt.tick_params(left = False, right = False , labelleft = False ,
#                 labelbottom = False, bottom = False)
#         plt.savefig("dep.png")
#         plt.show()
#         fig = plt.figure(figsize =(8, 8))
#         plt.imshow(d)
#         plt.tick_params(left = False, right = False , labelleft = False ,
#                 labelbottom = False, bottom = False)
#         plt.savefig("det.png")
#         plt.show()
#         break
#     frames += 1