In [77]:
import torch
import torch.nn as nn

from IPython.display import Image

import torchvision
from torchview import draw_graph

In [78]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [79]:
from PIL import Image
from torchvision import transforms
import pandas as pd
import os

In [90]:
df = pd.read_csv('../data/data.csv')
df.head()

Unnamed: 0,filename,label,tag,int_label,image_path
0,00_DSCN9645_2,Leukoplakia,train,3,data/train/images/00_DSCN9645_2.jpg
1,00_20d5f703-IMG_20190430_213225,Erythroleukoplakia,train,1,data/train/images/00_20d5f703-IMG_20190430_213...
2,109_DSCN9487,Ulcer,train,5,data/train/images/109_DSCN9487.jpg
3,137_DSCN0889,Leukoplakia,train,3,data/train/images/137_DSCN0889.jpg
4,46_DSCN9587,Leukoplakia,train,3,data/train/images/46_DSCN9587.jpg


In [112]:
import shutil

labels = list(set(df['label']))

if not os.path.exists('data'):
    os.mkdir('data')
    os.mkdir('data/train')
    os.mkdir('data/test')
    
    for lab in labels:
        os.mkdir(f'data/train/{lab}')    
        os.mkdir(f'data/test/{lab}')
imga = []
for name, tag, img, lab in zip(df['filename'], df['tag'], df['image_path'], df['label']):
    src = '../' + img
    tar = f"data/{tag}/{lab}/"
    imga.append(tar + name + '.jpg')
    shutil.copy2(src, tar)

df.drop(columns=['image_path'], inplace=True)
df['image_path'] = imga

In [113]:
df.head()

Unnamed: 0,filename,label,tag,int_label,image_path
0,00_DSCN9645_2,Leukoplakia,train,3,data/train/Leukoplakia/00_DSCN9645_2.jpg
1,00_20d5f703-IMG_20190430_213225,Erythroleukoplakia,train,1,data/train/Erythroleukoplakia/00_20d5f703-IMG_...
2,109_DSCN9487,Ulcer,train,5,data/train/Ulcer/109_DSCN9487.jpg
3,137_DSCN0889,Leukoplakia,train,3,data/train/Leukoplakia/137_DSCN0889.jpg
4,46_DSCN9587,Leukoplakia,train,3,data/train/Leukoplakia/46_DSCN9587.jpg


In [115]:
df.to_csv('data.csv', index=False)

: 

In [111]:
! tree -L 3

