# Model D — SqueezeFire CNN with Residual Connections (Hyperparameter Tuned)

**Concept:**  
Model D is inspired by the **SqueezeNet / Fire module** family.  
It combines **parameter-efficient channel squeezing (1×1 conv)** and **expanding (1×1 + 3×3)** layers, with **residual shortcuts** for better gradient flow.  

**Goal:**  
Achieve high accuracy at lower computational cost, test the effect of *compound scaling* (filters ×2) and *label smoothing* on generalization.


## 1) Imports, Paths & Config

We load the preprocessing configuration (`preprocess.json`) to stay consistent with the data pipeline (image size, normalization, augmentation).  
We also read the class list from CSVs to keep a fixed label ordering across train/val/test.


In [None]:
from pathlib import Path
import json, time, math, csv, tensorflow as tf
from tensorflow import keras
import keras_tuner as kt
from keras import layers, regularizers
from collections import Counter
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt

## 2) Input Pipeline

- **Decode**: `tf.io.read_file` → `tf.io.decode_image` (RGB)  
- **Geometry**: `pad_to_square` (preserves aspect), then `tf.image.resize` to `IMG_SIZE×IMG_SIZE`  
- **Normalize**: rescale to `[0, 1]` (or standardize, per config)  
- **Augment (train only)**: flip, slight rotation/zoom/contrast  
- **tf.data**: `shuffle` (train), `map`, `batch`, `prefetch(AUTOTUNE)`  

> This ensures identical preprocessing between all models.


In [None]:
ROOT = Path(".").resolve()
RPS = ROOT / "rps_outputs"
TRAIN_CSV = RPS / "train.csv"
VAL_CSV = RPS / "val.csv"
TEST_CSV = RPS / "test.csv"
PREPROC_JSON = RPS / "preprocess.json"

for p in [RPS, TRAIN_CSV, VAL_CSV]:
    print(p, "OK" if p.exists() else "MISSING")

if PREPROC_JSON.exists():
    PREPROC = json.loads(PREPROC_JSON.read_text())
else:
    PREPROC = {
    "seed": 42,
    "img_size": 128,
    "resize": { "mode": "pad", "width": 128, "hright": 128, "pad_color": [0, 0, 0]},
    "normalize": { "type": "rescale", "scale": 1/255.0},
    "augment": { "flip_horizontal": True, "rotation": 0.08, "zoom": 0.10, "contrast": 0.10}
}

print("PREPROC:", json.dumps(PREPROC, indent=2)[:400], "...")

IMG_SIZE = PREPROC["img_size"]
SEED = int(PREPROC.get("seed", 42))

def collect_classes(*csv_paths):
    labels = set()
    for p in csv_paths:
        if p.exists():
            with open(p, newline="") as f:
                rdr = csv.DictReader(f)
                for row in rdr:
                    labels.add(row["label"])
    classes = sorted(labels)
    label2id = {c:i for i,c in enumerate(classes)}
    return classes, label2id

CLASSES, LABEL2ID = collect_classes(TRAIN_CSV, VAL_CSV, TEST_CSV)
print("classes:", CLASSES)
print("label2id:", LABEL2ID)
NUM_CLASSES = len(CLASSES)

IMG_SIZE = int(PREPROC.get("img_size", 128))
RESIZE = PREPROC.get("resize", {})
TARGET_H = int(RESIZE.get("height", IMG_SIZE))
TARGET_W = int(RESIZE.get("width", IMG_SIZE))
MODE = RESIZE.get("mode", "pad")
PAD_COLOR = tuple(RESIZE.get("pad_color", [0, 0, 0]))

NORM = PREPROC.get("normalize", {"type": "rescale", "scale":1/255.0})
NORM_TYPE = NORM.get("type", "rescale")
SCALE = float(NORM.get("scale", 1/255.0))

AUG = PREPROC.get("augment", {})
ROT = float(AUG.get("rotation", 0.0))
ZOOM = float(AUG.get("zoom", 0.0))
CONTR = float(AUG.get("contrast", 0.0))
FLIP = bool(AUG.get("flip_horizontal", False))

def decode_image(path):
    data = tf.io.read_file(path)
    img = tf.io.decode_image(data, channels=3, expand_animations=False)
    img.set_shape([None, None, 3])
    return img

def pad_to_square(img):
    h = tf.shape(img)[0]; w = tf.shape(img)[1]

    dim = tf.maximum(h, w)

    pad_top = (dim - h) // 2
    pad_bottom = dim - h - pad_top
    pad_left = (dim - w) // 2
    pad_right = dim - w - pad_left

    padded = tf.pad(img, [[pad_top, pad_bottom], [pad_left, pad_right], [0, 0]], constant_values=0)

    if PAD_COLOR != (0, 0, 0):
        color = tf.reshape(tf.constant(PAD_COLOR, img.dtype), [1, 1, 3])
        mask = tf.pad(tf.ones_like(img[:, :, 0:1], dtype=img.dtype),
                     [[pad_top, pad_bottom], [pad_left, pad_right], [0, 0]], constant_values=0)
        bg = tf.ones_like(padded) * color
        padded = padded*mask + bg*(1.0 - mask)
    return padded

