In [1]:
import torch
import torchvision
import numpy as np
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor, Compose, Normalize, Resize
# import matplotlib.pyplot as plt

In [2]:
# Convert to tensors and normalize
transform = Compose([
    Resize((32, 32)),
    ToTensor(),
    Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Download training data from open datasets.
training_data = datasets.CIFAR10(
    root="datasets/cifar10",
    train=True,
    download=True,
    transform=transform,
)

# Download test data from open datasets.
test_data = datasets.CIFAR10(
    root="datasets/cifar10",
    train=False,
    download=True,
    transform=transform,
)

Files already downloaded and verified
Files already downloaded and verified


In [3]:
# Create data loaders
BATCH_SIZE = 32

train_dataloader = DataLoader(training_data, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=True)

In [4]:
class Block(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(Block, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += self.shortcut(x)
        out = self.relu(out)
        return out

class ResNet18(nn.Module):
    def __init__(self, num_classes=10):
        super(ResNet18, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        self.layer1 = self._make_layer(Block, 64, 2, stride=1)
        self.layer2 = self._make_layer(Block, 128, 2, stride=2)
        self.layer3 = self._make_layer(Block, 256, 2, stride=2)
        self.layer4 = self._make_layer(Block, 512, 2, stride=2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.maxpool(out)
        
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        
        out = self.avgpool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out

In [5]:
def train(dataloader, model, loss_function, optimizer):
    # model.train()
    running_loss = 0.0
    for _, (X, y) in enumerate(dataloader):
        X, y = X.to("cpu"), y.to("cpu")
        optimizer.zero_grad()

        pred = model(X)
        loss = loss_function(pred, y)

        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f"Training loss: {running_loss / len(dataloader)}")

def test(dataloader, model, loss_function):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    # model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to("cpu"), y.to("cpu")
            pred = model(X)
            test_loss += loss_function(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    return correct, test_loss

In [6]:
model = ResNet18()
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

epochs = 10
acc, loss = [], []
for epoch in range(epochs):
    print(f"-------------------------------\nEpoch {epoch+1}")
    train(train_dataloader, model, loss_function, optimizer)
    temp_acc, temp_loss = test(test_dataloader, model, loss_function)
    acc.append(temp_acc)
    loss.append(temp_loss)
    print(f"Accuracy: {(100*temp_acc):>0.1f}%, Avg loss: {temp_loss:>8f}")

-------------------------------
Epoch 1
Training loss: 1.244345134561518
Accuracy: 66.6%, Avg loss: 0.948033
-------------------------------
Epoch 2
Training loss: 0.8035291257151715
Accuracy: 71.7%, Avg loss: 0.806491
-------------------------------
Epoch 3
Training loss: 0.5871629234086376
Accuracy: 74.9%, Avg loss: 0.737363
-------------------------------
Epoch 4
Training loss: 0.4431214547088645
Accuracy: 75.8%, Avg loss: 0.739565
-------------------------------
Epoch 5
Training loss: 0.31633745472322883
Accuracy: 76.2%, Avg loss: 0.772278
-------------------------------
Epoch 6
Training loss: 0.22506103076288622
Accuracy: 77.9%, Avg loss: 0.768082
-------------------------------
Epoch 7
Training loss: 0.15845616323717288
Accuracy: 77.5%, Avg loss: 0.842494
-------------------------------
Epoch 8
Training loss: 0.1257612915055365
Accuracy: 76.7%, Avg loss: 0.912368
-------------------------------
Epoch 9
Training loss: 0.0882571160481076
Accuracy: 77.7%, Avg loss: 0.938873
--------

In [7]:
# fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 4))

# ax1.plot(np.arange(1, epochs+1), acc, color="blue", marker="o")
# ax1.set_title("Accuracy over epochs")
# ax1.set_xlabel("Epochs")
# ax1.set_ylabel("Accuracy")

# ax2.plot(np.arange(1, epochs+1), loss, color="red", marker="o")
# ax2.set_title("Loss over epochs")
# ax2.set_xlabel("Epochs")
# ax2.set_ylabel("Loss")

# plt.tight_layout()
# plt.show()

In [8]:
'''
def classify(model, image):
    diff = infinity
    while diff > threshold:
        values = model.predict(image)
        top2 = values.sort()[0:2]
        diff = abs(top2[0] - top2[1])
        model.add_layer() / model.switch()
'''

'\ndef classify(model, image):\n    diff = infinity\n    while diff > threshold:\n        values = model.predict(image)\n        top2 = values.sort()[0:2]\n        diff = abs(top2[0] - top2[1])\n        model.add_layer() / model.switch()\n'