<a href="https://colab.research.google.com/github/Devashish-Mishra-2003/dog_classifier_notebook/blob/main/efficientnetv2b2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.
import kagglehub
vikaschauhan734_120_dog_breed_image_classification_path = kagglehub.dataset_download('vikaschauhan734/120-dog-breed-image-classification')

print('Data source import complete.')


This notebook solves dog breed classification problem by using EfficientNetV2B2

Max Val_Accuracy : 89.978
Best Predict on Test : 89.72

Author : Devashish Mishra

In [None]:
# Cell 1: Kaggle-only config and strategy
import os
from pathlib import Path
import random
import numpy as np
import tensorflow as tf

# reproducibility
SEED = 777
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

# ---------- Kaggle dataset detection ----------
KAGGLE_INPUT_ROOT = "/kaggle/input"
CANDIDATE_DS = ["120-dog-breed-image-classification"]

dataset_root = None
for d in CANDIDATE_DS:
    path = os.path.join(KAGGLE_INPUT_ROOT, d)
    if os.path.exists(path):
        dataset_root = path
        break
if dataset_root is None:
    entries = sorted(os.listdir(KAGGLE_INPUT_ROOT))
    if len(entries) == 0:
        raise FileNotFoundError("No datasets under /kaggle/input.")
    dataset_root = os.path.join(KAGGLE_INPUT_ROOT, entries[0])

# prefer an 'images' subfolder if present
if os.path.isdir(os.path.join(dataset_root, "images")):
    SRC_ROOT = os.path.join(dataset_root, "images")
else:
    SRC_ROOT = dataset_root

# writable processed output
PROCESSED_ROOT = "/kaggle/working/processed_dogs"

# outputs
MODEL_DIR = Path("/kaggle/working/models"); MODEL_DIR.mkdir(parents=True, exist_ok=True)
BEST_PHASEA = MODEL_DIR / "best_phaseA.keras"
BEST_PHASEB = MODEL_DIR / "best_phaseB.keras"
COMPACT_FINAL = MODEL_DIR / "final_compact.keras"
SAVEDMODEL_DIR = MODEL_DIR / "savedmodel_backup"
TFLITE_OUT = MODEL_DIR / "final_fp16.tflite"

# image / training config
IMG_SIZE = (300, 300)
ALLOWED_EXT = {".jpg", ".jpeg", ".png", ".bmp"}
TRAIN_RATIO, VAL_RATIO, TEST_RATIO = 0.70, 0.20, 0.10

# single GPU strategy
gpus = tf.config.list_physical_devices("GPU")
for g in gpus:
    try:
        tf.config.experimental.set_memory_growth(g, True)
    except Exception:
        pass

strategy = tf.distribute.get_strategy()

# batch size safe for P100
GLOBAL_BATCH_SIZE = 32
AUTOTUNE = tf.data.AUTOTUNE

# two-phase hyperparams
EPOCHS_HEAD = 10
LR_HEAD = 1e-5    # lower learning rate for head
EPOCHS_FINE = 20
LR_FINE = 1e-6    # safer fine-tuning
UNFREEZE_TOP_N = 100


# print summary
print("Kaggle dataset root:", dataset_root)
print("SRC_ROOT:", SRC_ROOT)
print("PROCESSED_ROOT:", PROCESSED_ROOT)
print("Strategy:", type(strategy).__name__)
print("GLOBAL_BATCH_SIZE:", GLOBAL_BATCH_SIZE)
print("IMG_SIZE:", IMG_SIZE)


In [None]:
# Cell 2: Preprocess (resizes + splits per class into train/val/test)
import os, shutil, json
from pathlib import Path
import random
import cv2
import pandas as pd
from tqdm import tqdm

random.seed(SEED)

def ensure_dir(p): os.makedirs(p, exist_ok=True)

