<a href="https://colab.research.google.com/github/asheta66/Machine-Learning-2024/blob/main/Egypt/Egyptian_Hieroglyph_Classification_via_Lightweight_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ============================================================
# Lightweight CNN for Egyptian Hieroglyph Classification
# - Loads annotations from Google Drive: train/valid/test/_annotations.csv
# - Builds train/val/test tf.data pipelines
# - Trains a lightweight CNN
# - Reports Accuracy, Precision, Recall, F1 (macro) for train/val/test
# - Plots convergence curves (train/val + test per-epoch)
# - Plots ROC curves (one-vs-rest) for train/val/test
# ------------------------------------------------------------
# One-cell, robust to common CSV schemas and image layouts
# ============================================================

import os, re, glob, sys, math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# ------------------ ENV / PATHS ------------------
USE_DRIVE = True  # Set to False if running on Kaggle with local dataset
BASE_PATH = "/content/drive/MyDrive/egyptian-hieroglyphs"  # update if needed

if USE_DRIVE:
    try:
        from google.colab import drive
        drive.mount('/content/drive')
    except Exception as e:
        print("Colab drive mount not available; continuing anyway.", e)

# If not using Drive (e.g., Kaggle), set something like:
# BASE_PATH = "/kaggle/input/your-dataset-folder"

# ------------------ CONFIG ------------------
IMG_SIZE   = 128
BATCH_SIZE = 32
EPOCHS     = 15
SEED       = 42

import tensorflow as tf
tf.random.set_seed(SEED)
np.random.seed(SEED)

from sklearn.preprocessing import LabelEncoder, label_binarize
from sklearn.metrics import (
    confusion_matrix, ConfusionMatrixDisplay,
    accuracy_score, precision_score, recall_score, f1_score,
    roc_curve, auc
)

from tensorflow.keras import layers, models

# ------------------ HELPERS: LOADING CSVs ------------------
def _first_existing(path_list):
    for p in path_list:
        if os.path.isfile(p):
            return p
    return None

def read_annotations_for_split(base, split):
    """
    Read an annotations CSV for a split (train/valid/test).
    Tries several common filenames.
    Returns a DataFrame with columns ['image','label'].
    """
    candidates = [
        os.path.join(base, split, "_annotations.csv"),
        os.path.join(base, f"{split}_annotations.csv"),
        os.path.join(base, split, "annotations.csv"),
        os.path.join(base, split, "labels.csv"),
    ]
    csv_path = _first_existing(candidates)
    if csv_path is None:
        raise FileNotFoundError(f"No annotations CSV found for split '{split}' in {base}. "
                                f"Tried: {candidates}")

    df_raw = pd.read_csv(csv_path)
    # Guess image column
    img_cols = [c for c in df_raw.columns if c.lower() in
                ["file","filename","image","image_path","path","img_path","imagefile"]]
    if not img_cols:
        for c in df_raw.columns:
            if df_raw[c].astype(str).str.contains(r"\.(jpg|jpeg|png|bmp|gif)$", case=False, na=False).any():
                img_cols.append(c); break
    if not img_cols:
        raise ValueError(f"Could not find image path column in {csv_path}")
    img_col = img_cols[0]

    # Guess label column
    lbl_cols = [c for c in df_raw.columns if c.lower() in ["label","class","category","name"]]
    if not lbl_cols:
        for c in df_raw.columns:
            lc = c.lower()
            if "class" in lc or "label" in lc or "category" in lc or lc.endswith("name"):
                lbl_cols.append(c); break
    if not lbl_cols:
        raise ValueError(f"Could not find label/class column in {csv_path}")
    lbl_col = lbl_cols[0]

    df = df_raw[[img_col, lbl_col]].copy()
    df.columns = ["image","label"]
    # If detection-style duplicates, reduce to one label/image (first)
    df = df.groupby("image", as_index=False).first()
    # Resolve image paths
    df["image"] = df["image"].astype(str).apply(lambda p: resolve_img_path(BASE_PATH, split, p))
    df = df[df["image"].notnull() & df["image"].apply(os.path.isfile)].copy()
    df = df.drop_duplicates(subset=["image"])
    return df

def resolve_img_path(base, split, p):
    """
    Resolve image path strings to actual files.
    Tries:
      - absolute paths
      - base/split/<p>
      - base/split/images/<p>
      - base/<p>
      - basename lookup anywhere under base
    """
    if os.path.isabs(p) and os.path.isfile(p):
        return p
    trials = [
        os.path.join(base, split, p),
        os.path.join(base, split, "images", p),
        os.path.join(base, p),
        os.path.join(base, p.lstrip("/")),
    ]
    for t in trials:
        if os.path.isfile(t):
            return t
    # basename search
    base_name = os.path.basename(p)
    hits = glob.glob(os.path.join(base, "**", base_name), recursive=True)
    for h in hits:
        if os.path.isfile(h):
            return h
    return None

