In [33]:
import torch
import torch.nn as nn
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [34]:
class ResidualBlock(nn.Module):
  def __init__(self, in_channels, out_channels, stride=1, downsample=None):
    super(ResidualBlock, self).__init__()

    self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1)
    self.bn1 = nn.BatchNorm2d(out_channels)
    self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1)
    self.bn2 = nn.BatchNorm2d(out_channels)
    self.downsample = downsample

  def forward(self, x):
    identity = x

    if self.downsample:
      identity = self.downsample(x)

    out = F.relu(self.bn1(self.conv1(x)))
    out = self.bn2(self.conv2(out))

    out += identity
    out = F.relu(out)

    return out

In [35]:
class ResNet(nn.Module):
  def __init__(self, block, layers, num_classes=1000):
    super(ResNet, self).__init__()

    self.in_channels = 64

    self.maxpool = nn.MaxPool2d(3, 2, 1)
    self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
    self.bn = nn.BatchNorm2d(64)
    self.conv = nn.Conv2d(3, 64, 7, 2, 3)

    self.layer1 = self._make_layer(block, 64, layers[0])
    self.layer2 = self._make_layer(block, 128, layers[1], 2)
    self.layer3 = self._make_layer(block, 256, layers[2], 2)
    self.layer4 = self._make_layer(block, 512, layers[3], 2)

    self.fc = nn.Linear(512, num_classes)

  def _make_layer(self, block, out_channels, blocks, stride=1):
    downsample = None

    if stride != 1 or self.in_channels != out_channels:
      downsample = nn.Sequential(
        nn.Conv2d(self.in_channels, out_channels, 1, stride),
        nn.BatchNorm2d(out_channels)
      )

    layers = [block(self.in_channels, out_channels, stride, downsample)]
    self.in_channels = out_channels

    for _ in range(1, blocks):
      layers.append(block(out_channels, out_channels))

    return nn.Sequential(*layers)

  def forward(self, x):
    c1 = self.maxpool(F.relu(self.bn(self.conv(x))))

    l1 = self.layer1(c1)
    l2 = self.layer2(l1)
    l3 = self.layer3(l2)
    l4 = self.layer4(l3)

    l4 = self.avgpool(l4)
    a = l4.view(l4.size(0), -1) # Flatten
    z = self.fc(a)

    return z

In [36]:
import torch.optim as optim

model = ResNet(ResidualBlock, [2, 2, 2, 2], num_classes=10)
model.to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), 0.1, 0.9, weight_decay=0.0001)

In [37]:
import torchvision
import torchvision.transforms as transforms

transform = transforms.Compose([
    transforms.Resize(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5),
                         (0.5, 0.5, 0.5))
])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=128, shuffle=True)

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=128, shuffle=False)

In [38]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)

    model.train()

    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        y_pred = model(X)
        loss = loss_fn(y_pred, y)

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * 64 + len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [39]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    model.eval()

    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)

            y_pred = model(X)
            test_loss += loss_fn(y_pred, y).item()
            correct += (y_pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [40]:
epochs = 10
for i in range(epochs):
    print(f"Epoch {i+1}\n-------------------------------")
    train(trainloader, model, loss_fn, optimizer)
    test(testloader, model, loss_fn)

Epoch 1
-------------------------------
loss: 2.334243  [  128/50000]
loss: 2.076459  [ 6528/50000]
loss: 1.880416  [12928/50000]
loss: 1.610763  [19328/50000]
Test Error: 
 Accuracy: 45.5%, Avg loss: 1.499903 

Epoch 2
-------------------------------
loss: 1.403290  [  128/50000]
loss: 1.436733  [ 6528/50000]
loss: 1.325108  [12928/50000]
loss: 1.135547  [19328/50000]
Test Error: 
 Accuracy: 55.2%, Avg loss: 1.213256 

Epoch 3
-------------------------------
loss: 1.054446  [  128/50000]
loss: 1.124259  [ 6528/50000]
loss: 1.131295  [12928/50000]
loss: 1.105598  [19328/50000]
Test Error: 
 Accuracy: 64.0%, Avg loss: 1.005865 

Epoch 4
-------------------------------
loss: 0.990032  [  128/50000]
loss: 0.840571  [ 6528/50000]
loss: 0.910266  [12928/50000]
loss: 0.784782  [19328/50000]
Test Error: 
 Accuracy: 73.6%, Avg loss: 0.757268 

Epoch 5
-------------------------------
loss: 0.683259  [  128/50000]
loss: 0.587918  [ 6528/50000]
loss: 0.640490  [12928/50000]
loss: 0.700672  [19328