In [1]:
import torch
import numpy as np
from torchvision import datasets, transforms
from abc import ABC, abstractmethod
from sklearn.ensemble import RandomForestClassifier
import torch.nn as nn
from torch.utils.data import DataLoader, random_split
from torch.utils.tensorboard import SummaryWriter
from torch.amp import autocast, GradScaler
import torch.nn.functional as F

In [2]:
# loading MNIST dataset

transform = transforms.Compose([
    transforms.ToTensor()
])

train_dataset = datasets.MNIST(root='data/', train=True, transform=transform, download=True)

test_dataset = datasets.MNIST(root='data/', train=False, transform=transform, download=True)


# preparing training and test data for Random Forest
X_train = np.array([np.array(image[0]) for image in train_dataset])
y_train = np.array(train_dataset.targets)
X_test = np.array([np.array(image[0]) for image in test_dataset])
y_test = np.array(test_dataset.targets)


# preparing training and test data for FFNN, CNN
train_len = int(len(train_dataset) * 0.8)
valid_len = int(len(train_dataset) - train_len)

train_ds, valid_ds = random_split(train_dataset, [train_len, valid_len])

train_dl = DataLoader(train_ds, batch_size=64, shuffle=True)

valid_dl = DataLoader(valid_ds, batch_size=64, shuffle=False)

test_dl = DataLoader(test_dataset, batch_size=64, shuffle=False)

  X_train = np.array([np.array(image[0]) for image in train_dataset])
  y_train = np.array(train_dataset.targets)
  X_test = np.array([np.array(image[0]) for image in test_dataset])
  y_test = np.array(test_dataset.targets)


In [3]:
# Defining abstract class using interface
class MnistClassifierInterface(ABC):
    @abstractmethod
    def train(self, X_train, y_train):
        pass
    
    @abstractmethod
    def predict(self, X_test):
        pass
    
class TrainPredictAbstract(MnistClassifierInterface):
    def __init__(self, model,  num_epochs=10, lr=0.0001, device='cpu'):
        self.num_epochs = num_epochs
        self.lr = lr
        self.device = device
        self.model = model.to(device) # initializing out model
        self.loss_fn = nn.CrossEntropyLoss() # CrossEntropyLoss is a suitable loss function in our case, as it designed for multi-class classification
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self.lr) # setting up optimizer to update model parameters
        
    def train(self, train_dl, valid_dl):
        
        scaler = GradScaler() # initializing GradScaler for mixed precisions(using float16 where possible) to reduce memory usage, faster computations(works only with GPU). 
        torch.backends.cudnn.benchmark = True # by setting benchmark to True PyTorch will find the fastest algorithm on operations like convolutions for your hardware
        
        writer = SummaryWriter() # initializing SummaryWriter instance to enable tensorboard to track train and validation losses and accuracies

        for epoch in range(self.num_epochs):
            self.model.train() # setting model to train phase
            
            # variables to track model performance on training data
            total_samples_train = 0
            total_loss_train = 0
            total_correct_train = 0
            
            for x_batch, y_batch in train_dl:
                x_batch, y_batch = x_batch.to(self.device), y_batch.to(self.device)
                self.optimizer.zero_grad() # clearing gradients to avoid their summation

                with autocast(device_type=self.device): # enabling mixed precisions
                    pred = self.model(x_batch)
                    loss = self.loss_fn(pred, y_batch)

                scaler.scale(loss).backward() # scaling loss before backpropagation serves stable performance by preventing gradient underflow in float16
                scaler.step(self.optimizer) # unscales gradients 
                scaler.update() # update weights

                total_loss_train += loss.item() * y_batch.size(0)
                total_correct_train += (torch.argmax(pred, dim=1) == y_batch).sum().item()
                total_samples_train += y_batch.size(0)
        
            train_loss = total_loss_train / len(train_dl.dataset)
            train_accuracy = total_correct_train / total_samples_train


            self.model.eval() # setting model to evaluation phase

            # variables to track model performance on validation data
            total_correct_valid = 0
            total_samples_valid = 0
            total_loss_valid = 0

            with torch.inference_mode(): # disabling gradients calculation to measure the model performance during epoch
                for x_batch, y_batch in valid_dl:
                    x_batch, y_batch = x_batch.to(self.device), y_batch.to(self.device)

                    with autocast(device_type=self.device):
                        pred = self.model(x_batch)
                        loss = self.loss_fn(pred, y_batch)

                    total_loss_valid += loss.item() * y_batch.size(0)
                    total_correct_valid += (torch.argmax(pred, dim=1) == y_batch).sum().item()
                    total_samples_valid += y_batch.size(0)

            valid_loss = total_loss_valid / len(valid_dl.dataset)
            valid_accuracy = total_correct_valid / total_samples_valid


            # writing losses and accuracies to folders to track them in tensorboard
            writer.add_scalar("Loss/Train", train_loss, epoch)
            writer.add_scalar("Loss/Validation", valid_loss, epoch)
            writer.add_scalar("Accuracy/Train", train_accuracy, epoch)
            writer.add_scalar("Accuracy/Valid", valid_accuracy, epoch)

            print(f'Epoch {epoch + 1}, train acc: {train_accuracy:.4f}, valid acc: {valid_accuracy:.4f}')

        writer.close()
            
    def predict(self, test_dl):
        self.model.eval()
        total_correct = 0
        total_samples = 0
        with torch.inference_mode():
            for x_batch, y_batch in test_dl:
                x_batch, y_batch = x_batch.to(self.device), y_batch.to(self.device)
                pred = self.model(x_batch)
                
                total_correct += (torch.argmax(pred, dim=1) == y_batch).sum().item()
                total_samples += y_batch.size(0)
                
        return (total_correct / total_samples)
    
