# Hybrid CNN-BiLSTM Framework for Real-Time Handwriting Recognition

### 1. Imports

In [70]:
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Sequence, Union
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import matplotlib as mpl
import matplotlib.pyplot as plt

from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report

### 2. Reproducibility, Data Loading, and Preprocessing

In [71]:
def seed_everything(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def load_raw_splits(data_dir: Path):
    X_train = np.load(data_dir / "X_train.npy")
    X_val   = np.load(data_dir / "X_val.npy")
    X_test  = np.load(data_dir / "X_test.npy")
    y_train = np.load(data_dir / "y_train.npy", allow_pickle=True)
    y_val   = np.load(data_dir / "y_val.npy", allow_pickle=True)
    y_test  = np.load(data_dir / "y_test.npy", allow_pickle=True)
    return X_train, X_val, X_test, y_train, y_val, y_test

def encode_labels(y_train, y_val, y_test, save_path: Path):
    le = LabelEncoder()
    le.fit(np.unique(np.concatenate([y_train, y_val, y_test])))
    y_train_enc = le.transform(y_train).astype(int)
    y_val_enc   = le.transform(y_val).astype(int)
    y_test_enc  = le.transform(y_test).astype(int)
    np.save(save_path, le.classes_)
    return y_train_enc, y_val_enc, y_test_enc, le.classes_

def make_loaders(X_train, y_train, X_val, y_val, X_test, y_test, batch_size: int):
    train_ds = TensorDataset(torch.tensor(X_train, dtype=torch.float32),
                             torch.tensor(y_train, dtype=torch.long))
    val_ds   = TensorDataset(torch.tensor(X_val, dtype=torch.float32),
                             torch.tensor(y_val, dtype=torch.long))
    test_ds  = TensorDataset(torch.tensor(X_test, dtype=torch.float32),
                             torch.tensor(y_test, dtype=torch.long))

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=0)
    val_loader   = DataLoader(val_ds, batch_size=batch_size, shuffle=False, num_workers=0)
    test_loader  = DataLoader(test_ds, batch_size=batch_size, shuffle=False, num_workers=0)
    return train_loader, val_loader, test_loader

### 3. Model Definition