def collect_classes(src_root):
    src = Path(src_root)
    if not src.exists():
        raise FileNotFoundError(f"SRC_ROOT not found: {src_root}")
    classes = {}
    for d in sorted(os.listdir(src_root)):
        p = src / d
        if p.is_dir():
            files = [str(p / f) for f in sorted(os.listdir(p))
                     if os.path.splitext(f.lower())[1] in ALLOWED_EXT]
            if files:
                classes[d] = files
    return classes

def resize_image_cv2(path, size):
    img = cv2.imread(path)
    if img is None:
        raise IOError(f"Failed to read image: {path}")
    return cv2.resize(img, (size[0], size[1]), interpolation=cv2.INTER_AREA)

def save_jpg(img_bgr, out_path, quality=95):
    ensure_dir(os.path.dirname(out_path))
    cv2.imwrite(out_path, img_bgr, [int(cv2.IMWRITE_JPEG_QUALITY), quality])

def split_per_class(files, train_ratio=TRAIN_RATIO, val_ratio=VAL_RATIO, test_ratio=TEST_RATIO):
    f = files.copy()
    random.shuffle(f)
    n = len(f)
    if n == 0:
        return [], [], []
    train_n = int(round(n * train_ratio))
    val_n = int(round(n * val_ratio))
    if train_n == 0 and n >= 1:
        train_n = 1
    test_n = n - train_n - val_n
    if test_n < 0:
        val_n += test_n
        test_n = 0
    if val_n == 0 and n - train_n > 0:
        val_n = 1
        test_n = n - train_n - val_n
    if test_n == 0 and n - train_n - val_n > 0:
        test_n = n - train_n - val_n
    if train_n + val_n + test_n != n:
        remainder = n - (train_n + val_n + test_n)
        train_n += remainder
    assert train_n + val_n + test_n == n
    return f[:train_n], f[train_n:train_n+val_n], f[train_n+val_n:]

def preprocess_dataset(src_root, out_root, img_size=IMG_SIZE):
    classes = collect_classes(src_root)
    class_names = sorted(classes.keys())
    print(f"Found {len(class_names)} classes.")
    if os.path.exists(out_root):
        print("Removing existing processed folder:", out_root)
        shutil.rmtree(out_root)
    for s in ("train","val","test"):
        ensure_dir(os.path.join(out_root, s))

    class_indices = {i: name for i,name in enumerate(class_names)}
    ensure_dir(out_root)
    with open(os.path.join(out_root, "class_indices.json"), "w") as f:
        json.dump(class_indices, f, indent=2)

    train_recs, val_recs, test_recs = [], [], []
    for cls in tqdm(class_names, desc="Classes"):
        files = classes[cls]
        tr, va, te = split_per_class(files)
        def save_list(lst, split_name, recs):
            out_dir_label = os.path.join(out_root, split_name, cls)
            ensure_dir(out_dir_label)
            for src in lst:
                try:
                    img = resize_image_cv2(src, img_size)
                except Exception as e:
                    print("SKIP read error:", src, e)
                    continue
                base = Path(src).stem
                out_jpg = os.path.join(out_dir_label, base + ".jpg")
                save_jpg(img, out_jpg)
                recs.append({"orig_path": src, "out_jpg": out_jpg, "label": cls})
        save_list(tr, "train", train_recs)
        save_list(va, "val", val_recs)
        save_list(te, "test", test_recs)

    pd.DataFrame(train_recs).to_csv(os.path.join(out_root, "manifest_train.csv"), index=False)
    pd.DataFrame(val_recs).to_csv(os.path.join(out_root, "manifest_val.csv"), index=False)
    pd.DataFrame(test_recs).to_csv(os.path.join(out_root, "manifest_test.csv"), index=False)

    # summary counts
    def counts_from_dir(split_dir):
        counts = {}
        for c in class_names:
            cls_dir = os.path.join(split_dir, c)
            n = len([f for f in os.listdir(cls_dir) if os.path.splitext(f.lower())[1] in ALLOWED_EXT]) if os.path.isdir(cls_dir) else 0
            counts[c] = n
        return counts

    counts = {"train": counts_from_dir(os.path.join(out_root,"train")),
              "val": counts_from_dir(os.path.join(out_root,"val")),
              "test": counts_from_dir(os.path.join(out_root,"test"))}
    totals = {k: sum(v.values()) for k,v in counts.items()}
    with open(os.path.join(out_root, "counts_summary.json"), "w") as f:
        json.dump({"counts":counts,"totals":totals}, f, indent=2)

    print("Done. Totals:", totals)
    return out_root