# Random Forest implementation   
class RandomForestMnistClassifier(MnistClassifierInterface):
    def __init__(self):
        self.model = RandomForestClassifier()
        
    def train(self, X_train, y_train):
        X_train_flattened = X_train.reshape(X_train.shape[0], -1) # flatten to receive 1d array, as the model train with 2d array
        self.model.fit(X_train_flattened, y_train)
        
    def predict(self, X_test):
        X_test_flattened = X_test.reshape(X_test.shape[0], -1) # flatten to receive 1d array, as the model predict with 2d array
        return self.model.predict(X_test_flattened)

# Feed-Forward Neural Network      
class FFNN(nn.Module):
    def __init__(self, input_dim=784, hidden_dim=64, output_dim=10):
        super().__init__() # call parent constructor to initialize arguments
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
    
    # forward method to pass input data thought fully connected layers and introduce non-linearity by activation function ReLU    
    def forward(self, x):
        x = self.flatten(x)
        x = F.relu(self.fc1(x))
        return self.fc2(x)
    
class FFNNMnistClassifier(TrainPredictAbstract):
   def __init__(self, num_epochs=10, lr=0.0001, device='cpu'):
       model = FFNN()
       super().__init__(model=model, num_epochs=num_epochs, lr=lr, device=device)
    
    
class CNN(nn.Module):
    def __init__(self, output_dim=10):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, padding=1) # input_channel = 1 because image is grayscale, [batch(64), 32, 28, 28]
        self.batchnorm1 = nn.BatchNorm2d(32) # by normalizing layers inputs we make training faster and more stable
        self.pool1 = nn.MaxPool2d(kernel_size=2) # [batch, 32, 14, 14]

        self.conv2 = nn.Conv2d(32, 64, 3, padding=1) #  [batch, 64, 14, 14]
        self.batchnorm2 = nn.BatchNorm2d(64) 
        self.pool2 = nn.MaxPool2d(kernel_size=2) # [batch, 64, 7, 7]
        
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.relu3 = nn.ReLU()
        self.drop1 = nn.Dropout(p=0.4) # disable some neurons while training to reduce overfitting
        self.fc2 = nn.Linear(128, output_dim)
        
    def forward(self, x):
        x = self.pool1(F.relu(self.batchnorm1(self.conv1(x)))) # stack methods to reduce space 
        
        x = self.pool2(F.relu(self.batchnorm2(self.conv2(x))))
        
        x = self.flatten(x)
        x = F.relu(self.fc1(x))
        x = self.drop1(x)
        
        return self.fc2(x)
    
class CNNMnistClassifier(TrainPredictAbstract):
    def __init__(self, num_epochs=10, lr=0.0001, device='cpu'):
        model = CNN()
        super().__init__(model=model, num_epochs=num_epochs, lr=lr, device=device)
        
        
class MnistClassifier:
    def __init__(self, algorithm='rf', **kwargs):
        if algorithm == 'rf':
            self.model = RandomForestMnistClassifier(**kwargs)
        elif algorithm == 'nn':
            self.model = FFNNMnistClassifier(**kwargs)
        elif algorithm == 'cnn':
            self.model = CNNMnistClassifier(**kwargs)
        else:
            raise ValueError("Incorrect alogrithm, pick one of those: 'cnn', 'nn', 'rf'")
    
    def train(self, X_train=None, y_train=None, train_dl=None, valid_dl=None):
        if isinstance(self.model, RandomForestMnistClassifier):
            self.model.train(X_train, y_train)
        else:
            self.model.train(train_dl, valid_dl)
            
    def predict(self, X_test=None, test_dl=None):
        if isinstance(self.model, RandomForestMnistClassifier):
            return self.model.predict(X_test)
        else:
            return self.model.predict(test_dl)

# Random Forest Test

In [4]:
rf = MnistClassifier('rf')
rf.train(X_train=X_train, y_train=y_train)

In [None]:
pred = rf.predict(X_test)
print(f"Random forest accuracy: {(pred == y_test).sum() / y_test.shape[0]}")

Random forest accuracy: 0.9694


# FFNN

In [None]:
ffnn = MnistClassifier('nn', num_epochs=5)
ffnn.train(train_dl=train_dl, valid_dl=valid_dl)

Epoch 1, train acc: 0.7473, valid acc: 0.8605
Epoch 2, train acc: 0.8819, valid acc: 0.8942
Epoch 3, train acc: 0.9007, valid acc: 0.9040
Epoch 4, train acc: 0.9088, valid acc: 0.9099
Epoch 5, train acc: 0.9146, valid acc: 0.9141


In [None]:
print(f"FFNN accuracy: {ffnn.predict(test_dl=test_dl)}")

FFNN accuracy: 0.9179


# CNN


In [None]:
cnn = MnistClassifier('cnn', num_epochs=5)
cnn.train(train_dl=train_dl, valid_dl=valid_dl)

Epoch 1, train acc: 0.9024, valid acc: 0.9701
Epoch 2, train acc: 0.9680, valid acc: 0.9800
Epoch 3, train acc: 0.9772, valid acc: 0.9828
Epoch 4, train acc: 0.9814, valid acc: 0.9862
Epoch 5, train acc: 0.9848, valid acc: 0.9864


In [None]:
print(f"CNN accuracy: {cnn.predict(test_dl=test_dl)}")

CNN accuracy: 0.9875
