In [None]:
import torch
from torch.utils.data import DataLoader
from torch import nn
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt


In [None]:
# based on sample code from https://pytorch.org/tutorials/beginner/basics/data_tutorial.html
training_data = datasets.MNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor()
)

test_data = datasets.MNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor()
)



In [None]:
batch_size = 64

# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

In [None]:
# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SAFPooling(nn.Module):
    """
    SAF-Pooling: A pooling mechanism that pools the highest activations 
    and suppresses some randomly to improve robustness.
    """
    def __init__(self, pool_size):
        super(SAFPooling, self).__init__()
        self.pool_size = pool_size
    
    def forward(self, x):
        # Max pooling for highest activations
        x_max = F.max_pool2d(x, kernel_size=self.pool_size, stride=self.pool_size)
        # Random dropout of some activations
        mask = torch.bernoulli(torch.full_like(x_max, 0.9))  # Keep 90% activations
        return x_max * mask

class ConvBlock(nn.Module):
    """
    A convolutional block with Conv -> BatchNorm -> ReLU
    """
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1):
        super(ConvBlock, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        self.bn = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
    
    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.relu(x)
        return x

class SimpNet(nn.Module):
    def __init__(self, num_classes=10, in_channels=1):
        super(SimpNet, self).__init__()
        self.features = nn.Sequential(
            # Group 1
            ConvBlock(in_channels, 64),
            ConvBlock(64, 64),
            SAFPooling(pool_size=2),  # Output: 64x14x14

            # Group 2
            ConvBlock(64, 128),
            ConvBlock(128, 128),
            SAFPooling(pool_size=2),  # Output: 128x7x7

            # Group 3
            ConvBlock(128, 256),
            ConvBlock(256, 256),
            SAFPooling(pool_size=2)   # Output: 256x3x3
        )

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256 * 3 * 3, 512),  # Adjusted input size
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)  # Output layer
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

In [None]:
# Instantiate the model
model = SimpNet(num_classes=10, in_channels=1).to(device)
print(model)

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

In [None]:
def train(dataloader, model, loss_fn, optimizer, best_accuracy, label):
    size = len(dataloader.dataset)
    model.train()

    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
            
    # Evaluate on test set
    test_accuracy, test_loss = test(test_dataloader, model, loss_fn)
    
    # Checkpoint the best model
    if test_accuracy > best_accuracy:
        best_accuracy = test_accuracy
        torch.save(model.state_dict(), f"best_model-{label}.pth")
        print(f"New best model saved with accuracy: {best_accuracy:.1%}")

    return best_accuracy

In [None]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    return correct, test_loss

In [None]:
epochs = 10
best_accuracy = 0
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    best_accuracy = train(train_dataloader, model, loss_fn, optimizer, best_accuracy, 'test')
    test(test_dataloader, model, loss_fn)
    if t % 5 == 0:
        torch.save(model.state_dict(), f"model-{t}.pth")
        print(f"Saved PyTorch Model State to model-{t}")
print("Done!")

In [None]:
model.load_state_dict(torch.load("best_model-test.pth", weights_only=True))

In [None]:
classes = [
    "0",
    "1",
    "2",
    "3",
    "4",
    "5",
    "6",
    "7",
    "8",
    "9",
]

model.eval()
x, y = test_data[0][0], test_data[0][1]
with torch.no_grad():
    x = x.to(device)
    pred = model(x)
    predicted, actual = classes[pred[0].argmax(0)], classes[y]
    print(f'Predicted: "{predicted}", Actual: "{actual}"')

In [None]:
# load a image file
# image file to -> tensor
# get a label from human input
# give the model tensor to make a prediction then say if it was correct

In [None]:
model.load_state_dict(torch.load("best_model-test.pth", weights_only=True))
test(test_dataloader, model, loss_fn)