<a href="https://colab.research.google.com/github/armanheydari/Advance-Deep-Learning_Winter-2024/blob/main/Assignment3/cmpt489_828_a3_q2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**CMPT 489/828 Assignment 3**

Follow the instructions in this notebook and complete the missing code.

**NOTE: Do Not Change Any Provided Code or Given Variable Names!**


Feel free to reuse any code from previous assignments.

Use torch.nn module to implement your network.

Use the validation set to find the best hyperparameters.

In [13]:
# TODO: your imports
import torch
from torch import nn
from torch.nn import functional as F
import torchvision
from torchvision import transforms
import torchvision.models as models
from torchvision.models import ResNet18_Weights

# select gpu if possible
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [14]:
def create_data_loader(batch_size):
  # TODO: create your data loader, use entire CIFAR-10
  # Split the provided CIFAR-10 train set (50,000 images) into your train and val sets
  # Use the first 40,000 images as your train set and the remaining 10,000 images as val set
  # Use all 10,000 images in the provided test set as your test set

  train_id = list(range(40000))
  val_id = list(range(40000, 50000))
  test_id = list(range(10000))

  # convert to tensor, normalize
  transform = transforms.Compose([
      transforms.ToTensor(),
      transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2470, 0.2435, 0.2616)),
  ])
  # load CIFAR-10 dataset with pytorch
  trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
  testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

  # subset dataset and create dataloader with batch_size
  train_sub_set = torch.utils.data.Subset(trainset, train_id)
  val_sub_set = torch.utils.data.Subset(trainset, val_id)
  test_sub_set = torch.utils.data.Subset(testset, test_id)

  train_loader = torch.utils.data.DataLoader(train_sub_set, batch_size=batch_size, shuffle=True)
  val_loader = torch.utils.data.DataLoader(val_sub_set, batch_size=batch_size, shuffle=True)
  test_loader = torch.utils.data.DataLoader(test_sub_set, batch_size=batch_size, shuffle=True)

  return train_loader, val_loader, test_loader

In [16]:
class ResidualBlock(nn.Module):
  # TODO: implement a residual block with skip connection
  def __init__(self, in_channels, out_channels):
    super(ResidualBlock, self).__init__()
    self.stride = int(out_channels/in_channels)
    self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=(3, 3), stride=(self.stride, self.stride), padding=(1, 1), bias=False)
    self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    self.relu = nn.ReLU()
    self.bn = nn.BatchNorm2d(out_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    self.downsample = None
    if out_channels>in_channels:
      self.downsample = nn.Sequential(nn.Conv2d(in_channels, out_channels, kernel_size=(1, 1), stride=(self.stride, self.stride), bias=False),
                                     nn.BatchNorm2d(out_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True))

  def forward(self, x):
    residual_value = x.clone()
    if self.downsample is not None:
      residual_value = self.downsample(residual_value)
    x = self.conv1(x)
    x = self.bn(x)
    x = self.relu(x)
    x = self.conv2(x)
    x = self.bn(x)
    x = x + residual_value
    return x

In [17]:
class ResNet(nn.Module):
  # TODO: implement a ResNet with num_block residual blocks
  def __init__(self, num_block):
    super(ResNet, self).__init__()
    self.conv1 = nn.Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    self.bn1 = nn.BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    self.relu = nn.ReLU(inplace=True)
    self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    self.residual_blocks = nn.ModuleList()
    for i in range(num_block):
      out_channels = int(2**(6+i))
      if i==0:
        in_channels = out_channels
      else:
        in_channels = int(out_channels/2)
      self.residual_blocks.append(nn.Sequential(ResidualBlock(in_channels, out_channels), ResidualBlock(out_channels, out_channels)))
    self.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
    self.fc = nn.Linear(in_features=512, out_features=10, bias=True)

  def forward(self, x):
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)
    x = self.maxpool(x)
    for rb in self.residual_blocks:
      x = rb(x)
    x = self.avgpool(x)
    x = torch.flatten(x, 1)  # Flatten the tensor
    x = self.fc(x)
    return x


