In [39]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Dataset
from array import array
import struct, os, random, math, pickle
import numpy as np
from tqdm import tqdm

In [40]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = "cpu"

In [41]:
#going to make a cusom Dataset class for my own MNIST files, even though pytorch already has MNIST
class MNISTDataset(Dataset):
    def __init__(self, data_path, train=True):
        self.train = train
        
        if self.train:
            images_filepath = os.path.join(data_path, 'train-images')
            labels_filepath = os.path.join(data_path, 'train-labels')
        else:
            images_filepath = os.path.join(data_path, 'test-images')
            labels_filepath = os.path.join(data_path, 'test-labels')
        
        self.images, self.labels = self.load_and_preprocess_data(images_filepath, labels_filepath)
        
        self.transform = transforms.Compose([
            transforms.Normalize((0.1307,), (0.3081,))
        ])

    def load_and_preprocess_data(self, images_filepath, labels_filepath):
        # Read labels
        with open(labels_filepath, 'rb') as file:
            magic, size = struct.unpack(">II", file.read(8))
            if magic != 2049:
                raise ValueError(f'Magic number mismatch, expected 2049, got {magic}')
            labels = np.frombuffer(file.read(), dtype=np.uint8).astype(np.int64).copy()
        
        # Read images
        with open(images_filepath, 'rb') as file:
            magic, size, rows, cols = struct.unpack(">IIII", file.read(16))
            if magic != 2051:
                raise ValueError(f'Magic number mismatch, expected 2051, got {magic}')
            images = np.frombuffer(file.read(), dtype=np.uint8).reshape(-1, rows, cols).copy()
        
        # Convert to PyTorch tensors
        images = torch.from_numpy(images).float().unsqueeze(1) / 255.0 
        labels = torch.from_numpy(labels)
        
        return images, labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = self.images[idx]
        image = self.transform(image)
        label = self.labels[idx]
        return image, label

In [42]:
def train_epoch(model, loader, loss_fn, optimizer, device):

    #one run of this function goes through all images in the training data
    #this function goes through a 64-size batch of the training data,
    #computes the loss, and updates the model's weights and biases for that batch.
    #it does this 938 times as that is what it takes to make sure all images
    #are used for training. i.e 60000/64 = 938

    # lets the model know we are about to train it
    model.train()    
    # tdqm just visually displays progress
    #"loader" will give us 938 batches of 64 images
    for images, labels in tqdm(loader, desc="Training"):
        #images is actually a list of images, same as labels
        images, labels = images.to(device), labels.to(device)
        #need this since pytorch accumulates gradients by default
        optimizer.zero_grad()

        #forward pass, image is passed through model to get prediction
        # this is actually a list of multiple outputs, one for each image
        outputs = model(images)

        #calculate loss with our forward pass output and labels
        # this is the loss averaged over all outputs and labels
        loss = loss_fn(outputs, labels)

        #backproagation - computes gradients of loss with respect to model weights and biases
        loss.backward()

        #update weights and biases based on computed gradients
        optimizer.step()
        

In [43]:
# Evaluation function
def evaluate(model, testing_loader, device):
    # Set the model to evaluation mode
    model.eval() 

    total_correct = 0 
    total_samples = 0  
    
    # when evaluating, we dont want gradients to change
    with torch.no_grad():  
        # Iterate over batches from the data loader (this should just be one if we have the memory for it)
        for images, labels in testing_loader:
            images, labels = images.to(device), labels.to(device)
            # Perform a forward pass through the model
            outputs = model(images)  # Pass the batch of images through the model to get predictions

            # Get the predicted class indices
            _, predicted = torch.max(outputs, 1)  

            # Count the number of correct predictions in the current batch
            total_correct += (predicted == labels).sum().item()  

            # Count the total number of samples in the current batch
            total_samples += labels.size(0)  # Add the number of samples in the current batch to the total count

    # Compute accuracy
    accuracy = total_correct / total_samples  # Calculate the accuracy as the ratio of correct predictions to total samples

    return accuracy  # Return the computed accuracy