# Run preprocessing if necessary
if not Path(PROCESSED_ROOT).exists():
    print("Creating processed dataset at:", PROCESSED_ROOT)
    preprocess_dataset(SRC_ROOT, PROCESSED_ROOT, img_size=IMG_SIZE)
else:
    print("Found existing processed dataset at:", PROCESSED_ROOT)


In [None]:
# Cell 3: build tf.data datasets
import tensorflow as tf
from pathlib import Path

train_dir = str(Path(PROCESSED_ROOT) / "train")
val_dir   = str(Path(PROCESSED_ROOT) / "val")
test_dir  = str(Path(PROCESSED_ROOT) / "test")

print("train_dir:", train_dir)
print("val_dir:", val_dir)
print("test_dir:", test_dir)

train_ds = tf.keras.utils.image_dataset_from_directory(
    train_dir,
    labels='inferred',
    label_mode='categorical',
    image_size=IMG_SIZE,
    batch_size=GLOBAL_BATCH_SIZE,
    shuffle=True,
    seed=SEED,
)

val_ds = tf.keras.utils.image_dataset_from_directory(
    val_dir,
    labels='inferred',
    label_mode='categorical',
    image_size=IMG_SIZE,
    batch_size=GLOBAL_BATCH_SIZE,
    shuffle=False,
    seed=SEED,
)

test_ds = tf.keras.utils.image_dataset_from_directory(
    test_dir,
    labels='inferred',
    label_mode='categorical',
    image_size=IMG_SIZE,
    batch_size=GLOBAL_BATCH_SIZE,
    shuffle=False,
)

NUM_CLASSES = len(train_ds.class_names)
print("NUM_CLASSES:", NUM_CLASSES)

# performance tuning
train_ds = train_ds.prefetch(AUTOTUNE)
val_ds   = val_ds.prefetch(AUTOTUNE)
test_ds  = test_ds.prefetch(AUTOTUNE)


In [None]:
# Cell 4: build model (EfficientNetV2B2 + stronger head + preprocess_input)
from tensorflow.keras import layers, Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications.efficientnet_v2 import EfficientNetV2B2, preprocess_input

with strategy.scope():
    data_augmentation = tf.keras.Sequential([
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(0.2),
        layers.RandomZoom(0.2),
        layers.RandomContrast(0.1),
    ], name="augmentation")

    base = EfficientNetV2B2(
        include_top=False,
        weights='imagenet',
        input_shape=(IMG_SIZE[0], IMG_SIZE[1], 3)
    )
    base.trainable = False

    inputs = layers.Input(shape=(IMG_SIZE[0], IMG_SIZE[1], 3))
    x = data_augmentation(inputs)
    x = preprocess_input(x)         # ✅ official preprocessing
    x = base(x, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(1024, activation="relu", dtype="float32")(x)
    x = layers.Dropout(0.5)(x)      # stronger regularization
    outputs = layers.Dense(NUM_CLASSES, activation="softmax", dtype="float32")(x)

    model = Model(inputs, outputs)

    model.compile(optimizer=Adam(learning_rate=LR_HEAD),
                  loss="categorical_crossentropy",
                  metrics=["accuracy"])

model.summary()


In [None]:
# Cell 5: callbacks
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping

checkpoint_a = ModelCheckpoint(str(BEST_PHASEA), monitor="val_accuracy", save_best_only=True, verbose=1)
reducelr_a = ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, verbose=1)
early_a = EarlyStopping(monitor="val_loss", patience=6, restore_best_weights=True, verbose=1)

