
# EmotionRecognitionCNN.ipynb

This notebook:
1. Load `data/cls_pool/`, create a **seeded stratified** split into `data/cls_data/` (one time).
2. Define **BASELINE CONFIG** and train/evaluate (frozen).
3. Define a reusable **run_experiment(config, tag)** that:
  - builds model
  - trains with callbacks
  - evaluates (val/test, F1, confusion)
  - logs to `artifacts/trials/<run_id>/` and updates `artifacts/outputs/leaderboard.csv`
4. Define optimization framework for experimentation: **optimizations only** (override config keys), compare in leaderboard/plots.



## Setup Imports, fixed seed, paths


In [None]:
import sys, os, platform, json, math, time, uuid, csv, shutil, random, sklearn
from datetime import datetime
from pathlib import Path
import numpy as np
import pandas as pd
import tqdm as tq
import matplotlib.pyplot as plt
from tqdm.auto import tqdm as pb
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import classification_report, confusion_matrix, f1_score
import itertools
import io

print("Python:", sys.version)
print("OS:", platform.platform())
print("NumPy:", np.__version__)
print("TensorFlow:", tf.__version__)
print("scikit-learn:", sklearn.__version__)
print("tqdm:", tq.__version__)

# Reproducibility
SEED = 42
random.seed(SEED); np.random.seed(SEED); tf.random.set_seed(SEED)

# Paths
DATA = Path("../data")
POOL = DATA / "cls_pool"       # prepeared dataset pool
CLS  = DATA / "cls_data"       # split written here
ART  = Path("../artifacts")
MODELS = ART / "models"
TRIALS = ART / "trials"
OUTS   = ART / "outputs"
for p in [CLS, MODELS, TRIALS, OUTS]: p.mkdir(parents=True, exist_ok=True)

# Class names
with open(OUTS / "class_names.json", "r") as f:
    CLASS_NAMES = json.load(f)
NUM_CLASSES = len(CLASS_NAMES)
print("Classes:", CLASS_NAMES)


## Create a seeded, stratified split (70/15/15) **once**, save manifest


In [None]:
# Collect pool paths & labels
X, y = [], []
for i, cname in enumerate(sorted(CLASS_NAMES)):
    for p in (POOL / cname).glob("*"):
        if p.suffix.lower() in {".jpg",".jpeg",".png"}:
            X.append(p); y.append(cname)
X = np.array(X); y = np.array(y)

# If a previous split manifest exists, reuse it (keeps trials comparable)
split_manifest_path = OUTS / "split_manifest.json"
if split_manifest_path.exists():
    with open(split_manifest_path, "r") as f:
        split_manifest = json.load(f)
    print("Loaded existing split manifest.")
else:
    # 70/30, then split the 30 into 15/15
    sss1 = StratifiedShuffleSplit(n_splits=1, test_size=0.30, random_state=SEED)
    (train_idx, temp_idx) = next(sss1.split(X, y))
    X_train, y_train = X[train_idx], y[train_idx]
    X_temp,  y_temp  = X[temp_idx],  y[temp_idx]

    sss2 = StratifiedShuffleSplit(n_splits=1, test_size=0.50, random_state=SEED)
    (val_idx, test_idx) = next(sss2.split(X_temp, y_temp))
    X_val, y_val   = X_temp[val_idx],  y_temp[val_idx]
    X_test, y_test = X_temp[test_idx], y_temp[test_idx]

    # Write files into cls_data/<split>/<class> (idempotent)
    def place(paths, labels, split):
        print(f"Copying {len(paths)} files to {split}/ ...")
        for p, lbl in pb(zip(paths, labels), total=len(paths), desc=f"{split:>5s} split"):
            dst = CLS / split / lbl
            dst.mkdir(parents=True, exist_ok=True)
            shutil.copy2(p, dst / p.name)
    place(X_train, y_train, "train")
    place(X_val,   y_val,   "val")
    place(X_test,  y_test,  "test")

    split_manifest = {
        "seed": SEED,
        "splits": {
            "train": [str((CLS / "train" / y_train[i] / X_train[i].name).resolve()) for i in range(len(X_train))],
            "val":   [str((CLS / "val"   / y_val[i]   / X_val[i].name).resolve())   for i in range(len(X_val))],
            "test":  [str((CLS / "test"  / y_test[i]  / X_test[i].name).resolve())  for i in range(len(X_test))]
        }
    }
    with open(split_manifest_path, "w") as f:
        json.dump(split_manifest, f, indent=2)
    print("Created split and saved manifest:", split_manifest_path)

