In [1]:
import sys
sys.path.append('..')

In [2]:
import os
import torch 
from torch import nn
from torchvision import datasets, transforms
from torchvision.transforms import ToTensor
from torch.utils.data import DataLoader
from torchmetrics import ConfusionMatrix
from mlxtend.plotting import plot_confusion_matrix
from typing import Callable
from tqdm import tqdm
from timeit import default_timer as timer
from matplotlib import pyplot as plt
from CommonFunctions import *

In [3]:
data_transform = transforms.Compose([transforms.Resize((28, 28)), transforms.ToTensor()])
train_data = datasets.MNIST(
    root='./data', train=True, download=True, transform=data_transform, target_transform=None
)
test_data = datasets.MNIST(
    root='./data', train=False, download=True, transform=data_transform, target_transform=None
)
len(train_data), len(test_data)

(60000, 10000)

In [4]:
data_classes = train_data.classes
data_classes

['0 - zero',
 '1 - one',
 '2 - two',
 '3 - three',
 '4 - four',
 '5 - five',
 '6 - six',
 '7 - seven',
 '8 - eight',
 '9 - nine']

In [5]:
class Deep(nn.Module):
    def __init__(self):
        super(Deep, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Linear(in_features=28*28, out_features=20),
            nn.ReLU()
        )
        self.layer2 = nn.Sequential(
            nn.Linear(in_features=20, out_features=10),
            nn.LogSoftmax(dim=1)
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.layer2(self.layer1(x))

In [6]:
def train_step(model: nn.Module,
               data_loader: DataLoader,
               loss_function: nn.Module,
               optimizer: torch.optim.Optimizer,
               accuracy_function: Callable,
               device: torch.device = torch.device('cpu')) -> tuple[float:]: 
    train_loss: float = 0
    train_acc: float = 0
    model.train()
    for batch, (X, y) in enumerate(data_loader):
        X, y = X.to(device), y.to(device)
        y_pred = model(X)
        loss = loss_function(y_pred, y)
        train_loss += loss.item()
        train_acc += accuracy_function(y, y_pred.argmax(dim=1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    train_loss /= len(data_loader)
    train_acc /= len(data_loader)
    return train_loss, train_acc

In [7]:
def test_step(model: nn.Module,
              data_loader: DataLoader,
              loss_function: nn.Module,
              accuracy_function: Callable,
              device: torch.device = torch.device('cpu')) -> tuple[float:]:
    test_loss: float = 0
    test_acc: float = 0
    model.eval()
    with torch.inference_mode():
        for X, y in data_loader:
            X, y = X.to(device), y.to(device)
            test_pred = model(X)
            test_loss += loss_function(test_pred, y).item()
            test_acc += accuracy_function(y, test_pred.argmax(dim=1))
        test_loss /= len(data_loader)
        test_acc /= len(data_loader)
    return test_loss, test_acc

In [8]:
def accuracy_fn(y_true, y_pred):
    return (torch.eq(y_true, y_pred).sum().item() / len(y_pred)) * 100

In [9]:
CPU_COUNT = os.cpu_count()
BATCH_SIZE = 32
loader_kwargs = dict(batch_size=BATCH_SIZE, pin_memory=True, num_workers=CPU_COUNT)
train_loader = DataLoader(train_data, **loader_kwargs, shuffle=True)
test_loader = DataLoader(test_data, **loader_kwargs, shuffle=False)
len(train_data), len(test_data), len(train_loader)

(60000, 10000, 1875)

In [10]:
learning_rate = 0.05
momentum = 0.5
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device('cpu')
model = Deep().to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
model

Deep(
  (layer1): Sequential(
    (0): Linear(in_features=784, out_features=20, bias=True)
    (1): ReLU()
  )
  (layer2): Sequential(
    (0): Linear(in_features=20, out_features=10, bias=True)
    (1): LogSoftmax(dim=1)
  )
)

In [11]:
train_time_start = timer()
epochs = 10
train_loss, train_acc = [], []
test_loss, test_acc = [], []
for epoch in tqdm(range(epochs)):
    train_metrics = train_step(
        model, train_loader, loss_fn, optimizer, accuracy_fn, device
    )
    test_metrics = test_step(model, test_loader, loss_fn, accuracy_fn, device)
    train_loss.append(train_metrics[0])
    train_acc.append(train_metrics[1])
    test_loss.append(test_metrics[0])
    test_acc.append(test_metrics[1])
train_time_end = timer()
f'Time: {train_time_end - train_time_start:.3f} seconds'

  0%|          | 0/10 [00:07<?, ?it/s]


RuntimeError: mat1 and mat2 shapes cannot be multiplied (896x28 and 784x20)