In [1]:
import sys, os
sys.path.append("..")
from dataloader import Dataload
from utils.plot import plot_rect
from torchsummary import summary
import torch
import cv2
import matplotlib.pyplot as plt

In [2]:
import torch
from torch import nn
import torch.nn.functional as F
def conv_block(in_channel, out_channel):
    layer = nn.Sequential(
        nn.BatchNorm2d(in_channel),
        nn.ReLU(),
        nn.Conv2d(in_channel, out_channel, kernel_size=3, padding=1, bias=False)
    )
    return layer

class dense_block(nn.Module):
    def __init__(self, in_channel, growth_rate, num_layers):
        super(dense_block, self).__init__()
        block = []
        channel = in_channel
        for i in range(num_layers):
            block.append(conv_block(channel, growth_rate))
            channel += growth_rate
        self.net = nn.Sequential(*block)
    def forward(self, x):
        for layer in self.net:
            out = layer(x)
            x = torch.cat((out, x), dim=1)
        return x

def transition(in_channel, out_channel):
    trans_layer = nn.Sequential(
        nn.BatchNorm2d(in_channel),
        nn.ReLU(),
        nn.Conv2d(in_channel, out_channel, 1),
        nn.AvgPool2d(2, 2)
    )
    return trans_layer

class densenet(nn.Module):
    def __init__(self, in_channel, num_classes, growth_rate=32, block_layers=[6, 12, 24, 16], need_return_dic = True):
        super(densenet, self).__init__()
        self.need_return_dict = need_return_dic
        self.block1 = nn.Sequential(

            nn.Conv2d(in_channel, 64, 7, 2, 3),
            nn.BatchNorm2d(64),
            nn.ReLU(True),
            nn.MaxPool2d(3, 2, padding=1)
            )
        self.DB1 = self._make_dense_block(64, growth_rate,num=block_layers[0])
        self.TL1 = self._make_transition_layer(256)
        self.DB2 = self._make_dense_block(128, growth_rate, num=block_layers[1])
        self.TL2 = self._make_transition_layer(512)
        self.DB3 = self._make_dense_block(256, growth_rate, num=block_layers[2])
        self.TL3 = self._make_transition_layer(1024)
        self.DB4 = self._make_dense_block(512, growth_rate, num=block_layers[3])
        self.global_average = nn.Sequential(
            nn.BatchNorm2d(1024),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1,1)),
        )
        self.classifier = nn.Linear(1024, num_classes)
        #self.ea = nn.Linear(1024,2)
    def build_results(self,x):
        return {
            "pred_logits":x,
        }
    def forward(self, x):
        x = self.block1(x)
        x = self.DB1(x)
        x = self.TL1(x)
        x = self.DB2(x)
        x = self.TL2(x)
        x = self.DB3(x)
        x = self.TL3(x)
        x = self.DB4(x)
        x = self.global_average(x)
        x = x.view(x.shape[0], -1)
        #print(x.size())
        #a = self.ea(x)
        #print(a.size())
        x = self.classifier(x)
        #print(x.size())
        return self.build_results(x) if(self.need_return_dict) else x

    def _make_dense_block(self,channels, growth_rate, num):
        block = []
        block.append(dense_block(channels, growth_rate, num))
        channels += num * growth_rate

        return nn.Sequential(*block)
    def _make_transition_layer(self,channels):
        block = []
        block.append(transition(channels, channels // 2))
        return nn.Sequential(*block)

class MLP(nn.Module):
    """ Very simple multi-layer perceptron (also called FFN)"""
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
        super().__init__()
        self.num_layers = num_layers
        h = [hidden_dim] * (num_layers - 1)
        self.layers = nn.ModuleList(nn.Linear(n, k) for n, k in zip([input_dim] + h, h + [output_dim]))
    def forward(self, x):
        for i, layer in enumerate(self.layers):
            x = F.relu(layer(x)) if i < self.num_layers - 1 else layer(x)
        x = x.view(x.shape[0], -1, 4)
        return x
    
class ShareMLP(MLP):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
        super(ShareMLP, self).__init__(input_dim, hidden_dim, output_dim, num_layers)
        self.num_layers = num_layers
        h = [hidden_dim] * (num_layers - 1)
        self.layers = nn.ModuleList(nn.Linear(n, k) for n, k in zip([input_dim] + h, h + [output_dim]))
    def forward(self, x):
        for i, layer in enumerate(self.layers):
            x_src = x
            x = F.relu(layer(x)) if i < self.num_layers - 1 else layer(x)
            if i < self.num_layers - 1:
                x = x + x_src
        x = x.view(x.shape[0], -1, 4)
        return x
    
class DenseCoord(densenet):
    def __init__(self, in_channel, num_classes, num_queries = 25,growth_rate = 32, block_layers=[6, 12, 24, 16], need_return_dic = True):

        super(DenseCoord,self).__init__(in_channel, num_classes, growth_rate=growth_rate, block_layers=block_layers,
                                        need_return_dic = need_return_dic)
        self.num_classes = num_classes + 1
        self.class_embed = nn.Linear(1024, self.num_classes * num_queries)
        self.bbox_embed = MLP(1024, 1024, 4 * num_queries, 3)
        #self.bbox_embed = ShareMLP(1024, 1024, 4 * num_queries, 3)
        
    def build_results(self,x,y):
        return {
            "pred_logits":x,
            "pred_boxes":y,
        }
    def feature(self, x):
        x = self.block1(x)
        x = self.DB1(x)
        x = self.TL1(x)
        x = self.DB2(x)
        x = self.TL2(x)
        x = self.DB3(x)
        x = self.TL3(x)
        x = self.DB4(x)
        x = self.global_average(x)
        x = x.view(x.shape[0], -1)
        return x
    def forward(self, x):
        feature_map = self.feature(x)
        print(feature_map.shape)
        class_feature = self.class_embed(feature_map)
        print(class_feature.shape)
        outputs_class = class_feature.view(class_feature.shape[0], -1, self.num_classes)    # one-hot
        outputs_coord = self.bbox_embed(feature_map).sigmoid()
        print(outputs_class.shape)
        print(outputs_coord.shape)
        return self.build_results(outputs_class, outputs_coord) if (self.need_return_dict) else [outputs_class,outputs_coord]

In [3]:
model = DenseCoord(3, 8)

In [4]:
image = torch.zeros((2,3,128,128))
d = model(image)

torch.Size([2, 1024])
torch.Size([2, 225])
torch.Size([2, 25, 9])
torch.Size([2, 25, 4])


In [5]:
d['pred_boxes'].shape

torch.Size([2, 25, 4])

In [6]:
d['pred_logits'].shape

torch.Size([2, 25, 9])

In [31]:
costCross = torch.nn.CrossEntropyLoss()
l2 = torch.nn.MSELoss()

In [16]:
label = torch.zeros((2,25,5))

In [17]:
label.shape

torch.Size([2, 25, 5])

In [59]:
label.unsqueeze(0)

RuntimeError: expand(torch.FloatTensor{[2, 25, 5]}, size=[1]): the number of sizes provided (1) must be greater or equal to the number of dimensions in the tensor (3)

In [26]:
label[:,:,0][0][1]

tensor(0.)

In [34]:
pred = d['pred_logits'].view(-1, 9)

In [35]:
pred.shape

torch.Size([50, 9])

In [81]:
Conv2d = nn.Sequential(
            nn.Conv2d(256,64,2,2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
)

Conv2d(32, 2, kernel_size=(2, 2), stride=(2, 2))

In [52]:
image = torch.zeros([2, 256, 32, 32])

In [158]:
class BoxEmbed(nn.Module):
    def __init__(self, model_in_channel, W, H, num_require = 25, coord_number = 4):
        super(BoxEmbed,self).__init__()
        conv_list = []
        self.coord_number = coord_number
        channel = model_in_channel
        for i in range( int(channel/32) -1 ):
            W = W/2
            if( W > 4 ):
                kernal_size = 2
                stride = 2
            else:
                kernal_size = 1
                stride = 1
            if(channel > 32):
                channel = int(channel/2)
                in_channel = 2*channel
                out_channel = channel
            else:
                in_channel = channel
                out_channel = channel
            conv_list.append(nn.Conv2d(in_channel,out_channel,kernal_size,stride))
            conv_list.append(nn.BatchNorm2d(out_channel))
            conv_list.append(nn.ReLU())
        self.conv_block = nn.Sequential(*conv_list)
        self.embed = nn.Conv1d(512, num_require,1)
    def forward(self, x):
        x = self.conv_block(x)
        x = x.view(x.shape[0],-1, self.coord_number)  # [Batch,512,4]
        x = self.embed(x)
        return x

In [161]:
batch_image = torch.zeros((2, 128, 8, 8))
B,C,W,H = batch_image.shape
b = BoxEmbed(C,W,H,25,4)
b(batch_image).shape

torch.Size([2, 25, 4])

In [147]:
batch_image = torch.zeros((2,512,4))
batch_image.shape

torch.Size([2, 512, 4])

In [153]:
d = nn.Conv1d(512, 25,1)

In [154]:
d(batch_image).shape

torch.Size([2, 25, 4])

In [140]:
b(batch_image)

torch.Size([2, 32, 8, 8])
torch.Size([2, 512, 4])


RuntimeError: mat1 and mat2 shapes cannot be multiplied (1024x4 and 512x25)

In [128]:
d.view(d.shape[0],-1,4).shape

torch.Size([2, 512, 4])

In [175]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
 
#ResNet的基本Bottleneck类
class Bottleneck(nn.Module):
    expansion=4#通道倍增数
    def __init__(self,in_planes,planes,stride=1,downsample=None):
        super(Bottleneck,self).__init__()
        self.bottleneck=nn.Sequential(
            nn.Conv2d(in_planes,planes,1,bias=False),
            nn.BatchNorm2d(planes),
            nn.ReLU(inplace=True),
            nn.Conv2d(planes,planes,3,stride,1,bias=False),
            nn.BatchNorm2d(planes),
            nn.ReLU(inplace=True),
            nn.Conv2d(planes,self.expansion*planes,1,bias=False),
            nn.BatchNorm2d(self.expansion*planes),
        )
        self.relu=nn.ReLU(inplace=True)
        self.downsample=downsample
    def forward(self,x):
        identity=x
        out=self.bottleneck(x)
        if self.downsample is not None:
            identity=self.downsample(x)
        out+=identity
        out=self.relu(out)
        return out


class FPN(nn.Module):
    def __init__(self,in_channel = 3, layers = [2,2,2,2]):
        super(FPN,self).__init__()
        self.inplanes=64
        #处理输入的C1模块（C1代表了RestNet的前几个卷积与池化层）
        self.conv1=nn.Conv2d(in_channel,64,7,2,3,bias=False)
        self.bn1=nn.BatchNorm2d(64)
        self.relu=nn.ReLU(inplace=True)
        self.maxpool=nn.MaxPool2d(3,2,1)
        #搭建自下而上的C2，C3，C4，C5
        self.layer1=self._make_layer(64,layers[0])
        self.layer2=self._make_layer(128,layers[1],2)
        self.layer3=self._make_layer(256,layers[2],2)
        self.layer4=self._make_layer(512,layers[3],2)
        #对C5减少通道数，得到P5
        self.toplayer=nn.Conv2d(2048,256,1,1,0)
        #3x3卷积融合特征
        self.smooth1=nn.Conv2d(256,256,3,1,1)
        self.smooth2=nn.Conv2d(256,256,3,1,1)
        self.smooth3=nn.Conv2d(256,256,3,1,1)
        #横向连接，保证通道数相同
        self.latlayer1=nn.Conv2d(1024,256,1,1,0)
        self.latlayer2=nn.Conv2d(512,256,1,1,0)
        self.latlayer3=nn.Conv2d(256,256,1,1,0)
        
    def _make_layer(self,planes,blocks,stride=1):
        downsample=None
        if stride!=1 or self.inplanes != Bottleneck.expansion * planes:
            downsample=nn.Sequential(
                nn.Conv2d(self.inplanes,Bottleneck.expansion*planes,1,stride,bias=False),
                nn.BatchNorm2d(Bottleneck.expansion*planes)
            )
        layers=[]
        layers.append(Bottleneck(self.inplanes,planes,stride,downsample))
        self.inplanes=planes*Bottleneck.expansion
        for i in range(1,blocks):
            layers.append(Bottleneck(self.inplanes,planes))
        return nn.Sequential(*layers)
    
    #自上而下的采样模块
    def _upsample_add(self,x,y):
        _,_,H,W=y.shape
        return F.upsample(x,size=(H,W),mode='bilinear')+y
    def forward(self,x):
        #自下而上
        c1=self.maxpool(self.relu(self.bn1(self.conv1(x))))
        c2=self.layer1(c1)
        c3=self.layer2(c2)
        c4=self.layer3(c3)
        c5=self.layer4(c4)
        #自上而下
        p5=self.toplayer(c5)
        p4=self._upsample_add(p5,self.latlayer1(c4))
        p3=self._upsample_add(p4,self.latlayer2(c3))
        p2=self._upsample_add(p3,self.latlayer3(c2))
        #卷积的融合，平滑处理
        p4=self.smooth1(p4)
        p3=self.smooth2(p3)
        p2=self.smooth3(p2)
        return p2,p3,p4,p5

class CBL(nn.Module):
    def __init__(self, in_channel, out_channel, kernal_size = 3, stride = 1, padding = 1):
        super(CBL,self).__init__()
        self.cblblock = nn.Sequential(
            nn.Conv2d(in_channel,out_channel,kernal_size,stride,padding),
            nn.BatchNorm2d(out_channel),
            nn.LeakyReLU(),
        )
    def forward(self,x):
        return self.cblblock(x)

class BoxEmbed(nn.Module):
    def __init__(self, model_in_channel, W,num_require = 25, coord_number = 4):
        super(BoxEmbed,self).__init__()
        conv_list = []
        self.coord_number = coord_number
        channel = model_in_channel
        for i in range( int(channel/32) -1 ):
            W = W/2
            if( W > 4 ):
                kernal_size = 2
                stride = 2
            else:
                kernal_size = 1
                stride = 1
            if(channel > 32):
                channel = int(channel/2)
                in_channel = 2*channel
                out_channel = channel
            else:
                in_channel = channel
                out_channel = channel
            conv_list.append(nn.Conv2d(in_channel,out_channel,kernal_size,stride))
            conv_list.append(nn.BatchNorm2d(out_channel))
            conv_list.append(nn.ReLU())
        self.conv_block = nn.Sequential(*conv_list)
        self.embed = nn.Conv1d(512, num_require,1)
    def forward(self, x):
        x = self.conv_block(x)
        x = x.view(x.shape[0],-1, self.coord_number)  # [Batch,512,4]
        x = self.embed(x)
        return x
        
    
class MixFpn(nn.Module):
    def __init__(self,in_channel = 3, layers = [2,2,2,2], num_class = 2, num_require = 25, need_return_dict = True):
        super(MixFpn,self).__init__()
        self.fpn = FPN(in_channel, layers)
        self.conv1 = nn.Sequential(
            nn.Conv2d(256,128,3,1,1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
        )
        self.hidden = 128
        self.cbl_same = CBL(self.hidden,self.hidden)
        self.cbl_down1 = CBL(2*self.hidden,self.hidden)
        self.cbl_down = CBL(self.hidden,self.hidden,2,2,0)
        self.conv = nn.Sequential(
            nn.Conv2d(2*self.hidden,64,2,2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
        )
        self.num_require = num_require
        self.need_return_dict = need_return_dict
        self.softmax = nn.Linear(1024, num_require * (num_class + 1) * 2)
        self.class_embed = nn.Linear(num_require * (num_class + 1) * 2, num_require * (num_class + 1))
        self.box_embed1 = BoxEmbed(self.hidden*2, 32, num_require=num_require)
        self.box_embed2 = BoxEmbed(self.hidden*2, 16, num_require=num_require)
        self.box_embed3 = BoxEmbed(self.hidden*2, 8, num_require=num_require)
    def _upsample(self, x, H, W):
        return F.upsample(x,size=(H,W),mode='bilinear')
    def _upsample_add(self,x,y):
        _,_,H,W=y.shape
        return self._upsample(x,H,W)+y
    def feature(self, x):
        p2,p3,p4,p5 = self.fpn(x)
        B,C,H,W = p2.shape
        
        for i in range(int(C/self.hidden)-1):
            p2 = self.conv1(p2)
            p3 = self.conv1(p3)
            p4 = self.conv1(p4)
            p5 = self.conv1(p5)
         
        x1 = torch.cat([self.cbl_same(p2), self._upsample(p5,H,W)], dim=1)
        feature1 = self.cbl_down1(x1)
        x2 = torch.cat([p3, self.cbl_down(feature1)],dim=1)
        feature2 = self.cbl_down1(x2)
        x3 = torch.cat([p4, self.cbl_down(feature2)],dim=1)
        return x1,x2,x3
    def build_results(self,x,y):
        return {
            "pred_logits":x,
            "pred_boxes":y,
        }
    def forward(self,x):
        x = self.feature(x)
        box_coord_1 = self.box_embed1(x[0])
        box_coord_2 = self.box_embed2(x[1])
        box_coord_3 = self.box_embed3(x[2])
        pred_coord = (box_coord_1 * box_coord_2 * box_coord_3).sigmoid()
        x = self.conv(x[-1])
        x = x.view(x.shape[0],-1)
        x = self.softmax(x)
        pred_class = self.class_embed(x)
        return self.build_results(pred_class,pred_coord) if(self.need_return_dict) else [pred_class,pred_coord]

In [176]:
batch_image = torch.zeros(2,3,128,128)

In [178]:
m = MixFpn(3,[2,2,2,2],8)

In [179]:
d = m(batch_image)



In [180]:
d

{'pred_logits': tensor([[ 4.0452e-01,  4.0035e-01,  5.7632e-01,  1.5997e-01,  1.3133e-01,
          -2.3799e-01,  1.3313e-01, -1.5027e-01, -3.7701e-01, -7.0471e-02,
          -2.0735e-01,  3.1206e-01,  3.0096e-02, -7.2557e-02,  2.2506e-01,
          -1.7077e-01,  1.0950e-01,  3.4032e-01,  1.2843e-01, -1.1341e-01,
           9.0018e-02,  1.9510e-01,  6.1548e-02, -1.8315e-01, -1.2946e-01,
          -7.0517e-02,  1.6340e-01,  3.9783e-01,  3.3278e-01, -8.3161e-02,
          -9.4468e-02,  3.3800e-02, -1.3770e-01,  2.2086e-01,  1.5825e-01,
           1.5790e-01,  3.5834e-01,  2.5845e-01, -3.1832e-01,  1.9514e-01,
           2.2260e-01, -3.7083e-01, -4.5805e-01, -2.8093e-03, -8.9936e-02,
          -1.6657e-02, -1.8714e-01, -9.7226e-02,  1.2922e-01,  5.5807e-02,
          -6.3892e-02,  1.0089e-01, -7.8274e-02,  2.1929e-02,  5.3395e-01,
           1.7657e-01, -2.8625e-01, -4.5904e-02, -5.3479e-01, -1.4283e-01,
          -1.5624e-03,  1.0799e-01,  4.6423e-01,  1.9305e-01, -1.9696e-01,
          