In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import os

In [None]:
EPOCHS = 50
BATCH_SIZE = 128
LEARNING_RATE = 0.001
VALIDATION_SPLIT = 0.2
EARLY_STOPPING_PATIENCE = 10

In [None]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DATASET_PATH = "/kaggle/input/emnist/emnist-byclass-train.csv"
MODEL_OUTPUT_DIR = "/kaggle/working/"

MAPPING = {
    'digits': (0, 9),
    'uppercase': (10, 35),
    'lowercase': (36, 61)
}

In [None]:
class EarlyStopping:
    def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pth'):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.path = path

    def __call__(self, val_loss, model):
        score = -val_loss
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss


In [None]:
def filter_and_prepare_data(df, class_name):
    start_label, end_label = MAPPING[class_name]
    class_df = df[df[0].between(start_label, end_label)]

    labels = class_df.iloc[:, 0].values - start_label

    images = class_df.iloc[:, 1:].values.astype('float32') / 255.0
    images = images.reshape(-1, 1, 28, 28)

    return torch.tensor(images, dtype=torch.float32), torch.tensor(labels, dtype=torch.long)


In [None]:
class EMNISTDataset(Dataset):
    def __init__(self, images, labels):
        self.images = images
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.images[idx], self.labels[idx]

In [None]:
class CNNModel_Small(nn.Module):
    def __init__(self, num_classes):
        super(CNNModel_Small, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(2))
        self.flatten = nn.Flatten()
        self.fc1 = nn.Sequential(nn.Linear(64 * 7 * 7, 128), nn.ReLU())
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.flatten(out)
        out = self.fc1(out)
        out = self.dropout(out)
        out = self.fc2(out)
        return out

class CNNModel_Medium(nn.Module):
    def __init__(self, num_classes):
        super(CNNModel_Medium, self).__init__()
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(2))
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, padding=1), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(2))
        self.conv_block3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1), nn.BatchNorm2d(128), nn.ReLU(), nn.MaxPool2d(2))
        self.flatten = nn.Flatten()
        self.fc_block = nn.Sequential(
            nn.Linear(128 * 3 * 3, 256), nn.ReLU(), nn.Dropout(0.5))
        self.classifier = nn.Linear(256, num_classes)

    def forward(self, x):
        out = self.conv_block1(x)
        out = self.conv_block2(out)
        out = self.conv_block3(out)
        out = self.flatten(out)
        out = self.fc_block(out)
        out = self.classifier(out)
        return out

class CNNModel_Large(nn.Module):
    def __init__(self, num_classes):
        super(CNNModel_Large, self).__init__()
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(2))
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, padding=1), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(2))
        self.conv_block3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1), nn.BatchNorm2d(128), nn.ReLU(), nn.MaxPool2d(2))
        self.flatten = nn.Flatten()
        self.fc_block = nn.Sequential(
            nn.Linear(128 * 3 * 3, 512), nn.ReLU(), nn.Dropout(0.5),
            nn.Linear(512, 256), nn.ReLU(), nn.Dropout(0.5))
        self.classifier = nn.Linear(256, num_classes)

    def forward(self, x):
        out = self.conv_block1(x); out = self.conv_block2(out); out = self.conv_block3(out)
        out = self.flatten(out); out = self.fc_block(out); out = self.classifier(out)
        return out


In [None]:
def train_and_save_expert_model(expert_name, model_class, num_classes, train_loader, val_loader):
    print("-" * 60)
    print(f"Training FINAL model for: {expert_name.upper()}")
    print(f"Using Architecture: {model_class.__name__}")
    print("-" * 60)

    model = model_class(num_classes).to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

    model_save_path = os.path.join(MODEL_OUTPUT_DIR, f"{expert_name}_model.pth")
    early_stopper = EarlyStopping(patience=EARLY_STOPPING_PATIENCE, verbose=True, path=model_save_path)

    for epoch in range(EPOCHS):
        model.train()
        for images, labels in train_loader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        model.eval()
        running_val_loss = 0.0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(DEVICE), labels.to(DEVICE)
                outputs = model(images)
                loss = criterion(outputs, labels)
                running_val_loss += loss.item() * images.size(0)

        epoch_val_loss = running_val_loss / len(val_loader.dataset)
        print(f"Epoch {epoch+1}/{EPOCHS} | Validation Loss: {epoch_val_loss:.4f}")

        early_stopper(epoch_val_loss, model)
        if early_stopper.early_stop:
            print("Early stopping triggered!")
            break

    print(f"Final model for '{expert_name}' has been saved to {model_save_path}\n")


In [None]:
if __name__ == '__main__':
    print(f"Using device: {DEVICE}")
    print("Loading and preparing master dataset...")
    df = pd.read_csv(DATASET_PATH, header=None)
    print("Dataset loaded.")

    final_model_setup = [
        ('digits', CNNModel_Small),
        ('uppercase', CNNModel_Medium),
        ('lowercase', CNNModel_Medium)
    ]

    for expert_name, model_class in final_model_setup:
        images, labels = filter_and_prepare_data(df, expert_name)
        full_dataset = EMNISTDataset(images, labels)

        start_label, end_label = MAPPING[expert_name]
        num_classes = end_label - start_label + 1

        val_size = int(len(full_dataset) * VALIDATION_SPLIT)
        train_size = len(full_dataset) - val_size
        train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

        train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

        train_and_save_expert_model(expert_name, model_class, num_classes, train_loader, val_loader)
