# HomeWork 7 Problem 1A and 1B

# Matthew Anderson | 801203905

# Github https://github.com/Mand187/Intro-to-ML


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from sklearn.metrics import f1_score, confusion_matrix
import time

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cpu


In [3]:
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32,padding =4),
    transforms.ToTensor(),
    transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
])

In [4]:
train_dataset = datasets.CIFAR10(root='./data', train=True, transform=transforms.ToTensor(), download=True)
test_dataset = datasets.CIFAR10(root='./data', train=False, transform=transforms.ToTensor(), download=True)

Files already downloaded and verified
Files already downloaded and verified


In [5]:
batch_size = 64
train_Loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_Loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# Defining the CNN Model
class Cnn(nn.Module):
    def __init__(self):
        super(Cnn, self).__init__()
        self.fc1 = nn.Linear(64 * 8 * 8, 512)
        self.fc2 = nn.Linear(512, 10)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.conv1 = nn.Conv2d(3, 32, kernelSize=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernelSize=3, padding=1)
        self.pool = nn.MaxPool2d(kernelSize=2, stride=2)

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = x.view(-1, 64 * 8 * 8)
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

def trainModel(model, criterion, optimizer, trainLoader, valLoader, nEpochs=200, patience=300):
    trainLosses = []
    valLosses = []
    bestValLoss = float('inf')
    epochsNoImprove = 0

    for epoch in range(1, nEpochs + 1):
        model.train()
        runningLoss = 0.0
        for inputs, targets in trainLoader:
            inputs, targets = inputs.to(device), targets.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, targets)

            optimizer.zeroGrad()
            loss.backward()
            optimizer.step()

            runningLoss += loss.item()

        avgTrainLoss = runningLoss / len(trainLoader)
        trainLosses.append(avgTrainLoss)

        valLoss, valAccuracy = evaluateModel(model, valLoader, returnLoss=True)

        if valLoss < bestValLoss:
            bestValLoss = valLoss
            epochsNoImprove = 0
        else:
            epochsNoImprove += 1
            if epochsNoImprove >= patience:
                print(f"Early stopping triggered at epoch {epoch}")
                break

        valLosses.append(valLoss)

        if epoch % 10 == 0 or epoch == 1 or epoch == nEpochs:
            print(f"Epoch {epoch}/{nEpochs}, Training Loss: {avgTrainLoss:.4f}")

    return trainLosses, valLosses

def evaluateModel(model, loader, returnLoss=False):
    model.eval()
    totalLoss = 0.0
    correct = 0
    total = 0
    criterion = nn.CrossEntropyLoss()
    trueLabels = []
    predictedLabels = []

    with torch.noGrad():
        for inputs, targets in loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            if returnLoss:
                totalLoss += criterion(outputs, targets).item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == targets).sum().item()
            total += targets.size(0)
            trueLabels.extend(targets.cpu().numpy())
            predictedLabels.extend(predicted.cpu().numpy())

    accuracy = 100 * correct / total
    if returnLoss:
        averageLoss = totalLoss / len(loader)
        return averageLoss, accuracy
    else:
        return accuracy


In [None]:
model = Cnn().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

n_epochs = 200
train_losses, val_losses = trainModel(model, criterion, optimizer, train_Loader, test_Loader, n_epochs)

test_accuracy = evaluateModel(model, test_Loader)
print(f"Test Accuracy: {test_accuracy:.2f}%")

Epoch 1/200, Training Loss: 1.4740
Epoch 10/200, Training Loss: 0.4993
Epoch 20/200, Training Loss: 0.2267
Epoch 30/200, Training Loss: 0.1598
Epoch 40/200, Training Loss: 0.1323
Epoch 50/200, Training Loss: 0.1071
Epoch 60/200, Training Loss: 0.0996
Epoch 70/200, Training Loss: 0.0962
Epoch 80/200, Training Loss: 0.0853
Epoch 90/200, Training Loss: 0.0785
Epoch 100/200, Training Loss: 0.0770
Epoch 110/200, Training Loss: 0.0750
Epoch 120/200, Training Loss: 0.0711
Epoch 130/200, Training Loss: 0.0646
Epoch 140/200, Training Loss: 0.0677
Epoch 150/200, Training Loss: 0.0632
Epoch 160/200, Training Loss: 0.0577
Epoch 170/200, Training Loss: 0.0648
Epoch 180/200, Training Loss: 0.0628
Epoch 190/200, Training Loss: 0.0622
Epoch 200/200, Training Loss: 0.0626
Test Accuracy: 73.45%