In [72]:
class CNN_BiLSTM(nn.Module):
    def __init__(
        self,
        num_features: int = 18,
        num_classes: int = 26,
        hidden_size: int = 128,
        dropout: float = 0.3,
        use_batchnorm: bool = False,
    ):
        super().__init__()
        self.conv1 = nn.Conv1d(num_features, 64, kernel_size=5, padding=2)
        self.bn1 = nn.BatchNorm1d(64) if use_batchnorm else nn.Identity()

        self.conv2 = nn.Conv1d(64, 128, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm1d(128) if use_batchnorm else nn.Identity()

        self.pool = nn.MaxPool1d(kernel_size=2)

        self.lstm = nn.LSTM(
            input_size=128,
            hidden_size=hidden_size,
            num_layers=2,
            batch_first=True,
            bidirectional=True,
            dropout=dropout,
        )

        self.fc1 = nn.Linear(hidden_size * 2, 128)
        self.drop = nn.Dropout(dropout)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x.permute(0, 2, 1)
        x = torch.relu(self.bn1(self.conv1(x)))
        x = self.pool(torch.relu(self.bn2(self.conv2(x))))
        x = x.permute(0, 2, 1)

        x, _ = self.lstm(x)
        x = x[:, -1, :]
        x = torch.relu(self.fc1(x))
        x = self.drop(x)
        return self.fc2(x)

### 4. Config

In [73]:
@dataclass
class TrainConfig:
    data_dir: Path
    out_dir: Path
    num_features: int = 18
    num_classes: int = 26
    batch_size: int = 32
    epochs: int = 50
    dropout: float = 0.3
    lr: float = 1e-3
    weight_decay: float = 1e-4
    step_size: int = 10
    gamma: float = 0.8
    patience: int = 10
    seed: int = 42
    use_batchnorm: bool = True

### 5. Trainer Utilities

In [74]:
def train_one_epoch(model, loader, device, criterion, optimizer):
    model.train()
    running_loss = 0.0
    correct, total = 0, 0

    for x, y in loader:
        x, y = x.to(device), y.to(device)

        optimizer.zero_grad(set_to_none=True)
        logits = model(x)
        loss = criterion(logits, y)
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()

        running_loss += loss.item()
        pred = torch.argmax(logits, dim=1)
        correct += (pred == y).sum().item()
        total += y.size(0)

    return running_loss / max(1, len(loader)), correct / max(1, total)

@torch.no_grad()
def evaluate(model, loader, device, criterion):
    model.eval()
    loss_sum = 0.0
    correct, total = 0, 0
    y_true, y_pred = [], []

    for x, y in loader:
        x, y = x.to(device), y.to(device)
        logits = model(x)
        loss = criterion(logits, y)
        loss_sum += loss.item()

        pred = torch.argmax(logits, dim=1)
        correct += (pred == y).sum().item()
        total += y.size(0)

        y_true.append(y.cpu().numpy())
        y_pred.append(pred.cpu().numpy())

    y_true = np.concatenate(y_true) if y_true else np.array([])
    y_pred = np.concatenate(y_pred) if y_pred else np.array([])
    return loss_sum / max(1, len(loader)), correct / max(1, total), y_true, y_pred

### 6. Plot Functions

In [75]:
def plot_training_curves_csv(
    log_csv: Path,
    out_dir: Path,
    prefix: str,
    *,
    ema: Optional[float] = None,
    figsize=(3.5, 2.4),
    dpi: int = 600,
    use_color: bool = False,
    show_grid: bool = False,
    acc_in_percent: bool = True,
):
    """
    Plot training and validation loss/accuracy curves. Saves PDF and PNG.
    Expects columns: epoch, train_loss, val_loss, train_acc, val_acc
    """

    with mpl.rc_context({
        "font.family": "serif",
        "font.serif": ["Times New Roman", "STIXGeneral", "DejaVu Serif"],
        "mathtext.fontset": "stix",
        "font.size": 10,
        "axes.labelsize": 10,
        "legend.fontsize": 9,
        "xtick.labelsize": 9,
        "ytick.labelsize": 9,
        "lines.linewidth": 1.6,
        "axes.linewidth": 1.0,
        "xtick.direction": "in",
        "ytick.direction": "in",
        "xtick.minor.visible": True,
        "ytick.minor.visible": True,
        "axes.spines.top": False,
        "axes.spines.right": False,
        "axes.grid": show_grid,
        "grid.alpha": 0.15,
        "grid.linestyle": "--",
        "grid.linewidth": 0.6,
        "savefig.bbox": "tight",
        "savefig.pad_inches": 0.02,
        "pdf.fonttype": 42,
        "ps.fonttype": 42,
    }):
        log_csv = Path(log_csv)
        out_dir = Path(out_dir)
        out_dir.mkdir(parents=True, exist_ok=True)

        df = pd.read_csv(log_csv)
        req = ["epoch", "train_loss", "val_loss", "train_acc", "val_acc"]
        missing = [c for c in req if c not in df.columns]
        if missing:
            raise ValueError(f"Missing columns in {log_csv}: {missing}")

        x = df["epoch"].to_numpy(dtype=float)

        def smooth(y: np.ndarray) -> np.ndarray:
            y = np.asarray(y, dtype=float)
            if ema is None:
                return y
            a = float(ema)
            if not (0.0 < a < 1.0):
                raise ValueError("ema must be in (0,1) or None")
            ys = np.empty_like(y, dtype=float)
            ys[0] = y[0]
            for i in range(1, len(y)):
                ys[i] = a * y[i] + (1.0 - a) * ys[i - 1]
            return ys

        tr_loss = smooth(df["train_loss"].to_numpy())
        va_loss = smooth(df["val_loss"].to_numpy())
        tr_acc  = smooth(df["train_acc"].to_numpy())
        va_acc  = smooth(df["val_acc"].to_numpy())

        if acc_in_percent:
            tr_acc *= 100.0
            va_acc *= 100.0

        if use_color:
            c_train, c_val = "#004488", "#DDAA33"
        else:
            c_train, c_val = "black", "black"

        def finalize_axes(ax: plt.Axes):
            ax.tick_params(which="both", top=False, right=False)
            ax.set_xlim(float(np.min(x)), float(np.max(x)))

        def save(fig: plt.Figure, base: Path):
            fig.savefig(base.with_suffix(".pdf"))
            fig.savefig(base.with_suffix(".png"), dpi=dpi)
            plt.close(fig)

        # Loss curve
        fig, ax = plt.subplots(figsize=figsize, constrained_layout=True)
        ax.plot(x, tr_loss, color=c_train, linestyle="-",  marker=None, label="Train")
        ax.plot(x, va_loss, color=c_val,   linestyle="--", marker=None, label="Validation")
        ax.set_xlabel("Epoch")
        ax.set_ylabel("Loss")
        ax.legend(frameon=False, loc="best")
        finalize_axes(ax)
        save(fig, out_dir / f"{prefix}_loss")

        # Accuracy curve
        fig, ax = plt.subplots(figsize=figsize, constrained_layout=True)
        ax.plot(x, tr_acc, color=c_train, linestyle="-",  marker=None, label="Train")
        ax.plot(x, va_acc, color=c_val,   linestyle="--", marker=None, label="Validation")
        ax.set_xlabel("Epoch")
        ax.set_ylabel("Accuracy (%)" if acc_in_percent else "Accuracy")
        if acc_in_percent:
            ax.set_ylim(0.0, 100.0)
        else:
            ax.set_ylim(0.0, 1.0)
        ax.legend(frameon=False, loc="best")
        finalize_axes(ax)
        save(fig, out_dir / f"{prefix}_accuracy")

def plot_confusion_matrix(
    y_true: Union[Sequence[int], np.ndarray],
    y_pred: Union[Sequence[int], np.ndarray],
    out_path: Path,
    *,
    class_names: Optional[Sequence[str]] = None,
    labels: Optional[Sequence[int]] = None,
    normalize: bool = True,
    show_counts: bool = True,
    min_show_pct: float = 2.0,
    dpi: int = 600,
    cmap="Blues",
):
    """
    Plot confusion matrix. Saves PDF and PNG. If normalize=True, values are in %.
    """

    with mpl.rc_context({
        "font.family": "serif",
        "font.serif": ["Times New Roman", "STIXGeneral", "DejaVu Serif"],
        "mathtext.fontset": "stix",
        "font.size": 9,
        "axes.titlesize": 10,
        "axes.labelsize": 10,
        "xtick.labelsize": 8,
        "ytick.labelsize": 8,
        "axes.linewidth": 1.0,
        "axes.spines.top": False,
        "axes.spines.right": False,
        "savefig.bbox": "tight",
        "savefig.pad_inches": 0.02,
        "pdf.fonttype": 42,
        "ps.fonttype": 42,
    }):
        out_path = Path(out_path)
        out_path.parent.mkdir(parents=True, exist_ok=True)

        y_true = np.asarray(y_true, dtype=int).reshape(-1)
        y_pred = np.asarray(y_pred, dtype=int).reshape(-1)

        if labels is None:
            labels = np.unique(np.concatenate([y_true, y_pred])).tolist()
        labels = list(labels)

        cm = confusion_matrix(y_true, y_pred, labels=labels).astype(float)

        if class_names is None:
            class_names = [str(l) for l in labels]
        else:
            if len(class_names) != len(labels):
                raise ValueError("len(class_names) must match len(labels)")
            class_names = [str(c) for c in class_names]

        if normalize:
            row_sums = cm.sum(axis=1, keepdims=True)
            row_sums[row_sums == 0] = 1.0
            cm_show = (cm / row_sums) * 100.0
        else:
            cm_show = cm

        n = len(labels)
        fig_w = min(6.8, max(3.8, 0.45 * n + 1.6))
        fig_h = min(6.8, max(3.4, 0.45 * n + 1.3))
        fig, ax = plt.subplots(figsize=(fig_w, fig_h), constrained_layout=True)

        im = ax.imshow(cm_show, interpolation="nearest", cmap=cmap, aspect="equal")
        cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
        cbar.ax.tick_params(labelsize=8)
        if normalize:
            cbar.set_label("%", fontsize=9)

        ax.set_title("Confusion Matrix" + (" (Row-normalized)" if normalize else ""))
        ax.set_xlabel("Predicted")
        ax.set_ylabel("True")

        ticks = np.arange(n)
        ax.set_xticks(ticks)
        ax.set_yticks(ticks)
        ax.set_xticklabels(class_names, rotation=45, ha="right")
        ax.set_yticklabels(class_names)

        bbox_kw = dict(
            boxstyle="round,pad=0.15",
            facecolor="white",
            edgecolor="none",
            alpha=0.75,
        )

        for i in range(n):
            for j in range(n):
                cnt = cm[i, j]

                if normalize:
                    pct = float(cm_show[i, j])
                    if pct < float(min_show_pct):
                        continue
                    text = f"{pct:.1f}%"
                    if show_counts:
                        text += f"\n({int(cnt)})"
                else:
                    if cnt == 0:
                        continue
                    text = f"{int(cnt)}"

                ax.text(
                    j, i, text,
                    ha="center", va="center",
                    fontsize=8 if n <= 8 else 7,
                    color="black",
                    bbox=bbox_kw,
                )

        out_base = out_path.with_suffix("")
        fig.savefig(out_base.with_suffix(".pdf"))
        fig.savefig(out_base.with_suffix(".png"), dpi=dpi)
        plt.close(fig)

### 7. Trainer

In [76]:
class Trainer:
    def __init__(self, cfg: TrainConfig):
        self.cfg = cfg
        cfg.out_dir.mkdir(parents=True, exist_ok=True)
        seed_everything(cfg.seed)

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Data
        X_train, X_val, X_test, y_train, y_val, y_test = load_raw_splits(cfg.data_dir)
        y_train, y_val, y_test, self.class_names = encode_labels(
            y_train, y_val, y_test, save_path=cfg.data_dir / "label_encoder.npy"
        )
        self.train_loader, self.val_loader, self.test_loader = make_loaders(
            X_train, y_train, X_val, y_val, X_test, y_test, cfg.batch_size
        )

        # Model and optimizer
        self.model = CNN_BiLSTM(
            num_features=cfg.num_features,
            num_classes=cfg.num_classes,
            hidden_size=128,
            dropout=cfg.dropout,
            use_batchnorm=cfg.use_batchnorm
        ).to(self.device)

        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=cfg.lr, weight_decay=cfg.weight_decay)
        self.scheduler = torch.optim.lr_scheduler.StepLR(self.optimizer, step_size=cfg.step_size, gamma=cfg.gamma)

        self.best_val_acc = -1.0
        self.best_path = cfg.out_dir / ("best_bn.pth" if cfg.use_batchnorm else "best_no_bn.pth")
        self.log_path  = cfg.out_dir / ("train_log_bn.csv" if cfg.use_batchnorm else "train_log_no_bn.csv")

    def fit(self):
        patience_left = self.cfg.patience
        rows = []

        for epoch in range(1, self.cfg.epochs + 1):
            tr_loss, tr_acc = train_one_epoch(self.model, self.train_loader, self.device, self.criterion, self.optimizer)
            val_loss, val_acc, _, _ = evaluate(self.model, self.val_loader, self.device, self.criterion)

            rows.append({
                "epoch": epoch,
                "train_loss": tr_loss,
                "train_acc": tr_acc,
                "val_loss": val_loss,
                "val_acc": val_acc,
                "lr": self.optimizer.param_groups[0]["lr"],
            })
            pd.DataFrame(rows).to_csv(self.log_path, index=False)

            print(
                f"Epoch {epoch}/{self.cfg.epochs} | "
                f"train_loss={tr_loss:.4f} train_acc={tr_acc:.4f} | "
                f"val_loss={val_loss:.4f} val_acc={val_acc:.4f}"
            )


            if val_acc > self.best_val_acc:
                self.best_val_acc = val_acc
                torch.save(self.model.state_dict(), self.best_path)
                patience_left = self.cfg.patience
            else:
                patience_left -= 1
                if patience_left <= 0:
                    print(f"Early stopping at epoch {epoch} (best val_acc={self.best_val_acc:.4f})")
                    break

            self.scheduler.step()

        return self.log_path

    def test(self):
        self.model.load_state_dict(torch.load(self.best_path, map_location=self.device))
        test_loss, test_acc, y_true, y_pred = evaluate(self.model, self.test_loader, self.device, self.criterion)

        report_txt = classification_report(
            y_true, y_pred,
            digits=4,
            zero_division=0,
            target_names=[str(c) for c in self.class_names]
        )
        (self.cfg.out_dir / "classification_report.txt").write_text(
            f"Best val_acc: {self.best_val_acc:.4f}\n"
            f"Test loss: {test_loss:.4f}\n"
            f"Test acc: {test_acc:.4f}\n\n"
            f"{report_txt}\n"
        )

        np.save(self.cfg.out_dir / "y_true_test.npy", y_true)
        np.save(self.cfg.out_dir / "y_pred_test.npy", y_pred)

        print(f"Test: loss={test_loss:.4f} acc={test_acc:.4f}")
        return test_loss, test_acc, y_true, y_pred