[01;34m.[0m
├── [01;34mdata[0m
│   ├── [01;34mtest[0m
│   │   ├── [01;34mErythroleukoplakia[0m
│   │   ├── [01;34mErythroplakia[0m
│   │   ├── [01;34mKeratosis[0m
│   │   ├── [01;34mLeukoplakia[0m
│   │   ├── [01;34mTumor[0m
│   │   └── [01;34mUlcer[0m
│   └── [01;34mtrain[0m
│       ├── [01;34mErythroleukoplakia[0m
│       ├── [01;34mErythroplakia[0m
│       ├── [01;34mKeratosis[0m
│       ├── [01;34mLeukoplakia[0m
│       ├── [01;34mTumor[0m
│       └── [01;34mUlcer[0m
├── [00mdata.csv[0m
├── [00mmodel.ipynb[0m
└── [00mtransfer_learning_tutorial.ipynb[0m

15 directories, 3 files


In [100]:
idx = 0
sample_filename = f"{df['image_path'].iloc[idx]}" 
os.path.exists(sample_filename)

True

In [101]:
# resnetX = (Num of channels, repetition, Bottleneck_expansion , Bottleneck_layer)
model_parameters={}
model_parameters['resnet18'] = ([64,128,256,512],[2,2,2,2],1,False)
model_parameters['resnet34'] = ([64,128,256,512],[3,4,6,3],1,False)
model_parameters['resnet50'] = ([64,128,256,512],[3,4,6,3],4,True)
model_parameters['resnet101'] = ([64,128,256,512],[3,4,23,3],4,True)
model_parameters['resnet152'] = ([64,128,256,512],[3,8,36,3],4,True)

In [102]:
class Bottleneck(nn.Module):
    def __init__(self,in_channels,intermediate_channels,expansion,is_Bottleneck,stride):
        super(Bottleneck,self).__init__()

        self.expansion = expansion
        self.in_channels = in_channels
        self.intermediate_channels = intermediate_channels
        self.is_Bottleneck = is_Bottleneck
        
        # i.e. if dim(x) == dim(F) => Identity function
        if self.in_channels==self.intermediate_channels*self.expansion:
            self.identity = True
        else:
            self.identity = False
            projection_layer = []
            projection_layer.append(nn.Conv2d(in_channels=self.in_channels, out_channels=self.intermediate_channels*self.expansion, kernel_size=1, stride=stride, padding=0, bias=False ))
            projection_layer.append(nn.BatchNorm2d(self.intermediate_channels*self.expansion))
            # Only conv->BN and no ReLU
            # projection_layer.append(nn.ReLU())
            self.projection = nn.Sequential(*projection_layer)

        # commonly used relu
        self.relu = nn.ReLU()
        
        # is_Bottleneck = True for all ResNet 50+
        if self.is_Bottleneck:
            # bottleneck
            # 1x1
            self.conv1_1x1 = nn.Conv2d(in_channels=self.in_channels, out_channels=self.intermediate_channels, kernel_size=1, stride=1, padding=0, bias=False )
            self.batchnorm1 = nn.BatchNorm2d(self.intermediate_channels)
            # 3x3
            self.conv2_3x3 = nn.Conv2d(in_channels=self.intermediate_channels, out_channels=self.intermediate_channels, kernel_size=3, stride=stride, padding=1, bias=False )
            self.batchnorm2 = nn.BatchNorm2d(self.intermediate_channels)
            # 1x1
            self.conv3_1x1 = nn.Conv2d(in_channels=self.intermediate_channels, out_channels=self.intermediate_channels*self.expansion, kernel_size=1, stride=1, padding=0, bias=False )
            self.batchnorm3 = nn.BatchNorm2d( self.intermediate_channels*self.expansion )
        else:
            # basicblock
            # 3x3
            self.conv1_3x3 = nn.Conv2d(in_channels=self.in_channels, out_channels=self.intermediate_channels, kernel_size=3, stride=stride, padding=1, bias=False )
            self.batchnorm1 = nn.BatchNorm2d(self.intermediate_channels)
            
            # 3x3
            self.conv2_3x3 = nn.Conv2d(in_channels=self.intermediate_channels, out_channels=self.intermediate_channels, kernel_size=3, stride=1, padding=1, bias=False )
            self.batchnorm2 = nn.BatchNorm2d(self.intermediate_channels)

    def forward(self,x):
        # input stored to be added before the final relu
        in_x = x
        if self.is_Bottleneck:
            # conv1x1->BN->relu
            x = self.relu(self.batchnorm1(self.conv1_1x1(x)))         
            # conv3x3->BN->relu
            x = self.relu(self.batchnorm2(self.conv2_3x3(x)))
            # conv1x1->BN
            x = self.batchnorm3(self.conv3_1x1(x))
        else:
            # conv3x3->BN->relu
            x = self.relu(self.batchnorm1(self.conv1_3x3(x)))
            # conv3x3->BN
            x = self.batchnorm2(self.conv2_3x3(x))
        # identity or projected mapping
        if self.identity:
            x += in_x
        else:
            x += self.projection(in_x)
        
        # final relu
        x = self.relu(x)
        return x

In [103]:
# Bottleneck(64*4,64,4,stride=1)
def test_Bottleneck():
    x = torch.randn(1,64,112,112)
    model = Bottleneck(64,64,4,True,2)
    print(model(x).shape)
    del model

test_Bottleneck()

torch.Size([1, 256, 56, 56])


In [104]:
class ResNet(nn.Module):
    def __init__(self, resnet_variant,in_channels,num_classes):
        super(ResNet,self).__init__()
        self.channels_list = resnet_variant[0]
        self.repeatition_list = resnet_variant[1]
        self.expansion = resnet_variant[2]
        self.is_Bottleneck = resnet_variant[3]

        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=64, kernel_size=7, stride=2, padding=3, bias=False )
        self.batchnorm1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()

        self.maxpool = nn.MaxPool2d(kernel_size=3,stride=2,padding=1)

        self.block1 = self._make_blocks( 64 , self.channels_list[0], self.repeatition_list[0], self.expansion, self.is_Bottleneck, stride=1 )
        self.block2 = self._make_blocks( self.channels_list[0]*self.expansion , self.channels_list[1], self.repeatition_list[1], self.expansion, self.is_Bottleneck, stride=2 )
        self.block3 = self._make_blocks( self.channels_list[1]*self.expansion , self.channels_list[2], self.repeatition_list[2], self.expansion, self.is_Bottleneck, stride=2 )
        self.block4 = self._make_blocks( self.channels_list[2]*self.expansion , self.channels_list[3], self.repeatition_list[3], self.expansion, self.is_Bottleneck, stride=2 )

        self.average_pool = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Linear( self.channels_list[3]*self.expansion , num_classes)

    def forward(self,x):
        x = self.relu(self.batchnorm1(self.conv1(x)))
        x = self.maxpool(x)        
        x = self.block1(x)        
        x = self.block2(x)        
        x = self.block3(x)        
        x = self.block4(x)        
        x = self.average_pool(x)
        x = torch.flatten(x, start_dim=1)
        x = self.fc1(x)
        return x

    def _make_blocks(self,in_channels,intermediate_channels,num_repeat, expansion, is_Bottleneck, stride):
        """
        Args:
            in_channels : #channels of the Bottleneck input
            intermediate_channels : #channels of the 3x3 in the Bottleneck
            num_repeat : #Bottlenecks in the block
            expansion : factor by which intermediate_channels are multiplied to create the output channels
            is_Bottleneck : status if Bottleneck in required
            stride : stride to be used in the first Bottleneck conv 3x3
        Attributes:
            Sequence of Bottleneck layers
        """
        layers = []
        layers.append(Bottleneck(in_channels,intermediate_channels,expansion,is_Bottleneck,stride=stride))
        for num in range(1,num_repeat):
            layers.append(Bottleneck(intermediate_channels*expansion,intermediate_channels,expansion,is_Bottleneck,stride=1))

        return nn.Sequential(*layers)

In [105]:
def test_ResNet(params):
    model = ResNet( params , in_channels=3, num_classes=1000)
    x = torch.randn(1,3,224,224)
    output = model(x)
    print(output.shape)
    return model

In [106]:
architecture = 'resnet50'
model = test_ResNet(model_parameters[architecture])

torch.Size([1, 1000])
