In [17]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import random as rd
from pathlib import Path
from torchvision import datasets, transforms, models
from torchvision.models import ResNet50_Weights, ViT_B_16_Weights, ResNet18_Weights
from torch.utils.data import DataLoader, Subset, Dataset
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, classification_report
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import GroupShuffleSplit
import json

# Enable inline plotting for Jupyter
%matplotlib inline

# Set device
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

Using device: cuda


# utils.py

In [18]:
def plot_curves(results, model_name):
    """
    Plots training and validation loss/accuracy curves for Deep Learning models.
    """
    train_loss = results['train_loss']
    val_loss = results['val_loss']

    train_acc = results['train_acc']
    val_acc = results['val_acc']

    epochs = range(len(results['train_loss']))

    plt.figure(figsize=(15, 7))
    
    # Loss Plot
    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_loss, label='Train Loss')
    plt.plot(epochs, val_loss, label='Val Loss')
    plt.title(f'{model_name} - Loss')
    plt.xlabel('Epochs')
    plt.legend()

    # Accuracy Plot
    plt.subplot(1, 2, 2)
    plt.plot(epochs, train_acc, label='Train Accuracy')
    plt.plot(epochs, val_acc, label='Val Accuracy')
    plt.title(f'{model_name} - Accuracy')
    plt.xlabel('Epochs')
    plt.legend()

    if not os.path.exists("plots"):
        os.makedirs("plots")

    save_path = f"plots/{model_name}_curves.png"
    plt.savefig(save_path)
    plt.close() 
    # plt.show() 

def plot_mlp_loss_curve(mlp_model, model_name="MLP"):
    """
    Plots the loss curve for Scikit-Learn MLPClassifier.
    """
    if not hasattr(mlp_model, 'loss_curve_'):
        print(f"Warning: {model_name} does not have a loss curve.")
        return

    plt.figure(figsize=(10, 6))
    plt.plot(mlp_model.loss_curve_, label='Training Loss')
    plt.title(f'{model_name} - Training Loss Curve')
    plt.xlabel('Iterations')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)

    if not os.path.exists("plots"):
        os.makedirs("plots")

    save_path = f"plots/{model_name}_loss_curve.png"
    plt.savefig(save_path)
    plt.close()

def plot_confusion_matrix(y_true, y_pred, class_names, model_name):
    """
    Plots confusion matrix for both DL and ML models.
    """
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(12, 10))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=class_names, yticklabels=class_names)
    
    plt.title(f'{model_name} Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted')

    if not os.path.exists("plots"):
        os.makedirs("plots")
    
    save_path = f"plots/{model_name}_conf_matrix.png"
    plt.savefig(save_path)
    plt.close()



# data_setup.py

In [19]:
class TransformedDatasetWrapper(Dataset):
    def __init__(self, subset, transform=None):
        self.subset = subset
        self.transform = transform
        
    def __getitem__(self, index):
        image, label = self.subset[index]
        if self.transform:
            image = self.transform(image)
        return image, label
        
    def __len__(self):
        return len(self.subset)

def create_dataloaders(data_dir, batch_size=32, train_ratio=0.7, val_ratio=0.15):
    if not os.path.exists(data_dir):
        raise FileNotFoundError(f"Data directory not found: {data_dir}")

    # Augmentation for training
    train_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(degrees=15),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    # Standard transform for val/test (No augmentation)
    test_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    # Load dataset without transforms initially
    full_dataset = datasets.ImageFolder(root=data_dir, transform=None)
    class_names = full_dataset.classes
    print(f"Classes found: {class_names}")

    # Split by Writer ID
    indices_by_writer = {}
    all_writers = set()

    for idx, (path, label) in enumerate(full_dataset.samples):
        filename = Path(path).name
        filename_no_ext = os.path.splitext(filename)[0]
        parts = filename_no_ext.split('_')

        if len(parts) >= 4:
            writer_id = parts[-2]
        else:
            print(f"Wrong file format: {filename}")
            writer_id = "unknown"

        if writer_id not in indices_by_writer:
            indices_by_writer[writer_id] = []

        indices_by_writer[writer_id].append(idx)
        all_writers.add(writer_id)

    writers_list = sorted(list(all_writers))
    rd.seed(42)
    rd.shuffle(writers_list)

    total_writers = len(writers_list)
    n_train = int(total_writers * train_ratio)
    n_val = int(total_writers * val_ratio)

    train_writers = writers_list[:n_train]
    val_writers = writers_list[n_train : n_train + n_val]
    test_writers = writers_list[n_train + n_val:]

    print(f"Total participants: {total_writers}")
    print(f"Training participants ({len(train_writers)}): {train_writers[:5]}...")
    print(f"Testing participants ({len(test_writers)}): {test_writers}...")

    train_indices = []
    val_indices = []
    test_indices = []

    for w in train_writers:
        train_indices.extend(indices_by_writer[w])
    for w in val_writers:
        val_indices.extend(indices_by_writer[w])
    for w in test_writers:
        test_indices.extend(indices_by_writer[w])
    
    # Apply transforms using Wrapper
    train_data = TransformedDatasetWrapper(Subset(full_dataset, train_indices), transform=train_transform)
    val_data = TransformedDatasetWrapper(Subset(full_dataset, val_indices), transform=test_transform)
    test_data = TransformedDatasetWrapper(Subset(full_dataset, test_indices), transform=test_transform)

    print(f"Sample Sizes -> Train: {len(train_data)}, Val: {len(val_data)}, Test: {len(test_data)}")

    # Create Loaders
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, num_workers=0)
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=0)

    return train_loader, val_loader, test_loader, class_names

