# Hello, Welcome!

This Notebook uses Machine Learning / PyTorch for recognizing handwritten numbers, but can easily be modfied for other class recognition purposes. You may use this as a kickstarter or skeleton for your own purposes, or steal this entirely. Just call me out when you do so please.  

This notebook is based on the Pytorch tutorial by Patrick Loeber:  
https://www.youtube.com/watch?v=c36lUUr864M

I'm using the Dataset from this link:  
https://www.kaggle.com/datasets/jcprogjava/handwritten-digits-dataset-not-in-mnist

### Known Issues:
- Output flickers for Visual Studio. However, this seems to be an issue with VS and might be impossible to fix from my side.

### If you are using Windows:  
Copy/Paste the line below into the Windows cmd console if tensorboard won't start or is giving you a hard time in general.
Kills the tensorboard process, which can be really tough and persists to run even after restart of your machine.

    del /q %TMP%\.tensorboard-info\*

Also, Microsoft Defender is slowing down Dataloading significantly.

### If you are using Tensorboard:  
Tensorboard creates new folders in your working directory. Make sure you have writing permissions or turn tensorboard off by setting

    tb_analytics = False. 

Don't forget to delete everything in the folder once you're done here!

### If you are using Jupyter Notebooks:  
Use this link to start a tensorboard session in your browser:  
http://localhost:6006/


In [None]:
### ALL THE IMPORTS ###
#######################

# PyTorch imports
import torch as tc
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, RandomSampler
from torch.optim import lr_scheduler

import torchvision as tv
import torchvision.io as io

from sklearn import datasets
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

# Tensorboard imports
import tensorboard
from tensorboard import notebook
from torch.utils.tensorboard import SummaryWriter
logdir =  './runs/'                 # dir in which to save run data
writer = SummaryWriter(logdir)      # init tensorboard data writer

%load_ext tensorboard
%tensorboard --logdir=runs
dir_counter = 0                     # counter for setting up tensorboard folders; different training runs will be saved in different folders

# PIL Imports
from PIL import Image
from PIL import ImageStat
from PIL import ImageOps
from PIL import ImageShow
from PIL import ImageFilter

# Standard pkg imports
import sys
import math
import random
import numpy as np
import matplotlib.pyplot as plt
import time
from IPython.display import clear_output
from threading import Thread as Thread
from threading import Event as Event

clear_output()

# CUDA
if tc.cuda.is_available():
    device = tc.device("cuda")
else:
    device = tc.device("cpu")

print(f"CUDA is available: {tc.cuda.is_available()}")


#### MODEL CLASSES ####
#######################
    