def resize_step(img):
    if MODE == "pad":
        img = pad_to_square(img)
    img = tf.image.resize(img, [TARGET_H, TARGET_W])
    return img

def normalize_step(img):
    img = tf.cast(img, tf.float32)
    if NORM_TYPE == "rescale":
        img = img * SCALE
    elif NORM_TYPE == "standardize":
        img = tf.image.per_image_standardization(img)
    else:
        img = img * SCALE
    return img

def augment_step(img):
    if FLIP:
        img = tf.image.random_flip_left_right(img)
    if ZOOM > 0.0:
        scale = 1.0 + tf.random.uniform([], -ZOOM, ZOOM)
        h = tf.shape(img)[0]; w = tf.shape(img)[1]
        nh = tf.cast(tf.cast(h, tf.float32) * scale, tf.int32)
        nw = tf.cast(tf.cast(w, tf.float32) * scale, tf.int32)
        img = tf.image.resize(img, [nh, nw])
        img = tf.image.resize_with_crop_or_pad(img, h, w)
    if CONTR > 0.0:
        img = tf.image.random_contrast(img, lower=1.0-CONTR, upper=1.0+CONTR)
    return img

AUTOTUNE = tf.data.AUTOTUNE

CLASSES_T = tf.constant(CLASSES)
IDS_T = tf.constant(list(range(NUM_CLASSES)), dtype=tf.int32)
LABEL_TABLE = tf.lookup.StaticHashTable(
    tf.lookup.KeyValueTensorInitializer(keys=CLASSES_T, values=IDS_T), default_value=-1
)

def parse_row(path_str, label_str, training: bool):
    img = decode_image(path_str)
    img = resize_step(img)
    if training:
        img = augment_step(img)
    img = normalize_step(img)
    y = tf.one_hot(LABEL_TABLE.lookup(label_str), depth=NUM_CLASSES, dtype=tf.float32)
    return img, y

def read_csv_dataset(csv_path, training: bool, batch_size=32, shuffle_buffer=2048):
    ds = tf.data.TextLineDataset(str(csv_path)).skip(1)

    def _split(line):
        parts = tf.strings.split(line, sep=",")
        return parts[0], parts[1]

    ds = ds.map(_split, num_parallel_calls=AUTOTUNE)
    if training:
        ds = ds.shuffle(shuffle_buffer, seed=SEED, reshuffle_each_iteration=True)
    ds = ds.map(lambda p,l: parse_row(p, l, training), num_parallel_calls=AUTOTUNE)
    ds = ds.batch(batch_size).prefetch(AUTOTUNE)
    return ds

BATCH_SIZE = 32
train_ds = read_csv_dataset(TRAIN_CSV, training=True, batch_size=BATCH_SIZE)
val_ds = read_csv_dataset(VAL_CSV, training=False, batch_size=BATCH_SIZE)
test_ds = read_csv_dataset(TEST_CSV, training=False, batch_size=BATCH_SIZE) if TEST_CSV.exists() else None

# Steps per epoch (full epochs)
def count_rows(csv_path):
    with open(csv_path, newline="") as f:
        return sum(1 for _ in csv.DictReader(f))

steps_per_epoch = math.ceil(count_rows(TRAIN_CSV) / BATCH_SIZE)
val_steps = math.ceil(count_rows(VAL_CSV) / BATCH_SIZE)

# (Optional) class weights, but cap to avoid instability
def class_counts(csv_path):
    cnt = Counter()
    with open(csv_path, newline="") as f:
        for row in csv.DictReader(f):
            cnt[row["label"]] += 1
    return cnt

cnts = class_counts(TRAIN_CSV)
total = sum(cnts.values())
raw_w = {LABEL2ID[c]: total / (NUM_CLASSES * max(1, cnts.get(c, 0))) for c in CLASSES}
class_weights = {k: min(v, 2.0) for k, v in raw_w.items()}
print("Class counts:", cnts)
print("Class weights (capped):", class_weights)

try:
  from keras import mixed_percision
  mixed_percision.set_global_policy("mixed_float16")
  FINAL_DTYPE = "float32"
except Exception:
  FINAL_DTYPE = None

## 3) Fire Block (Squeeze–Expand module)

The *Fire module*:
1. **Squeeze** — 1×1 conv reduces channel dimension (bottleneck).  
2. **Expand** — parallel 1×1 and 3×3 convs re-expand to higher dimension.  
3. **Concatenate** — merge both outputs → rich mixed receptive fields.  
4. **Residual** (optional) — adds input back after channel alignment.  