# Keras datasets
IMG = 96; BATCH = 64
def make_ds(split):
    return tf.keras.preprocessing.image_dataset_from_directory(
        CLS / split,
        labels="inferred", label_mode="int",
        color_mode="grayscale", image_size=(IMG, IMG),
        batch_size=BATCH, shuffle=True, seed=SEED)
train_ds = make_ds("train"); val_ds = make_ds("val"); test_ds = make_ds("test")
class_names = train_ds.class_names
assert class_names == CLASS_NAMES, (class_names, CLASS_NAMES)

AUTOTUNE = tf.data.AUTOTUNE
train_ds = train_ds.cache().prefetch(AUTOTUNE)
val_ds   = val_ds.cache().prefetch(AUTOTUNE)
test_ds  = test_ds.cache().prefetch(AUTOTUNE)


## Define **Baseline configuration**

This cell will remain unchanged. All future optimizations will override keys in later defined cells.


In [None]:
BASELINE = {
    "img_size": IMG,
    "batch_size": BATCH,
    "epochs": 60,
    "optimizer": "adam",
    "learning_rate": 1e-3,
    "l2": 1e-4,
    "dropout": 0.4,
    "augment": {"flip": True, "rotation": 0.08, "zoom": 0.10},
    "filters": [32, 64, 128, 256],
    "use_residual": True,
    "seed": SEED,
}
print(json.dumps(BASELINE, indent=2))


## Define model builder utility and **run_experiment(config, tag)**

This utility will provide the following functionality:
- builds the CNN from `config`
- trains with EarlyStopping + ReduceLROnPlateau
- evaluates on **val** and **test**
- logs everything to `artifacts/trials/<run_id>/` and updates `artifacts/outputs/leaderboard.csv`