checkpoint_b = ModelCheckpoint(str(BEST_PHASEB), monitor="val_accuracy", save_best_only=True, verbose=1)
reducelr_b = ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, verbose=1)
early_b = EarlyStopping(monitor="val_loss", patience=6, restore_best_weights=True, verbose=1)


In [None]:
# Cell 6: train Phase A (head-only)
print("Phase A: training head only. LR =", LR_HEAD)
history_a = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS_HEAD,
    callbacks=[checkpoint_a, reducelr_a, early_a],
    verbose=1
)
print("Phase A done. Best saved to:", BEST_PHASEA)


In [None]:
# Cell 7: prepare Phase B (unfreeze top layers)
# quick hyperparam tweaks for Phase B
UNFREEZE_TOP_N = 150
LR_FINE = 1e-5
EPOCHS_FINE = 30

# load best Phase A
if Path(BEST_PHASEA).exists():
    print("Loading best Phase A from:", BEST_PHASEA)
    model = tf.keras.models.load_model(str(BEST_PHASEA))
else:
    print("BEST_PHASEA not found; using current model in memory")

# unfreeze last UNFREEZE_TOP_N layers
total_layers = len(model.layers)
start_unfreeze = max(0, total_layers - UNFREEZE_TOP_N)
for i, layer in enumerate(model.layers):
    layer.trainable = True if i >= start_unfreeze else False
print(f"Unfreezing layers from {start_unfreeze} / {total_layers-1}")

# recompile with slightly higher LR and label smoothing
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import CategoricalCrossentropy

with strategy.scope():
    opt = Adam(learning_rate=LR_FINE)
    loss = CategoricalCrossentropy(label_smoothing=0.05)   # stabilizes loss
    model.compile(optimizer=opt, loss=loss, metrics=["accuracy"])

# callbacks: you can keep the existing ones but increase patience a bit
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint
checkpoint_b = ModelCheckpoint(str(BEST_PHASEB), monitor="val_accuracy", save_best_only=True, verbose=1)
reducelr_b = ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, verbose=1)
early_b = EarlyStopping(monitor="val_loss", patience=6, restore_best_weights=True, verbose=1)



In [None]:
# Cell 8: train Phase B (fine-tune)
# resume training Phase B
history_b = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS_FINE,
    callbacks=[checkpoint_b, reducelr_b, early_b],
    verbose=1
)
print("Phase B done. Best saved to:", BEST_PHASEB)


In [None]:
# Option A: Continue fine-tuning, save final_model.keras, evaluate on test_ds
from pathlib import Path
import numpy as np
from sklearn.metrics import classification_report, accuracy_score
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping

# --- hyperparams (tweakable) ---
UNFREEZE_TOP_N = 250        # last N layers to unfreeze
LR_CONT = 3e-5              # small LR for continued fine-tuning
EPOCHS_CONT = 20            # run a few to many epochs
PATIENCE = 6                # early stopping patience
FINAL_OUT = MODEL_DIR / "final_model.keras"   # where final model will be saved

# --- load best Phase B checkpoint ---
if not Path(BEST_PHASEB).exists():
    raise FileNotFoundError(f"BEST_PHASEB not found at {BEST_PHASEB}")
model = tf.keras.models.load_model(str(BEST_PHASEB))
print("Loaded BEST_PHASEB:", BEST_PHASEB)

# --- unfreeze last UNFREEZE_TOP_N layers ---
total_layers = len(model.layers)
start_unfreeze = max(0, total_layers - UNFREEZE_TOP_N)
for i, layer in enumerate(model.layers):
    layer.trainable = True if i >= start_unfreeze else False
print(f"Unfrozen layers from index {start_unfreeze} / {total_layers-1}")

# --- recompile with small LR and label smoothing ---
opt = Adam(learning_rate=LR_CONT)
loss = CategoricalCrossentropy(label_smoothing=0.05)
model.compile(optimizer=opt, loss=loss, metrics=["accuracy"])
model.summary()

