### Import necessary libraries

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
import torch.optim.lr_scheduler as lr_scheduler
from tqdm import tqdm

#### hyperparameters

In [2]:
in_channels = 3
num_classes = 10
learning_rate = 0.001
batch_size = 32
num_epochs = 5
load_model = False

### Architecture

In [3]:
class ConvolutionNeuralNetwork(nn.Module):
    def __init__(self, in_channels = None,out_channels = None):
        super(ConvolutionNeuralNetwork, self). __init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(32))
        
        self.layer2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, stride=1, padding=1, bias=False),
            nn.ReLU())
        
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.layer3 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(64))
        
        self.layer4 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=0, bias=False))
        
        self.layer5 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1, bias=False),
            nn.ReLU())
        
        self.avgpool = nn.AvgPool2d(7)
        self.dropout = nn.Dropout(0.5)
        
        self.fc1 = nn.Linear(128 * 2 * 2, num_classes)
      
        
    def forward(self, x):
        x = F.relu(self.layer1(x)) 
        x = self.pool1(self.layer2(x))
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.dropout(self.layer5(x))
        x = self.avgpool(x)
        x = x.view(-1, 128 *2 * 2)
        x = self.fc1(x)
        return x

### Datasets and preprocessing

In [4]:
class CIFAR10DataLoader(torch.utils.data.Dataset):
    def __init__(self, root, train=True, transform=None):
        self.transform = transform
        self.cifar10 = CIFAR10(root=root, train = train, download = True)
        
    def __getitem__(self, index):
        img, label = self.cifar10[index]
        
        if self.transform is not None:
            img = self.transform(img)
        return img, label
    
    def __len__(self):
        return len(self.cifar10)
        

In [5]:
# the Normalize has 3 values because the CIFAR10 has 3 channels, if we were dealing with a 1 channel dataset we would have just 1 value
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32),
    transforms.ToTensor(),
    transforms.Normalize(mean = [0.5, 0.5, 0.5], std = [0.5, 0.5, 0.5])
])

In [6]:
test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean = [0.5, 0.5, 0.5], std = [0.5, 0.5, 0.5])
])

In [7]:
train_dataset = CIFAR10DataLoader(root='C:/Users/personal/Documents/CIFAR10', train=True, transform=train_transform)
test_dataset = CIFAR10DataLoader(root = 'C:/Users/personal/Documents/CIFAR10', train=False, transform=test_transform)

Files already downloaded and verified
Files already downloaded and verified


In [8]:
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=True)

In [9]:
dataiter = iter(train_dataloader)
images, labels = next(dataiter)

In [10]:
print(images[0].shape)

torch.Size([3, 32, 32])


In [11]:
images.shape

torch.Size([32, 3, 32, 32])

#### Instantiate a Neural Network

In [12]:
model = ConvolutionNeuralNetwork()

#### set device

In [13]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

ConvolutionNeuralNetwork(
  (layer1): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (layer2): Sequential(
    (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): ReLU()
  )
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (layer3): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (layer4): Sequential(
    (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), bias=False)
  )
  (layer5): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): ReLU()
  )
  (avgpool): AvgPool2d(kernel_size=7, stride=7, padding=0)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc1): Linear(i

### Loss Function and Optimizer

In [14]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=1, T_mult=1, eta_min=0.001)

### Checkpoint Loading

In [15]:
def load_checkpoint(checkpoint):
    model.load_state_dict(checkpoint['state_dict'])
    optimizer.load_state_dict(checkpoint['state_dict'])
    epoch.load_state_dict(checkpoint['state_dict'])

### Training loop

The learning rate scheduler adjusts the learning rate over time to improe the training performance

In [16]:
for epoch in range(num_epochs):
    running_loss = 0.0
    print(f'Epoch [{epoch+1}/{num_epochs}]')
    
    if epoch%2 == 0:
        checkpoint = {
            'state_dict' : model.state_dict(), 
            'optimizer' : optimizer.state_dict(),
            'epoch': epoch
        }
        torch.save(checkpoint, 'C:/Users/personal/Documents/my_checkpoint.pth')
        
        
    for batch_idx, (inputs, labels) in tqdm(enumerate(train_dataloader), total = len(train_dataloader)):
        #get data to cuda if possible
        inputs = inputs.to(device)
        labels = labels.to(device)

        # forward
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        running_loss += loss.item()

        # backward
        optimizer.zero_grad()
        loss.backward()

        # gradient descent or adam step
        optimizer.step()
        # update the learning rate scheduler
        scheduler.step()
    epoch_loss = running_loss / len(train_dataloader)
    print('Epoch {} loss: {:.3f}'.format(epoch + 1, epoch_loss))

Epoch [1/5]


100%|██████████| 1563/1563 [03:29<00:00,  7.45it/s]


Epoch 1 loss: 1.247
Epoch [2/5]


100%|██████████| 1563/1563 [03:13<00:00,  8.07it/s]


Epoch 2 loss: 0.925
Epoch [3/5]


100%|██████████| 1563/1563 [03:08<00:00,  8.29it/s]


Epoch 3 loss: 0.818
Epoch [4/5]


100%|██████████| 1563/1563 [03:11<00:00,  8.18it/s]


Epoch 4 loss: 0.754
Epoch [5/5]


100%|██████████| 1563/1563 [03:10<00:00,  8.19it/s]

Epoch 5 loss: 0.705





#### Set model to eval to test

In [17]:
model.eval()

ConvolutionNeuralNetwork(
  (layer1): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (layer2): Sequential(
    (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): ReLU()
  )
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (layer3): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (layer4): Sequential(
    (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), bias=False)
  )
  (layer5): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): ReLU()
  )
  (avgpool): AvgPool2d(kernel_size=7, stride=7, padding=0)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc1): Linear(i

##### initialize

In [18]:
test_loss = 0
correct = 0
total = 0

### Test loop/accuracy

In [19]:
with torch.no_grad():
    for inputs, labels in test_dataloader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        test_loss += loss.item() * inputs.size(0)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum()
        
test_loss /= len(test_dataloader.dataset)
accuracy = correct / total

print('Test Loss: {:.3f} | Accuracy: {:.3f}'.format(test_loss, accuracy))

Test Loss: 0.694 | Accuracy: 0.764