In [44]:
def train_for_n_epochs(n, model, train_loader, test_loader, loss_fn, optimizer, device):
    for epoch in range(n):
        train_epoch(model, train_loader, loss_fn, optimizer, device)
        test_accuracy = evaluate(model, test_loader, device)

        print(f"Epoch {epoch+1}/{EPOCHS}")
        print(f"Test Accuracy: {test_accuracy:.4f}")
        print("-" * 20)

In [45]:
INPUT_PATH = './data'
INPUT_LAYER_SIZE = 28*28
HIDDEN_LAYER_SIZE = 16
OUTPUT_LAYER_SIZE = 10
BATCH_SIZE = 128
LEARNING_RATE = 0.01
EPOCHS = 3

In [46]:
def train_mlp():
    #check if a saved verson of the model exists
    model = None
    if os.path.exists("saved/model.pth"):
        model = torch.load("saved/model.pth")
    else:
        nn.c
        model = nn.Sequential(
            nn.Flatten(),
            nn.Linear(INPUT_LAYER_SIZE, HIDDEN_LAYER_SIZE),
            nn.ReLU(),
            nn.Linear(HIDDEN_LAYER_SIZE, HIDDEN_LAYER_SIZE),
            nn.ReLU(),
            nn.Linear(HIDDEN_LAYER_SIZE, OUTPUT_LAYER_SIZE),
        )

    # Create datasets
    train_dataset = MNISTDataset(INPUT_PATH, train=True)
    test_dataset = MNISTDataset(INPUT_PATH, train=False)

    # Create dataloaders
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE)

    #train the model
    train_for_n_epochs(EPOCHS, model, train_loader, test_loader, loss_fn, optimizer)

    print("Training completed! Saving...")
    model_path = "saved/model.pth"
    torch.save(model, model_path)
    print("Model saved to ", model_path)

In [47]:
def train_cnn():
    model = None
    
    # Check if a saved version of the model exists
    if os.path.exists("saved/cnn_model.pth"):
        model = torch.load("saved/cnn_model.pth")
    else:
        # Define a CNN model
        model = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),  # Convolutional layer 1
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # Pooling layer 1

            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),  # Convolutional layer 2
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # Pooling layer 2

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),  # Convolutional layer 3
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # Pooling layer 3

            nn.Flatten(),  # Flatten the tensor to feed into fully connected layers
            nn.Linear(128 * 3 * 3, 256),  # Fully connected layer 1
            nn.ReLU(),
            nn.Linear(256, 10)  # Fully connected layer 2 (output layer)
        )

    model = model.to(device)
    # Create datasets
    train_dataset = MNISTDataset(INPUT_PATH, train=True)
    test_dataset = MNISTDataset(INPUT_PATH, train=False)

    # Create dataloaders
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE)

    #train the model
    train_for_n_epochs(EPOCHS, model, train_loader, test_loader, loss_fn, optimizer, device)

    print("Training completed! Saving...")
    model_path = "saved/cnn_model.pth"
    torch.save(model, model_path)
    print("Model saved to ", model_path)

In [50]:
if __name__ == "__main__":
    train_cnn()

  model = torch.load("saved/cnn_model.pth")
Training: 100%|██████████| 469/469 [00:04<00:00, 101.15it/s]


Epoch 1/3
Test Accuracy: 0.9818
--------------------


Training: 100%|██████████| 469/469 [00:04<00:00, 101.29it/s]


Epoch 2/3
Test Accuracy: 0.9779
--------------------


Training: 100%|██████████| 469/469 [00:04<00:00, 101.76it/s]


Epoch 3/3
Test Accuracy: 0.9837
--------------------
Training completed! Saving...
Model saved to  saved/cnn_model.pth
