In [1]:
import torch
print(torch.__version__)
import torch.nn as nn
from torch.nn import init
from torchvision import models
from torch.nn import functional as F
from torch.nn import Parameter
from torch.autograd import Variable
from torch_geometric.nn import knn
from net.pspnet import PSPNet
from hrnet.seg_hrnet import HighResolutionNet, HRNet_2Head
import pretrainedmodels
import timm
import os
from torchinfo import summary
from typing import Optional
from gem_pooling import GeneralizedMeanPoolingP
#from .backbones.resnet import BasicBlock, Bottleneck, ResNet
#from .backbones.resnet_ibn_a import resnet50_ibn_a, resnet101_ibn_a

proxy = 'http://10.0.0.107:3128'
os.environ['http_proxy'] = proxy 
os.environ['HTTP_PROXY'] = proxy
os.environ['https_proxy'] = proxy
os.environ['HTTPS_PROXY'] = proxy

2.3.0


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class Swin(nn.Module):
    def __init__(self, mb_h=2048, with_nl=False,pretrained=True, cut_at_pooling=False,
                 num_features=0, norm=False, dropout=0, num_classes=None, sour_class=751):
        super().__init__()
            
        model_ft = timm.create_model('swin_base_patch4_window7_224', pretrained=True, drop_path_rate = 0.2)
        # avg pooling to global pooling
        #model_ft.avgpool = nn.AdaptiveAvgPool2d((1,1))
        
        model_ft.head = nn.Sequential() # save memory
        self.model = model_ft
        self.cut_at_pooling = cut_at_pooling
        print("GeneralizedMeanPoolingP")
        self.gap = GeneralizedMeanPoolingP(3)
        self.memorybank_fc = nn.Linear(mb_h, mb_h)
        self.mbn=nn.BatchNorm1d(mb_h)
        init.kaiming_normal_(self.memorybank_fc.weight, mode='fan_out')
        init.constant_(self.memorybank_fc.bias, 0)
        
        self.num_features = num_features
        self.norm = norm
        self.dropout = dropout
        self.has_embedding = num_features > 0
        self.num_classes = num_classes

        out_planes = model_ft.norm.normalized_shape[0]

        # Append new layers
        if self.has_embedding:
            self.feat = nn.Linear(out_planes, self.num_features)
            self.feat_bn = nn.BatchNorm1d(self.num_features)
            init.kaiming_normal_(self.feat.weight, mode='fan_out')
            init.constant_(self.feat.bias, 0)
        else:
            # Change the num_features to CNN output channels
            self.num_features = out_planes
            self.feat_bn = nn.BatchNorm1d(self.num_features)
        self.feat_bn.bias.requires_grad_(False)
        if self.dropout > 0:
            self.drop = nn.Dropout(self.dropout)
        if self.num_classes is not None:
            for i,num_cluster in enumerate(self.num_classes):
                exec("self.classifier{}_{} = nn.Linear(self.num_features, {}, bias=False)".format(i,num_cluster,num_cluster))
                exec("init.normal_(self.classifier{}_{}.weight, std=0.001)".format(i,num_cluster))
    def forward(self, x, feature_withbn=False, training=False):
        x = self.model.forward_features(x) # [batchsize, 7, 7, 1024]
        
        x = x.permute(0,3,1,2) # Change shape from [batchsize, 7, 7, 1024] -> [batchsize, 49, 1024]
        # swin is update in latest timm>0.6.0, so I add the following two lines.
        
        x = self.gap(x)

        x = x.view(x.size(0), -1)

        if self.cut_at_pooling:return x#FALSE

        if self.has_embedding:
            bn_x = self.feat_bn(self.feat(x))#FALSE
        else:
            bn_x = self.feat_bn(x)#1

        if training is False:
            bn_x = F.normalize(bn_x)
            return bn_x

        if self.norm:#FALSE
            bn_x = F.normalize(bn_x)
        elif self.has_embedding:#FALSE
            bn_x = F.relu(bn_x)

        if self.dropout > 0:#FALSE
            bn_x = self.drop(bn_x)

        prob = []
        if self.num_classes is not None:
            for i,num_cluster in enumerate(self.num_classes):
                exec("prob.append(self.classifier{}_{}(bn_x))".format(i,num_cluster))
        else:
            return x, bn_x

        if feature_withbn:#False
           return bn_x, prob
        mb_x = self.mbn(self.memorybank_fc(bn_x))
        return x, prob, mb_x, None