class FeedForward(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(FeedForward, self).__init__()
        self.lin1 = nn.Linear(input_dim, hidden_dim)
        self.lin2 = nn.ReLU()
        self.lin3 = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        out = self.lin1(x)
        out = self.lin2(out)
        out = self.lin3(out)
        return out
    

class ConvNN(nn.Module):

    # Conv Output Size:
    # OutputWidth = (Width - FilterSize + 2*Padding) / (Stride) + 1
    def conv2d_out_dim(self, input_dim, kernel_size, padding, stride):
        return ((tc.tensor(input_dim) - tc.tensor(kernel_size) + 2*tc.tensor(padding)) / (tc.tensor(stride))) + 1

    def __init__(self, img_dim, fc1_dim, fc2_dim, output_dim):
        super(ConvNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels = img_dim[0],
                               out_channels = 3,
                               kernel_size = 5,
                                )

        self.conv2 = nn.Conv2d(in_channels = self.conv1.out_channels,
                               out_channels = 16,
                               kernel_size = 5,
                                )

        self.pool = nn.MaxPool2d(kernel_size = 2,
                                 stride = 2)
        
        self.adapter_dim = self.conv2d_out_dim(img_dim[1:len(img_dim)], self.conv1.kernel_size, self.conv1.padding, self.conv1.stride)
        self.adapter_dim = self.conv2d_out_dim(self.adapter_dim, self.pool.kernel_size, self.pool.padding, self.pool.stride)
        self.adapter_dim = self.conv2d_out_dim(self.adapter_dim, self.conv2.kernel_size, self.conv2.padding, self.conv2.stride)
        self.adapter_dim = self.conv2d_out_dim(self.adapter_dim, self.pool.kernel_size, self.pool.padding, self.pool.stride)
        self.adapter_dim = int((self.conv2.out_channels * self.adapter_dim[0] * self.adapter_dim[1]).item())

        self.fc1 = nn.Linear(in_features = self.adapter_dim, out_features = fc1_dim)
        self.fc2 = nn.Linear(in_features = self.fc1.out_features, out_features = fc2_dim)
        self.fc3 = nn.Linear(in_features = self.fc2.out_features, out_features = output_dim)
        
    def forward(self, x):
        out = self.pool(F.relu(self.conv1(x)))
        out = self.pool(F.relu(self.conv2(out)))
        out = out.view(-1, self.adapter_dim)
        out = F.relu(self.fc1(out))
        out = F.relu(self.fc2(out))
        out = self.fc3(out)
        return out


###### FUNCTIONS ######
#######################

def loading_animation(event, message = 'loading'):                  # Thread, needs a halting event as input!
    while(True):                                                    # prints a loading animation, run this code somewhere to see what it does.
        print(message, sep='', end='')                              # google "python threads" if you are unfamiliar.
        time.sleep(1)
        if event.is_set():
            clear_output()
            break
        for i in range(3):
            print('.', sep='', end='')
            time.sleep(1)
            if event.is_set():
                clear_output()
                break
        clear_output(wait = True)


def get_batch(batches,                              # input dataset, batch-length must be > 0
              batch_idx = 0,                        # get batch at batch_idx...
              get_random = False):                  # ...or pick a random batch
    
    # Start loading screen
    event = Event()
    thread = Thread(target = loading_animation, daemon=True, args=(event, "Loading Batches. This might take a while"))
    thread.start()

    batches_iterator = iter(batches)                                            # init dataset iterator
    if get_random:                                                              # set random index
        batch_idx = random.randrange(len(batches))
    else:                                                                       # for proper indexing
        batch_idx += 1

    for i in range(batch_idx):                                                  # iterate through dataset...
        next(batches_iterator)
        if i == batch_idx - 1:                                                  # ...until idx...
            batch_spl, batch_lbl = next(batches_iterator)

            # Stop loading screen
            if not event.is_set():
                event.set()
                thread.join()
            
            return (batch_spl, batch_lbl)                                       # ...and return a tuple of form (batch of samples, batch of labels)


def get_sample(batches,                             # input dataset, batch-length must be > 0
               sample_idx = 0,                      # get sample at sample_idx,...
               get_random = False):                 # ...or pick a random sample throughout all batches
    
    batch_idx, sample_idx = divmod(sample_idx, len(batches))
    batch = get_batch(batches, batch_idx, get_random)
    if get_random:
        sample_idx = random.randrange(len(batch)) 
    return (batch[0][sample_idx], batch[1][sample_idx])                         # returns a tuple of form (sample, label)
    

def tb_write_model(model, batches):
    writer.add_graph(model, get_sample(batches)[0].to(device))

    writer.flush()
    writer.close()


def training_loop(model,                            # model input
                  batches_trn,                      # training batches input
                  criterion,                        # cost/loss/criterion function input
                  optimizer,                        # ...
                  scheduler,
                  n_epochs = 1,                     # number of iterations through all batches
                  tb_analytics = False,             # tensorboard plugin
                  print_fps = 30.):                 # output fps. Needed for not overwhelming the kernel. Also serves to limit tensorboard datasize.
    
    # Start loading screen
    event = Event()
    thread = Thread(target = loading_animation, daemon=True, args=(event, "Loading Batches. This might take a while"))
    thread.start()

    # init func-global variables
    n_batches = len(batches_trn)                                                    # number of batches
    acc = 0.                                                                        # accuracy counter
    n_iter = 0                                                                      # iter counter
    t_0 = time.time()                                                               # save current time value
    t_fps = time.time()                                                             # -- " -- for output fps
    
    for epoch in range(n_epochs):                                                   # iter through epochs
        for batch_idx, (inputs, labels) in enumerate(batches_trn):                  # iter through batches in epoch
            
            # Stop loading screen
            if not event.is_set():
                event.set()
                thread.join()
            
            # Compute prediction and true value Block
            output_prd = model(inputs.to(device))                                   # calc model output
            output_tru = labels.to(device)                                          # get label
            
            # Running average accuracy Block
            compare_results = 0
            for outpt, label in zip(output_prd, output_tru):                        # calc number of correct predictions in batch
                compare_results += outpt.argmax() == label.argmax()

            batch_len = len(labels)                                                 # get size of current batch. Not necessarily equal to batch_size!
            acc = (compare_results + n_iter * acc) / (n_iter + batch_len)           # calc running average accuracy
            n_iter += batch_len                                                     # advance iteration counter

            # Gradient Block
            optimizer.zero_grad()                                                   # reset gradient calc
            cost = criterion(output_prd, output_tru)                                # calc cost value
            cost.backward()                                                         # backward propagation
            optimizer.step()                                                        # apply optimizer
            
            # Output Block
            if time.time() - t_fps >= 1./print_fps:
                t_fps = time.time()
                print(f'epoch {epoch + 1}/{n_epochs}; batch {batch_idx + 1}/{n_batches}; learning rate = {optimizer.param_groups[0]["lr"]}; Cost: {cost:.6f}; Running Accuracy: {100 * acc:.2f} %')
                clear_output(wait = True)
            # Define your tensorboard data here 
                if tb_analytics:
                    writer.add_scalar('Training Loss', cost, batch_idx + epoch * n_batches)
                    writer.add_scalar('Training Accuracy', acc, batch_idx + epoch * n_batches)
            
            # tidy up tensorboard writer
            if tb_analytics:
                writer.flush()
                writer.close()
            
        scheduler.step()                                                            # diminish learning rate after every epoch
    
    t_1 = time.time()                                                               # get time value after training
    print(f'Done. Final Cost: {cost:.6f}. Time: {(t_1 - t_0):.2f}s.')               # final output
    return model                                                                    # returns the trained model
    

def validation_loop(model,                                                          # model input
                    batches_tst,                                                    # test batches input
                    print_miss = False,                                             # option for showing wrong predictions
                    print_fps = 30.):                                               # output fps. Needed for not overwhelming the kernel
    
    with tc.no_grad():                                                              # don't train the model anymore!

        # Start loading screen
        event = Event()
        thread = Thread(target = loading_animation, daemon=True, args=(event, "Loading Batches. This might take a while"))
        thread.start()
        
        # init func-global variables
        n_batches = len(batches_tst)                                                # number of batches
        acc = 0.                                                                    # running accuracy counter
        n_iter = 0                                                                  # iter counter
        t_0 = time.time()                                                           # get current time value
        t_fps = time.time()                                                         # -- " -- for output fps
    
        for batch_idx, (inputs, labels) in enumerate(batches_tst):                  # iter through batches
            
            # Stop loading screen
            if not event.is_set():
                event.set()
                thread.join()
            
            for inpt, label in zip(inputs, labels):                                 # iter through samples in batches
                
                output_prd = model(inpt.to(device))                                 # calc model output
                output_tru = label.to(device)                                       # get label
                compare_results = output_prd.argmax() == output_tru.argmax()
                acc = (compare_results + n_iter * acc) / (1 + n_iter)               # running average accuracy
                n_iter += 1

                # Print miss Block
                if print_miss and not compare_results:
                    print(f'batch {batch_idx + 1}/{n_batches}; Accuracy: {100*acc:.2f} %')
                    print(f'Miss at Iteration {n_iter}! Predicted: {output_prd.argmax().item()} (Confidence = {100 * tc.softmax(output_prd, dim=0).max().item():.2f} %), True: {output_tru.argmax().item()}')
                    plt.imshow(inpt.reshape(28,28), cmap='gray')
                    plt.axis('off')
                    plt.show()
                    time.sleep(2)
                    clear_output(wait = True)  
                # Output Block
                if time.time() - t_fps >= 1./print_fps:
                    t_fps = time.time()
                    print(f'batch {batch_idx + 1}/{n_batches}; Accuracy: {100*acc:.2f} %')
                    clear_output(wait = True)

                    
        print(f'Done. Final Accuracy: {100*acc:.2f} %. Time: {(time.time() - t_0):.2f}s.')  # final output
    return acc                                                                      # returns the final accuracy

print('Done.')

In [None]:
#### PREPARE DATA ####
###### - 0.1 - #######


# Adapter from your Dataset to Dataloader and thus specific for each Dataset.
# Microsoft Defender might slow things down here significantly. Take a look at your task manager.

# init func-global variables
datapath = './dataset/'
n_samples = 10 * 10772                                       # define total number of samples
train_test_ratio = 0.75                                      # define ratio of train/total samples

# Start loading screen
event = Event()
thread = Thread(target = loading_animation, daemon=True, args=(event, "Loading Data into Memory. This might take a few minutes"))
thread.start()

# create a tuple-list with entries of form ("Digit", Digit_sample), e.g. ("1", 9001)...
address_list = []
for i in range(10):
    for j in range(10772):
        address_list.append((i,j))

np.random.shuffle(address_list)                             # ...and shuffle it.

# loop through samples and labels
# and load them into memory as tuples (samples, labels).
digit = tc.zeros(10, dtype = tc.float32)                    # create a 10-dim-vector of zeros...
for idx, (label, sample) in enumerate(address_list):
    digit[label] = 1.                                       # ...and change a zero to a one in the spot representing the current sample digit...

    # Load the image as flattened matrix (=vector) or matrix, respectively
    # sample_cpy = (tc.flatten(tv.transforms.ToTensor()(Image.open(f"./dataset/{label}/{label}{sample}.png").getchannel('A'))), digit.clone())
    sample_cpy = (tv.transforms.ToTensor()(Image.open(f"{datapath}{label}/{label}{sample}.png").getchannel('A')), digit.clone())
    
    address_list[idx] = sample_cpy
    digit[label] = 0.                                       # ...and reset it afterwards.


# Split the address list into a training and a test list and delete the original list afterwards
trn_list = address_list[0 : math.floor(len(address_list) * train_test_ratio)]
tst_list = address_list[math.ceil(len(address_list) * train_test_ratio) : len(address_list)]
del address_list

# Stop loading screen
event.set()
thread.join()

# final output
clear_output(wait = True)
print(f'Length of Training List: {len(trn_list)} items.')
print(f'Length of Test List: {len(tst_list)} items.')

print("Done.")

In [None]:
##### DATALOADER #####
###### - 0.2 - #######


# Shove both training and test lists into the dataloader.

batch_size = 4                                                  # define batch_size

batches_trn = DataLoader(dataset = trn_list,                    # samples input
                     batch_size = batch_size,
                     shuffle = False,                           # no need to shuffle the samples, we already did that.
                     num_workers = 0,                           # strongly recommended to keep = 0
                     persistent_workers = False,
                     pin_memory = True)                         # prevent loading into memory after every epoch

batches_tst = DataLoader(dataset = tst_list,
                     batch_size = batch_size,
                     shuffle = False,
                     num_workers = 0,
                     persistent_workers = False,
                     pin_memory = True)

# tidy up
del trn_list
del tst_list

print('Done.')

In [None]:
##### DEFINE MODEL ####
######## - 1 - ########

model = ConvNN(tc.tensor(get_sample(batches_trn)[0].shape),
               fc1_dim = 64,
               fc2_dim = 64,
               output_dim = 10
               ).to(device)

# Optional Block for loading in a model and/or model-state from disk
# filepath = './'
# filename = "Tutorial_CNN.pth"
# model = tc.load(f"{filepath}{filename}").to(device)
# model.load_state_dict(tc.load(f"{filepath}{filename}_state_dict"))

###### LOSS/COST ######
###### OPTIMIZER ######
###### SCHEDULER ######
######## - 2 - ########

criterion = nn.CrossEntropyLoss()

optimizer = tc.optim.Adam(model.parameters(),
                         lr = .001)                                     # define initial learning rate

step_lr_scheduler = lr_scheduler.StepLR(optimizer,
                                        step_size = 1,                  # diminish learning rate every n-th epoch...
                                        gamma = .1)                     # ...by this diminishing factor

print('Done.')

In [None]:
###### TRAINING #######
######## - 3 - ########


# Optional Block for using tensorboard
tb_analytics = True


if tb_analytics:
    logdir = f"./runs/{dir_counter}/"
    writer = SummaryWriter(logdir)
    tb_write_model(model, batches_trn)
    dir_counter += 1


# Training Loop. 
# Note that loading the batches into memory might take a few minutes. Take a look at your task manager.

model_trn = training_loop(n_epochs = 2,                             # See func definition for input details
                          print_fps = 5.,
                          model = model.to(device),
                          batches_trn = batches_trn,
                          criterion = criterion.to(device),
                          optimizer = optimizer,
                          scheduler = step_lr_scheduler,
                          tb_analytics = tb_analytics)                          

In [None]:
####### TESTING #######
######## - 4 - ########


# Testing Loop.
# Again, loading batches into memory may take a few minutes. Stay strong.

device = tc.device("cpu")                                           # optional, but runs faster on cpu for some reason
accuracy = validation_loop(print_fps = 5.,                          # See func definition for input details
                           print_miss = True,
                           model = model_trn.to(device),
                           batches_tst = batches_tst)

if tc.cuda.is_available():                                          # change back to gpu
    device = tc.device("cuda")
else:
    device = tc.device("cpu")
model_trn = model_trn.to(device)

In [None]:
##### SAVE MODEL ######
######## - 5 - ########

# Set filepath and model name
filepath = './'
model_name = 'Tutorial_CNN'

tc.save(model_trn, f"{filepath}{model_name}.pth")                               # Save the whole model and/or...
tc.save(model_trn.state_dict(), f"{filepath}{model_name}_state_dict.pth")       # ...save only the the model state

print('Done.')

In [None]:
# I'm just here so that the Screen doesn't flick when printing images

















