In [None]:
import torch
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as f
import torch.utils.data as data
import torch.optim as optim
import numpy as np
from datasets import load_dataset

In [None]:
mnist = load_dataset("mnist")
train, test = mnist.get("train"), mnist.get("test")

In [None]:
train.set_format(type="numpy", columns=["image", "label"])
test.set_format(type="numpy", columns=["image", "label"])
num_train_samples = 10000
num_test_samples = 1000

train_indices = np.random.choice(num_train_samples, num_train_samples, replace=False)
test_indices = np.random.choice(num_test_samples, num_test_samples, replace=False)
train = train.rename_column("image", "input").select(train_indices)
test = test.rename_column("image", "input").select(test_indices)

In [None]:
def preprocess(example):
    arr = np.reshape(example["input"], -1)
    example["input"] = arr
    return example

train = train.map(preprocess, num_proc=2)
test = test.map(preprocess, num_proc=2)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
train_inputs = torch.from_numpy(train["input"]).float().squeeze()
test_inputs = torch.from_numpy(test["input"]).float().squeeze()
train_labels = torch.from_numpy(train["label"]).long()
test_labels = torch.from_numpy(test["label"]).long()


In [None]:
train_dataset = data.TensorDataset(train_inputs, train_labels)
test_dataset = data.TensorDataset(test_inputs, test_labels)

In [None]:
class Model(nn.Module):

    def __init__(self):
        super().__init__()
        self.linear_1 = nn.Linear(28 * 28, 512)
        self.norm_1 = nn.LayerNorm(512)
        self.drop_1 = nn.Dropout(p=0.4)
        self.linear_2 = nn.Linear(512, 512)
        self.norm_2 = nn.LayerNorm(512)
        self.drop_2 = nn.Dropout(p=0.2)
        self.linear_3 = nn.Linear(512, 512)
        self.norm_3 = nn.LayerNorm(512)
        self.linear_4 = nn.Linear(512, 10)
        

    def forward(self, x):
        x = self.drop_1(f.relu(self.norm_1(self.linear_1(x))))
        x = self.drop_2(f.relu(self.norm_2(self.linear_2(x))))
        x = f.relu(self.norm_3(self.linear_3(x)))
        out = self.linear_4(x)
        return out


In [None]:
def create_dataloader(dataset, batch_size):
    return data.DataLoader(dataset, batch_size=batch_size, shuffle=True, drop_last=False, num_workers=2, pin_memory=True)

def train(model, optimizer, train_dataloader, test_dataloader, epochs=10, device=None, verbose=False):
    if verbose:
        print("Training has started")
        
    model = model.to(device)
    loss_fn = nn.CrossEntropyLoss()
    train_losses = []
    test_losses = []
    
    for epoch in range(epochs):
        train_loss = 0
        model.train()
        
        for inputs, labels in train_dataloader:
            inputs, labels = inputs.to(device, non_blocking=True), labels.to(device, non_blocking=True)
    
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        test_loss = 0
        model.eval()
        with torch.no_grad():
            for inputs, labels in test_dataloader:
                inputs, labels = inputs.to(device, non_blocking=True), labels.to(device, non_blocking=True)
                
                outputs = model(inputs)
                loss = loss_fn(outputs, labels)
                test_loss += loss.item()

        train_loss /= len(train_dataloader)
        test_loss /= len(test_dataloader)
        train_losses.append(train_loss)
        test_losses.append(test_loss)
        if verbose and epoch and ((epoch + 1) % int(epochs * 0.25) == 0):
            print(f"Epoch {epoch + 1} complete, train loss: {train_loss:.3f}, test loss: {test_loss:.3f}")

    if verbose:
        print("Training is complete")

    return train_losses, test_losses

In [None]:
def plot_loss(train_loss, test_loss, path):
    epochs = range(1, len(train_loss) + 1)

    plt.figure(figsize=(10, 5))
    plt.plot(epochs, train_loss, label="Training Loss", color="dodgerblue")
    plt.plot(epochs, test_loss, label="Testing Loss", color="red")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend(loc="upper left")
    plt.title("Loss vs. Epoch")
    plt.style.use("dark_background")
    plt.savefig(path, dpi=300)
    plt.show()


## Batch Gradient Descent

In [None]:
train_dataloader = create_dataloader(train_dataset, batch_size=len(train_dataset))
test_dataloader = create_dataloader(test_dataset, batch_size=len(test_dataset))

model = Model()
optimizer = optim.SGD(model.parameters(), lr=1e-1, momentum=0)

In [None]:
print(f"Parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad)}")

In [None]:
train_loss, test_loss = train(model, optimizer, train_dataloader, test_dataloader, epochs=100, device=device, verbose=True)

In [None]:
plot_loss(train_loss, test_loss, "./batch_metrics.png")

## Mini-Batch Gradient Descent

In [None]:
train_dataloader = create_dataloader(train_dataset, batch_size=512)
test_dataloader = create_dataloader(test_dataset, batch_size=len(test_dataset))

model = Model()
optimizer = optim.SGD(model.parameters(), lr=1e-1, momentum=0)

In [None]:
train_loss, test_loss = train(model, optimizer, train_dataloader, test_dataloader, epochs=100, device=device, verbose=True)

In [None]:
plot_loss(train_loss, test_loss, "./mini_batch_metrics.png")

## Stochastic Gradient Descent

In [None]:
train_dataloader = create_dataloader(train_dataset, batch_size=1)
test_dataloader = create_dataloader(test_dataset, batch_size=len(test_dataset))

model = Model()
optimizer = optim.SGD(model.parameters(), lr=1e-1, momentum=0)

In [None]:
train_loss, test_loss = train(model, optimizer, train_dataloader, test_dataloader, epochs=100, device=device, verbose=True)

In [None]:
plot_loss(train_loss, test_loss, "./stochastic_metrics.png")