# Problem 1B

In [8]:
# Load CIFAR-10 Dataset
train_dataset = datasets.CIFAR10(root='./data', train=True, transform=transform, download=True)
test_dataset = datasets.CIFAR10(root='./data', train=False, transform=transform, download=True)

# Create DataLoaders
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

Files already downloaded and verified
Files already downloaded and verified


In [9]:
for inputs, targets in train_loader:
    print(f"Sample Input Shape: {inputs.shape}")  # Expected: [batch_size, 3, 32, 32]
    print(f"Sample Target Shape: {targets.shape}")  # Expected: [batch_size]
    break

Sample Input Shape: torch.Size([64, 3, 32, 32])
Sample Target Shape: torch.Size([64])


In [None]:
def createCnnModel():
    """Defines and returns a CNN model."""
    class Cnn(nn.Module):
        def __init__(self):
            super(Cnn, self).__init__()
            self.fc1 = nn.Linear(64 * 8 * 8, 512)
            self.fc2 = nn.Linear(512, 10)
            self.relu = nn.ReLU()
            self.dropout = nn.Dropout(0.5)
            self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
            self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
            self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        def forward(self, x):
            x = self.pool(self.relu(self.conv1(x)))
            x = self.pool(self.relu(self.conv2(x)))
            x = x.view(-1, 64 * 8 * 8)
            x = self.relu(self.fc1(x))
            x = self.dropout(x)
            x = self.fc2(x)
            return x

    return Cnn()

def trainModel(model, criterion, optimizer, trainLoader, valLoader, nEpochs=200, patience=300):
    """Trains the CNN model with early stopping."""
    trainLosses, valLosses = [], []
    bestValLoss, epochsNoImprove = float('inf'), 0

    for epoch in range(1, nEpochs + 1):
        model.train()
        avgTrainLoss = processBatch(trainLoader, model, criterion, optimizer)
        trainLosses.append(avgTrainLoss)

        valLoss, valAccuracy = evaluateModel(model, valLoader, returnLoss=True)
        valLosses.append(valLoss)

        if valLoss < bestValLoss:
            bestValLoss, epochsNoImprove = valLoss, 0
        else:
            epochsNoImprove += 1
            if epochsNoImprove >= patience:
                print(f"Early stopping triggered at epoch {epoch}")
                break

        if epoch % 10 == 0 or epoch in {1, nEpochs}:
            print(f"Epoch {epoch}/{nEpochs}, Training Loss: {avgTrainLoss:.4f}, Validation Loss: {valLoss:.4f}")

    return trainLosses, valLosses

def processBatch(dataLoader, model, criterion, optimizer):
    """Processes a single batch of data during training."""
    runningLoss = 0.0
    for inputs, targets in dataLoader:
        inputs, targets = inputs.to(device), targets.to(device)

        outputs = model(inputs)
        loss = criterion(outputs, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        runningLoss += loss.item()

    return runningLoss / len(dataLoader)

def evaluateModel(model, loader, returnLoss=False):
    """Evaluates the model and optionally returns loss and accuracy."""
    model.eval()
    totalLoss, correct, total = 0.0, 0, 0
    criterion = nn.CrossEntropyLoss()

    with torch.no_grad():
        for inputs, targets in loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)

            if returnLoss:
                totalLoss += criterion(outputs, targets).item()

            _, predicted = torch.max(outputs, 1)
            correct += (predicted == targets).sum().item()
            total += targets.size(0)

    accuracy = 100 * correct / total
    return (totalLoss / len(loader), accuracy) if returnLoss else accuracy


In [None]:
# Train and Test Extended CNN
extended_model = createCnnModel().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(extended_model.parameters(), lr=0.001)

print("\nTraining Extended CNN...")
train_losses, val_losses = trainModel(extended_model, criterion, optimizer, train_loader, test_loader, n_epochs=200)

extended_test_accuracy = evaluateModel(extended_model, test_loader)
print(f"Extended CNN Test Accuracy: {extended_test_accuracy:.2f}%")
print("\nEvaluating Extended CNN...")
avg_loss, test_accuracy, f1 = evaluate_model(extended_model, test_loader)


Training Extended CNN...
Evaluation Results:
Average Loss: 1.3343
Accuracy: 52.23%
F1 Score: 0.5122


ValueError: not enough values to unpack (expected 4, got 3)