In [18]:
def init_train_var(model):
  # TODO: create your criterion, optimizer (use SGD)
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.SGD(model.parameters())
  return criterion, optimizer

In [22]:
def train(train_loader, val_loader, batch_size):
  # TODO: implement your training loop, save your best model at the end
  model = ResNet(4)
  model.to(device)
  # print(model)
  criterion, optimizer = init_train_var(model)

  epochs = 100
  val_accuracy = 0
  for epoch in range(epochs):
    model.train()
    train_correct, val_correct = 0, 0
    for x, y in train_loader:
      x = x.to(device)
      y = y.to(device)
      optimizer.zero_grad()
      prediction = model.forward(x)
      loss = criterion(prediction, y)
      loss.backward()
      optimizer.step()
      train_correct += (prediction.argmax(1) == y).type(torch.float).sum().item()

    # switch off autograd for validation
    with torch.no_grad():
      # set the model in evaluation mode
      model.eval()
      for x, y in val_loader:
        x = x.to(device)
        y = y.to(device)
        prediction = model.forward(x)
        val_correct += (prediction.argmax(1) == y).type(torch.float).sum().item()
    # calculate the training and validation accuracy
    train_accuracy = train_correct / len(train_loader.dataset)
    val_accuracy_new = val_correct / len(val_loader.dataset)
    print(f'In epoch {epoch+1} the training accuracy is {train_accuracy} and the validation accuracy is {val_accuracy_new}')
    if val_accuracy>val_accuracy_new and train_accuracy>0.95:
      # Save entire model
      model_path = 'Resnet18_Cifar10_Arman.pth'
      torch.save(model, model_path)
      break
    val_accuracy = val_accuracy_new
  return model_path

In [20]:
def test(model_path, test_loader):
  # TODO: test function for your trained model , load your best model
  model = torch.load(model_path)
  # switch off autograd for validation
  with torch.no_grad():
    # set the model in evaluation mode
    model.eval()
    test_correct = 0
    for x, y in test_loader:
      x = x.to(device)
      y = y.to(device)
      prediction = model.forward(x)
      test_correct += (prediction.argmax(1) == y).type(torch.float).sum().item()
  # calculate the training and validation accuracy
  test_accuracy = test_correct / len(test_loader.dataset)
  print(f"model's test accuracy is {test_accuracy}")

In [23]:
# This is to check the model archtecture
print("True Resnet:")
print(models.resnet18(weights=ResNet18_Weights.IMAGENET1K_V1))
print("\n----------------------------------------------------------------------\n")
print("Implemented model:")
print(ResNet(4))

True Resnet:
ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inp

In [24]:
# TODO: train your model and save the best model
batch_size = 100
train_loader, val_loader, test_loader = create_data_loader(batch_size)
model_path = train(train_loader, val_loader, batch_size)

Files already downloaded and verified
Files already downloaded and verified
In epoch 1 the training accuracy is 0.44345 and the validation accuracy is 0.2721
In epoch 2 the training accuracy is 0.5778 and the validation accuracy is 0.2863
In epoch 3 the training accuracy is 0.642925 and the validation accuracy is 0.3215
In epoch 4 the training accuracy is 0.6967 and the validation accuracy is 0.3067
In epoch 5 the training accuracy is 0.7494 and the validation accuracy is 0.3135
In epoch 6 the training accuracy is 0.79745 and the validation accuracy is 0.3267
In epoch 7 the training accuracy is 0.845 and the validation accuracy is 0.2852
In epoch 8 the training accuracy is 0.8923 and the validation accuracy is 0.2955
In epoch 9 the training accuracy is 0.927625 and the validation accuracy is 0.3148
In epoch 10 the training accuracy is 0.95755 and the validation accuracy is 0.3074


In [25]:
# TODO: report test performance of best model
test(model_path, test_loader)

model's test accuracy is 0.3093