# ------------------ HELPERS: TF DATA ------------------
def load_image_tf(path, img_size=IMG_SIZE):
    img = tf.io.read_file(path)
    img = tf.image.decode_image(img, channels=3, expand_animations=False)
    img = tf.image.convert_image_dtype(img, tf.float32)  # [0,1]
    img = tf.image.resize(img, (img_size, img_size), antialias=True)
    return img

def make_tf_dataset(paths, labels, batch_size=BATCH_SIZE, shuffle=False, augment=False):
    ds_paths = tf.data.Dataset.from_tensor_slices(np.array(paths))
    ds_imgs = ds_paths.map(lambda p: load_image_tf(p), num_parallel_calls=tf.data.AUTOTUNE)
    if augment:
        aug = tf.keras.Sequential([
            layers.RandomFlip("horizontal"),
            layers.RandomRotation(0.05),
            layers.RandomZoom(0.1),
        ])
        ds_imgs = ds_imgs.map(lambda x: aug(x), num_parallel_calls=tf.data.AUTOTUNE)
    ds_labels = tf.data.Dataset.from_tensor_slices(labels)
    ds = tf.data.Dataset.zip((ds_imgs, ds_labels))
    if shuffle:
        ds = ds.shuffle(buffer_size=len(paths), seed=SEED, reshuffle_each_iteration=True)
    ds = ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return ds