# --- callbacks (save best during this continuation) ---
ckpt = ModelCheckpoint(str(MODEL_DIR / "cont_finetune_best.keras"),
                       monitor="val_accuracy", save_best_only=True, verbose=1)
reducelr = ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, verbose=1)
early = EarlyStopping(monitor="val_loss", patience=PATIENCE, restore_best_weights=True, verbose=1)

# --- train ---
history_cont = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS_CONT,
    callbacks=[ckpt, reducelr, early],
    verbose=1
)

# --- after training: pick the best checkpoint (if exists) and save as final_model.keras ---
best_cont_path = MODEL_DIR / "cont_finetune_best.keras"
if best_cont_path.exists():
    final = tf.keras.models.load_model(str(best_cont_path))
    print("Loaded continuation best from:", best_cont_path)
else:
    final = model
    print("Using last-in-memory model as final.")

# save final model in native Keras format
final.save(str(FINAL_OUT), include_optimizer=False)
print("Saved final model ->", FINAL_OUT)

# --- evaluation on test_ds ---
y_true, y_pred = [], []
for imgs, labs in test_ds:
    probs = final.predict(imgs, verbose=0)
    y_true.extend(np.argmax(labs.numpy(), axis=1).tolist())
    y_pred.extend(np.argmax(probs, axis=1).tolist())

acc = accuracy_score(y_true, y_pred)
print(f"\nFinal test accuracy: {acc:.4f}")
print("\nClassification report:")
print(classification_report(y_true, y_pred, digits=4))


In [None]:
# Cell 9: evaluation
import numpy as np
from sklearn.metrics import classification_report, accuracy_score

if Path(BEST_PHASEB).exists():
    final_model = tf.keras.models.load_model(str(BEST_PHASEB))
    print("Loaded BEST_PHASEB for evaluation")
else:
    final_model = model
    print("Evaluating current model in memory")

y_true, y_pred = [], []
for imgs, labs in test_ds:
    probs = final_model.predict(imgs, verbose=0)
    y_true.extend(np.argmax(labs.numpy(), axis=1).tolist())
    y_pred.extend(np.argmax(probs, axis=1).tolist())

acc = accuracy_score(y_true, y_pred)
print(f"Test accuracy: {acc:.4f}")
print("Classification report:")
print(classification_report(y_true, y_pred, digits=4))


In [None]:
# Progressive-resize (320x320) starting from best_phaseB.keras
# Run this cell to fine-tune at 320x320 using BEST Phase B weights as the source.

# Progressive-resize (320x320) starting from best_phaseB.keras
# Corrected: keep raw datasets to read class_names before prefetch.
from pathlib import Path
import tensorflow as tf
from tensorflow.keras import layers, Model
from tensorflow.keras.applications.efficientnet_v2 import EfficientNetV2B2, preprocess_input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
import numpy as np
from sklearn.metrics import accuracy_score, classification_report

# ---------- params ----------
NEW_IMG_SIZE = (320, 320)
BATCH_SIZE_320 = 24           # try 24 on P100; reduce to 16 if OOM
UNFREEZE_TOP_N = 250
LR_RESIZE = 3e-6
EPOCHS_RESIZE = 18
PATIENCE = 5
FINAL_RESIZED = MODEL_DIR / "best_phaseB.keras"

SRC_TRAIN = str(Path(PROCESSED_ROOT) / "train")
SRC_VAL   = str(Path(PROCESSED_ROOT) / "val")
SRC_TEST  = str(Path(PROCESSED_ROOT) / "test")

# ---------- sanity checks ----------
assert Path(SRC_TRAIN).exists(), f"Train folder not found: {SRC_TRAIN}"
assert Path(SRC_VAL).exists(), f"Val folder not found: {SRC_VAL}"
assert Path(SRC_TEST).exists(), f"Test folder not found: {SRC_TEST}"
best_phaseb_path = MODEL_DIR / "best_phaseB.keras"
assert best_phaseb_path.exists(), f"best_phaseB.keras not found at {best_phaseb_path}"