In [77]:
def sanity_check_data():
    X_train = np.load("X_train.npy")
    y_train = np.load("y_train.npy", allow_pickle=True)
    print("X_train shape:", X_train.shape)
    print("y_train sample:", y_train[:5])
    print("Label range:", np.unique(y_train))

### 8. Train and Evaluate Model

In [None]:
def train_and_evaluate():
    DATA_DIR = Path(".")
    OUT_DIR  = Path("outputs/cnn_bilstm_no_bn_drop05_seed42_run")

    cfg = TrainConfig(
        data_dir=DATA_DIR,
        out_dir=OUT_DIR,
        epochs=50,
        batch_size=32,
        use_batchnorm=False,
        dropout=0.5,
        seed=42,
    )

    trainer = Trainer(cfg)
    log_csv = trainer.fit()
    trainer.test()

    print("Training complete.")
    print("Outputs saved to:", cfg.out_dir)
    print("Training log:", log_csv)

train_and_evaluate()

Epoch 1/50 | train_loss=3.2621 train_acc=0.0362 | val_loss=3.2587 val_acc=0.0385
Epoch 2/50 | train_loss=3.2607 train_acc=0.0307 | val_loss=3.2588 val_acc=0.0385
Epoch 3/50 | train_loss=3.2605 train_acc=0.0348 | val_loss=3.2583 val_acc=0.0385
Epoch 4/50 | train_loss=3.2600 train_acc=0.0370 | val_loss=3.2582 val_acc=0.0385
Epoch 5/50 | train_loss=3.2572 train_acc=0.0407 | val_loss=3.2328 val_acc=0.0503
Epoch 6/50 | train_loss=3.2343 train_acc=0.0451 | val_loss=3.0799 val_acc=0.0592
Epoch 7/50 | train_loss=2.8650 train_acc=0.0936 | val_loss=2.6213 val_acc=0.1272
Epoch 8/50 | train_loss=2.6340 train_acc=0.1309 | val_loss=2.5416 val_acc=0.1243
Epoch 9/50 | train_loss=2.5658 train_acc=0.1402 | val_loss=2.4423 val_acc=0.1568
Epoch 10/50 | train_loss=2.4569 train_acc=0.1612 | val_loss=2.3268 val_acc=0.1864
Epoch 11/50 | train_loss=2.3725 train_acc=0.1775 | val_loss=2.2629 val_acc=0.2189
Epoch 12/50 | train_loss=2.2718 train_acc=0.1993 | val_loss=2.2299 val_acc=0.2278
Epoch 13/50 | train_loss=

### 9. Plotting 

In [None]:
def plot_results_only():
    DATA_DIR = Path(".")
    OUT_DIR  = Path("outputs/cnn_bilstm_no_bn_drop05_seed42_run")

    log_csv = OUT_DIR / "train_log_bn.csv"

    y_true = np.load(OUT_DIR / "y_true_test.npy")
    y_pred = np.load(OUT_DIR / "y_pred_test.npy")
    class_names = np.load(DATA_DIR / "label_encoder.npy", allow_pickle=True).tolist()

    prefix = "cnn_bilstm_bn_drop05_seed123"

    plot_training_curves_csv(log_csv, OUT_DIR, prefix)

    labels = list(range(len(class_names)))

    plot_confusion_matrix(
        y_true, y_pred,
        OUT_DIR / "confusion_matrix_counts",
        class_names=class_names,
        labels=labels,
        normalize=False
)

    plot_confusion_matrix(
        y_true, y_pred,
        OUT_DIR / "confusion_matrix_normalized",
        class_names=class_names,
        labels=labels,
        normalize=True,
        show_counts=True,
        min_show_pct=10.0
)

    print("Plots generated successfully.")

plot_results_only()

Plots generated successfully.
