Imports + dataset + loaders

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

from torchvision import transforms, datasets
from torch.utils.data import DataLoader

transform = transforms.Compose([transforms.Resize((32, 32)), transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])

train_dataset = datasets.MNIST(".", train=True, download=True, transform=transform)
test_dataset  = datasets.MNIST(".", train=False, download=True, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)

VGG11 Architecture

In [2]:
class VGG11(nn.Module):
  def __init__(self):
    super().__init__()

    #Layers in the feature extraction
    @staticmethod
    def feature_extraction (input, output, pool=False):
      layers = [nn.Conv2d(input, output, kernel_size=3, padding=1),
                nn.BatchNorm2d(output),
                nn.ReLU()]

      if pool:
        layers.append(nn.MaxPool2d(2))
      return nn.Sequential(*layers)

    #feature exctraction following VGG11 architecure
    self.features = nn.Sequential(
          feature_extraction(1, 64, pool=True),   # → 16×16
          feature_extraction(64, 128, pool=True),   # → 8×8
          feature_extraction(128, 256, pool=False),  # → 8×8
          feature_extraction(256, 256, pool=True),   # → 4×4
          feature_extraction(256, 512, pool=False),  # → 4×4
          feature_extraction(512, 512, pool=True),   # → 2×2
          feature_extraction(512, 512, pool=False),  # → 2×2
          feature_extraction(512, 512, pool=True)    # → 1×1
    )

    #classification with fully connected layers and dropout
    self.classifier = nn.Sequential(
          nn.Flatten(),
          nn.Linear(512, 4096), nn.ReLU(), nn.Dropout(0.5),
          nn.Linear(4096, 4096), nn.ReLU(), nn.Dropout(0.5),
          nn.Linear(4096, 10)
    )

  #Forward propogation
  def forward (self, x):
    x = self.features(x)
    x = self.classifier(x)
    return x


Model + Loss Function + update rule

In [3]:
#Sets device to GPU is available otherwise uses CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#Initializes model to use
model = VGG11().to(device)

#loss function
loss_function = nn.CrossEntropyLoss()

#update rule
update_rule = optim.SGD(model.parameters(), lr=1e-3, momentum=0.9)

Training + predicting + output

In [7]:
def run(train_loader, test_loader, model, loss_function, update_rule):
    max_epochs = 10
    data = {
        "train_loss": [], "train_accuracy": [],
        "test_loss":  [], "test_accuracy":  []
    }

    # Trains and predicts for max_epochs
    for epoch in range(0, max_epochs):
        # ---------------------- Training -----------------------------------
        model.train()
        total_loss = correct = total = 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            update_rule.zero_grad()  # Sets gradients to zero

            # Forward -> loss -> backward -> update
            outputs = model(images)
            loss = loss_function(outputs, labels)
            loss.backward()
            update_rule.step()

            total_loss += loss.item() * labels.size(0)
            predictions = outputs.argmax(1)
            correct += (predictions == labels).sum().item()
            total += labels.size(0)

        train_loss = total_loss / total
        train_accuracy = 100 * correct / total
        data["train_loss"].append(train_loss)
        data["train_accuracy"].append(train_accuracy)

        # ---------------------- Prediction --------------------------------
        model.eval()
        total_loss = correct = total = 0
        with torch.no_grad():  # No gradients needed
            for images, labels in test_loader:
                images, labels = images.to(device), labels.to(device)

                # Forward -> compute loss
                outputs = model(images)
                loss = loss_function(outputs, labels)

                total_loss += loss.item() * labels.size(0)
                predictions = outputs.argmax(1)
                correct += (predictions == labels).sum().item()
                total += labels.size(0)

        test_loss = total_loss / total
        test_accuracy = 100 * correct / total
        data["test_loss"].append(test_loss)
        data["test_accuracy"].append(test_accuracy)

        print(f"Epoch {epoch}/{max_epochs} done")



In [None]:
#Q3.1,2,3
run(train_loader, test_loader, model, loss_function, update_rule)