First we define some global variables which are used for the whole training process

In [63]:
# Change the paths accordingly
path_train_csv = "train.csv"
path_test_csv = "test.csv"
# First we have to select the classes on which we would like to train on
classes = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]

# All available classes
#classes = ['!','$','&','(',')','+','0','1','2','3','4','5','6','7','8','9','<','>','?','A','B','C','D','E','F','G','H',
#           'I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z','a','b','c','d','e','f','g','h','i',
#           'j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z','~','ß','α','β','π','φ','€','∑','√','∞',
#           '∫']
num_classes = len(classes)
num_val_samples_per_class = 250
# Standard DL-parameters
batch_size = 64
num_workers = 12
hparams = {"num_epochs": 100, "early_stopping_patience": 3, "early_stopping_threshold": 0.001}

For training later on we need to generate a one_hot_encoding

In [64]:
import numpy as np
from sklearn.preprocessing import OneHotEncoder
onehot_encoder = OneHotEncoder(sparse=False)
reshaped_classes = np.array(classes).reshape(-1, 1)
onehot_encoder = onehot_encoder.fit(reshaped_classes)

We define some helper functions for the training

In [65]:
import torchmetrics

class EarlyStopper:
    def __init__(self, patience=5, min_delta=0.001, model_weights=None):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_validation_acc = 0
        self.best_model_weights = model_weights

    def early_stop(self, validation_acc, model_weights):
       # If the model improved we store the best weights
        if validation_acc > self.best_validation_acc:
            self.best_model_weights = model_weights
        
        # If the model improved more than the threshold  
        if validation_acc > self.best_validation_acc + self.min_delta:
            self.counter = 0
            self.best_validation_acc = validation_acc
        else:          
            if self.counter >= self.patience:
                return True
            self.counter += 1    
        return False


class EpochInformation:
    def __init__(self, model, device, num_classes, dataset_sizes):
        self.mcc_metric = torchmetrics.MatthewsCorrCoef(task='multiclass', num_classes=num_classes).to(device)
        self.auc_metric = torchmetrics.AUROC(task='multiclass', num_classes=num_classes).to(device)
        self.dataset_sizes = dataset_sizes
        self.running_loss = 0.0
        self.running_outputs = None
        self.running_labels = None
        self.model = model
        
    def reset_metrics(self):
        self.running_loss = 0.0
        self.running_outputs = None
        self.running_labels = None            
        
    def update_metrics_for_batch(self, outputs, loss, inputs, labels):
        if self.running_outputs is None:
            self.running_outputs = outputs
        else:
            self.running_outputs = torch.cat((self.running_outputs, outputs), dim=0)
        
        if self.running_labels is None:
            self.running_labels = labels
        else:
            self.running_labels = torch.cat((self.running_labels, labels), dim=0)

        #update the loss
        self.running_loss += loss.item() * inputs.size(0)
        
    def calculate_metrics(self, phase):
        loss = self.running_loss / self.dataset_sizes[phase]
                
        _, predictions = torch.max(self.running_outputs, 1)
        _, target_indices = torch.max(self.running_labels, 1)
        comparison = predictions == target_indices
        corrects = torch.sum(comparison)
        
        acc = corrects.double() / self.dataset_sizes[phase]
        mcc = self.mcc_metric(predictions, target_indices)
        auc = self.auc_metric(self.running_outputs, target_indices)
        
        # The gradient norm can only be calculated during training
        # Also we calculate the weight-norm only once in each training epoch
        if self.model.training:
            grads = [param.grad.detach().flatten() for param in self.model.parameters() if param.grad is not None]
            l2_norm_grads = torch.linalg.vector_norm(torch.cat(grads))
            weights = [param.detach().flatten() for param in self.model.parameters()]
            l2_norm_weights = torch.linalg.vector_norm(torch.cat(weights)) 
           
        result_dict = {
            "loss" : loss,
            "acc"  : acc.item(),
            "mcc"  : mcc.item(),
            "auc"  : auc.item()
        }
        if self.model.training:
            result_dict["l2_grad"] = l2_norm_grads.item()
            result_dict["l2_weights"] = l2_norm_weights.item()
            
        return result_dict 

