In [105]:
import torch
import torch.nn as nn
import torchvision
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
import time
from typing import Type, Any, Callable, Union, List, Optional
from torch import Tensor 
if torch.cuda.is_available():
    print("Using GPUs")
    device = torch.device("cuda") 
else:
    device = torch.device("cpu")

Using GPUs


In [106]:
torch.manual_seed(43)

batch_size = 32
# mean, std = (0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261)

transform = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),
#     torchvision.transforms.Normalize(mean, std),
    torchvision.transforms.RandomCrop(32, padding=4, padding_mode='constant'),
    torchvision.transforms.RandomHorizontalFlip(p=0.5)
])

train_set = torchvision.datasets.CIFAR10(root="data", train=True, download=True, transform=transform)
train_size = len(train_set)
test_set = torchvision.datasets.CIFAR10(root="data", train=False, download=True, transform=transform)
test_set, validation_set = torch.utils.data.random_split(test_set, [5000, 5000])
test_size = len(test_set)
validation_size = len(validation_set)


train_loader = torch.utils.data.DataLoader(train_set, batch_size, shuffle=True, num_workers=4, pin_memory=True)
test_loader = torch.utils.data.DataLoader(test_set, batch_size, num_workers=4, pin_memory=True)
validation_loader = torch.utils.data.DataLoader(validation_set, batch_size, num_workers=4, pin_memory=True)

data_loaders = {"train": train_loader, "test": test_loader, "validation": validation_loader}
dataset_sizes = {"train": train_size, "test": test_size, "validation": validation_size}
print(dataset_sizes)

Files already downloaded and verified
Files already downloaded and verified
{'train': 50000, 'test': 5000, 'validation': 5000}


In [107]:
### from https://pytorch.org/hub/pytorch_vision_resnet/

class BasicBlock(nn.Module):

    def __init__(self, inplanes, planes, stride=1, down=False):
        super().__init__()
            
        self.conv1 = nn.Conv2d(inplanes, planes, stride=stride, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(planes)
        
        self.downsample = None
        
        if down:
            self.downsample = nn.Conv2d(inplanes, planes, kernel_size=1, stride=stride)
        

    def forward(self, x: Tensor) -> Tensor:
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        
        if self.downsample is not None:
            identity = self.downsample(x)
        
        out += identity
        out = self.relu(out)
        
        return out

    

class ResNet(nn.Module):
    def __init__(self, model_n, num_classes: int = 10):
        super().__init__()

        self.residual_layers = nn.ModuleList([])
        self.model_n = model_n

        ### begining layers
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding='same')
        self.bn1 = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)
        
        
        ######## ResNet blocks [16, 32, 64]
        ### first block, 16 channels
        for i in range(self.model_n):
            self.residual_layers.append(BasicBlock(16, 16).to(device))
            
        
        ### second block, 32 channels
        for i in range(self.model_n):
            if i == 0:
                self.residual_layers.append(BasicBlock(16, 32, stride=2, down=True).to(device))
            else:
                self.residual_layers.append(BasicBlock(32, 32).to(device))
                
                
        ### third block, 64 channels
        for i in range(self.model_n):
            if i == 0:
                self.residual_layers.append(BasicBlock(32, 64, stride=2, down=True).to(device))
                self.inplanes = 64
            else:
                self.residual_layers.append(BasicBlock(64, 64).to(device))
        
    
        ### output layers
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(64, num_classes)


    def forward(self, x: Tensor) -> Tensor:
        

        ### begining layers
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        ##### ResNet blocks
        for i, layer in enumerate(self.residual_layers):
            x = layer (x)
            
            
        ### output layers
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

    
model = ResNet(model_n=3)
model.to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[80], gamma=0.1)


In [109]:
epochs = 100

for epoch in range(epochs):
    start_time = time.time()
    print("Epoch {}/{}".format(epoch, epochs - 1))
    print("-" * 30)
    
    
    epoch_loss = {"train": 0.0, "validation": 0.0}
    epoch_acc = {"train": 0.0, "validation": 0.0}
    
    running_loss = {"train": 0.0, "validation": 0.0}
    running_corrects = {"train": 0, "validation": 0}
    
    for phase in ["train", "validation"]:
        if phase == "train":
            model.train(True)
        else:
            model.train(False)
        
        for data in data_loaders[phase]:
            inputs, labels = data 
            
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad() # clear all gradients
            
            outputs = model(inputs) # batch_size x num_classes
            _, preds = torch.max(outputs.data, 1) # values, indices
            loss = loss_fn(outputs, labels)
            
            if phase == "train":
                loss.backward()  # compute gradients
                optimizer.step() # update weights/biases
               
            running_loss[phase] += loss.data.item() * inputs.size(0)
            running_corrects[phase] += torch.sum(preds == labels.data).item()
        
        epoch_loss[phase] = running_loss[phase] / dataset_sizes[phase]
        epoch_acc[phase] =  running_corrects[phase] / dataset_sizes[phase]

    # Visualize the loss and accuracy values.
    print({
        'time': np.round(time.time()-start_time, 5),
        'train_loss': np.round(epoch_loss["train"], 5),
        'train_acc': np.round(epoch_acc["train"], 5),
        'val_loss': np.round(epoch_loss["validation"], 5),
        'val_acc': np.round(epoch_acc["validation"], 5),
    })
    
    scheduler.step()


Epoch 0/99
------------------------------
{'time': 27.23867, 'train_loss': 0.90516, 'train_acc': 0.6806, 'val_loss': 0.83081, 'val_acc': 0.7124}
Epoch 1/99
------------------------------
{'time': 29.44841, 'train_loss': 0.76974, 'train_acc': 0.73092, 'val_loss': 0.72863, 'val_acc': 0.7468}
Epoch 2/99
------------------------------
{'time': 28.2496, 'train_loss': 0.68516, 'train_acc': 0.7627, 'val_loss': 0.69881, 'val_acc': 0.7534}
Epoch 3/99
------------------------------
{'time': 32.9939, 'train_loss': 0.6268, 'train_acc': 0.78626, 'val_loss': 0.65467, 'val_acc': 0.7718}
Epoch 4/99
------------------------------
{'time': 31.12182, 'train_loss': 0.58508, 'train_acc': 0.79632, 'val_loss': 0.58427, 'val_acc': 0.796}
Epoch 5/99
------------------------------
{'time': 30.68745, 'train_loss': 0.54633, 'train_acc': 0.80996, 'val_loss': 0.58102, 'val_acc': 0.7996}
Epoch 6/99
------------------------------
{'time': 29.11135, 'train_loss': 0.523, 'train_acc': 0.82008, 'val_loss': 0.5447, 'val_a

In [114]:
### evaluating the model with test set

with torch.no_grad():
    model.eval()
    running_loss = 0
    running_corrects = 0

    for data in test_loader:
        inputs, labels = data 

        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad() # clear all gradients

        outputs = model(inputs) # batch_size x num_classes
        _, preds = torch.max(outputs.data, 1) # values, indices
        loss = loss_fn(outputs, labels)

        running_loss += loss.data.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data).item()

    # Visualize the loss and accuracy values.
    print({
    'time': np.round(time.time()-start_time, 5),
    'test_loss': np.round(running_loss/ dataset_sizes['test'], 5),
    'test_acc': np.round(running_corrects/ dataset_sizes['test'], 5),
    })

{'time': 27321.16112, 'test_loss': 0.39032, 'test_acc': 0.8982}
