In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import torch
import torch.nn as nn
import torch.nn.functional as F

from torch import optim
from torch import Tensor

import torchvision
import torchvision.transforms as transforms

from torchvision.models import resnet

from torchvision.models.resnet import Bottleneck
from torchvision.models.resnet import BasicBlock
from torchvision.models.resnet import resnet18

from typing import Type, Any, Callable, Union, List, Optional

import time


class ResNet(nn.Module):

    def __init__(
        self,
        block: Type[Union[BasicBlock, Bottleneck]],
        layers: List[int],
        num_classes: int = 10,
        zero_init_residual: bool = False,
        groups: int = 1,
        width_per_group: int = 64,
        replace_stride_with_dilation: Optional[List[bool]] = None,
        norm_layer: Optional[Callable[..., nn.Module]] = None
    ) -> None:
        super(ResNet, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer

        self.inplanes = 64
        self.dilation = 1
        if replace_stride_with_dilation is None:
            # each element in the tuple indicates if we should replace
            # the 2x2 stride with a dilated convolution instead
            replace_stride_with_dilation = [False, False, False]
        if len(replace_stride_with_dilation) != 3:
            raise ValueError("replace_stride_with_dilation should be None "
                             "or a 3-element tuple, got {}".format(replace_stride_with_dilation))
        self.groups = groups
        self.base_width = width_per_group
        self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = norm_layer(self.inplanes)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)  
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0])
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1])
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2])
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

        # Zero-initialize the last BN in each residual branch,
        # so that the residual branch starts with zeros, and each residual block behaves like an identity.
        # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
        
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, Bottleneck):
                    nn.init.constant_(m.bn3.weight, 0)  # type: ignore[arg-type]
                elif isinstance(m, BasicBlock):
                    nn.init.constant_(m.bn2.weight, 0)  # type: ignore[arg-type]

    def _make_layer(self, block: Type[Union[BasicBlock, Bottleneck]], planes: int, blocks: int,
                    stride: int = 1, dilate: bool = False) -> nn.Sequential:
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        
        if dilate:
            self.dilation *= stride
            stride = 1
        
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride),
                norm_layer(planes * block.expansion),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample, self.groups,
                            self.base_width, previous_dilation, norm_layer))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes, groups=self.groups,
                                base_width=self.base_width, dilation=self.dilation,
                                norm_layer=norm_layer))

        return nn.Sequential(*layers)

    def _forward_impl(self, x: Tensor) -> Tensor:
        # See note [TorchScript super()]
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

    def forward(self, x: Tensor) -> Tensor:
        return self._forward_impl(x)

    
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
])

train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=False, num_workers=2)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1024, shuffle=False, num_workers=2)



device = 'cuda'

net = resnet18()
net = net.to(device)

learning_rate = 0.001


optimizer = optim.Adam(net.parameters(), lr=learning_rate)


def train(epoch):
    
    epoch+=1

    print('\n[ Train epoch: %d ]' % epoch)
    net.train()
    train_loss = 0
    correct = 0
    total = 0
    criterion = nn.CrossEntropyLoss()

    for batch_idx, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        
        optimizer.zero_grad()

        outputs = net(inputs)
        
        loss = criterion(outputs,targets)
        
        loss.backward()

        optimizer.step()
        train_loss += loss.item()
        _, predicted = outputs.max(1)

        total += targets.size(0)
        current_correct = predicted.eq(targets).sum().item()
        
        correct += current_correct


    print('\nTotal average train accuarcy:', correct / total)
    print('Total average train loss:', train_loss / total)


def test(epoch):
    
    epoch+=1
    
    print('\n[ test epoch: %d ]' % epoch)
    net.eval()
    loss = 0
    correct = 0
    total = 0

    criterion = nn.CrossEntropyLoss()

    for batch_idx, (inputs, targets) in enumerate(test_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        total += targets.size(0)

        outputs = net(inputs)
        loss += criterion(outputs, targets).item()

        _, predicted = outputs.max(1)
        correct += predicted.eq(targets).sum().item()

    print('\nTotal average test accuarcy:', correct / total)
    print('Total average test loss:', loss / total)

    
start_time = time.time()


for epoch in range(0, 100):
    train(epoch)
    test(epoch)
    print('\nTime elapsed:', time.time() - start_time)


Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


HBox(children=(FloatProgress(value=0.0, max=170498071.0), HTML(value='')))


Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified

[ Train epoch: 1 ]


  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)



Total average train accuarcy: 0.4333
Total average train loss: 0.012444877746105193

[ test epoch: 1 ]

Total average test accuarcy: 0.4418
Total average test loss: 0.0016643280625343322

Time elapsed: 20.874744176864624

[ Train epoch: 2 ]

Total average train accuarcy: 0.56608
Total average train loss: 0.009484183324575424

[ test epoch: 2 ]

Total average test accuarcy: 0.5529
Total average test loss: 0.0012725342750549316

Time elapsed: 41.730533599853516

[ Train epoch: 3 ]

Total average train accuarcy: 0.63132
Total average train loss: 0.00811392553806305

[ test epoch: 3 ]

Total average test accuarcy: 0.6339
Total average test loss: 0.0010656726241111754

Time elapsed: 63.017143964767456

[ Train epoch: 4 ]

Total average train accuarcy: 0.6721
Total average train loss: 0.007305842530727386

[ test epoch: 4 ]

Total average test accuarcy: 0.6777
Total average test loss: 0.0009267971038818359

Time elapsed: 84.57510352134705

[ Train epoch: 5 ]

Total average train accuarcy: 0

In [7]:
def test():
    

    
    net.eval()
    correct_390 = 0
    total_390 = 49920

    correct_last = 0
    total_last = 80

    criterion = nn.CrossEntropyLoss()

    for batch_idx, (inputs, targets) in enumerate(train_loader):

        inputs, targets = inputs.to(device), targets.to(device)

        if batch_idx<390:
            outputs = net(inputs)
          
            _, predicted = outputs.max(1)
            correct_390 += predicted.eq(targets).sum().item()

        else:
            outputs = net(inputs)
            
            _, predicted = outputs.max(1)
            correct_last += predicted.eq(targets).sum().item()


    print('\n average test accuarcy - 390 batches:', correct_390 / total_390)
    print('\n average test accuarcy - last batch:', correct_last / total_last)

test()



 average test accuarcy - 390 batches: 0.9388221153846154

 average test accuarcy - last batch: 0.9625