# model_setup.py

In [20]:
class SimpleCNN(nn.Module):
    def __init__(self, num_classes):
        super(SimpleCNN, self).__init__()
        
        self.features = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128 * 28 * 28, 512), 
            nn.ReLU(),
            nn.Dropout(0.5), 
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

def get_model(model_name, num_classes, device):
    """
    Gets the wanted model from torch and returns it
    """

    if model_name == "resnet50":
        weights = ResNet50_Weights.DEFAULT
        model = models.resnet50(weights=weights)

        num_features = model.fc.in_features
        model.fc = nn.Linear(num_features, num_classes)

    elif model_name == "resnet18":
        weights = ResNet18_Weights.DEFAULT
        model = models.resnet18(weights=weights)

        num_features = model.fc.in_features
        model.fc = nn.Linear(num_features, num_classes)

    elif model_name == "vit_b_16":
        weights = ViT_B_16_Weights.DEFAULT
        model = models.vit_b_16(weights=weights)

        num_features = model.heads.head.in_features
        model.heads.head = nn.Linear(num_features, num_classes)
    
    elif model_name == "simple_cnn":
        model = SimpleCNN(num_classes)

    else:
        print("Error. Wrong model name")
        return None

    return model.to(device)

# engine.py

In [21]:
def train(model, dataloader, criterion, optimizer, device):
    """"Train the model for one epoch."""
    model.train()
    total_loss = 0
    correct_preds = 0
    total_preds = 0

    for images, labels in dataloader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()

        outputs = model(images)
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, preds = torch.max(outputs, 1)
        correct_preds += torch.sum(preds == labels.data)
        total_preds += labels.size(0)

    epoch_loss = total_loss / len(dataloader)
    epoch_acc = correct_preds.double() / total_preds

    return epoch_loss, epoch_acc


def validate(model, dataloader, criterion, device):
    """Validate the model for one epoch."""
    model.eval()

    running_loss = 0.0
    correct_preds = 0
    total_preds = 0

    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)

            _, preds = torch.max(outputs, 1)

            running_loss += loss.item() * images.size(0)
            correct_preds += torch.sum(preds == labels.data)
            total_preds += labels.size(0)

    epoch_loss = running_loss / total_preds
    epoch_acc = correct_preds.double() / total_preds

    return epoch_loss, epoch_acc