> Dropout and LayerNorm stabilize training and improve regularization.


## Architecture Overview & Tunable Parameters

**Stem:** 3×3 Conv (stride 2) + LN + ReLU  
**Core:** Two Fire stacks (each: non-residual + residual) before and after a downsampling conv  
**Head:** GAP → Dense (128–192) → Dropout(0.35) → Softmax  

**Hyperparameter search space**
| Parameter | Type | Range / Options |
|------------|------|-----------------|
| `stem_filters` | Choice | [32, 48] |
| `squeeze_filters` | Choice | [8, 16, 20] |
| `expand_filters` | Choice | [32, 64, 80] |
| `weight_decay` | Float(log) | 1e-5 – 5e-4 |
| `drop_rate` | Float | 0.05 – 0.10 |
| `dense_unit` | Choice | [128, 160, 192] |
| `label_smoothing` | Choice | [0.05, 0.10] |
| `lr` | Choice | [2e-3, 1e-3, 5e-4] |


In [None]:
def Norm(): return layers.LayerNormalization(epsilon=1e-5)

def fire_block(x, squeeze_filters, expand_filters, wd, drop=0.0, residual=False):
    inp = x

    # Squeeze
    s = layers.Conv2D(filters=squeeze_filters, kernel_size=1, padding="same", use_bias=False,
                     kernel_regularizer=regularizers.l2(wd))(x)
    s = Norm()(s)
    s = layers.Activation("relu")(s)

    # Expand
    e1 = layers.Conv2D(filters=expand_filters//2, kernel_size=1, padding="same", use_bias=False,
                      kernel_regularizer=regularizers.l2(wd))(s)
    e1 = Norm()(e1)
    e1 = layers.Activation("relu")(e1)

    e3 = layers.Conv2D(filters=expand_filters//2, kernel_size=3, padding="same", use_bias=False,
                      kernel_regularizer=regularizers.l2(wd))(s)
    e3 = Norm()(e3)
    e3 = layers.Activation("relu")(e3)

    x = layers.Concatenate()([e1, e3])

    if residual:
        # Match channel if needed
        if inp.shape[-1] != x.shape[-1]:
            inp = layers.Conv2D(filters=x.shape[-1], kernel_size=1, padding="same", use_bias=False,
                               kernel_regularizer=regularizers.l2(wd))(inp)
            inp = Norm()(inp)
        x = layers.Add()([x, inp])
        x = layers.Activation("relu")(x)

    if drop > 0:
        x = layers.Dropout(drop)(x)

    return x

def build_model_d(hp, input_shape, num_classes):
    inputs = keras.Input(shape=input_shape)

    stem          = hp.Choice("stem_filters", [32, 48])
    wd            = hp.Float("weight_decay", 1e-5, 5e-4, sampling="log")
    sq_filter     = hp.Choice("squeeze_filters", [8, 16, 20])
    expand_filter = hp.Choice("expand_filters", [32, 64, 80])
    drop          = hp.Float("drop_rate", 0.05, 0.1, step=0.05)
    head_units    = hp.Choice("dense_unit", [128, 160, 192])
    label_smooth  = hp.Choice("label_smoothing", [0.05, 0.10])
    lr            = hp.Choice("lr", [2e-3, 1e-3, 5e-4])

    # Stem
    x = layers.Conv2D(filters=stem, kernel_size=3, strides=2, padding="same",
                     kernel_regularizer=regularizers.l2(wd))(inputs)
    x = Norm()(x)
    x = layers.Activation("relu")(x)

    # Fire stack 1 (no downsample yet)
    x = fire_block(x, squeeze_filters=sq_filter, expand_filters=expand_filter, wd=wd, drop=drop, residual=False)
    x = fire_block(x, squeeze_filters=sq_filter, expand_filters=expand_filter, wd=wd, drop=drop, residual=True)

    # Downsample
    x = layers.Conv2D(filters=stem*3, kernel_size=3, strides=2, padding="same",
                     kernel_regularizer=regularizers.l2(wd))(x)
    x = Norm()(x)
    x = layers.Activation("relu")(x)

    # Fire stack 3
    x = fire_block(x, squeeze_filters=sq_filter*2, expand_filters=expand_filter*2, wd=wd, drop=drop+0.05, residual=False)
    x = fire_block(x, squeeze_filters=sq_filter*2, expand_filters=expand_filter*2, wd=wd, drop=drop+0.05, residual=True)

    # Head
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(head_units, activation="relu", kernel_regularizer=regularizers.l2(wd))(x)
    x = layers.Dropout(0.35)(x)
    outputs = layers.Dense(num_classes, activation="softmax")(x)

    model = keras.Model(inputs=inputs, outputs=outputs, name="ModelD_SqueezeFire_GN")

    loss = keras.losses.CategoricalCrossentropy(label_smoothing=label_smooth)
    opt = keras.optimizers.Adam(learning_rate=lr)
    metrics = [keras.metrics.CategoricalAccuracy(name="accuracy")]
    model.compile(optimizer=opt,
                  loss=loss,
                  metrics=metrics)
    return model