In [3]:
class ResNet50(nn.Module):
    def __init__(self, mb_h=2048, with_nl=False,pretrained=True, cut_at_pooling=False,
                 num_features=0, norm=False, dropout=0, num_classes=None, sour_class=751):
        super().__init__()
        model_ft = models.resnet50(pretrained=pretrained)
        model_ft.layer4[0].downsample[0].stride = (1,1)
        model_ft.layer4[0].conv2.stride = (1,1)
        self.model = model_ft
        
        self.cut_at_pooling = cut_at_pooling
        print("GeneralizedMeanPoolingP")
        self.gap = GeneralizedMeanPoolingP(3)
        self.memorybank_fc = nn.Linear(2048, mb_h)
        self.mbn=nn.BatchNorm1d(mb_h)
        init.kaiming_normal_(self.memorybank_fc.weight, mode='fan_out')
        init.constant_(self.memorybank_fc.bias, 0)
        
        self.num_features = num_features
        self.norm = norm
        self.dropout = dropout
        self.has_embedding = num_features > 0
        self.num_classes = num_classes

        out_planes = model_ft.fc.in_features

        # Append new layers
        if self.has_embedding:
            self.feat = nn.Linear(out_planes, self.num_features)
            self.feat_bn = nn.BatchNorm1d(self.num_features)
            init.kaiming_normal_(self.feat.weight, mode='fan_out')
            init.constant_(self.feat.bias, 0)
        else:
            # Change the num_features to CNN output channels
            self.num_features = out_planes
            self.feat_bn = nn.BatchNorm1d(self.num_features)
        self.feat_bn.bias.requires_grad_(False)
        if self.dropout > 0:
            self.drop = nn.Dropout(self.dropout)
        if self.num_classes is not None:
            for i,num_cluster in enumerate(self.num_classes):
                exec("self.classifier{}_{} = nn.Linear(self.num_features, {}, bias=False)".format(i,num_cluster,num_cluster))
                exec("init.normal_(self.classifier{}_{}.weight, std=0.001)".format(i,num_cluster))
    def forward(self, x, feature_withbn=False, training=False):
        x = self.model.conv1(x)
        x = self.model.bn1(x)
        x = self.model.relu(x)
        x = self.model.maxpool(x)
        x = self.model.layer1(x)
        x = self.model.layer2(x)
        x = self.model.layer3(x)
        x = self.model.layer4(x)
        x = self.gap(x)

        x = x.view(x.size(0), -1)

        if self.cut_at_pooling:return x#FALSE

        if self.has_embedding:
            bn_x = self.feat_bn(self.feat(x))#FALSE
        else:
            bn_x = self.feat_bn(x)#1

        if training is False:
            bn_x = F.normalize(bn_x)
            return bn_x

        if self.norm:#FALSE
            bn_x = F.normalize(bn_x)
        elif self.has_embedding:#FALSE
            bn_x = F.relu(bn_x)

        if self.dropout > 0:#FALSE
            bn_x = self.drop(bn_x)

        prob = []
        if self.num_classes is not None:
            for i,num_cluster in enumerate(self.num_classes):
                exec("prob.append(self.classifier{}_{}(bn_x))".format(i,num_cluster))
        else:
            return x, bn_x

        if feature_withbn:#False
           return bn_x, prob
        mb_x = self.mbn(self.memorybank_fc(bn_x))
        return x, prob, mb_x, None

model = ResNet50()  
input = torch.rand((32,3,256,128))
output = model(input)



GeneralizedMeanPoolingP
3.0


In [4]:
mb_h = 1024 # Number of features 's channel
sour_class = 751 # Number of source dataset classes
num_features = 0 # Number of features if we want to add an LN layers
dropout = 0 # Dropout propability of dropout layer
ncs = [int(x) for x in '60'.split(',')]
fc_len = 3500
num_classes = [fc_len for _ in range(len(ncs))]
model = Swin(mb_h=mb_h, with_nl=False,pretrained=True, cut_at_pooling=False,
                 num_features=num_features, norm=False, dropout=dropout, num_classes=num_classes, sour_class=sour_class).to('cpu')

GeneralizedMeanPoolingP
3.0


In [5]:
input = torch.rand((32,3,224,224)).to('cpu')
output = model(input, training=True)

In [7]:
output.shape

AttributeError: 'tuple' object has no attribute 'shape'

In [8]:
output[0].shape, output[1][0].shape, output[2].shape, output[3]

(torch.Size([32, 1024]), torch.Size([32, 3500]), torch.Size([32, 1024]), None)

In [None]:
model_ft = timm.create_model('swin_base_patch4_window7_224', pretrained=True, drop_path_rate = 0.2)

In [None]:
model_ft.norm.normalized_shape[0]

1024

In [None]:
summary(model=model_ft, 
        input_size=(32, 3, 224, 224), # make sure this is "input_size", not "input_shape"
        # col_names=["input_size"], # uncomment for smaller output
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
       row_settings=["var_names"]
    )

Layer (type (var_name))                            Input Shape          Output Shape         Param #              Trainable
SwinTransformer (SwinTransformer)                  [32, 3, 224, 224]    [32, 1000]           --                   True
├─PatchEmbed (patch_embed)                         [32, 3, 224, 224]    [32, 56, 56, 128]    --                   True
│    └─Conv2d (proj)                               [32, 3, 224, 224]    [32, 128, 56, 56]    6,272                True
│    └─LayerNorm (norm)                            [32, 56, 56, 128]    [32, 56, 56, 128]    256                  True
├─Sequential (layers)                              [32, 56, 56, 128]    [32, 7, 7, 1024]     --                   True
│    └─SwinTransformerStage (0)                    [32, 56, 56, 128]    [32, 56, 56, 128]    --                   True
│    │    └─Identity (downsample)                  [32, 56, 56, 128]    [32, 56, 56, 128]    --                   --
│    │    └─Sequential (blocks)              