def test(model, dataloader, device, class_names):
    """Test the model and print classification report."""
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)

            outputs = model(images)
            _, preds = torch.max(outputs, 1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    print(classification_report(
            all_labels,
            all_preds,
            target_names=class_names,
            labels=np.arange(len(class_names)),
            zero_division=0
        ))

    return all_labels, all_preds

# ML Features

In [22]:
class IconMLManager:
    """Does data loading and model training for icon features"""

    @staticmethod
    def features_to_vectors(dataset):
        """Flatten JSON structure into a matrix X, label matrix y, and group matrix (writer_id)"""
        _x = []
        _y = []
        _groups = []  # List to store writer IDs for GroupShuffleSplit

        for sample in dataset:
            feature_vector = []

            # Process subdivisions
            for sub in sample['subdivisions']:
                feature_vector.extend([
                    sub['perimeter'],
                    sub['area'],
                    sub['compactness'],
                    sub['corners_count'],
                    sub['sharp_corners_count']
                ])
                feature_vector.extend(sub['hu_moments'])
                feature_vector.extend([
                    sub['line_directions']['horizontal'],
                    sub['line_directions']['vertical'],
                    sub['line_directions']['diag1'],
                    sub['line_directions']['diag2']
                ])

            # Process global features
            g = sample['global']
            feature_vector.extend([
                g['perimeter'],
                g['area'],
                g['compactness'],
                g['corners_count'],
                g['sharp_corners_count'],
                g['ellipse_count'],
                g['diagonal_length'],
                g['diagonal_angle'],
                g['convex_area']['convex_area'],
                g['convex_area']['solidity'],
                g['avg_centroidal_radius']
            ])
            feature_vector.extend(g['hu_moments'])
            feature_vector.extend([
                g['line_directions']['horizontal'],
                g['line_directions']['vertical'],
                g['line_directions']['diag1'],
                g['line_directions']['diag2']
            ])

            _x.append(feature_vector)
            _y.append(sample['label'])

            
            filename = sample.get('filename', '')  # Example: Fire_00000_1_05.png -> Writer ID is "1"
            parts = filename.replace('.png', '').split('_')

            if len(parts) >= 4:
                # The writer ID is typically the second to last element
                writer_id = parts[-2]
            else:
                writer_id = 'unknown'
            
            _groups.append(writer_id)


        # Return groups along with X and y
        return np.array(_x), np.array(_y), np.array(_groups)

# ML Train

In [23]:
def train_ml(json_path_str):
    """Runs the SVM and MLP training using extracted features with Writer-Independent Split"""
    print(f"\n{'='*30}")
    print("STARTING TRADITIONAL ML PIPELINE (SVM & MLP)")
    print(f"{'='*30}")

    json_path = Path(json_path_str)

    try:
        with open(json_path, 'r', encoding='utf-8') as f:
            raw_data = json.load(f)
    except FileNotFoundError:
        print(f"Error: {json_path} not found. Ensure extracted features exist.")
        return

    ml_manager = IconMLManager()
    X, y_raw, groups = ml_manager.features_to_vectors(raw_data)

    le = LabelEncoder()
    y = le.fit_transform(y_raw)

    print(f"Dataset: {X.shape[0]} samples, {X.shape[1]} features per sample.")
    print(f"Total Unique Writers: {len(np.unique(groups))}")

    # Split Train (70%) and Temp (30%)
    gss_train = GroupShuffleSplit(n_splits=1, train_size=0.70, random_state=42)
    train_idx, temp_idx = next(gss_train.split(X, y, groups))
    
    X_train, X_temp = X[train_idx], X[temp_idx]
    y_train, y_temp = y[train_idx], y[temp_idx]
    groups_temp = groups[temp_idx]

    # Split Temp into Val (15%) and Test (15%)
    gss_val = GroupShuffleSplit(n_splits=1, test_size=0.50, random_state=42)
    val_idx, test_idx = next(gss_val.split(X_temp, y_temp, groups_temp))

    X_val, X_test = X_temp[val_idx], X_temp[test_idx]
    y_val, y_test = y_temp[val_idx], y_temp[test_idx]

    print(f"Split: Train={len(X_train)}, Val={len(X_val)}, Test={len(X_test)}")

    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_val_scaled = scaler.transform(X_val)
    X_test_scaled = scaler.transform(X_test)

    # SVM
    print("\n--- Training SVM ---")
    svm_model = SVC(kernel='rbf', C=10.0, gamma='scale', random_state=42)
    svm_model.fit(X_train_scaled, y_train)

    val_pred_svm = svm_model.predict(X_val_scaled)
    test_pred_svm = svm_model.predict(X_test_scaled)
    print(f"SVM Val Accuracy: {accuracy_score(y_val, val_pred_svm):.4f}")
    print(f"SVM Test Accuracy: {accuracy_score(y_test, test_pred_svm):.4f}")

    plot_confusion_matrix(y_test, test_pred_svm, le.classes_, "SVM")

    # MLP
    print("\n --- Training MLP (Sklearn) ---")
    mlp_model = MLPClassifier(
        hidden_layer_sizes=(128, 64),
        activation='relu',
        solver='adam',
        max_iter=1000,
        early_stopping=True,
        validation_fraction=0.1,
        random_state=42
    )
    mlp_model.fit(X_train_scaled, y_train)

    val_pred_mlp = mlp_model.predict(X_val_scaled)
    print(f"MLP Val Accuracy: {accuracy_score(y_val, val_pred_mlp):.4f}")

    test_pred_mlp = mlp_model.predict(X_test_scaled)
    print(f"MLP Test Accuracy: {accuracy_score(y_test, test_pred_mlp):.4f}")

    print("\n --- MLP Detailed Report ----")
    print(classification_report(y_test, test_pred_mlp, target_names=le.classes_))
    
    plot_confusion_matrix(y_test, test_pred_mlp, le.classes_, "MLP")
    plot_mlp_loss_curve(mlp_model, "MLP")

# Execute

In [24]:
DATA_PATH = "../data/extracted"
JSON_FEATURES_PATH = "../features_dataset.json" 

MODELS_DL = ["simple_cnn", "resnet50", "vit_b_16", "resnet18"]
EPOCHS = 10
BATCH_SIZE = 32
LR = 0.0001

def main_pipeline():
    print(f"Working on {DEVICE}")

    #  TRADITIONAL ML PART

    if Path(JSON_FEATURES_PATH).exists():
        train_ml(JSON_FEATURES_PATH)
    else:
        print(f"Warning: {JSON_FEATURES_PATH} not found. Skipping ML training.")
        print("Please run extraction.ipynb first to generate the JSON file.")
    
    #  DEEP LEARNING PART

    if not Path(DATA_PATH).exists():
        print(f"Error: {DATA_PATH} not found.")
        return

    # Using create_dataloaders function defined in previous cells
    train_loader, val_loader, test_loader, class_names = create_dataloaders(DATA_PATH, BATCH_SIZE)

    if not os.path.exists("models"):
        os.makedirs("models")

    for model_name in MODELS_DL:
        print(f"\n{'-'*30}")
        print(f"NOW TRAINING {model_name}")
        print(f"\n{'-'*30}")

        model = get_model(model_name, len(class_names), DEVICE)

        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=LR)

        results = {
            "train_loss": [],
            "train_acc": [],
            "val_loss": [],
            "val_acc": []
        }

        for epoch in range(EPOCHS):
            # train and validate functions 
            train_loss, train_acc = train(model, train_loader, criterion, optimizer, DEVICE)
            val_loss, val_acc = validate(model, val_loader, criterion, DEVICE)

            t_acc = train_acc.item() if isinstance(train_acc, torch.Tensor) else train_acc
            v_acc = val_acc.item() if isinstance(val_acc, torch.Tensor) else val_acc

            results["train_loss"].append(train_loss)
            results["train_acc"].append(t_acc)
            results["val_loss"].append(val_loss)
            results["val_acc"].append(v_acc)

            print(
                f"Epoch {epoch+1}/{EPOCHS} | "
                f"Train Loss: {train_loss:.4f} | "
                f"Train Acc: {t_acc:.4f} | "
                f"Val Loss: {val_loss:.4f} | "
                f"Val Acc: {v_acc:.4f}"
            )

        print(f"\n--- {model_name} TEST RESULTS ---")

        # test
        y_true, y_pred = test(model, test_loader, DEVICE, class_names)

        # Visualization
        plot_curves(results, model_name)
        plot_confusion_matrix(y_true, y_pred, class_names, model_name)

        # Saving the Model
        torch.save(model.state_dict(), f"models/{model_name}_final.pth")
        print("Model Saved")

        # Memory cleanup
        del model
        torch.cuda.empty_cache()


if __name__ == "__main__":
    main_pipeline()

Working on cuda

STARTING TRADITIONAL ML PIPELINE (SVM & MLP)
Dataset: 2095 samples, 118 features per sample.
Total Unique Writers: 30
Split: Train=1465, Val=280, Test=350

--- Training SVM ---
SVM Val Accuracy: 0.9321
SVM Test Accuracy: 0.9400

 --- Training MLP (Sklearn) ---
MLP Val Accuracy: 0.8893
MLP Test Accuracy: 0.9000

 --- MLP Detailed Report ----
              precision    recall  f1-score   support

        Bomb       0.96      0.96      0.96        25
         Car       0.86      1.00      0.93        25
    Casualty       0.85      0.92      0.88        25
 Electricity       1.00      0.92      0.96        25
        Fire       0.73      0.96      0.83        25
Fire_brigade       1.00      0.96      0.98        25
       Flood       0.89      0.64      0.74        25
         Gas       0.85      0.92      0.88        25
      Injury       0.91      0.84      0.88        25
  Paramedics       0.92      0.92      0.92        25
      Person       0.96      0.96      0.96  