In [None]:
def conv_block(x, f, k=3, pool=True, l2=1e-4):
    x = layers.Conv2D(f, k, padding="same", use_bias=False,
                      kernel_regularizer=regularizers.l2(l2))(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    if pool: x = layers.MaxPool2D()(x)
    return x

def residual_stack(x, f, l2=1e-4):
    sc = layers.Conv2D(f, 1, padding="same", use_bias=False,
                       kernel_regularizer=regularizers.l2(l2))(x)
    sc = layers.BatchNormalization()(sc)
    y = layers.Conv2D(f, 3, padding="same", use_bias=False,
                      kernel_regularizer=regularizers.l2(l2))(x)
    y = layers.BatchNormalization()(y); y = layers.ReLU()(y)
    y = layers.Conv2D(f, 3, padding="same", use_bias=False,
                      kernel_regularizer=regularizers.l2(l2))(y)
    y = layers.BatchNormalization()(y)
    y = layers.Add()([y, sc]); y = layers.ReLU()(y)
    y = layers.MaxPool2D()(y)
    return y

def build_model(cfg):
    tf.random.set_seed(cfg["seed"])
    augment_layers = []
    if cfg["augment"].get("flip"):     augment_layers.append(layers.RandomFlip("horizontal"))
    if cfg["augment"].get("rotation"): augment_layers.append(layers.RandomRotation(cfg["augment"]["rotation"]))
    if cfg["augment"].get("zoom"):     augment_layers.append(layers.RandomZoom(cfg["augment"]["zoom"]))
    augment = tf.keras.Sequential(augment_layers)

    inp = layers.Input((cfg["img_size"], cfg["img_size"], 1))
    x = augment(inp)
    x = layers.Rescaling(1./255)(x)

    f1, f2, f3, f4 = cfg["filters"]
    x = conv_block(x, f1, l2=cfg["l2"])
    x = conv_block(x, f2, l2=cfg["l2"])
    if cfg["use_residual"]:
        x = residual_stack(x, f3, l2=cfg["l2"])
    else:
        x = conv_block(x, f3, l2=cfg["l2"])
        x = layers.MaxPool2D()(x)

    x = conv_block(x, f4, pool=False, l2=cfg["l2"])
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(cfg["dropout"])(x)
    out = layers.Dense(NUM_CLASSES, activation="softmax")(x)
    model = models.Model(inp, out)

    opt = tf.keras.optimizers.Adam(cfg["learning_rate"]) if cfg["optimizer"] == "adam" else tf.keras.optimizers.SGD(cfg["learning_rate"], momentum=0.9, nesterov=True)
    model.compile(optimizer=opt, loss="sparse_categorical_crossentropy", metrics=["accuracy"])
    return model

def compute_class_weights(ds):
    # count labels from train_ds
    counts = np.zeros(NUM_CLASSES, dtype=np.int64)
    for _, yb in tf.data.Dataset.unbatch(ds.take(999999)):
        counts[int(yb.numpy())] += 1
    total = counts.sum()
    return {i: float(total/(NUM_CLASSES*counts[i])) for i in range(NUM_CLASSES)}

LEADERBOARD = OUTS / "leaderboard.csv"
if not LEADERBOARD.exists():
    with open(LEADERBOARD, "w", newline="") as f:
        csv.writer(f).writerow(["run_id","tag","val_acc","val_f1_macro","test_acc","test_f1_macro","epochs","params_json"])

def plot_accuracy(history, out_png_path, show=True):
    h = history.history
    epochs = range(1, len(h["loss"]) + 1)

    plt.figure(figsize=(6,4))
    if "accuracy" in h:      plt.plot(epochs, h["accuracy"], label="train")
    if "val_accuracy" in h:  plt.plot(epochs, h["val_accuracy"], label="val")
    plt.xlabel("Epoch"); plt.ylabel("Accuracy"); plt.title("Accuracy")
    plt.legend(); plt.tight_layout()
    plt.savefig(out_png_path, dpi=160)
    if show: plt.show()
    plt.close()

def plot_loss(history, out_png_path, show=True):
    h = history.history
    epochs = range(1, len(h["loss"]) + 1)

    plt.figure(figsize=(6,4))
    plt.plot(epochs, h["loss"], label="train")
    if "val_loss" in h: plt.plot(epochs, h["val_loss"], label="val")
    plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.title("Loss")
    plt.legend(); plt.tight_layout()
    plt.savefig(out_png_path, dpi=160)
    if show: plt.show()
    plt.close()

def plot_confusion(cm, class_names, out_png_path, normalize=True, cmap="Blues", show=True):
    """Friendly confusion matrix; saves to file and (optionally) shows inline."""
    if normalize:
        with np.errstate(all="ignore"):
            cmn = cm.astype("float") / cm.sum(axis=1, keepdims=True)
            cmn = np.nan_to_num(cmn)
    else:
        cmn = cm

    plt.figure(figsize=(8,6))
    plt.imshow(cmn, interpolation="nearest", cmap=cmap)
    plt.title("Confusion Matrix" + (" (normalized)" if normalize else ""))
    plt.colorbar()
    ticks = np.arange(len(class_names))
    plt.xticks(ticks, class_names, rotation=45, ha="right")
    plt.yticks(ticks, class_names)

    thresh = cmn.max() / 2.0
    for i, j in itertools.product(range(cmn.shape[0]), range(cmn.shape[1])):
        val = cmn[i, j] if normalize else cm[i, j]
        txt_color = "white" if cmn[i, j] > thresh else "black"
        plt.text(j, i, f"{val:.2f}" if normalize else f"{val}",
                 ha="center", va="center", color=txt_color, fontsize=9)

    plt.ylabel("True label"); plt.xlabel("Predicted label")
    plt.tight_layout()
    plt.savefig(out_png_path, dpi=160)
    if show: plt.show()
    plt.close()

def save_model_summary(model, out_txt_path):
    buff = io.StringIO()
    model.summary(print_fn=lambda s: buff.write(s + "\n"))  # capture
    summary_txt = buff.getvalue()
    with open(out_txt_path, "w", encoding="utf-8") as f:
        f.write(summary_txt)

def run_experiment(cfg, tag="baseline"):
    run_id = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + "_" + tag
    run_dir = TRIALS / run_id
    run_dir.mkdir(parents=True, exist_ok=True)

    # freeze params used
    with open(run_dir / "params.json", "w") as f:
        json.dump(cfg, f, indent=2)

    model = build_model(cfg)

    # save model summary
    save_model_summary(model, run_dir / "model_summary.txt")

    cbs = [
        tf.keras.callbacks.EarlyStopping(monitor="val_accuracy", patience=8, restore_best_weights=True),
        tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", patience=3, factor=0.5),
        tf.keras.callbacks.ModelCheckpoint(str(run_dir / "model.keras"), monitor="val_accuracy", save_best_only=True),
    ]
    class_weight = compute_class_weights(train_ds)

    hist = model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=cfg["epochs"],
        class_weight=class_weight,
        callbacks=cbs,
        verbose=1
    )

    # show + save accuracy & loss plots
    plot_accuracy(hist, run_dir / "accuracy.png", show=True)
    plot_loss(hist,     run_dir / "loss.png",     show=True)

    # Save history json
    with open(run_dir / "history.json", "w") as f:
        json.dump({k: [float(x) for x in v] for k, v in hist.history.items()}, f, indent=2)

    # Evaluate
    val_metrics = model.evaluate(val_ds, verbose=0)
    test_metrics = model.evaluate(test_ds, verbose=0)

    # Detailed predictions for test set
    y_true, y_pred = [], []
    for xb, yb in tf.data.Dataset.unbatch(test_ds):
        probs = model.predict(tf.expand_dims(xb, 0), verbose=0)[0]
        y_true.append(int(yb.numpy()))
        y_pred.append(int(np.argmax(probs)))

    test_f1_macro = float(f1_score(y_true, y_pred, average="macro"))

    # Classification report (save file; also print short summary)
    report = classification_report(y_true, y_pred, target_names=CLASS_NAMES)
    print("\nClassification report (test):\n", report)
    with open(run_dir / "classification_report.txt", "w", encoding="utf-8") as f:
        f.write(report)

    # save + show Confusion matrices: raw & normalized
    cm = confusion_matrix(y_true, y_pred)
    plot_confusion(cm, CLASS_NAMES, run_dir / "confusion_matrix_raw.png", normalize=False, cmap="Blues", show=True)
    plot_confusion(cm, CLASS_NAMES, run_dir / "confusion_matrix_norm.png", normalize=True,  cmap="Blues", show=True)

    # Summaries
    val_acc  = float(val_metrics[1])
    test_acc = float(test_metrics[1])
    metrics = {
        "val_accuracy": val_acc,
        "test_accuracy": test_acc,
        "test_f1_macro": test_f1_macro,
        "epochs_trained": int(len(hist.history["loss"]))
    }
    with open(run_dir / "metrics.json", "w") as f:
        json.dump(metrics, f, indent=2)

    # Leaderboard row
    with open(LEADERBOARD, "a", newline="") as f:
        csv.writer(f).writerow([
            run_id, tag,
            f"{val_acc:.4f}",
            f"{float(f1_score(y_true, y_pred, average='macro')):.4f}",
            f"{test_acc:.4f}",
            f"{test_f1_macro:.4f}",
            metrics["epochs_trained"],
            json.dumps(cfg)
        ])

    print("Saved trial to:", run_dir)
    return run_id, metrics



