In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.transforms import ToTensor
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
import torch.optim as optim
from tqdm.auto import tqdm

In [2]:
device = "cuda" if torch.cuda.is_available() else "cpu"

### I used resnet50 architecture

#### defined two classes to create residual blocks, the first one(IdentityBlock) do not use downsample to change the weight and height scale(in conv layer 1) and change the channel scale(in layer 3) but the other one(ConvBlock) does it

In [3]:
class IdentiyBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3):
        super(IdentiyBlock, self).__init__()

        self.conv1 = nn.Conv2d(
            in_channels=out_channels, out_channels=in_channels, stride=1, kernel_size=1)
        self.conv2 = nn.Conv2d(in_channels=in_channels, out_channels=in_channels,
                               kernel_size=kernel_size, stride=1, padding=kernel_size // 2)
        self.conv3 = nn.Conv2d(
            in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=1)
        self.bn1 = nn.BatchNorm2d(in_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.in_channels = in_channels

    def forward(self, x):
        identity = x
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.conv3(x)
        x = self.bn2(x)
        
        x += identity
        x = self.relu(x)

        return x

In [4]:
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels,kernel_size = 3, downsample = None, stride = 1):
        super(ConvBlock, self).__init__()

        if in_channels == 64:
            self.conv1 = nn.Conv2d(in_channels= in_channels, out_channels= in_channels, stride= stride, kernel_size= 1)
        # first residual block has input channel the same as the current input channel, however the
        # next one has input channel equal to half of the current channel   
        else: 
            self.conv1 = nn.Conv2d(
                in_channels=int(in_channels * 2), out_channels=in_channels, stride=stride, kernel_size=1)
        self.conv2 = nn.Conv2d(in_channels= in_channels, out_channels= in_channels, kernel_size= kernel_size, stride = 1, padding= kernel_size // 2)
        self.conv3 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size= 1, stride = 1)
        self.bn1 = nn.BatchNorm2d(in_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace= True)
        self.downsample = downsample
        self.in_channels = in_channels

    def forward(self, x):
        identity = x
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.conv3(x)
        x = self.bn2(x)
        
        if self.downsample is not None:
            identity = self.downsample(identity)
        x += identity
        x = self.relu(x)

        return x

### because of lack of memory two residual block has been inactivated, if you make them work again, make sure that you have modified the input feature of classifier block

In [5]:
class ResNet(nn.Module):
    def __init__(self, multiplied, blocks, first_layer_channel, input_shape, num_classes=1000):
        super(ResNet, self).__init__()
        self.first_layer_channel = first_layer_channel
        self.multiplied = multiplied
        self.input_channel = input_shape[1]
        self.conv1 = nn.Conv2d(in_channels= self.input_channel, out_channels= first_layer_channel, kernel_size=7, stride = 2, padding= 3)
        self.bn = nn.BatchNorm2d(first_layer_channel)
        self.relu = nn.ReLU(inplace= True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride= 2)
        self.layer1 = self.make_layer(next(iter(blocks)), 1)
        self.layer2 = self.make_layer(next(iter(blocks)), 2)
        #self.layer3 = self.make_layer(next(iter(blocks)), 2)
        #self.layer4 = self.make_layer(next(iter(blocks)), 2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features= 512, out_features= num_classes)
        )

    def make_layer(self, block, stride=1):
        #if stride!= 1 or self.first_layer_channel != int(self.first_layer_channel * self.multiplied):
        #    downsample = nn.Sequential(
        #        nn.Conv2d(in_channels=self.first_layer_channel, out_channels=int(
        #            self.multiplied * self.first_layer_channel), kernel_size=1, stride=stride),
        #        nn.BatchNorm2d(int(self.multiplied * self.first_layer_channel))
        #    )
        #
        if self.first_layer_channel == 64:
            downsample = nn.Sequential(
                nn.Conv2d(in_channels= self.first_layer_channel, out_channels= int(self.multiplied * self.first_layer_channel), kernel_size= 1, stride= stride),
                nn.BatchNorm2d(int(self.multiplied * self.first_layer_channel))
            )
        else:
            downsample = nn.Sequential(
                nn.Conv2d(in_channels=int(self.first_layer_channel * 2), out_channels=int(
                    self.multiplied * self.first_layer_channel), kernel_size=1, stride=stride),
                nn.BatchNorm2d(int(self.multiplied * self.first_layer_channel))
            )
        layers = []
        layers.append(ConvBlock(in_channels= self.first_layer_channel, out_channels= int(self.multiplied * self.first_layer_channel), stride = stride, downsample= downsample))

        for _ in range(block - 1):
            layers.append(IdentiyBlock(in_channels= self.first_layer_channel, out_channels= int(self.first_layer_channel * self.multiplied)))

        self.first_layer_channel = int(
            self.first_layer_channel * (self.multiplied/2))
        return nn.Sequential(*layers)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        #x = self.layer3(x)
        #x = self.layer4(x)

        x = self.avgpool(x)
        x = self.classifier(x)

        return x

In [6]:
#class RearrangeTransform:
#    def __call__(self, x):
#        return x.permute(1, 2, 0)
#
#
#transform = transforms.Compose([
#    transforms.ToTensor(),
#    RearrangeTransform()
#])

In [7]:
train_data = datasets.FashionMNIST(
    root='', train=True, download=False, transform=ToTensor(), target_transform=None)
test_data = datasets.FashionMNIST(
    root='', train=False, download=False, transform=ToTensor())

In [8]:
train_size = int(0.2 * len(train_data))
val_size = len(train_data) - train_size


train_dataset, val_dataset = random_split(train_data, [train_size, val_size])

trainloader = DataLoader(train_dataset, batch_size=28,
                         shuffle=True, num_workers=8)
valloader = DataLoader(val_dataset, batch_size=28,
                       shuffle=False, num_workers=8)

In [9]:
#trainloader = DataLoader(train_data, batch_size=28, shuffle=True, num_workers=8)
testloader = DataLoader(test_data, batch_size=28, shuffle=False, num_workers=8)

In [10]:
model = ResNet(num_classes=10, input_shape=(28, 1, 28, 28),
               first_layer_channel=64, multiplied=4, blocks=[3, 4, 6, 3])

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1,
                      momentum=0.9, weight_decay=5e-4)

In [11]:
from helper_function import train_step, test_step,  accuracy_fn

epochs = 3
for epoch in tqdm(range(epochs)):
    print(f"Epoch: {epoch}\n---------")
    train_step(data_loader=trainloader,
               model=model,
               loss_fn=criterion,
               optimizer=optimizer,
               accuracy_fn=accuracy_fn,
               device=device
               )
    test_step(data_loader=testloader,
              model=model,
              loss_fn=criterion,
              accuracy_fn=accuracy_fn,
              device=device
              )

  0%|          | 0/3 [00:00<?, ?it/s]

Epoch: 0
---------
Train loss: 1.58876 | Train accuracy: 52.66%
Test loss: 1.60184 | Test accuracy: 45.66%

Epoch: 1
---------
Train loss: 0.94832 | Train accuracy: 62.38%
Test loss: 0.75853 | Test accuracy: 70.41%

Epoch: 2
---------
Train loss: 0.73779 | Train accuracy: 71.24%
Test loss: 0.69923 | Test accuracy: 72.69%