# ---------- datasets at 320 (keep raw dataset to access class_names/cardinality) ----------
train_ds_320_raw = tf.keras.utils.image_dataset_from_directory(
    SRC_TRAIN, labels='inferred', label_mode='categorical',
    image_size=NEW_IMG_SIZE, batch_size=BATCH_SIZE_320, shuffle=True, seed=SEED
)
val_ds_320_raw = tf.keras.utils.image_dataset_from_directory(
    SRC_VAL, labels='inferred', label_mode='categorical',
    image_size=NEW_IMG_SIZE, batch_size=BATCH_SIZE_320, shuffle=False, seed=SEED
)
test_ds_320_raw = tf.keras.utils.image_dataset_from_directory(
    SRC_TEST, labels='inferred', label_mode='categorical',
    image_size=NEW_IMG_SIZE, batch_size=BATCH_SIZE_320, shuffle=False, seed=SEED
)

# read class names and batch counts from the raw datasets
NUM_CLASSES = len(train_ds_320_raw.class_names)
train_batches = tf.data.experimental.cardinality(train_ds_320_raw).numpy()
print("NUM_CLASSES:", NUM_CLASSES, "Train batches:", train_batches)

# now safely add performance transforms
AUTOTUNE = tf.data.AUTOTUNE
train_ds_320 = train_ds_320_raw.prefetch(AUTOTUNE)
val_ds_320   = val_ds_320_raw.prefetch(AUTOTUNE)
test_ds_320  = test_ds_320_raw.prefetch(AUTOTUNE)

# ---------- build model (320) ----------
inputs = layers.Input(shape=(NEW_IMG_SIZE[0], NEW_IMG_SIZE[1], 3))
# mild augmentation
x = layers.RandomFlip("horizontal")(inputs)
x = layers.RandomRotation(0.15)(x)
x = layers.RandomZoom(0.12)(x)
x = layers.RandomContrast(0.08)(x)
x = preprocess_input(x)