Now we define a simple CNN-model

In [66]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class PyTorchClassifier(nn.Module):
    def __init__(self, num_classes):
        super(PyTorchClassifier, self).__init__()
        self.size_fc1 = 256
        self.conv1 = nn.Conv2d(1, 32, 6)
        self.pool1 = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, 4)
        self.pool2 = nn.MaxPool2d(2, 2)
        self.conv3 = nn.Conv2d(64, 128, 3)
        self.pool3 = nn.MaxPool2d(2, 2)
        self.conv4 = nn.Conv2d(128, 256, 2)
        self.fc1 = nn.Linear(self.size_fc1, 256)
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = self.pool3(F.relu(self.conv3(x)))
        x = F.relu(self.conv4(x))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

The code for generating the dataset

In [67]:
from torch.utils.data import Dataset

class GermanCharacterRecognitionDS(Dataset):
    def __init__(self, path_csv, one_hot_encoder, transform=None, target_transform=None, classes=[],
                 num_channels=1):
        self.path_csv = path_csv
        self.transform = transform
        self.target_transform = target_transform
        self.data_lines = self.read_lines_csv(classes)
        self.n = len(self.data_lines)
        self.classes = classes
        self.onehot_encoder = one_hot_encoder
        self.num_channels = num_channels

    def __len__(self):
        return self.n

    def __getitem__(self, idx):
        label, image = self.parse_one_line(idx)
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)

        label = self.onehot_encoder.transform(np.array(label).reshape(-1, 1))[0]
        return image, label

    def read_lines_csv(self, classes):
        training_data_file = open(self.path_csv, 'r', encoding="latin-1")
        data_lines = training_data_file.readlines()
        training_data_file.close()
        data_lines = [line for line in data_lines if line[0] in classes]
        return data_lines

    def parse_one_line(self, index):
        line = self.data_lines[index].split(',')
        image_np = np.asarray(line[1:1601], dtype=np.float32)
        image_np = image_np.reshape(40, 40, 1)
        if self.num_channels != 1:
            image_np = np.repeat(image_np, self.num_channels, axis=2)
        return line[0], image_np

Then we can define the train loop

In [68]:
import copy
from tqdm import tqdm
from torchvision import transforms

def train_model(data_loaders, model, loss_func, optimizer, device):
    print("training started")
    num_epochs = hparams["num_epochs"]
    information = EpochInformation(model, device, num_classes, dataset_sizes)
    early_stopper = EarlyStopper(patience=hparams["early_stopping_patience"],
                             min_delta=hparams["early_stopping_threshold"],
                             model_weights=copy.deepcopy(model.state_dict()))
    strop_training = False
    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)
        if strop_training == True:
            break
        # Each epoch has a training and validation phase
        for phase in ['val', 'train']:
            if phase == 'train':
                model.train()  
            else:
                model.eval()  
            information.reset_metrics()
            
            if phase == 'train':
                print("training...")
            else:
                print("validating...")                
            data_loader = tqdm(data_loaders[phase])
            for inputs, labels in data_loader:
                inputs = inputs.to(device, non_blocking=True)
                labels = labels.to(device, non_blocking=True)
                optimizer.zero_grad()
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = loss_func(outputs, labels)
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                information.update_metrics_for_batch(outputs, loss, inputs, labels)

            result_dict = information.calculate_metrics(phase)
            # prints the all metrics of the training and validation phase
            print(" ".join(name + ": " + str(round(value, 4)) for name, value in result_dict.items()))

            if phase == 'val':
                if early_stopper.early_stop(result_dict["mcc"], copy.deepcopy(model.state_dict())):
                    print('early stopping')
                    strop_training = True
    # load best model
    model.load_state_dict(early_stopper.best_model_weights) 
    return model

For loading the data we need some helper methods. As stated in the description of the dataset, the representation of each class of the train data set is the same for each class. We also want to make sure that the validation data has the same distribution as the test data, so we need a function which takes a certain amount of samples from each class of the train data set and puts them into the validation data set. To optimize the run time, we save the indices of the train and validation data set in a numpy array. This approach eliminates the need to regenerate the data split each time, thereby significantly reducing processing time.

