In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from tqdm import tqdm
from pathlib import Path
from google.colab import files

torch.manual_seed(42)

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

In [None]:
class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()

        self.conv1 = nn.Conv2d(
            in_channels, out_channels, kernel_size=3,
            stride=stride, padding=1, bias=False
        )
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu1 = nn.ReLU(inplace=True)

        self.conv2 = nn.Conv2d(
            out_channels, out_channels, kernel_size=3,
            stride=1, padding=1, bias=False
        )
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Identity()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(
                    in_channels, out_channels,
                    kernel_size=1, stride=stride, bias=False
                ),
                nn.BatchNorm2d(out_channels)
            )

        self.relu2 = nn.ReLU(inplace=True)

    def forward(self, x):
        identity = self.shortcut(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu1(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out += identity
        out = self.relu2(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super().__init__()

        self.in_channels = 16
        channels = [16, 32, 64]

        self.conv1 = nn.Conv2d(
            3, 16, kernel_size=3, stride=1, padding=1, bias=False
        )
        self.bn1 = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)

        self.layer1 = self._make_layer(block, channels[0], num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, channels[1], num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, channels[2], num_blocks[2], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(channels[-1], num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        layers = [block(self.in_channels, out_channels, stride)]
        self.in_channels = out_channels

        for _ in range(1, num_blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x


def ResNet18(num_classes=10):
    return ResNet(BasicBlock, [2, 2, 2], num_classes)


def ResNet34(num_classes=10):
    return ResNet(BasicBlock, [6, 6, 6], num_classes)


def ResNet50(num_classes=10):
    return ResNet(BasicBlock, [9, 9, 9], num_classes)

In [None]:
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=(0.4914, 0.4822, 0.4465),
        std=(0.2023, 0.1994, 0.2010)
    )
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(
        mean=(0.4914, 0.4822, 0.4465),
        std=(0.2023, 0.1994, 0.2010)
    )
])

train_set = datasets.CIFAR10("data", train=True, download=True, transform=transform_train)
test_set  = datasets.CIFAR10("data", train=False, download=True, transform=transform_test)

train_loader = DataLoader(train_set, batch_size=128, shuffle=True, num_workers=2)
test_loader  = DataLoader(test_set, batch_size=256, shuffle=False, num_workers=2)

In [None]:
def train_one_epoch(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0.0

    for x, y in tqdm(loader, leave=False):
        x, y = x.to(device), y.to(device)

        optimizer.zero_grad()
        out = model(x)
        loss = criterion(out, y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(loader)


@torch.no_grad()
def evaluate(model, loader):
    model.eval()
    correct = 0
    total = 0

    for x, y in loader:
        x, y = x.to(device), y.to(device)
        pred = model(x).argmax(dim=1)
        correct += (pred == y).sum().item()
        total += y.size(0)

    return 100.0 * correct / total

In [None]:
model = ResNet18().to(device)

optimizer = optim.SGD(
    model.parameters(),
    lr=0.1,
    momentum=0.9,
    weight_decay=5e-4
)

scheduler = optim.lr_scheduler.MultiStepLR(
    optimizer,
    milestones=[100, 150],
    gamma=0.1
)

EPOCHS = 150

criterion = nn.CrossEntropyLoss()

for epoch in range(EPOCHS):
    loss = train_one_epoch(model, train_loader, optimizer, criterion)
    acc = evaluate(model, test_loader)
    scheduler.step()

    print(
        f"[ResNet18] Epoch {epoch+1:03d} | "
        f"Loss {loss:.3f} | Acc {acc:.2f}%"
    )

checkpoint = {
    "model": "ResNet18-CIFAR10",
    "epochs": EPOCHS,
    "test_acc": acc,
    "state_dict": model.state_dict()
}

torch.save(checkpoint, "resnet18_cifar10.pt")
print("Saved resnet18_cifar10.pt")
files.download("resnet18_cifar10.pt")

In [None]:
model = ResNet34().to(device)

optimizer = optim.SGD(
    model.parameters(),
    lr=0.1,
    momentum=0.9,
    weight_decay=5e-4
)

scheduler = optim.lr_scheduler.MultiStepLR(
    optimizer,
    milestones=[100, 150],
    gamma=0.1
)

EPOCHS = 150

criterion = nn.CrossEntropyLoss()

for epoch in range(EPOCHS):
    loss = train_one_epoch(model, train_loader, optimizer, criterion)
    acc = evaluate(model, test_loader)
    scheduler.step()

    print(
        f"[ResNet34] Epoch {epoch+1:03d} | "
        f"Loss {loss:.3f} | Acc {acc:.2f}%"
    )

checkpoint = {
    "model": "ResNet34-CIFAR10",
    "epochs": EPOCHS,
    "test_acc": acc,
    "state_dict": model.state_dict()
}

torch.save(checkpoint, "resnet34_cifar10.pt")
print("Saved resnet34_cifar10.pt")
files.download("resnet34_cifar10.pt")

In [None]:
model = ResNet50().to(device)

optimizer = optim.SGD(
    model.parameters(),
    lr=0.1,
    momentum=0.9,
    weight_decay=5e-4
)

scheduler = optim.lr_scheduler.MultiStepLR(
    optimizer,
    milestones=[100, 150],
    gamma=0.1
)

EPOCHS = 150

criterion = nn.CrossEntropyLoss()

for epoch in range(EPOCHS):
    loss = train_one_epoch(model, train_loader, optimizer, criterion)
    acc = evaluate(model, test_loader)
    scheduler.step()

    print(
        f"[ResNet50] Epoch {epoch+1:03d} | "
        f"Loss {loss:.3f} | Acc {acc:.2f}%"
    )

checkpoint = {
    "model": "ResNet50-CIFAR10",
    "epochs": EPOCHS,
    "test_acc": acc,
    "state_dict": model.state_dict()
}

torch.save(checkpoint, "resnet50_cifar10.pt")
print("Saved resnet50_cifar10.pt")
files.download("resnet50_cifar10.pt")