## Run baseline


In [None]:
# Make a copy so later optimizations won't recast BASELINE
baseline_cfg = json.loads(json.dumps(BASELINE))
run_id, metrics = run_experiment(baseline_cfg, tag="baseline")
metrics


## Run optimization: Learning Rate Reduction

In [None]:
# Learning Rate Reduction
lr_slow_cfg = json.loads(json.dumps(BASELINE))
lr_slow_cfg["learning_rate"] = 2.5e-4
run_id, metrics = run_experiment(lr_slow_cfg, tag="lr_2p5e-4")
metrics

## Run optimization: Higher Dropout

In [None]:
# Higher Dropout (stronger regularization)
dropout_high_cfg = json.loads(json.dumps(BASELINE))
dropout_high_cfg["dropout"] = 0.5
run_id, metrics = run_experiment(dropout_high_cfg, tag="dropout_0p5")
metrics

## Run optimization: Lower Dropout

In [None]:
# Lower Dropout (weaker regularization)
dropout_low_cfg = json.loads(json.dumps(BASELINE))
dropout_low_cfg["dropout"] = 0.3
run_id, metrics = run_experiment(dropout_low_cfg, tag="dropout_0p3")
metrics

## Run optimization: Reduced Filter Depth

In [None]:
# Reduced Filter Depth (lighter model)
filters_light_cfg = json.loads(json.dumps(BASELINE))
filters_light_cfg["filters"] = [32, 64, 96, 128]
run_id, metrics = run_experiment(filters_light_cfg, tag="filters_light")
metrics

## Run optimization: Increased Early Filter Depth

In [None]:
# Increased Early Filter Depth (richer low-level features)
filters_richfront_cfg = json.loads(json.dumps(BASELINE))
filters_richfront_cfg["filters"] = [48, 64, 128, 256]
run_id, metrics = run_experiment(filters_richfront_cfg, tag="filters_richfront")
metrics