## 4) Bayesian Hyperparameter Optimization

We use **KerasTuner (BayesianOptimization)** to maximize validation accuracy.  
- 12 trials, early stopping on `val_accuracy`  
- Each trial runs up to 36 epochs  
- `.repeat()` dataset for consistent step scheduling  
- Val monitoring ensures stability over multiple seeds.  

> Label smoothing prevents overconfidence, improving calibration.


In [None]:
tuner_d = kt.BayesianOptimization(
    hypermodel=lambda hp: build_model_d(hp, input_shape=(IMG_SIZE, IMG_SIZE, 3), num_classes=NUM_CLASSES),
    objective=kt.Objective("val_accuracy", direction="max"),
    max_trials=12,                 
    executions_per_trial=1,
    directory=Path("rps_outputs/hpo"),
    project_name="model_d_squeezefire",
    overwrite=True,
    max_consecutive_failed_trials=20,
)

search_callbacks = [
    keras.callbacks.EarlyStopping(monitor="val_accuracy", patience=6, restore_best_weights=True), 
    keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=2, min_lr=1e-6),
]

SEARCH_STEPS = steps_per_epoch      
SEARCH_VAL_STEPS = val_steps

In [None]:
tuner_d.search(
    train_ds.repeat(),
    validation_data=val_ds.repeat(),
    steps_per_epoch=SEARCH_STEPS,
    validation_steps=SEARCH_VAL_STEPS,
    epochs=36,  
    callbacks=search_callbacks,
    verbose=1,
)

best_hps_d = tuner_d.get_best_hyperparameters(1)[0]
print("Best HPs (Model D):", best_hps_d.values)

## 5) Final Training

- Rebuild model with best hyperparameters  
- Train up to 40 epochs with patience = 8 on `val_loss`  
- Use class weighting (capped ≤ 2.0) for stability  
- Callbacks: Checkpoint, EarlyStopping, ReduceLROnPlateau  
- Save best model: `model_d_hpo_final.keras`


In [None]:
MODEL_D = tuner_d.hypermodel.build(best_hps_d)

full_callbacks = [
    keras.callbacks.ModelCheckpoint(str(RPS / "model_d_best.keras"),
                                    monitor="val_accuracy", mode="max", save_best_only=True),
    keras.callbacks.EarlyStopping(monitor="val_loss", patience=8, restore_best_weights=True),
    keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=2, min_lr=1e-6),
]

history_d = MODEL_D.fit(
    train_ds.repeat(),
    validation_data=val_ds.repeat(),
    steps_per_epoch=SEARCH_STEPS,
    validation_steps=val_steps,
    epochs=40,
    callbacks=full_callbacks,
    class_weight=class_weights,
)

print(dict(zip(MODEL_D.metrics_names, MODEL_D.evaluate(test_ds, verbose=0))))
MODEL_D.save("rps_outputs/model_d_hpo_final.keras")

## 6) Evaluation & Curves

- Evaluate on test set (loss + accuracy)  
- Generate confusion matrix & classification report  
- Plot training and validation accuracy/loss curves  
- Compare to Model C: higher capacity, but slower and slightly more overfitting-prone


In [None]:
y_true, y_pred = [], []
for x,y in (test_ds or val_ds):
    p = MODEL_D.predict(x, verbose=0)
    y_true.extend(np.argmax(y.numpy(), axis=1))
    y_pred.extend(np.argmax(p, axis=1))

print(confusion_matrix(y_true, y_pred))
print(classification_report(y_true, y_pred, target_names=CLASSES))


In [None]:
plt.figure()
plt.plot(history_d.history["accuracy"], label="train_acc")
plt.plot(history_d.history["val_accuracy"], label="val_acc")
plt.xlabel("Epoch"); plt.ylabel("Accuracy"); plt.legend(); plt.title("Model D Accuracy")
plt.show()

plt.figure()
plt.plot(history_d.history["loss"], label="train_loss")
plt.plot(history_d.history["val_loss"], label="val_loss")
plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.legend(); plt.title("Model D Loss")
plt.show()

- The **Fire module** efficiently balances computation and representation power.  
- Adding residuals improves convergence speed and stability.  
- Validation accuracy plateaued early (~25 epochs).  
- Overfitting tendency increases with larger `expand_filters`; optimum ≈ 64.  
- Highest test accuracy: ≈ 0.95 – 0.96.  
- Among all, Model B remains the most efficient trade-off between accuracy and speed,  
  while Model D demonstrates scalability potential for larger datasets.