# ------------------ MODEL ------------------
def build_light_cnn(num_classes):
    model = models.Sequential([
        layers.Conv2D(16, (3,3), padding="same", activation='relu', input_shape=(IMG_SIZE, IMG_SIZE, 3)),
        layers.MaxPooling2D(2),

        layers.Conv2D(32, (3,3), padding="same", activation='relu'),
        layers.MaxPooling2D(2),

        layers.Conv2D(64, (3,3), padding="same", activation='relu'),
        layers.MaxPooling2D(2),

        layers.GlobalAveragePooling2D(),
        layers.Dropout(0.25),
        layers.Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    return model

# ------------------ CALLBACK: TEST METRICS PER EPOCH ------------------
class TestEvalCallback(tf.keras.callbacks.Callback):
    def __init__(self, test_ds):
        super().__init__()
        self.test_ds = test_ds
        self.test_loss = []
        self.test_acc = []
    def on_epoch_end(self, epoch, logs=None):
        loss, acc = self.model.evaluate(self.test_ds, verbose=0)
        self.test_loss.append(loss)
        self.test_acc.append(acc)
        print(f" — TEST: loss={loss:.4f} acc={acc:.4f}")

# ------------------ PLOTTING HELPERS ------------------
def plot_convergence(history, test_cb):
    # Loss
    plt.figure(figsize=(6.4,4))
    plt.plot(history.history['loss'], label='Train Loss')
    if 'val_loss' in history.history:
        plt.plot(history.history['val_loss'], label='Val Loss')
    if test_cb is not None and test_cb.test_loss:
        plt.plot(test_cb.test_loss, label='Test Loss')
    plt.title('Convergence (Loss)'); plt.xlabel('Epoch'); plt.ylabel('Loss'); plt.legend(); plt.grid(True)
    plt.tight_layout(); plt.show()
    # Accuracy
    plt.figure(figsize=(6.4,4))
    plt.plot(history.history['accuracy'], label='Train Acc')
    if 'val_accuracy' in history.history:
        plt.plot(history.history['val_accuracy'], label='Val Acc')
    if test_cb is not None and test_cb.test_acc:
        plt.plot(test_cb.test_acc, label='Test Acc')
    plt.title('Convergence (Accuracy)'); plt.xlabel('Epoch'); plt.ylabel('Accuracy'); plt.legend(); plt.grid(True)
    plt.tight_layout(); plt.show()

def compute_and_show_metrics(name, y_true, y_prob, class_names, max_classes_in_plot=10):
    y_pred = np.argmax(y_prob, axis=1)
    acc  = accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, average='macro', zero_division=0)
    rec  = recall_score(y_true, y_pred, average='macro', zero_division=0)
    f1   = f1_score(y_true, y_pred, average='macro', zero_division=0)
    print(f"\n=== {name} Metrics ===")
    print(f"Accuracy : {acc:.4f}")
    print(f"Precision: {prec:.4f} (macro)")
    print(f"Recall   : {rec:.4f} (macro)")
    print(f"F1       : {f1:.4f} (macro)")

    # Confusion Matrix
    cm = confusion_matrix(y_true, y_pred, labels=range(len(class_names)))
    fig, ax = plt.subplots(figsize=(5.8,4.8))
    ConfusionMatrixDisplay(cm, display_labels=class_names).plot(ax=ax, colorbar=False, cmap="Greens")
    ax.set_title(f"{name} Confusion Matrix")
    plt.tight_layout(); plt.show()

    # ROC (one-vs-rest)
    y_true_bin = label_binarize(y_true, classes=range(len(class_names)))
    # Some splits may miss classes; handle gracefully
    fpr, tpr, roc_auc = {}, {}, {}
    present = []
    for i in range(len(class_names)):
        if y_true_bin[:, i].max() == 0:
            continue
        present.append(i)
        fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], y_prob[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    valid_aucs = [roc_auc[i] for i in present]
    macro_auc = np.mean(valid_aucs) if valid_aucs else np.nan

    # micro-average
    if y_true_bin.sum() > 0:
        fpr["micro"], tpr["micro"], _ = roc_curve(y_true_bin.ravel(), y_prob.ravel())
        roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

    plt.figure(figsize=(6.4,5.2))
    # Plot up to max_classes_in_plot class-curves to avoid clutter
    to_plot = present[:max_classes_in_plot]
    for i in to_plot:
        plt.plot(fpr[i], tpr[i], label=f"{class_names[i]} (AUC={roc_auc[i]:.3f})")
    if len(present) > max_classes_in_plot:
        plt.plot([], [], ' ', label=f"... (+{len(present)-max_classes_in_plot} more classes)")
    if "micro" in roc_auc:
        plt.plot(fpr["micro"], tpr["micro"], linestyle="--", label=f"Micro (AUC={roc_auc['micro']:.3f})")
    plt.plot([0,1],[0,1], linestyle=':', label='Chance')
    plt.title(f"{name} ROC (macro AUC={macro_auc:.3f})")
    plt.xlabel("False Positive Rate"); plt.ylabel("True Positive Rate")
    plt.legend(fontsize=8, loc="lower right"); plt.grid(True, linestyle=":")
    plt.tight_layout(); plt.show()

    return {"Accuracy": acc, "Precision": prec, "Recall": rec, "F1": f1, "MacroAUC": macro_auc}

# ------------------ LOAD DATA ------------------
print("Loading annotations from Google Drive paths...")
df_train = read_annotations_for_split(BASE_PATH, "train")
df_val   = read_annotations_for_split(BASE_PATH, "valid")  # uses 'valid' split name
df_test  = read_annotations_for_split(BASE_PATH, "test")

print(f"df_train: {len(df_train)} | df_val: {len(df_val)} | df_test: {len(df_test)}")

# Encode labels jointly (ensures consistent class ids across splits)
le = LabelEncoder()
all_labels = pd.concat([df_train["label"], df_val["label"], df_test["label"]], axis=0).astype(str).values
le.fit(all_labels)
class_names = list(le.classes_)
num_classes = len(class_names)
print(f"Classes ({num_classes}): {class_names[:10]}{'...' if num_classes>10 else ''}")

df_train["y"] = le.transform(df_train["label"].astype(str))
df_val["y"]   = le.transform(df_val["label"].astype(str))
df_test["y"]  = le.transform(df_test["label"].astype(str))

train_paths, train_labels = df_train["image"].tolist(), df_train["y"].values
val_paths,   val_labels   = df_val["image"].tolist(),   df_val["y"].values
test_paths,  test_labels  = df_test["image"].tolist(),  df_test["y"].values

# ------------------ TF.DATA ------------------
train_ds = make_tf_dataset(train_paths, train_labels, shuffle=True,  augment=True)
val_ds   = make_tf_dataset(val_paths,   val_labels,   shuffle=False, augment=False)
test_ds  = make_tf_dataset(test_paths,  test_labels,  shuffle=False, augment=False)

# ------------------ MODEL + TRAIN ------------------
model = build_light_cnn(num_classes)
model.summary()

test_cb = TestEvalCallback(test_ds)
history = model.fit(
    train_ds,
    epochs=EPOCHS,
    validation_data=val_ds,
    callbacks=[test_cb],
    verbose=1
)

# Convergence curves (train/val + test per-epoch)
plot_convergence(history, test_cb)

# ------------------ FINAL EVALUATIONS ------------------
# Collect probabilities
train_probs = model.predict(train_ds, verbose=0)
val_probs   = model.predict(val_ds,   verbose=0)
test_probs  = model.predict(test_ds,  verbose=0)

# Collect true labels from the datasets (preserve order)
y_train_true = np.concatenate([y.numpy() for _, y in train_ds.unbatch()])
y_val_true   = np.concatenate([y.numpy() for _, y in val_ds.unbatch()])
y_test_true  = np.concatenate([y.numpy() for _, y in test_ds.unbatch()])

# Metrics + Confusion + ROC
metrics_train = compute_and_show_metrics("Train", y_train_true, train_probs, class_names)
metrics_val   = compute_and_show_metrics("Validation", y_val_true,   val_probs,   class_names)
metrics_test  = compute_and_show_metrics("Test",  y_test_true,  test_probs,  class_names)

# Summary table
summary = pd.DataFrame([
    {"Split":"Train",      **metrics_train},
    {"Split":"Validation", **metrics_val},
    {"Split":"Test",       **metrics_test},
]).set_index("Split")
print("\n=== Summary (macro metrics) ===")
display(summary)


Mounted at /content/drive
Loading annotations from Google Drive paths...
df_train: 2723 | df_val: 778 | df_test: 389
Classes (95): ['100', 'Among', 'Angry', 'Ankh', 'Aroura', 'At', 'Bad_Thinking', 'Bandage', 'Bee', 'Belongs']...


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/15