## Run optimization: Smaller Batch Size

In [None]:
# Smaller Batch Size (adds gradient noise regularization)
batch32_cfg = json.loads(json.dumps(BASELINE))
batch32_cfg["batch_size"] = 32
run_id, metrics = run_experiment(batch32_cfg, tag="batch32")
metrics

## Run optimization: Larger Batch Size with Adjusted LR

In [None]:

# Larger Batch with Adjusted LR (stability focus)
batch128_cfg = json.loads(json.dumps(BASELINE))
batch128_cfg["batch_size"] = 128
batch128_cfg["learning_rate"] = 5e-4
run_id, metrics = run_experiment(batch128_cfg, tag="batch128_lr5e-4")
metrics

## Run optimization: Stronger L2 Regularization

In [None]:
# Stronger L2 Regularization (weight decay)
l2_stronger_cfg = json.loads(json.dumps(BASELINE))
l2_stronger_cfg["l2"] = 5e-4
run_id, metrics = run_experiment(l2_stronger_cfg, tag="l2_5e-4")
metrics

## Run optimization: Enhanced Data Augmentation

In [None]:
# Enhanced Data Augmentation (data-driven regularization)
aug_stronger_cfg = json.loads(json.dumps(BASELINE))
aug_stronger_cfg["augment"]["rotation"] = 0.15   # baseline was 0.08
aug_stronger_cfg["augment"]["zoom"] = 0.20       # baseline was 0.10
run_id, metrics = run_experiment(aug_stronger_cfg, tag="aug_stronger")
metrics

## Run optimization: Lower Dropout with Stronger L2 Regularization

In [None]:
# Lower Dropout with Stronger L2 Regularization
dropout03_l2_cfg = json.loads(json.dumps(BASELINE))
dropout03_l2_cfg["dropout"] = 0.3
dropout03_l2_cfg["l2"] = 5e-4
run_id, metrics = run_experiment(dropout03_l2_cfg, tag="dropout_0p3_l2_5e-4")
metrics

## Run optimization: Higher Dropout with Stronger L2 Regularization

In [None]:
# Higher Dropout with Stronger L2 Regularization
dropout05_l2_cfg = json.loads(json.dumps(BASELINE))
dropout05_l2_cfg["dropout"] = 0.5
dropout05_l2_cfg["l2"] = 5e-4
run_id, metrics = run_experiment(dropout05_l2_cfg, tag="dropout_0p5_l2_5e-4")
metrics


## Run optimization: Lower Dropout with Smaller Batch Size

In [None]:
# Lower Dropout with Smaller Batch Size
dropout03_b32_cfg = json.loads(json.dumps(BASELINE))
dropout03_b32_cfg["dropout"] = 0.3
dropout03_b32_cfg["batch_size"] = 32
run_id, metrics = run_experiment(dropout03_b32_cfg, tag="dropout_0p3_batch32")
metrics


## Run optimization: Increased Early Filter Depth with Higher Dropout

In [None]:
# Increased Early Filter Depth with Higher Dropout
richfront_dropout05_cfg = json.loads(json.dumps(BASELINE))
richfront_dropout05_cfg["filters"] = [48, 64, 128, 256]
richfront_dropout05_cfg["dropout"] = 0.5
run_id, metrics = run_experiment(richfront_dropout05_cfg, tag="filters_richfront_dropout_0p5")
metrics


## Promote best-performing model to /artifacts/models directory

In [None]:
# Promote best-performing model

lb_path = OUTS / "leaderboard.csv"
lb = pd.read_csv(lb_path)

# Choose metric to rank by
best_row = lb.sort_values("test_f1_macro", ascending=False).iloc[0]
best_run_id = best_row["run_id"]

src_model = TRIALS / best_run_id / "model.keras"
dst_model = MODELS / "best_model.keras"

shutil.copy2(src_model, dst_model)
print(f"Promoted best model:\n  From: {src_model}\n  To:   {dst_model}")

# Save its metrics for quick reference
with open(TRIALS / best_run_id / "metrics.json") as f:
    best_metrics = json.load(f)

with open(MODELS / "best_model_metrics.json", "w") as f:
    json.dump(best_metrics, f, indent=2)

print("Saved metrics snapshot:", (MODELS / "best_model_metrics.json").resolve())