In [69]:
from torch.utils.data import Subset

def get_train_and_val_loader(train_data_set, num_samples_validation_data=250):
    # Define the ratio for train and validation data
    print("Splitting train- and val-data ...")
    val_count = dict(zip(classes, len(classes) * [0]))
    val_indices = []
    train_indices = []
    for i in range(len(train_data_set)):
        _, label = train_data_set[i]
        string_label_list = onehot_encoder.inverse_transform(np.reshape(label, (1, -1)))
        label_string = str(string_label_list[0][0])
        number = val_count[label_string]
        if val_count[label_string] < num_samples_validation_data:
            val_count[label_string] += 1
            val_indices.append(i)
        else:
            train_indices.append(i)

    np.save("val_indices.npy", np.asarray(val_indices))
    np.save("train_indices.npy", np.asarray(train_indices))
    train_loader, val_loader = split_train_loader(train_data_set, train_indices, val_indices)
    print("Splitting done")
    return train_loader, val_loader

def split_train_loader(train_data_set, train_indices, val_indices):
    train_loader = torch.utils.data.DataLoader(Subset(train_data_set, train_indices), batch_size=batch_size,
                                               shuffle=True, num_workers=num_workers)
    val_loader = torch.utils.data.DataLoader(Subset(train_data_set, val_indices), batch_size=batch_size,
                                             shuffle=False, num_workers=num_workers)
    return train_loader, val_loader

def get_class_counts_of_data_loader(data_loader):
    labels_count_dict = dict(zip(classes, len(classes) * [0]))
    for _, labels in data_loader:
        string_label = onehot_encoder.inverse_transform(labels)
        for label in string_label:
            labels_count_dict[label[0]] += 1
    return labels_count_dict

Now, we can proceed to construct all the necessary data loaders.

In [None]:
transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize(35.37502147246886, 75.87412766890324)])
train_set = GermanCharacterRecognitionDS(path_train_csv, transform=transform, classes=classes,
                                         one_hot_encoder=onehot_encoder, num_channels=1)
test_set = GermanCharacterRecognitionDS(path_test_csv, transform=transform, classes=classes,
                                        one_hot_encoder=onehot_encoder, num_channels=1)

train_loader, val_loader = get_train_and_val_loader(train_set, num_val_samples_per_class)
# TODO uncomment this line if you want to use the precalculated indices which speeds up the run time
#train_loader, val_loader = split_train_loader(train_set, np.load("train_indices.npy"), np.load("val_indices.npy"))
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=num_workers)

print("train_loader: " + str(get_class_counts_of_data_loader(train_loader)))
print("val_loader: " + str(get_class_counts_of_data_loader(val_loader)))
print("test_loader: " + str(get_class_counts_of_data_loader(test_loader)))

data_loaders = {"train": train_loader, "val": val_loader, "test": test_loader}
dataset_sizes = {"train": len(train_loader.dataset), "val": len(val_loader.dataset), "test": len(test_loader.dataset)}

Splitting train- and val-data ...


Now we can start the training

In [None]:
model = PyTorchClassifier(len(classes))
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
model.to(device)
optimizer = torch.optim.NAdam(model.parameters(), lr=0.001)
loss_func = torch.nn.CrossEntropyLoss()
model = train_model(data_loaders, model, loss_func, optimizer, device)

After the training we evaluate the model

In [None]:
information_test = EpochInformation(model, device, num_classes, dataset_sizes)
model.eval()
for inputs, labels in data_loaders["test"]:
    inputs = inputs.to(device, non_blocking=True)
    labels = labels.to(device, non_blocking=True)
    optimizer.zero_grad()
    with torch.set_grad_enabled(False):
        outputs = model(inputs)
        loss = loss_func(outputs, labels)
    information_test.update_metrics_for_batch(outputs, loss, inputs, labels)

result_dict = information_test.calculate_metrics("test")
print("Test metrics:")
print(" ".join(name + ": " + str(round(value, 4)) for name, value in result_dict.items()))