base = EfficientNetV2B2(include_top=False, weights=None, input_shape=(NEW_IMG_SIZE[0], NEW_IMG_SIZE[1], 3))
x = base(x, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(1024, activation="relu", dtype="float32")(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(NUM_CLASSES, activation="softmax", dtype="float32")(x)
model_320 = Model(inputs, outputs)

# ---------- load weights from best_phaseB.keras ----------
src_path = best_phaseb_path
print("Loading weights from:", src_path)
src_model = tf.keras.models.load_model(str(src_path))

# Try direct set_weights; fall back to per-layer matching if shapes differ
try:
    model_320.set_weights(src_model.get_weights())
    print("Directly loaded weights from best_phaseB.keras")
except Exception as e:
    print("Direct load failed (likely shape mismatch). Attempting best-effort layer-wise copy. Error:", e)
    copied = 0
    src_by_name = {l.name: l for l in src_model.layers}
    for tgt in model_320.layers:
        if tgt.name in src_by_name:
            src_w = src_by_name[tgt.name].get_weights()
            if not src_w:
                continue
            try:
                tgt.set_weights(src_w)
                copied += 1
            except Exception:
                # shape mismatch for this layer — skip
                continue
    print(f"Copied weights for {copied} layers (best-effort).")

# ---------- unfreeze top N ----------
total_layers = len(model_320.layers)
start_unfreeze = max(0, total_layers - UNFREEZE_TOP_N)
for i, layer in enumerate(model_320.layers):
    layer.trainable = True if i >= start_unfreeze else False
print(f"Unfrozen layers from {start_unfreeze}/{total_layers-1} (total layers: {total_layers})")

# ---------- compile ----------
opt = Adam(learning_rate=LR_RESIZE)
loss = CategoricalCrossentropy(label_smoothing=0.05)
model_320.compile(optimizer=opt, loss=loss, metrics=["accuracy"])
model_320.summary()

# ---------- callbacks ----------
ckpt = ModelCheckpoint(str(MODEL_DIR / "best_resize.keras"), monitor="val_accuracy", save_best_only=True, verbose=1)
reducelr = ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, verbose=1)
early = EarlyStopping(monitor="val_loss", patience=PATIENCE, restore_best_weights=True, verbose=1)

# ---------- train ----------
history_resize = model_320.fit(
    train_ds_320,
    validation_data=val_ds_320,
    epochs=EPOCHS_RESIZE,
    callbacks=[ckpt, reducelr, early],
    verbose=1
)

# ---------- save best & evaluate ----------
best_resize = MODEL_DIR / "best_resize.keras"
if best_resize.exists():
    final_model = tf.keras.models.load_model(str(best_resize))
    print("Loaded best resize model:", best_resize)
else:
    final_model = model_320

final_model.save(str(FINAL_RESIZED), include_optimizer=False)
print("Saved final resized model ->", FINAL_RESIZED)

# evaluate on test set
y_true, y_pred = [], []
for imgs, labs in test_ds_320:
    probs = final_model.predict(imgs, verbose=0)
    y_true.extend(np.argmax(labs.numpy(), axis=1).tolist())
    y_pred.extend(np.argmax(probs, axis=1).tolist())

acc = accuracy_score(y_true, y_pred)
print(f"\nFinal test accuracy (320×320): {acc:.4f}")
print(classification_report(y_true, y_pred, digits=4))

In [None]:
# Corrected: TTA + simple ensemble of best_phaseB + final_model_320
import tensorflow as tf
import numpy as np
from pathlib import Path
from sklearn.metrics import accuracy_score, classification_report

# config
IMG = (320, 320)
BATCH = 24
SRC_TEST = str(Path(PROCESSED_ROOT) / "test")
m1_path = MODEL_DIR / "best_phaseB.keras"
m2_path = MODEL_DIR / "final_model_320.keras"   # may exist after resize
models_to_use = []
if m1_path.exists():
    models_to_use.append(tf.keras.models.load_model(str(m1_path)))
if m2_path.exists():
    models_to_use.append(tf.keras.models.load_model(str(m2_path)))
if not models_to_use:
    raise FileNotFoundError("No candidate models found (best_phaseB/final_model_320).")

print("Using models:", [getattr(m, "name", "model") for m in models_to_use])

# build test dataset (320)
test_ds = tf.keras.utils.image_dataset_from_directory(
    SRC_TEST, labels='inferred', label_mode='categorical',
    image_size=IMG, batch_size=BATCH, shuffle=False, seed=777
).prefetch(tf.data.AUTOTUNE)

def tta_variants(batch_images):
    orig = tf.cast(batch_images, tf.float32)
    flip = tf.image.flip_left_right(orig)
    crop = tf.image.central_crop(orig, 0.92)
    crop = tf.image.resize(crop, IMG)
    cont = tf.image.adjust_contrast(orig, 1.08)
    variants = tf.concat([orig, flip, crop, cont], axis=0)  # (4*B, H, W, 3)
    return variants, 4

y_true_all = []
y_pred_all = []
for imgs, labs in test_ds:
    batch_size = imgs.shape[0]
    variants, nvar = tta_variants(imgs)  # shape (nvar*batch, H, W, 3)

    model_preds = []
    for m in models_to_use:
        preds = m.predict(variants, verbose=0)           # (nvar*batch, C)
        preds = preds.reshape(nvar, batch_size, -1)      # (nvar, batch, C)
        preds = preds.mean(axis=0)                       # (batch, C)
        model_preds.append(preds)

    ensemble_preds = np.mean(np.stack(model_preds, axis=0), axis=0)  # (batch, C)
    y_true_all.extend(np.argmax(labs.numpy(), axis=1).tolist())
    y_pred_all.extend(np.argmax(ensemble_preds, axis=1).tolist())

acc = accuracy_score(y_true_all, y_pred_all)
print(f"\nEnsembled TTA Test accuracy: {acc:.4f}")
print("\nClassification report:")
print(classification_report(y_true_all, y_pred_all, digits=4))
