In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load


# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os

for dirname, _, filenames in os.walk("./"):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# 1. Frame the problem
Using the customer description, Define the problem your trying to solve in your own words (remember this is not technial but must be specific so the customer understands the project

We are tasked in developing an algorithm in order to detect certain lowercase cursive letters. The program should be able to detect which letter is in an imported image and output the accuracy of that letter being the predicted letter. 

# 2. Get the Data 
Define how you recieved the data (provided, gathered..)

In [None]:
import zipfile
import os

# Define base and data paths
base_path = "/home/jupyter-1016468/Git-Repo-Tour/Project_5"
data_dir = os.path.join(base_path, "Data")

# Define ZIP file paths
cursive_zip = os.path.join(data_dir, "Cursive.zip")
kaggle_zip = os.path.join(data_dir, "archive.zip")

# Define extraction directories
cursive_extract_path = os.path.join(data_dir, "Cursive")
kaggle_extract_path = os.path.join(data_dir, "Signatures")

# Make sure folders exist
os.makedirs(cursive_extract_path, exist_ok=True)
os.makedirs(kaggle_extract_path, exist_ok=True)

# --- Unzip the Cursive dataset ---
try:
    with zipfile.ZipFile(cursive_zip, 'r') as zip_ref:
        zip_ref.extractall(cursive_extract_path)
    print(f"Successfully extracted Cursive.zip to: {cursive_extract_path}")
except FileNotFoundError:
    print("Error: Could not find Cursive.zip in the Data folder.")
except zipfile.BadZipFile:
    print("Error: Cursive.zip appears to be corrupted or invalid.")

# --- Unzip the Kaggle signature dataset ---
'''
try:
    with zipfile.ZipFile(kaggle_zip, 'r') as zip_ref:
        zip_ref.extractall(kaggle_extract_path)
    print(f"Successfully extracted archive.zip to: {kaggle_extract_path}")
except FileNotFoundError:
    print("Error: Could not find archive.zip in the Data folder.")
except zipfile.BadZipFile:
    print("Error: archive.zip appears to be corrupted or invalid.")
'''

# 3. Explore the Data
Gain insights into the data you have from step 2, making sure to identify any bias

In [None]:
import glob
import cv2
import random
import matplotlib.pyplot as plt

jpg_files = glob.glob(f"{extract_path}/**/*.jpg", recursive=True)
heic_files = glob.glob(f"{extract_path}/**/*.heic", recursive=True)
png_files = glob.glob(f"{extract_path}/**/*.png", recursive=True)

print("JPG count:", len(jpg_files))
print("HEIC count:", len(heic_files))
print("PNG count:", len(png_files))


sample = random.choice(jpg_files)
img = cv2.imread(sample)
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

plt.imshow(img_rgb)
plt.title(f"Sample image: {sample.split('/')[-1]}")
plt.axis('off')
plt.show()



In [None]:
import os

# Define dataset paths
base_dir = "/home/jupyter-1016468/Git-Repo-Tour/Project_5/Data"
cursive_dir = os.path.join(base_dir, "Cursive")
signatures_dir = os.path.join(base_dir, "Signatures", "sign_data")

# Define a helper to count images recursively
def count_images(folder, exts=(".jpg", ".jpeg", ".png")):
    total = 0
    for root, _, files in os.walk(folder):
        total += sum(f.lower().endswith(exts) for f in files)
    return total

# Count images in both datasets
cursive_count = count_images(cursive_dir)
signature_count = count_images(signatures_dir)

print("Dataset Summary")
print(f"Cursive dataset path: {cursive_dir}")
print(f"Number of Cursive images: {cursive_count:,}")

print(f"\nSignature dataset path: {signatures_dir}")
print(f"Number of Signature images: {signature_count:,}")

# Check if folders exist and are non-empty
if not os.path.exists(cursive_dir):
    print("\nWarning: Cursive directory not found!")
elif cursive_count == 0:
    print("\nCursive directory found but contains no images.")

if not os.path.exists(signatures_dir):
    print("\nWarning: Signatures directory not found!")
elif signature_count == 0:
    print("\nSignature directory found but contains no images.")



# 4.Prepare the Data


Apply any data transformations and explain what and why


In [None]:
import os
import glob
import cv2
from PIL import Image

base_path = "Data/Signatures/sign_data/sign_data/train"
subject_folders = [f"S{i}" for i in range(1, 36)]

viable_images = []
corrupted_images = []
converted_count = 0

for folder in subject_folders:
    folder_path = os.path.join(base_path, folder)
    if not os.path.exists(folder_path):
        continue
    
    for img_file in glob.glob(f"{folder_path}/*.*"):
        # Convert to JPG if not already
        if not img_file.lower().endswith(".jpg"):
            try:
                with Image.open(img_file) as im:
                    im = im.convert("RGB")  # ensure it's RGB
                    new_path = os.path.splitext(img_file)[0] + ".jpg"
                    im.save(new_path, "JPEG")
                    converted_count += 1
                    os.remove(img_file)  # remove original
                    img_file = new_path  # update path to new jpg
            except Exception as e:
                print(f"Failed to convert {img_file}: {e}")
                corrupted_images.append(img_file)
                continue
        
        # Check if OpenCV can read it
        try:
            img = cv2.imread(img_file, cv2.IMREAD_GRAYSCALE)
            if img is None:
                corrupted_images.append(img_file)
            else:
                viable_images.append(img_file)
        except Exception as e:
            corrupted_images.append(img_file)

print(f"Total files converted to JPG: {converted_count}")
print(f"Total viable images: {len(viable_images)}")
print(f"Total corrupted/unreadable images: {len(corrupted_images)}")


In [None]:
import os
import cv2
import numpy as np
from torch.utils.data import Dataset
import torch

# -------------------------
# Elastic transform
# -------------------------
def elastic_transform(image, alpha, sigma, random_state=None):
    if random_state is None:
        random_state = np.random.RandomState(None)
    shape = image.shape
    dx = random_state.uniform(-1, 1, size=shape) 
    dy = random_state.uniform(-1, 1, size=shape)
    dx = cv2.GaussianBlur(dx, ksize=(0,0), sigmaX=sigma) * alpha
    dy = cv2.GaussianBlur(dy, ksize=(0,0), sigmaX=sigma) * alpha

    x, y = np.meshgrid(np.arange(shape[1]), np.arange(shape[0]))
    map_x = (x + dx).astype(np.float32)
    map_y = (y + dy).astype(np.float32)
    transformed = cv2.remap(image, map_x, map_y, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE)
    return transformed

# -------------------------
# Dataset class with augmentation
# -------------------------
class LetterDataset(Dataset):
    def __init__(self, data_list, label2idx=None, img_size=128, augment=False, elastic_prob=0.2):
        """
        data_list: [(path, label), ...] or [path, ...] (for unlabeled)
        label2idx: dict mapping label->int (None if unlabeled)
        """
        self.data_list = data_list
        self.label2idx = label2idx
        self.img_size = img_size
        self.augment = augment
        self.elastic_prob = elastic_prob
        self.has_labels = isinstance(data_list[0], tuple) and len(data_list[0]) == 2

    def __len__(self):
        return len(self.data_list)

    def __getitem__(self, idx):
        if self.has_labels:
            img_path, label = self.data_list[idx]
            label_idx = self.label2idx[label]
        else:
            img_path = self.data_list[idx]
            label_idx = 0  # dummy

        img = self._load_image(img_path)
        if self.augment:
            img = self._augment(img)

        img_tensor = torch.FloatTensor(img).unsqueeze(0)
        return img_tensor, label_idx

    def _load_image(self, path):
        img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
        if img is None:
            img = np.zeros((self.img_size, self.img_size), dtype=np.float32)
        if img.shape != (self.img_size, self.img_size):
            img = cv2.resize(img, (self.img_size, self.img_size), interpolation=cv2.INTER_AREA)
        return img.astype(np.float32) / 255.0

    def _augment(self, img):
        # 1) Rotation ±15°
        if np.random.rand() > 0.3:
            angle = np.random.uniform(-15, 15)
            center = (self.img_size // 2, self.img_size // 2)
            M = cv2.getRotationMatrix2D(center, angle, 1.0)
            img = cv2.warpAffine(img, M, (self.img_size, self.img_size), borderMode=cv2.BORDER_REPLICATE)

        # 2) Scaling 0.85 - 1.15
        if np.random.rand() > 0.3:
            scale = np.random.uniform(0.85, 1.15)
            new_size = max(2, int(self.img_size * scale))
            resized = cv2.resize(img, (new_size, new_size), interpolation=cv2.INTER_AREA)
            output = np.zeros((self.img_size, self.img_size), dtype=np.float32)
            if scale >= 1.0:
                start = (new_size - self.img_size) // 2
                output = resized[start:start+self.img_size, start:start+self.img_size]
            else:
                start = (self.img_size - new_size) // 2
                output[start:start+new_size, start:start+new_size] = resized
            img = output

        # 3) Translation ±8 px
        if np.random.rand() > 0.35:
            tx = int(np.random.randint(-8, 9))
            ty = int(np.random.randint(-8, 9))
            M = np.float32([[1, 0, tx], [0, 1, ty]])
            img = cv2.warpAffine(img, M, (self.img_size, self.img_size), borderMode=cv2.BORDER_REPLICATE)

        # 4) Elastic deformation
        if np.random.rand() < min(self.elastic_prob + 0.05, 0.3):
            alpha = np.random.uniform(6, 14)
            sigma = np.random.uniform(3, 5)
            img = elastic_transform(img, alpha, sigma)

        # 5) Brightness/contrast jitter
        if np.random.rand() > 0.35:
            alpha = np.random.uniform(0.85, 1.15)
            beta = np.random.uniform(-0.07, 0.07)
            img = np.clip(alpha * img + beta, 0, 1)

        # 6) Gaussian noise
        if np.random.rand() > 0.55:
            noise = np.random.normal(0, 0.02, img.shape)
            img = np.clip(img + noise, 0, 1)

        # 7) Optional slight blur
        if np.random.rand() > 0.8:
            img = cv2.GaussianBlur(img, (3, 3), 0)

        return img.astype(np.float32)

# -------------------------
# VISUALIZE AUGMENTED SAMPLES
# -------------------------
import random

# Take 5 random samples from the training dataset with augmentation
augmented_samples = random.sample(train_data, 10)
augmented_dataset = LetterDataset(augmented_samples, label2idx, IMG_SIZE, augment=True)

fig, axes = plt.subplots(1, len(augmented_samples), figsize=(15, 3))
for i, (img_tensor, label_idx) in enumerate(augmented_dataset):
    img = img_tensor.squeeze(0).numpy()
    axes[i].imshow(img, cmap='gray')
    axes[i].set_title(f"Label: {idx2label[label_idx]}")
    axes[i].axis('off')
plt.suptitle("Augmented Samples")
plt.show()



# 5. Model the data
Using selected ML models, experment with your choices and describe your findings. Finish by selecting a Model to continue with


In [None]:
#!/usr/bin/env python3
"""
Ensemble Cursive Letter Classifier with Test-Time Augmentation
Trains multiple models and ensembles predictions for max accuracy
"""

import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau

# Configuration
CURSIVE_DIR = "/home/jupyter-1016468/Git-Repo-Tour/Project_5/Data/Processed/Cursive"
SIGN_DIR = "/home/jupyter-1016468/Git-Repo-Tour/Project_5/Data/Processed/Signatures"
IMG_SIZE = 96
BATCH_SIZE = 16
INITIAL_EPOCHS = 80
FINE_TUNE_EPOCHS = 25
LR = 0.001
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
CONFIDENCE_THRESHOLD = 0.80
NUM_MODELS = 3  # Train 3 models for ensemble
TTA_ROUNDS = 5  # Test-time augmentation rounds

print(f"Device: {DEVICE}, Image size: {IMG_SIZE}x{IMG_SIZE}")
print(f"Training {NUM_MODELS} models for ensemble")


def load_cursive_data(cursive_dir):
    labeled_data = []
    valid_exts = {'.png', '.jpg', '.jpeg'}
    if not os.path.exists(cursive_dir):
        return labeled_data

    for s_folder in sorted(os.listdir(cursive_dir)):
        s_path = os.path.join(cursive_dir, s_folder)
        if not os.path.isdir(s_path):
            continue

        for f in os.listdir(s_path):
            if any(f.lower().endswith(ext) for ext in valid_exts):
                label = f[0].lower()
                if 'a' <= label <= 'z':
                    labeled_data.append((os.path.join(s_path, f), label))
    return labeled_data


def load_signatures(sign_dir):
    valid_exts = {'.png', '.jpg', '.jpeg'}
    paths = []
    for root, dirs, files in os.walk(sign_dir):
        for f in files:
            if any(f.lower().endswith(ext) for ext in valid_exts):
                paths.append(os.path.join(root, f))
    return paths


def apply_augmentation(img, strength='medium'):
    """Apply augmentation to image with different strengths"""
    img_size = img.shape[0]
    
    if strength == 'light':
        # Light augmentation for TTA
        if np.random.rand() > 0.5:
            angle = np.random.uniform(-5, 5)
            M = cv2.getRotationMatrix2D((img_size//2, img_size//2), angle, 1.0)
            img = cv2.warpAffine(img, M, (img_size, img_size), borderMode=cv2.BORDER_REPLICATE)
        
        if np.random.rand() > 0.5:
            tx = np.random.randint(-3, 4)
            ty = np.random.randint(-3, 4)
            M = np.float32([[1, 0, tx], [0, 1, ty]])
            img = cv2.warpAffine(img, M, (img_size, img_size), borderMode=cv2.BORDER_REPLICATE)
    
    elif strength == 'medium':
        # Medium augmentation for training
        if np.random.rand() > 0.4:
            angle = np.random.uniform(-15, 15)
            M = cv2.getRotationMatrix2D((img_size//2, img_size//2), angle, 1.0)
            img = cv2.warpAffine(img, M, (img_size, img_size), borderMode=cv2.BORDER_REPLICATE)
        
        if np.random.rand() > 0.5:
            tx = np.random.randint(-10, 11)
            ty = np.random.randint(-10, 11)
            M = np.float32([[1, 0, tx], [0, 1, ty]])
            img = cv2.warpAffine(img, M, (img_size, img_size), borderMode=cv2.BORDER_REPLICATE)
        
        if np.random.rand() > 0.5:
            scale = np.random.uniform(0.85, 1.15)
            new_size = int(img_size * scale)
            img_scaled = cv2.resize(img, (new_size, new_size))
            if scale > 1:
                start = (new_size - img_size) // 2
                img = img_scaled[start:start+img_size, start:start+img_size]
            else:
                img_new = np.ones((img_size, img_size), dtype=np.float32)
                start = (img_size - new_size) // 2
                img_new[start:start+new_size, start:start+new_size] = img_scaled
                img = img_new
        
        if np.random.rand() > 0.5:
            alpha = np.random.uniform(0.85, 1.15)
            beta = np.random.uniform(-0.1, 0.1)
            img = np.clip(alpha * img + beta, 0, 1)
        
        if np.random.rand() > 0.7:
            noise = np.random.normal(0, 0.02, img.shape)
            img = np.clip(img + noise, 0, 1)
    
    return img


class LetterDataset(Dataset):
    def __init__(self, data_list, label2idx, img_size=96, augment=False):
        self.data_list = data_list
        self.label2idx = label2idx
        self.img_size = img_size
        self.augment = augment
        self.has_labels = isinstance(data_list[0], tuple)

    def __len__(self):
        return len(self.data_list)

    def __getitem__(self, idx):
        if self.has_labels:
            img_path, label = self.data_list[idx]
            label_idx = self.label2idx[label]
        else:
            img_path = self.data_list[idx]
            label_idx = 0

        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        if img is None:
            img = np.zeros((self.img_size, self.img_size), dtype=np.float32)
        if img.shape != (self.img_size, self.img_size):
            img = cv2.resize(img, (self.img_size, self.img_size))
        
        img = img.astype(np.float32) / 255.0

        if self.augment:
            img = apply_augmentation(img, strength='medium')

        img_tensor = torch.FloatTensor(img).unsqueeze(0)
        return img_tensor, label_idx


class SimplerCNN(nn.Module):
    def __init__(self, num_classes, img_size=96):
        super().__init__()
        
        self.conv1 = nn.Conv2d(1, 32, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        
        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        
        self.conv4 = nn.Conv2d(128, 256, 3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(0.4)
        
        self.global_pool = nn.AdaptiveAvgPool2d((2, 2))
        
        self.fc1 = nn.Linear(256 * 2 * 2, 256)
        self.bn_fc = nn.BatchNorm1d(256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool(x)
        x = self.dropout(x)
        
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool(x)
        x = self.dropout(x)
        
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.pool(x)
        x = self.dropout(x)
        
        x = F.relu(self.bn4(self.conv4(x)))
        x = self.pool(x)
        x = self.dropout(x)
        
        x = self.global_pool(x)
        x = x.view(x.size(0), -1)
        
        x = F.relu(self.bn_fc(self.fc1(x)))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        
        return x


def train_epoch(model, loader, criterion, optimizer, device, use_mixup=True):
    model.train()
    total_loss, total_correct, total_samples = 0, 0, 0
    
    for imgs, labels in tqdm(loader, leave=False, desc="Training"):
        imgs, labels = imgs.to(device), labels.to(device)
        
        # Apply Mixup augmentation
        if use_mixup and np.random.rand() > 0.5:
            lam = np.random.beta(0.2, 0.2)
            idx = torch.randperm(imgs.size(0))
            mixed_imgs = lam * imgs + (1 - lam) * imgs[idx]
            labels_a, labels_b = labels, labels[idx]
            
            optimizer.zero_grad()
            outputs = model(mixed_imgs)
            loss = lam * criterion(outputs, labels_a) + (1 - lam) * criterion(outputs, labels_b)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item() * imgs.size(0)
            total_correct += (lam * (outputs.argmax(1) == labels_a).float() + 
                            (1 - lam) * (outputs.argmax(1) == labels_b).float()).sum().item()
        else:
            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item() * imgs.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()
        
        total_samples += imgs.size(0)
    
    return total_loss / total_samples, total_correct / total_samples * 100


def validate(model, loader, criterion, device):
    model.eval()
    total_loss, total_correct, total_samples = 0, 0, 0
    
    with torch.no_grad():
        for imgs, labels in loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            
            total_loss += loss.item() * imgs.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()
            total_samples += imgs.size(0)
    
    return total_loss / total_samples, total_correct / total_samples * 100


def predict_with_tta(models, img_tensor, device, n_augments=TTA_ROUNDS):
    """Test-time augmentation with ensemble"""
    all_predictions = []
    
    for model in models:
        model.eval()
        with torch.no_grad():
            # Original prediction
            pred = F.softmax(model(img_tensor.to(device)), dim=1)
            all_predictions.append(pred)
            
            # Augmented predictions
            for _ in range(n_augments):
                # Create augmented version
                img_np = img_tensor.squeeze().cpu().numpy()
                aug_img = apply_augmentation(img_np, strength='light')
                aug_tensor = torch.FloatTensor(aug_img).unsqueeze(0).unsqueeze(0).to(device)
                
                pred = F.softmax(model(aug_tensor), dim=1)
                all_predictions.append(pred)
    
    # Average all predictions
    avg_pred = torch.stack(all_predictions).mean(dim=0)
    return avg_pred


def train_single_model(train_loader, val_loader, test_loader, num_classes, model_idx, random_seed):
    """Train a single model with given seed"""
    print(f"\n{'='*60}")
    print(f"Training Model {model_idx + 1}/{NUM_MODELS} (seed={random_seed})")
    print(f"{'='*60}")
    
    # Set seed for reproducibility
    torch.manual_seed(random_seed)
    np.random.seed(random_seed)
    
    model = SimplerCNN(num_classes, IMG_SIZE).to(DEVICE)
    criterion = nn.CrossEntropyLoss(label_smoothing=0.05)
    optimizer = optim.AdamW(model.parameters(), lr=LR, weight_decay=0.01)
    scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=10)

    best_val_acc = 0
    patience = 0
    max_patience = 20

    for epoch in range(1, INITIAL_EPOCHS + 1):
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, DEVICE, use_mixup=True)
        val_loss, val_acc = validate(model, val_loader, criterion, DEVICE)
        scheduler.step(val_acc)
        
        if epoch % 10 == 0 or epoch == 1:
            print(f"Epoch {epoch}/{INITIAL_EPOCHS}")
            print(f"  Train - Loss: {train_loss:.4f}, Acc: {train_acc:.2f}%")
            print(f"  Val   - Loss: {val_loss:.4f}, Acc: {val_acc:.2f}%")
        
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), f'best_model_{model_idx}_phase1.pth')
            patience = 0
            if epoch % 10 == 0 or epoch == 1:
                print(f"  ✓ New best model saved! Val Acc: {val_acc:.2f}%")
        else:
            patience += 1
            if patience >= max_patience:
                print(f"  Early stopping triggered after {epoch} epochs")
                break

    model.load_state_dict(torch.load(f'best_model_{model_idx}_phase1.pth'))
    print(f"Best Phase 1 Val Accuracy: {best_val_acc:.2f}%")
    
    return model, best_val_acc


def main():
    labeled_data = load_cursive_data(CURSIVE_DIR)
    signature_paths = load_signatures(SIGN_DIR)
    print(f"Loaded {len(labeled_data)} cursive images")
    print(f"Loaded {len(signature_paths)} signature images")

    if len(labeled_data) == 0:
        raise RuntimeError("No labeled cursive images found!")

    labels = sorted(set(lbl for _, lbl in labeled_data))
    label2idx = {lbl: i for i, lbl in enumerate(labels)}
    idx2label = {i: lbl for lbl, i in label2idx.items()}
    num_classes = len(labels)
    print(f"Number of classes: {num_classes}")

    train_data, temp_data = train_test_split(labeled_data, test_size=0.2, 
                                             stratify=[lbl for _, lbl in labeled_data], 
                                             random_state=42)
    val_data, test_data = train_test_split(temp_data, test_size=0.5, 
                                           stratify=[lbl for _, lbl in temp_data], 
                                           random_state=42)

    print(f"Train: {len(train_data)}, Val: {len(val_data)}, Test: {len(test_data)}")

    train_dataset = LetterDataset(train_data, label2idx, IMG_SIZE, augment=True)
    val_dataset = LetterDataset(val_data, label2idx, IMG_SIZE, augment=False)
    test_dataset = LetterDataset(test_data, label2idx, IMG_SIZE, augment=False)

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

    # Train ensemble of models
    models = []
    val_accs = []
    
    for i in range(NUM_MODELS):
        model, val_acc = train_single_model(train_loader, val_loader, test_loader, 
                                           num_classes, i, random_seed=42 + i * 100)
        models.append(model)
        val_accs.append(val_acc)
    
    print(f"\n{'='*60}")
    print(f"Ensemble Training Complete!")
    print(f"Individual model val accuracies: {[f'{acc:.2f}%' for acc in val_accs]}")
    print(f"Average val accuracy: {np.mean(val_accs):.2f}%")
    print(f"{'='*60}")

    # Pseudo-label with ensemble
    print("\n" + "="*60)
    print("PHASE 2: Pseudo-labeling with ensemble")
    print("="*60)

    pseudo_data = []
    sig_dataset = LetterDataset([(p, 'a') for p in signature_paths], label2idx, IMG_SIZE, augment=False)
    sig_loader = DataLoader(sig_dataset, batch_size=1, shuffle=False, num_workers=0)

    for model in models:
        model.eval()

    with torch.no_grad():
        for idx, (imgs, _) in enumerate(tqdm(sig_loader, desc="Pseudo-labeling")):
            # Get ensemble prediction
            predictions = []
            for model in models:
                pred = F.softmax(model(imgs.to(DEVICE)), dim=1)
                predictions.append(pred)
            
            avg_pred = torch.stack(predictions).mean(dim=0)
            conf, pred_idx = avg_pred.max(1)
            
            if conf.item() >= CONFIDENCE_THRESHOLD:
                pseudo_data.append((signature_paths[idx], idx2label[pred_idx.item()]))

    print(f"Pseudo-labeled {len(pseudo_data)}/{len(signature_paths)} signatures ({len(pseudo_data)/len(signature_paths)*100:.1f}%)")

    # Fine-tune each model
    if len(pseudo_data) > 50:
        print("\n" + "="*60)
        print("PHASE 3: Fine-tuning ensemble with pseudo-labels")
        print("="*60)
        
        combined_train = train_data + pseudo_data
        combined_dataset = LetterDataset(combined_train, label2idx, IMG_SIZE, augment=True)
        combined_loader = DataLoader(combined_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
        
        for i, model in enumerate(models):
            print(f"\nFine-tuning Model {i+1}/{NUM_MODELS}")
            
            criterion = nn.CrossEntropyLoss(label_smoothing=0.05)
            optimizer = optim.AdamW(model.parameters(), lr=LR * 0.1, weight_decay=0.01)
            scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=7)
            
            best_val_acc = 0
            patience = 0
            max_patience = 15
            
            for epoch in range(1, FINE_TUNE_EPOCHS + 1):
                train_loss, train_acc = train_epoch(model, combined_loader, criterion, optimizer, DEVICE)
                val_loss, val_acc = validate(model, val_loader, criterion, DEVICE)
                scheduler.step(val_acc)
                
                if epoch % 5 == 0:
                    print(f"  Epoch {epoch}/{FINE_TUNE_EPOCHS} - Val Acc: {val_acc:.2f}%")
                
                if val_acc > best_val_acc:
                    best_val_acc = val_acc
                    torch.save(model.state_dict(), f'best_model_{i}_final.pth')
                    patience = 0
                else:
                    patience += 1
                    if patience >= max_patience:
                        break
            
            model.load_state_dict(torch.load(f'best_model_{i}_final.pth'))
            print(f"  Best fine-tuned val acc: {best_val_acc:.2f}%")

    # Final evaluation with TTA and ensemble
    print("\n" + "="*60)
    print("FINAL EVALUATION WITH ENSEMBLE + TTA")
    print("="*60)

    all_preds, all_labels = [], []
    
    for imgs, labels in tqdm(test_loader, desc="Testing with TTA"):
        for i in range(imgs.size(0)):
            img = imgs[i:i+1]
            label = labels[i]
            
            # Get ensemble + TTA prediction
            pred_probs = predict_with_tta(models, img, DEVICE, n_augments=TTA_ROUNDS)
            pred = pred_probs.argmax(1).item()
            
            all_preds.append(pred)
            all_labels.append(label.item())

    test_acc = (np.array(all_preds) == np.array(all_labels)).mean() * 100
    print(f"\n{'*'*60}")
    print(f"FINAL TEST ACCURACY (Ensemble + TTA): {test_acc:.2f}%")
    print(f"{'*'*60}\n")

    print("\nClassification Report:")
    print(classification_report(all_labels, all_preds, 
                              target_names=[idx2label[i] for i in range(num_classes)], 
                              zero_division=0))

    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(16, 14))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=[idx2label[i] for i in range(num_classes)],
                yticklabels=[idx2label[i] for i in range(num_classes)],
                cbar_kws={'label': 'Count'})
    plt.xlabel('Predicted Label', fontsize=12)
    plt.ylabel('True Label', fontsize=12)
    plt.title(f'Confusion Matrix - Test Accuracy: {test_acc:.2f}%', fontsize=14)
    plt.tight_layout()
    plt.savefig('confusion_matrix_ensemble.png', dpi=300, bbox_inches='tight')
    
    per_class_acc = cm.diagonal() / cm.sum(axis=1)
    print("\nPer-class accuracy:")
    for i, letter in enumerate([idx2label[j] for j in range(num_classes)]):
        print(f"  {letter}: {per_class_acc[i]*100:.1f}%")
    
    print(f"\n✓ Training complete! Best models saved")
    print(f"✓ Confusion matrix saved as 'confusion_matrix_ensemble.png'")


if __name__ == '__main__':
    main()

# 6. Fine Tune the Model

With the select model descibe the steps taken to acheve the best rusults possiable 


In [None]:
#!/usr/bin/env python3
"""
Ensemble Cursive Letter Classifier with Test-Time Augmentation
Trains multiple models and ensembles predictions for max accuracy
"""

import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import joblib

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau

# ------------------------
# CONFIGURATION
# ------------------------
CURSIVE_DIR = "/home/jupyter-1016468/Git-Repo-Tour/Project_5/Data/Processed/Cursive"
SIGN_DIR = "/home/jupyter-1016468/Git-Repo-Tour/Project_5/Data/Processed/Signatures"
IMG_SIZE = 96
BATCH_SIZE = 16
INITIAL_EPOCHS = 80
FINE_TUNE_EPOCHS = 25
LR = 0.001
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
CONFIDENCE_THRESHOLD = 0.80
NUM_MODELS = 3
TTA_ROUNDS = 5

print(f"Device: {DEVICE}, Image size: {IMG_SIZE}x{IMG_SIZE}")
print(f"Training {NUM_MODELS} models for ensemble")

# ------------------------
# DATA LOADING FUNCTIONS
# ------------------------
def load_cursive_data(cursive_dir):
    labeled_data = []
    valid_exts = {'.png', '.jpg', '.jpeg'}
    if not os.path.exists(cursive_dir):
        return labeled_data

    for s_folder in sorted(os.listdir(cursive_dir)):
        s_path = os.path.join(cursive_dir, s_folder)
        if not os.path.isdir(s_path):
            continue

        for f in os.listdir(s_path):
            if any(f.lower().endswith(ext) for ext in valid_exts):
                label = f[0].lower()
                if 'a' <= label <= 'z':
                    labeled_data.append((os.path.join(s_path, f), label))
    return labeled_data

def load_signatures(sign_dir):
    valid_exts = {'.png', '.jpg', '.jpeg'}
    paths = []
    for root, dirs, files in os.walk(sign_dir):
        for f in files:
            if any(f.lower().endswith(ext) for ext in valid_exts):
                paths.append(os.path.join(root, f))
    return paths

# ------------------------
# AUGMENTATION
# ------------------------
def apply_augmentation(img, strength='medium'):
    img_size = img.shape[0]
    if strength == 'light':
        if np.random.rand() > 0.5:
            angle = np.random.uniform(-5, 5)
            M = cv2.getRotationMatrix2D((img_size//2, img_size//2), angle, 1.0)
            img = cv2.warpAffine(img, M, (img_size, img_size), borderMode=cv2.BORDER_REPLICATE)
        if np.random.rand() > 0.5:
            tx = np.random.randint(-3, 4)
            ty = np.random.randint(-3, 4)
            M = np.float32([[1, 0, tx], [0, 1, ty]])
            img = cv2.warpAffine(img, M, (img_size, img_size), borderMode=cv2.BORDER_REPLICATE)
    elif strength == 'medium':
        if np.random.rand() > 0.4:
            angle = np.random.uniform(-15, 15)
            M = cv2.getRotationMatrix2D((img_size//2, img_size//2), angle, 1.0)
            img = cv2.warpAffine(img, M, (img_size, img_size), borderMode=cv2.BORDER_REPLICATE)
        if np.random.rand() > 0.5:
            tx = np.random.randint(-10, 11)
            ty = np.random.randint(-10, 11)
            M = np.float32([[1, 0, tx], [0, 1, ty]])
            img = cv2.warpAffine(img, M, (img_size, img_size), borderMode=cv2.BORDER_REPLICATE)
        if np.random.rand() > 0.5:
            scale = np.random.uniform(0.85, 1.15)
            new_size = int(img_size * scale)
            img_scaled = cv2.resize(img, (new_size, new_size))
            if scale > 1:
                start = (new_size - img_size) // 2
                img = img_scaled[start:start+img_size, start:start+img_size]
            else:
                img_new = np.ones((img_size, img_size), dtype=np.float32)
                start = (img_size - new_size) // 2
                img_new[start:start+new_size, start:start+new_size] = img_scaled
                img = img_new
        if np.random.rand() > 0.5:
            alpha = np.random.uniform(0.85, 1.15)
            beta = np.random.uniform(-0.1, 0.1)
            img = np.clip(alpha * img + beta, 0, 1)
        if np.random.rand() > 0.7:
            noise = np.random.normal(0, 0.02, img.shape)
            img = np.clip(img + noise, 0, 1)
    return img

# ------------------------
# DATASET CLASS
# ------------------------
class LetterDataset(Dataset):
    def __init__(self, data_list, label2idx, img_size=96, augment=False):
        self.data_list = data_list
        self.label2idx = label2idx
        self.img_size = img_size
        self.augment = augment
        self.has_labels = isinstance(data_list[0], tuple)

    def __len__(self):
        return len(self.data_list)

    def __getitem__(self, idx):
        if self.has_labels:
            img_path, label = self.data_list[idx]
            label_idx = self.label2idx[label]
        else:
            img_path = self.data_list[idx]
            label_idx = 0

        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        if img is None:
            img = np.zeros((self.img_size, self.img_size), dtype=np.float32)
        if img.shape != (self.img_size, self.img_size):
            img = cv2.resize(img, (self.img_size, self.img_size))
        img = img.astype(np.float32) / 255.0

        if self.augment:
            img = apply_augmentation(img, strength='medium')

        img_tensor = torch.FloatTensor(img).unsqueeze(0)
        return img_tensor, label_idx

# ------------------------
# MODEL DEFINITION
# ------------------------
class SimplerCNN(nn.Module):
    def __init__(self, num_classes, img_size=96):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 256, 3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(0.4)
        self.global_pool = nn.AdaptiveAvgPool2d((2, 2))
        self.fc1 = nn.Linear(256*2*2, 256)
        self.bn_fc = nn.BatchNorm1d(256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool(x); x = self.dropout(x)
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool(x); x = self.dropout(x)
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.pool(x); x = self.dropout(x)
        x = F.relu(self.bn4(self.conv4(x)))
        x = self.pool(x); x = self.dropout(x)
        x = self.global_pool(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.bn_fc(self.fc1(x)))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

# ------------------------
# TRAINING / VALIDATION FUNCTIONS
# ------------------------
def train_epoch(model, loader, criterion, optimizer, device, use_mixup=True):
    model.train()
    total_loss, total_correct, total_samples = 0, 0, 0
    for imgs, labels in tqdm(loader, leave=False, desc="Training"):
        imgs, labels = imgs.to(device), labels.to(device)
        if use_mixup and np.random.rand() > 0.5:
            lam = np.random.beta(0.2, 0.2)
            idx = torch.randperm(imgs.size(0))
            mixed_imgs = lam * imgs + (1 - lam) * imgs[idx]
            labels_a, labels_b = labels, labels[idx]
            optimizer.zero_grad()
            outputs = model(mixed_imgs)
            loss = lam * criterion(outputs, labels_a) + (1 - lam) * criterion(outputs, labels_b)
            loss.backward()
            optimizer.step()
            total_loss += loss.item() * imgs.size(0)
            total_correct += (lam * (outputs.argmax(1) == labels_a).float() +
                              (1-lam) * (outputs.argmax(1) == labels_b).float()).sum().item()
        else:
            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item() * imgs.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()
        total_samples += imgs.size(0)
    return total_loss/total_samples, total_correct/total_samples*100

def validate(model, loader, criterion, device):
    model.eval()
    total_loss, total_correct, total_samples = 0,0,0
    with torch.no_grad():
        for imgs, labels in loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            loss = nn.CrossEntropyLoss()(outputs, labels)
            total_loss += loss.item() * imgs.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()
            total_samples += imgs.size(0)
    return total_loss/total_samples, total_correct/total_samples*100

# ------------------------
# MAIN PIPELINE
# ------------------------
def main():
    labeled_data = load_cursive_data(CURSIVE_DIR)
    signature_paths = load_signatures(SIGN_DIR)
    if len(labeled_data) == 0:
        raise RuntimeError("No labeled cursive images found!")

    labels = sorted(set(lbl for _, lbl in labeled_data))
    label2idx = {lbl: i for i, lbl in enumerate(labels)}
    idx2label = {i: lbl for lbl, i in label2idx.items()}
    num_classes = len(labels)

    train_data, temp_data = train_test_split(labeled_data, test_size=0.2,
                                             stratify=[lbl for _, lbl in labeled_data],
                                             random_state=42)
    val_data, test_data = train_test_split(temp_data, test_size=0.5,
                                           stratify=[lbl for _, lbl in temp_data],
                                           random_state=42)

    train_loader = DataLoader(LetterDataset(train_data, label2idx, IMG_SIZE, augment=True),
                              batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(LetterDataset(val_data, label2idx, IMG_SIZE, augment=False),
                            batch_size=BATCH_SIZE, shuffle=False)
    test_loader = DataLoader(LetterDataset(test_data, label2idx, IMG_SIZE, augment=False),
                             batch_size=BATCH_SIZE, shuffle=False)

    models = []
    for i in range(NUM_MODELS):
        model = SimplerCNN(num_classes, IMG_SIZE).to(DEVICE)
        optimizer = optim.AdamW(model.parameters(), lr=LR, weight_decay=0.01)
        criterion = nn.CrossEntropyLoss(label_smoothing=0.05)
        best_val_acc = 0
        for epoch in range(1, INITIAL_EPOCHS+1):
            train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, DEVICE)
            val_loss, val_acc = validate(model, val_loader, criterion, DEVICE)
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                # Save with joblib
                joblib.dump(model.state_dict(), f'best_model_{i}_phase1.joblib')
        # Load best model
        model.load_state_dict(joblib.load(f'best_model_{i}_phase1.joblib'))
        models.append(model)

    print("✓ Ensemble training complete. Models saved with joblib.")

if __name__ == '__main__':
    main()


# 7. Present
In a customer faceing Document provide summery of finding and detail approach taken


This project uses a neural network to recognize handwritten cursive letters. Images from “Cursive” and “Signatures” folders were preprocessed by resizing, grayscaling, and normalizing pixel values. Data augmentation—including rotations, translations, scaling, brightness adjustments, and noise was applied during training to improve generalization. The CNN architecture consists of four layers with batch normalization, adaptive pooling, and fully connected layers. Models were trained on labeled cursive data, then pseudo-labeling was applied to signature images for fine-tuning. Test-time augmentation and ensemble averaging were used during inference to improve accuracy. 

# 8. Launch the Model System
Define your production run code, This should be self susficent and require only your model pramaters 


In [None]:
import joblib
import torch
import torch.nn.functional as F
import numpy as np
import cv2

# ------------------------
# CONFIG
# ------------------------
MODEL_PATH = "best_model_0_phase1.joblib"  # adjust which model to load
IMG_SIZE = 96
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# ------------------------
# MODEL CLASS (must match saved model)
# ------------------------
class SimplerCNN(nn.Module):
    def __init__(self, num_classes, img_size=96):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 256, 3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(0.4)
        self.global_pool = nn.AdaptiveAvgPool2d((2, 2))
        self.fc1 = nn.Linear(256*2*2, 256)
        self.bn_fc = nn.BatchNorm1d(256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 26)  # assuming 26 letters

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x))); x = self.pool(x); x = self.dropout(x)
        x = F.relu(self.bn2(self.conv2(x))); x = self.pool(x); x = self.dropout(x)
        x = F.relu(self.bn3(self.conv3(x))); x = self.pool(x); x = self.dropout(x)
        x = F.relu(self.bn4(self.conv4(x))); x = self.pool(x); x = self.dropout(x)
        x = self.global_pool(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.bn_fc(self.fc1(x))); x = self.dropout(x)
        x = F.relu(self.fc2(x)); x = self.dropout(x)
        x = self.fc3(x)
        return x

# ------------------------
# AUGMENTATION FUNCTION (light)
# ------------------------
def apply_augmentation(img, strength='light'):
    img_size = img.shape[0]
    if np.random.rand() > 0.5:
        angle = np.random.uniform(-5, 5)
        M = cv2.getRotationMatrix2D((img_size//2, img_size//2), angle, 1.0)
        img = cv2.warpAffine(img, M, (img_size, img_size), borderMode=cv2.BORDER_REPLICATE)
    if np.random.rand() > 0.5:
        tx = np.random.randint(-3, 4)
        ty = np.random.randint(-3, 4)
        M = np.float32([[1, 0, tx], [0, 1, ty]])
        img = cv2.warpAffine(img, M, (img_size, img_size), borderMode=cv2.BORDER_REPLICATE)
    return img

# ------------------------
# LOAD MODEL
# ------------------------
model = SimplerCNN(num_classes=26, img_size=IMG_SIZE).to(DEVICE)
model.load_state_dict(joblib.load(MODEL_PATH))
model.eval()

# ------------------------
# INFERENCE FUNCTION
# ------------------------
def inference(image_path, tta_rounds=5):
    """
    Predict the letter for a single image using the saved CNN model.
    
    Parameters
    ----------
    image_path : str
        Path to the input image.
    tta_rounds : int
        Number of augmented versions for test-time augmentation.
    
    Returns
    -------
    tuple
        (predicted_label, confidence)
    """
    # Load and preprocess image
    img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    if img is None:
        raise FileNotFoundError(f"Cannot load image: {image_path}")
    img = cv2.resize(img, (IMG_SIZE, IMG_SIZE)).astype(np.float32)/255.0
    img_tensor = torch.FloatTensor(img).unsqueeze(0).unsqueeze(0).to(DEVICE)

    # Original prediction
    preds = [F.softmax(model(img_tensor), dim=1)]

    # TTA predictions
    for _ in range(tta_rounds):
        aug_img = apply_augmentation(img, strength='light')
        aug_tensor = torch.FloatTensor(aug_img).unsqueeze(0).unsqueeze(0).to(DEVICE)
        preds.append(F.softmax(model(aug_tensor), dim=1))

    # Average predictions
    avg_pred = torch.stack(preds).mean(dim=0)
    pred_idx = avg_pred.argmax(1).item()
    confidence = avg_pred[0, pred_idx].item()

    predicted_label = chr(ord('a') + pred_idx)  # assuming labels a-z
    return predicted_label, confidence

# ------------------------
# EXAMPLE USAGE
# ------------------------
sample_img = "/home/jupyter-1016468/Git-Repo-Tour/Project_5/Data/Processed/Cursive/S1/f.png"
pred_letter, conf = inference(sample_img)
print(f"Predicted Letter: {pred_letter}, Confidence: {conf:.3f}")
