# **The Notebook is under Editing**

# Import Libraries

In [None]:
import numpy as np 
import pandas as pd
import os
import shutil
import random
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.models import Sequential
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, BatchNormalization, Dropout, GlobalAveragePooling2D, Input
from tensorflow.keras.optimizers import Adam, Adamax, RMSprop
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.applications.xception import Xception
from tensorflow.keras.applications.xception import preprocess_input
from tensorflow.keras.applications.xception import decode_predictions
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing import image
from sklearn.metrics import f1_score
from cleanlab.filter import find_label_issues
from cleanlab.rank import get_label_quality_scores
import keras_cv
from keras_cv.layers import RandAugment
from IPython.display import Image, display
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import tensorflow_probability as tfp
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.losses    import CategoricalFocalCrossentropy
import keras
import warnings
import math
from tensorflow.keras.callbacks import LearningRateScheduler
import cv2

warnings.filterwarnings('ignore')

In [None]:
IMAGE_SIZE = 224

# Load Dataset

In [None]:
len(os.listdir('/kaggle/input/clothing-dataset-full/images_compressed'))

In [None]:
len(os.listdir('/kaggle/input/clothing-dataset-full/images_original'))

In [None]:
labels = pd.read_csv('/kaggle/input/clothing-dataset-full/images.csv')
labels.head()

In [None]:
labels.shape

In [None]:
labels.label.value_counts()

In [None]:
label_counts = labels["label"].value_counts()

# Plot
plt.figure(figsize=(12, 6))
label_counts.plot(kind="bar")

plt.xlabel("Garment Class")
plt.ylabel("Number of Images")
plt.xticks(rotation=45, ha='right')
plt.tight_layout()

# Save to file
plt.savefig("label_distribution.png", dpi=300)

# Optionally show the plot
plt.show()

In [None]:
# Suppose 'labels' is your DataFrame with two columns: 
#   - 'image' (e.g. "000123" or "IMG_0456") 
#   - 'label' (the original class name, e.g. "T-Shirt", "Skip", "Blouse", etc.)

# 1) Drop the rows whose label is exactly "Skip", "Not sure", or "Other"
drop_these = ["Skip", "Not sure", "Other"]
labels = labels[~labels["label"].isin(drop_these)].copy()

# 2) Define a mapping for all classes that need merging into a parent class
#    a) Everything in {"Blouse","Polo","Undershirt","Top","Body"} → "Shirt"
#    b) Everything in {"Blazer","Hoodie"}                 → "Outwear"
#    c) Leave the remaining labels unchanged
merge_map = {
    "Top":        "Basic Top",
    "Undershirt": "Basic Top",
    "Body":       "Basic Top",
    "Polo":       "Shirt",
    "Blouse":     "Shirt",
    "Blazer":     "Outwear",
    "Hoodie":     "Outwear"
}

# 3) Apply the mapping: if a label appears in merge_map, replace it; otherwise keep it
labels["label"] = labels["label"].replace(merge_map)

# 4) Add the “.jpg” extension to the image filename (so that it matches the files on disk)
labels["image"] = labels["image"] + ".jpg"

# 5) (Optional) Recompute and display the new, cleaned class counts to verify the unbalance
new_counts = labels["label"].value_counts()
print(new_counts)

labels.to_csv("/kaggle/working/cleaned_labels.csv", index=False)


In [None]:
label_counts = labels["label"].value_counts()

# Plot
plt.figure(figsize=(12, 6))
label_counts.plot(kind="bar")

plt.xlabel("Garment Class")
plt.ylabel("Number of Images")
plt.xticks(rotation=45, ha='right')
plt.tight_layout()

# Save to file
plt.savefig("label_distribution.png", dpi=300)

# Optionally show the plot
plt.show()

In [None]:
labels_lst = labels.label.unique().tolist()
labels_lst

# Splitting the dataset

In [None]:


train_df, test_df = train_test_split(labels, test_size=0.1, random_state=42, stratify=labels['label'])
train_df, val_df = train_test_split(train_df, test_size=0.11111111, random_state=42, stratify=train_df['label'])

In [None]:
try:
    parent_dir = '/kaggle/working/clothes'
    os.mkdir(parent_dir)

    for split in ['training', 'validation', 'testing']:
        path = os.path.join(parent_dir , split)
        os.mkdir(path)
        for label  in labels_lst:
            n_path = os.path.join(path , label)
            os.mkdir(n_path)
except OSError:
    print('Existing Directories')

In [None]:
not_found_images = []

def copy_files(file_df, split, source_dir, destination_dir):
    global not_found_images
    for _, row in file_df.iterrows():
        img_name = row['image']
        label = row['label']
        
        # Define source and destination paths
        src = os.path.join(source_dir, img_name)
        dst = os.path.join(destination_dir, split, label, img_name)
        
        try:
            # Copy the file
            shutil.copy2(src, dst)
        except FileNotFoundError:
            not_found_images.append(src)

In [None]:
source_dir = '/kaggle/input/clothing-dataset-full/images_compressed'
destination_dir = '/kaggle/working/clothes'

In [None]:


# Copy images to the appropriate directories
copy_files(train_df, 'training', source_dir, destination_dir)
copy_files(val_df, 'validation', source_dir, destination_dir)
copy_files(test_df, 'testing', source_dir, destination_dir)

In [None]:
len(not_found_images) # No images not found

# Display Sample images

In [None]:
# Display Sample image

def display_sample_image(split, label):
    images = os.listdir(os.path.join(destination_dir, split, label))
    img = random.choice(images)
    return load_img(os.path.join(destination_dir, split, label, img), target_size=(IMAGE_SIZE, IMAGE_SIZE))

In [None]:
display_sample_image('training', 'Outwear')

In [None]:
# Display Sample images

def display_sample_images(split, label):
    for i in range(5):
        plt.subplot(1, 5, i+1)
        plt.imshow(display_sample_image(split, label))
        plt.axis('off')
    plt.show()

In [None]:
for label in labels_lst:
    print(label)
    display_sample_images('training', label)

In [None]:
# Check the number of images in each directory
def check_no_images(label):
    print(f'{label} Total Images:', labels[labels.label == label].shape[0])
    print(f'{label} Training Images:', len(os.listdir(os.path.join(destination_dir, 'training', label))))
    print(f'{label} Validation Images:', len(os.listdir(os.path.join(destination_dir, 'validation', label))))
    print(f'{label} Testing Images:', len(os.listdir(os.path.join(destination_dir, 'testing', label))))

In [None]:
check_no_images('T-Shirt')

In [None]:
check_no_images('Shoes')

# Data Preparation

In [None]:
DATA_ROOT = "/kaggle/working/clothes"

# 2) Sub‐directories for train/val/test
TRAIN_DIR = f"{DATA_ROOT}/training"
VAL_DIR   = f"{DATA_ROOT}/validation"
TEST_DIR  = f"{DATA_ROOT}/testing"

# 3) Image dimensions and batch size
IMG_HEIGHT = 224
IMG_WIDTH  = 224
BATCH_SIZE = 32

# 4) Mean/STD normalization (ImageNet‐style) or simply rescale [0,255] → [0,1]
#    (Keras’ built-in Rescaling layer or rescale=1./255 in ImageDataGenerator)
RESCALE_FACTOR = 1.0 / 255.0

# 5) Augmentation parameters for the training generator
TRAIN_AUG_PARAMS = dict(
    rescale=RESCALE_FACTOR,
    rotation_range=15,          # randomly rotate by up to ±15°
    width_shift_range=0.10,     # randomly shift horizontally by ±10%
    height_shift_range=0.10,    # randomly shift vertically by ±10%
    shear_range=0.10,           # shear transformations
    zoom_range=0.10,            # randomly zoom in/out by ±10%
    horizontal_flip=True,       # randomly flip images left→right
    fill_mode="nearest"         # filling strategy for newly created pixels
)

# 6) For validation & test, we typically only rescale (no random aug)
VAL_TEST_PARAMS = dict(
    rescale=RESCALE_FACTOR
)


# ──────────────────────────────────────────────────────────────────────────────
# STEP A: CREATE KERAS IMAGE DATA GENERATORS
# ──────────────────────────────────────────────────────────────────────────────

train_datagen = ImageDataGenerator(**TRAIN_AUG_PARAMS)
val_datagen   = ImageDataGenerator(**VAL_TEST_PARAMS)
test_datagen  = ImageDataGenerator(**VAL_TEST_PARAMS)

# ──────────────────────────────────────────────────────────────────────────────
# STEP B: POINT EACH GENERATOR AT THE CORRECT FOLDER
# ──────────────────────────────────────────────────────────────────────────────

#  B.1) Training generator:
train_generator = train_datagen.flow_from_directory(
    TRAIN_DIR,
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    color_mode="rgb",
    batch_size=BATCH_SIZE,
    class_mode="categorical",    # since you have multiple classes (one‐hot)
    shuffle=True,
    seed=42
)

#  B.2) Validation generator:
val_generator = val_datagen.flow_from_directory(
    VAL_DIR,
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    color_mode="rgb",
    batch_size=BATCH_SIZE,
    class_mode="categorical",
    shuffle=False,               # keep deterministic ordering for validation metrics
    seed=42
)

#  B.3) Test generator:
#       Use shuffle=False so that predictions ↔ filenames stay aligned
test_generator = test_datagen.flow_from_directory(
    TEST_DIR,
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    color_mode="rgb",
    batch_size=BATCH_SIZE,
    class_mode="categorical",
    shuffle=False,
    seed=42
)

# ──────────────────────────────────────────────────────────────────────────────
# STEP C: (OPTIONAL) DISPLAY A BATCH OF TRAIN IMAGES TO VERIFY AUGMENTATIONS
# ──────────────────────────────────────────────────────────────────────────────



# Grab one batch from train_generator
images_batch, labels_batch = next(train_generator)

# Show a 3×3 grid of random images + their one‐hot labels
plt.figure(figsize=(8, 8))
for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow(images_batch[i])
    label_index = labels_batch[i].argmax()   # integer index of the class
    label_name  = list(train_generator.class_indices.keys())[label_index]
    plt.title(label_name, fontsize=8)
    plt.axis("off")
plt.tight_layout()
plt.show()

In [None]:
total_no_images = train_generator.n + valid_generator.n + test_generator.n
print('Training Images:', round(train_generator.n / total_no_images *100, 2), '%')
print('Validation Images:', round(valid_generator.n / total_no_images *100, 2), '%')
print('Testing Images:', round(test_generator.n / total_no_images *100, 2), '%')

In [None]:
# Check the total number of images 
assert len(labels) == total_no_images, 'Total number of images does not match'

# Baseline

In [None]:
model = Sequential([
    Conv2D(32, (3,3), activation='relu', input_shape=(IMAGE_SIZE, IMAGE_SIZE, 3)),
    MaxPooling2D(2,2),

    Conv2D(64, (3,3), activation='relu'),
    MaxPooling2D(2,2),

    Conv2D(128, (3,3), activation='relu'),
    MaxPooling2D(2,2),

    Flatten(),
    Dense(128, activation='relu'),
    Dense(train_generator.num_classes, activation='softmax')
])

model.summary()

In [None]:
CHECKPOINT_PATH = "/kaggle/working/base_model.weights.h5"

checkpoint_cb = ModelCheckpoint(
    filepath=CHECKPOINT_PATH,
    monitor="val_loss",   # if you have defined a custom 'macro_f1' metric, otherwise use "val_loss" or "val_categorical_accuracy"
    save_best_only=True,
    mode="min",               # we want to maximize macro‐F1; use "min" if monitoring "val_loss"
    verbose=1,
    save_weights_only=True
)

earlystop_cb = EarlyStopping(
    monitor="val_loss",   # again, or "val_loss"
    mode="min",               # or "min" for loss
    patience=5,
    restore_best_weights=True,
    verbose=1
)

model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-3),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

history = model.fit(
    train_generator,
    epochs=30,
    validation_data=val_generator,
    callbacks=[checkpoint_cb, earlystop_cb]
)

In [None]:
test_loss, test_acc = model.evaluate(test_generator, verbose=1)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

# To get per‐image predictions (for confusion matrix / macro‐F1), do:


# Reset the generator’s index to zero, so predictions match filenames:
test_generator.reset()

# Number of steps to cover the entire test set
steps = int(np.ceil(test_generator.samples / test_generator.batch_size))

# Get softmax predictions for all test images
pred_probs = model.predict(test_generator, steps=steps, verbose=1)
pred_labels = np.argmax(pred_probs, axis=1)
true_labels = test_generator.classes  # integer labels in the same order

# Map indices back to class names
idx_to_class = {v: k for k, v in test_generator.class_indices.items()}
pred_names = [idx_to_class[i] for i in pred_labels]
true_names = [idx_to_class[i] for i in true_labels]

# Print a quick classification report
print(classification_report(true_names, pred_names, digits=4))

cm = confusion_matrix(true_labels, pred_labels, labels=list(idx_to_class.keys()))
plt.figure(figsize=(8, 6))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    xticklabels=list(idx_to_class.values()),
    yticklabels=list(idx_to_class.values()),
    cmap="Blues"
)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Test-Set Confusion Matrix")
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

# Phase 1

In [None]:


#  E.1) Build the model:
base_model = EfficientNetB0(
    input_shape=(IMG_HEIGHT, IMG_WIDTH, 3),
    include_top=False,
    weights="imagenet"
)
base_model.trainable = False   # freeze the backbone for Phase 1

x = base_model.output
x = Flatten()(x)
x = Dense(512, activation='relu')(x)
x = Dropout(0.3)(x)
outputs = Dense(train_generator.num_classes, activation="softmax")(x)

model = Model(inputs=base_model.input, outputs=outputs)

model.compile(
    optimizer=Adam(learning_rate=1e-3),
    loss="categorical_crossentropy",
    metrics=["accuracy"]
)

In [None]:
CHECKPOINT_PATH = "/kaggle/working/best_base_model.weights.h5"

checkpoint_cb = ModelCheckpoint(
    filepath=CHECKPOINT_PATH,
    monitor="val_loss",   # if you have defined a custom 'macro_f1' metric, otherwise use "val_loss" or "val_categorical_accuracy"
    save_best_only=True,
    mode="min",               # we want to maximize macro‐F1; use "min" if monitoring "val_loss"
    verbose=1,
    save_weights_only=True
)

earlystop_cb = EarlyStopping(
    monitor="val_loss",   # again, or "val_loss"
    mode="min",               # or "min" for loss
    patience=5,
    restore_best_weights=True,
    verbose=1
)

#  E.2) Fit for a few epochs (just to demonstrate that the generators hook up correctly):
history = model.fit(
    train_generator,
    epochs=30,              # set a large upper limit; EarlyStopping will trim it down
    validation_data=val_generator,
    callbacks=[checkpoint_cb, earlystop_cb],
    verbose=1
)

In [None]:
test_loss, test_acc = model.evaluate(test_generator, verbose=1)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

# To get per‐image predictions (for confusion matrix / macro‐F1), do:


# Reset the generator’s index to zero, so predictions match filenames:
test_generator.reset()

# Number of steps to cover the entire test set
steps = int(np.ceil(test_generator.samples / test_generator.batch_size))

# Get softmax predictions for all test images
pred_probs = model.predict(test_generator, steps=steps, verbose=1)
pred_labels = np.argmax(pred_probs, axis=1)
true_labels = test_generator.classes  # integer labels in the same order

# Map indices back to class names
idx_to_class = {v: k for k, v in test_generator.class_indices.items()}
pred_names = [idx_to_class[i] for i in pred_labels]
true_names = [idx_to_class[i] for i in true_labels]

# Print a quick classification report
print(classification_report(true_names, pred_names, digits=4))

cm = confusion_matrix(true_labels, pred_labels, labels=list(idx_to_class.keys()))
plt.figure(figsize=(8, 6))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    xticklabels=list(idx_to_class.values()),
    yticklabels=list(idx_to_class.values()),
    cmap="Blues"
)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Test-Set Confusion Matrix")
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

In [None]:
#  E.1) Build the model:
base_model = Xception(
    input_shape=(IMG_HEIGHT, IMG_WIDTH, 3),
    include_top=False,
    weights="imagenet"
)
base_model.trainable = False   # freeze the backbone for Phase 1

x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dropout(0.3)(x)
outputs = Dense(train_generator.num_classes, activation="softmax")(x)

model = Model(inputs=base_model.input, outputs=outputs)

model.compile(
    optimizer=Adam(learning_rate=1e-3),
    loss="categorical_crossentropy",
    metrics=["accuracy"]
)

model.summary()

In [None]:
CHECKPOINT_PATH = "/kaggle/working/best_base_X_model.weights.h5"

checkpoint_cb = ModelCheckpoint(
    filepath=CHECKPOINT_PATH,
    monitor="val_loss",   # if you have defined a custom 'macro_f1' metric, otherwise use "val_loss" or "val_categorical_accuracy"
    save_best_only=True,
    mode="min",               # we want to maximize macro‐F1; use "min" if monitoring "val_loss"
    verbose=1,
    save_weights_only=True
)

earlystop_cb = EarlyStopping(
    monitor="val_loss",   # again, or "val_loss"
    mode="min",               # or "min" for loss
    patience=5,
    restore_best_weights=True,
    verbose=1
)

#  E.2) Fit for a few epochs (just to demonstrate that the generators hook up correctly):
history = model.fit(
    train_generator,
    epochs=50,              # set a large upper limit; EarlyStopping will trim it down
    validation_data=val_generator,
    callbacks=[checkpoint_cb, earlystop_cb],
    verbose=1
)

In [None]:
test_loss, test_acc = model.evaluate(test_generator, verbose=1)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

# Reset the generator’s index to zero, so predictions match filenames:
test_generator.reset()

# Number of steps to cover the entire test set
steps = int(np.ceil(test_generator.samples / test_generator.batch_size))

# Get softmax predictions for all test images
pred_probs = model.predict(test_generator, steps=steps, verbose=1)
pred_labels = np.argmax(pred_probs, axis=1)
true_labels = test_generator.classes  # integer labels in the same order

# Map indices back to class names
idx_to_class = {v: k for k, v in test_generator.class_indices.items()}
pred_names = [idx_to_class[i] for i in pred_labels]
true_names = [idx_to_class[i] for i in true_labels]

# Print a quick classification report
print(classification_report(true_names, pred_names, digits=4))

# Optionally plot a confusion matrix


cm = confusion_matrix(true_labels, pred_labels, labels=list(idx_to_class.keys()))
plt.figure(figsize=(8, 6))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    xticklabels=list(idx_to_class.values()),
    yticklabels=list(idx_to_class.values()),
    cmap="Blues"
)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Test-Set Confusion Matrix")
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

In [None]:
def set_random_seeds(seed: int):
    """Sets Python, NumPy, and TensorFlow random seeds for reproducibility."""
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)


def build_frozen_xception_model(num_classes: int, dropout_rate: float = 0.3):
    """
    Constructs an Xception-based model with a frozen convolutional stem,
    returning a compiled Keras model ready for training.
    """
    base = Xception(
        weights="imagenet",
        include_top=False,
        input_shape=(224, 224, 3),
    )
    base.trainable = False  # freeze the ImageNet‐pretrained stem

    x = base.output
    x = GlobalAveragePooling2D()(x)
    x = Dropout(dropout_rate)(x)
    outputs = Dense(num_classes, activation="softmax")(x)

    model = Model(inputs=base.input, outputs=outputs)
    return model


In [None]:


def compute_macro_f1_on_generator(model, generator):
    """
    Runs model.predict(...) on `generator` (entire dataset),
    then computes macro-F1 using sklearn.
    Returns a single float.
    """
    # 1) Reset the generator so it starts from the first batch
    generator.reset()
    steps = int(np.ceil(generator.samples / generator.batch_size))

    # 2) Get predicted class indices
    pred_probs = model.predict(generator, steps=steps, verbose=0)
    pred_labels = np.argmax(pred_probs, axis=1)

    # 3) True labels in the same order
    true_labels = generator.classes

    # 4) Compute macro-F1 (with labels in [0..num_classes-1])
    macro_f1 = f1_score(true_labels, pred_labels, average="macro")
    return macro_f1

def train_frozen_xception(seed: int):
    """
    Trains a frozen‐stem Xception head on train_generator / val_generator
    using the given random seed. Returns the best validation macro‐F1.
    """
    # 1) Set all random seeds so that shuffling, weight inits, etc. are reproducible:
    set_random_seeds(seed)

    # 2) Rebuild data generators so that their internal shuffles also use the seed:
    #    (Only necessary if your generators were created without a fixed seed.)
    #    If you originally did: 
    #         train_generator = train_datagen.flow_from_directory(..., shuffle=True, seed=42)
    #    then re‐instantiating with `seed=seed` ensures each run shuffles differently.
    train_generator.seed = seed
    val_generator.seed = seed

    # 3) Build & compile the model
    model = build_frozen_xception_model(num_classes=train_generator.num_classes)

    model.compile(
        optimizer=Adam(learning_rate=1e-3),
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )


    ckpt_path = f"/kaggle/working/xception_seed{seed}_best.weights.h5"
    checkpoint_cb = ModelCheckpoint(
        filepath=ckpt_path,
        monitor="val_loss",
        save_best_only=True,
        save_weights_only=True,
        mode="min",
        verbose=1
    )
    earlystop_cb = EarlyStopping(
        monitor="val_loss",
        mode="min",
        patience=5,
        restore_best_weights=True,
        verbose=1
    )
    
    model.fit(
        train_generator,
        epochs=50,                    # upper‐bound on epochs
        validation_data=val_generator,
        callbacks=[checkpoint_cb, earlystop_cb],
        verbose=1
    )

    val_macro_f1 = compute_macro_f1_on_generator(model, val_generator)
    return val_macro_f1

In [None]:
seeds = [42, 2022, 999]
f1_scores = []

for seed in seeds:
    print(f"\n=== Running seed {seed} ===")
    f1 = train_frozen_xception(seed)
    print(f"Seed {seed} → val macro-F1 = {f1:.4f}\n")
    f1_scores.append(f1)

f1_scores = np.array(f1_scores)
mean_f1 = f1_scores.mean()
std_f1  = f1_scores.std()

print(f"\nFinal frozen-Xception val macro-F1 over seeds {seeds}: "
      f"{mean_f1:.4f} ± {std_f1:.4f}")

# Phase 2

In [None]:

CLEAN_TRAIN_ROOT = "/kaggle/working/clean_train"

os.makedirs(CLEAN_TRAIN_ROOT, exist_ok=True)

for cls in labels_lst:
    class_dir = os.path.join(CLEAN_TRAIN_ROOT, cls)
    os.makedirs(class_dir, exist_ok=True)

not_found = []

for idx, row in labels.iterrows():
    filename = row["image"]   # e.g. "img_1234.jpg"
    label    = row["label"]   # e.g. "T-Shirt"
    
    # Build source and destination paths
    src = os.path.join(source_dir, filename)
    dst = os.path.join(CLEAN_TRAIN_ROOT, label, filename)
    
    try:
        shutil.copy2(src, dst)
    except FileNotFoundError:
        # In case some filenames are missing from SOURCE_DIR
        not_found.append(src)

In [None]:


# 1a) Create an ImageDataGenerator that ONLY rescales:
train_datagen_cl = ImageDataGenerator(rescale=1.0/255.0)

In [None]:
train_generator_cl = train_datagen_cl.flow_from_directory(
    CLEAN_TRAIN_ROOT,
    target_size=(224, 224),       # Resize every image to 224×224
    color_mode="rgb",             # 3‐channel RGB (as Xception expects)
    batch_size=32,                # You can adjust batch_size if you like
    class_mode="categorical",     # We have 10 one‐hot classes in subfolders
    shuffle=False                 # IMPORTANT: keep a fixed order for CleanLab
)

In [None]:
NUM_CLASSES = train_generator_cl.num_classes  # should be 10
frozen_xcep = build_frozen_xception_model(num_classes=NUM_CLASSES)
frozen_xcep.load_weights("/kaggle/working/xception_seed42_best.weights.h5")

train_generator_cl.reset()
steps_train = int(np.ceil(train_generator_cl.samples / train_generator_cl.batch_size))

In [None]:
probabilities = frozen_xcep.predict(train_generator_cl, steps=steps_train, verbose=1)

true_labels = train_generator_cl.classes  # shape: (N_train,)
filepaths = train_generator_cl.filepaths   # list of length N_train

In [None]:


# 5a) Identify all indices whose “label vs. predicted probs” look suspicious
label_issues_mask = find_label_issues(
    labels=true_labels,
    pred_probs=probabilities,
    return_indices_ranked_by="normalized_margin"
)
problem_indices = np.where(label_issues_mask)[0]
print("Number of flagged images:", len(problem_indices))

# 5b) If you only want the top‐K worst examples to inspect manually:
K = 50
topk_problem_indices = problem_indices[:K]
print("Top‐K flagged indices:", topk_problem_indices[:10])

In [None]:
true_labels = train_generator_cl.classes  # shape: (N_train,)
pred_probs = probabilities                # shape: (N_train, 10)

# 5.1) Find likely mislabeled examples (boolean mask of length N_train)
label_issues_mask = find_label_issues(
    labels=true_labels,
    pred_probs=pred_probs,
    return_indices_ranked_by="normalized_margin"
)

# 5.2) Get the indices of all examples flagged as potential issues, ordered by severity
problem_indices = np.where(label_issues_mask)[0]
len(problem_indices), "images flagged as potential label issues"

# 5.3) (Optional) If you only want the top-K worst images to inspect:
K = 50
topk_problem_indices = problem_indices[:K]

In [None]:
# Build a reverse‐map from class index → class name (string)
idx_to_class = {v: k for k, v in train_generator_cl.class_indices.items()}

# Inspect the top K flagged images
for idx in topk_problem_indices:
    img_path = train_generator_cl.filepaths[idx]
    actual_class_idx = true_labels[idx]
    predicted_class_idx = np.argmax(pred_probs[idx])
    
    actual_name    = idx_to_class[actual_class_idx]
    predicted_name = idx_to_class[predicted_class_idx]

    print(f"Index {idx}  |  File: {os.path.basename(img_path)}")
    print(f"  → Current label = '{actual_name}'")
    print(f"  → Model predicts = '{predicted_name}'")
    display(Image(filename=img_path, width=200))
    print("──" * 20)

In [None]:
len(problem_indices)

# Phase 2B

In [None]:
rand_augment_layer = RandAugment(
    value_range=(0, 255),
    augmentations_per_image=2,
    magnitude=0.3,              # moderately strong augmentations
    magnitude_stddev=0.1,       # mild randomness in strength
    rate=0.7                    # apply to 70% of images
)

# Full augmentation pipeline: flip + rotate + RandAugment + clip + rescale
data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal"),
    tf.keras.layers.RandomRotation(0.10),                 # ±10% rotation
    rand_augment_layer,                                   # RandAugment with 70% rate
    tf.keras.layers.Lambda(lambda x: tf.clip_by_value(x, 0, 255)),  # clip values just in case
    tf.keras.layers.Rescaling(1.0 / 255.0)                # normalize to [0,1]
])

In [None]:
# ──────────────────────────────────────────────────────────────────────────────
# 3.A) Training dataset with RandAugment-Lite
# ──────────────────────────────────────────────────────────────────────────────
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    "/kaggle/working/clothes/training",
    labels="inferred",
    label_mode="categorical",
    image_size=(224, 224),
    batch_size=32,
    shuffle=True  # we will re‐shuffle each seed below
)

class_names = train_ds.class_names
num_classes = len(class_names)

def apply_randaug(image, label):
    # `image` comes in as uint8 in [0,255]; feed directly into data_augmentation
    image = data_augmentation(image)  # flip, ±10% rotation, 2 random ops, clip, rescale
    return image, label

train_ds = train_ds.map(apply_randaug, num_parallel_calls=tf.data.AUTOTUNE)
train_ds = train_ds.prefetch(buffer_size=tf.data.AUTOTUNE)

# ──────────────────────────────────────────────────────────────────────────────
# 3.B) Validation dataset (only rescale)
# ──────────────────────────────────────────────────────────────────────────────
val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    "/kaggle/working/clothes/validation",
    labels="inferred",
    label_mode="categorical",
    image_size=(224, 224),
    batch_size=32,
    shuffle=False
)

val_ds = val_ds.map(
    lambda x, y: (tf.cast(x, tf.float32) / 255.0, y),
    num_parallel_calls=tf.data.AUTOTUNE
)
val_ds = val_ds.prefetch(buffer_size=tf.data.AUTOTUNE)


In [None]:
def train_with_randaug(seed: int):
    set_random_seeds(seed)

    # 1) Re‐shuffle the training dataset each seed to get a different batch order
    train_shuffled = train_ds.shuffle(buffer_size=2000, seed=seed)

    # 2) Build a fresh frozen‐Xception model
    model = build_frozen_xception_model(num_classes)

    model.compile(
        optimizer=Adam(learning_rate=1e-3),
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )

    # 3) Set up callbacks: monitor val_loss, save best weights only
    ckpt_path = f"/kaggle/working/randaug_xcep_seed{seed}.weights.h5"
    checkpoint_cb = ModelCheckpoint(
        filepath=ckpt_path,
        monitor="val_loss",
        mode="min",
        save_best_only=True,
        save_weights_only=True,
        verbose=1
    )
    earlystop_cb = EarlyStopping(
        monitor="val_loss",
        mode="min",
        patience=7,
        restore_best_weights=True,
        verbose=1
    )

    # 4) Fit for up to 50 epochs
    history = model.fit(
        train_shuffled,
        validation_data=val_ds,
        epochs=50,
        callbacks=[checkpoint_cb, earlystop_cb],
        verbose=1
    )

    # 5) Load the best‐saved weights and compute validation macro‐F₁
    model.load_weights(ckpt_path)
    y_true = []
    y_pred = []
    for imgs, labs in val_ds:
        probs = model.predict(imgs, verbose=0)
        y_pred.extend(np.argmax(probs, axis=1))
        y_true.extend(np.argmax(labs.numpy(), axis=1))
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    val_macro_f1 = f1_score(y_true, y_pred, average="macro")
    print(f"Seed {seed} → val macro-F₁ (RandAugment) = {val_macro_f1:.4f}")
    return val_macro_f1



In [None]:
seeds = [42, 2022, 999]
randaug_scores = []

for seed in seeds:
    print(f"\n=== Training with RandAugment-Lite (seed {seed}) ===")
    f1 = train_with_randaug(seed)
    randaug_scores.append(f1)

mean_f1 = np.mean(randaug_scores)
std_f1  = np.std(randaug_scores)
print(f"\n→ RandAugment frozen-Xception val macro-F₁ over seeds {seeds}: "
      f"{mean_f1:.4f} ± {std_f1:.4f}")

In [None]:
model = build_frozen_xception_model(num_classes)
model.load_weights("/kaggle/working/randaug_xcep_seed42.weights.h5")

model.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=["accuracy"]
)

test_loss, test_acc = model.evaluate(test_generator, verbose=1)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

# Reset the generator’s index to zero, so predictions match filenames:
test_generator.reset()

# Number of steps to cover the entire test set
steps = int(np.ceil(test_generator.samples / test_generator.batch_size))

# Get softmax predictions for all test images
pred_probs = model.predict(test_generator, steps=steps, verbose=1)
pred_labels = np.argmax(pred_probs, axis=1)
true_labels = test_generator.classes  # integer labels in the same order

# Map indices back to class names
idx_to_class = {v: k for k, v in test_generator.class_indices.items()}
pred_names = [idx_to_class[i] for i in pred_labels]
true_names = [idx_to_class[i] for i in true_labels]

# Print a quick classification report
print(classification_report(true_names, pred_names, digits=4))

# Optionally plot a confusion matrix


cm = confusion_matrix(true_labels, pred_labels, labels=list(idx_to_class.keys()))
plt.figure(figsize=(8, 6))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    xticklabels=list(idx_to_class.values()),
    yticklabels=list(idx_to_class.values()),
    cmap="Blues"
)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Test-Set Confusion Matrix")
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

# Phase 2C

In [None]:
def mixup_batch(images, labels, alpha=0.2):
    """
    images : float32 in [0,1], shape (B,H,W,C)
    labels : one-hot float32,   shape (B,num_classes)
    """
    batch_size = tf.shape(images)[0]

    # 1) λ ~ Beta(α, α)  →  shape (B,1,1,1)
    lam = tfp.distributions.Beta(alpha, alpha).sample([batch_size, 1, 1, 1])
    lam_lbl = tf.reshape(lam, [batch_size, 1])              # shape (B,1) for labels

    # 2) Shuffle within the batch
    idx = tf.random.shuffle(tf.range(batch_size))
    img2   = tf.gather(images, idx)
    label2 = tf.gather(labels, idx)

    # 3) Mix
    mixed_images = lam * images + (1.0 - lam) * img2
    mixed_labels = lam_lbl * labels + (1.0 - lam_lbl) * label2
    return mixed_images, mixed_labels

In [None]:
def apply_mixup(image_batch, label_batch):
    # image_batch: (B,224,224,3) in [0,1], label_batch: (B,10) one-hot
    return mixup_batch(image_batch, label_batch, alpha=0.2)

train_ds_mix = train_ds.map(apply_mixup, num_parallel_calls=tf.data.AUTOTUNE)
train_ds_mix = train_ds_mix.prefetch(tf.data.AUTOTUNE)

In [None]:
def train_with_randaug_mixup(seed: int):
    set_random_seeds(seed)

    # 1) Shuffle & augment (RandAugment + MixUp)
    # train_shuffled = train_generator_with_mixup.shuffle(buffer_size=2000, seed=seed)
    # train_shuffled = train_shuffled.map(apply_mixup, num_parallel_calls=tf.data.AUTOTUNE)
    # train_shuffled = train_shuffled.prefetch(tf.data.AUTOTUNE)

    # 2) Build & compile model
    model = build_frozen_xception_model(num_classes)

    model.compile(
        optimizer=Adam(learning_rate=1e-3),
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )

    # 3) Callbacks (monitor val_loss)
    ckpt_path = f"/kaggle/working/randaug_mixup_xcep_seed{seed}.weights.h5"
    checkpoint_cb = ModelCheckpoint(
        filepath=ckpt_path,
        monitor="val_loss",
        mode="min",
        save_best_only=True,
        save_weights_only=True,
        verbose=1
    )
    earlystop_cb = EarlyStopping(
        monitor="val_loss",
        mode="min",
        patience=5,
        restore_best_weights=True,
        verbose=1
    )
    # print(len(train_generator))
    # 4) Fit (50 epochs max)
    model.fit(
        train_generator_with_mixup,
        steps_per_epoch=len(train_generator),
        validation_data=val_generator,
        validation_steps=(val_generator.samples // val_generator.batch_size),
        epochs=50,
        callbacks=[checkpoint_cb, earlystop_cb],
        verbose=1
    )

    # 5) Load best weights & compute val macro‐F₁
    model.load_weights(ckpt_path)
    y_true = []
    y_pred = []
    for imgs, labs in val_ds:
        probs = model.predict(imgs, verbose=0)
        y_pred.extend(np.argmax(probs, axis=1))
        y_true.extend(np.argmax(labs.numpy(), axis=1))
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    val_macro_f1 = f1_score(y_true, y_pred, average="macro")
    print(f"Seed {seed} → val macro-F₁ (RandAugment + MixUp) = {val_macro_f1:.4f}")
    return val_macro_f1

In [None]:
seeds = [42, 2022, 999]
mixup_scores = []

for seed in seeds:
    print(f"\n=== Training with RandAugment + MixUp (seed {seed}) ===")
    score = train_with_randaug_mixup(seed)
    mixup_scores.append(score)

mean_score = np.mean(mixup_scores)
std_score  = np.std(mixup_scores)
print(f"\n→ RandAugment + MixUp frozen‐Xception val macro-F₁ over seeds {seeds}: "
      f"{mean_score:.4f} ± {std_score:.4f}")

In [None]:
model = build_frozen_xception_model(num_classes)
model.load_weights("/kaggle/working/randaug_mixup_xcep_seed42.weights.h5")

model.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=["accuracy"]
)

test_loss, test_acc = model.evaluate(test_generator, verbose=1)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

# Reset the generator’s index to zero, so predictions match filenames:
test_generator.reset()

# Number of steps to cover the entire test set
steps = int(np.ceil(test_generator.samples / test_generator.batch_size))

# Get softmax predictions for all test images
pred_probs = model.predict(test_generator, steps=steps, verbose=1)
pred_labels = np.argmax(pred_probs, axis=1)
true_labels = test_generator.classes  # integer labels in the same order

# Map indices back to class names
idx_to_class = {v: k for k, v in test_generator.class_indices.items()}
pred_names = [idx_to_class[i] for i in pred_labels]
true_names = [idx_to_class[i] for i in true_labels]

# Print a quick classification report
print(classification_report(true_names, pred_names, digits=4))

# Optionally plot a confusion matrix


cm = confusion_matrix(true_labels, pred_labels, labels=list(idx_to_class.keys()))
plt.figure(figsize=(8, 6))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    xticklabels=list(idx_to_class.values()),
    yticklabels=list(idx_to_class.values()),
    cmap="Blues"
)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Test-Set Confusion Matrix")
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

# Phase 2D

In [None]:
class_names = sorted(train_df["label"].unique())  # same ordering as your data folders
class_to_idx = {cls: i for i, cls in enumerate(class_names)}

# 2. Convert labels to integer indices
label_indices = train_df["label"].map(class_to_idx).values

# 3. Compute balanced weights
weights = compute_class_weight(
    class_weight="balanced",
    classes=np.arange(len(class_names)),
    y=label_indices
)

# 4. Wrap as dict for Keras
class_weights = dict(enumerate(weights))

In [None]:
def train_with_randaug(seed: int):
    set_random_seeds(seed)

    # 1) Re‐shuffle the training dataset each seed to get a different batch order
    train_shuffled = train_ds.shuffle(buffer_size=2000, seed=seed)

    # 2) Build a fresh frozen‐Xception model
    model = build_frozen_xception_model(num_classes)

    model.compile(
        optimizer=Adam(learning_rate=1e-3),
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )

    # 3) Set up callbacks: monitor val_loss, save best weights only
    ckpt_path = f"/kaggle/working/randaug_xcep_seed{seed}.weights.h5"
    checkpoint_cb = ModelCheckpoint(
        filepath=ckpt_path,
        monitor="val_loss",
        mode="min",
        save_best_only=True,
        save_weights_only=True,
        verbose=1
    )
    earlystop_cb = EarlyStopping(
        monitor="val_loss",
        mode="min",
        patience=7,
        restore_best_weights=True,
        verbose=1
    )

    # 4) Fit for up to 50 epochs
    history = model.fit(
        train_generator,
        validation_data=val_generator,
        epochs=50,
        callbacks=[checkpoint_cb, earlystop_cb],
        verbose=1,
        class_weight=class_weights
    )

    # 5) Load the best‐saved weights and compute validation macro‐F₁
    model.load_weights(ckpt_path)
    y_true = []
    y_pred = []
    for imgs, labs in val_ds:
        probs = model.predict(imgs, verbose=0)
        y_pred.extend(np.argmax(probs, axis=1))
        y_true.extend(np.argmax(labs.numpy(), axis=1))
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    val_macro_f1 = f1_score(y_true, y_pred, average="macro")
    print(f"Seed {seed} → val macro-F₁ (RandAugment) = {val_macro_f1:.4f}")
    return val_macro_f1

In [None]:
seeds = [42, 2022, 999]
randaug_scores = []

for seed in seeds:
    print(f"\n=== Training with RandAugment-Lite (seed {seed}) ===")
    f1 = train_with_randaug(seed)
    randaug_scores.append(f1)

mean_f1 = np.mean(randaug_scores)
std_f1  = np.std(randaug_scores)
print(f"\n→ RandAugment frozen-Xception val macro-F₁ over seeds {seeds}: "
      f"{mean_f1:.4f} ± {std_f1:.4f}")

In [None]:
model = build_frozen_xception_model(num_classes)
model.load_weights("/kaggle/working/randaug_xcep_seed2022.weights.h5")

model.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=["accuracy"]
)

test_loss, test_acc = model.evaluate(test_generator, verbose=1)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

# Reset the generator’s index to zero, so predictions match filenames:
test_generator.reset()

# Number of steps to cover the entire test set
steps = int(np.ceil(test_generator.samples / test_generator.batch_size))

# Get softmax predictions for all test images
pred_probs = model.predict(test_generator, steps=steps, verbose=1)
pred_labels = np.argmax(pred_probs, axis=1)
true_labels = test_generator.classes  # integer labels in the same order

# Map indices back to class names
idx_to_class = {v: k for k, v in test_generator.class_indices.items()}
pred_names = [idx_to_class[i] for i in pred_labels]
true_names = [idx_to_class[i] for i in true_labels]

# Print a quick classification report
print(classification_report(true_names, pred_names, digits=4))

# Optionally plot a confusion matrix


cm = confusion_matrix(true_labels, pred_labels, labels=list(idx_to_class.keys()))
plt.figure(figsize=(8, 6))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    xticklabels=list(idx_to_class.values()),
    yticklabels=list(idx_to_class.values()),
    cmap="Blues"
)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Test-Set Confusion Matrix")
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

In [None]:
model = build_frozen_xception_model(num_classes)
model.load_weights("/kaggle/working/randaug_mixup_xcep_seed42.weights.h5")

model.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=["accuracy"]
)

test_loss, test_acc = model.evaluate(test_generator, verbose=1)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

# Reset the generator’s index to zero, so predictions match filenames:
test_generator.reset()

# Number of steps to cover the entire test set
steps = int(np.ceil(test_generator.samples / test_generator.batch_size))

# Get softmax predictions for all test images
pred_probs = model.predict(test_generator, steps=steps, verbose=1)
pred_labels = np.argmax(pred_probs, axis=1)
true_labels = test_generator.classes  # integer labels in the same order

# Map indices back to class names
idx_to_class = {v: k for k, v in test_generator.class_indices.items()}
pred_names = [idx_to_class[i] for i in pred_labels]
true_names = [idx_to_class[i] for i in true_labels]

# Print a quick classification report
print(classification_report(true_names, pred_names, digits=4))

# Optionally plot a confusion matrix


cm = confusion_matrix(true_labels, pred_labels, labels=list(idx_to_class.keys()))
plt.figure(figsize=(8, 6))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    xticklabels=list(idx_to_class.values()),
    yticklabels=list(idx_to_class.values()),
    cmap="Blues"
)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Test-Set Confusion Matrix")
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

# Phase 3

In [None]:

def build_and_unfreeze_xception(
    n_unfreeze: int = 20,
):
    """
    1. Build Xception+head.
    2. Load those weights (frozen-head).
    3. Unfreeze the top `n_unfreeze` layers of the Xception base.
    4. Recompile with lr_backbone for all trainable layers.
    """
    # 1) Build fresh model with pre-trained ImageNet base (all frozen by default)
    
    base = Xception(weights="imagenet", include_top=False, input_shape=(224, 224, 3))
    x = GlobalAveragePooling2D()(base.output)
    x = Dropout(0.3)(x)
    output = Dense(num_classes, activation="softmax")(x)
    model = Model(inputs=base.input, outputs=output)

    # 2) If you have frozen-head weights from Phase 2-D, load them now
    model.load_weights("/kaggle/working/randaug_xcep_seed2022.weights.h5")

    # 3) Selectively unfreeze: only the last `n_unfreeze` layers of the base
    #    (we assume base is model.layers[1] if you built with `base = Xception...`)
    base.trainable = True
    # Count base layers and mark the first (len(base.layers) - n_unfreeze) as frozen
    for layer in base.layers[:-n_unfreeze]:
        layer.trainable = False
    for layer in base.layers[-n_unfreeze:]:
        layer.trainable = True

    return model

In [None]:
def train_phase3a(seed: int):
    set_random_seeds(seed)

    # 1) Shuffle the training dataset so that each seed sees a different batch order
    train_shuffled = train_ds.shuffle(buffer_size=2000, seed=seed)

    # 2) Build & unfreeze the top‐20 layers, loading Phase 2-D weights
    model = build_and_unfreeze_xception(
        n_unfreeze=20
    )

    optimizer = Adam(learning_rate=1e-4)
    model.compile(
        optimizer=optimizer,
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )

    # 3) Set up callbacks: monitor val_loss, save best weights only
    ckpt_path = f"/kaggle/working/fine_tuned_xcep_seed{seed}.weights.h5"
    checkpoint_cb = ModelCheckpoint(
        filepath=ckpt_path,
        monitor="val_loss",
        mode="min",
        save_best_only=True,
        save_weights_only=True,
        verbose=1
    )
    earlystop_cb = EarlyStopping(
        monitor="val_loss",
        mode="min",
        patience=7,
        restore_best_weights=True,
        verbose=1
    )

    # 4) Fit for up to 30 epochs, passing class_weights
    history = model.fit(
        train_generator,
        validation_data=val_generator,
        epochs=50,
        class_weight=class_weights,
        callbacks=[checkpoint_cb, earlystop_cb],
        verbose=1
    )

    # 5) Load best‐saved weights and compute validation macro-F₁
    model.load_weights(ckpt_path)

    y_true = []
    y_pred = []
    for imgs, labs in val_ds:
        probs = model.predict(imgs, verbose=0)
        y_pred.extend(np.argmax(probs, axis=1))
        y_true.extend(np.argmax(labs.numpy(), axis=1))
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    val_macro_f1 = f1_score(y_true, y_pred, average="macro")
    print(f"\nSeed {seed} → Phase 3-A val macro-F₁ = {val_macro_f1:.4f}\n")
    return val_macro_f1


In [None]:
seeds = [42, 2022, 999]
phase3a_scores = []

for seed in seeds:
    print(f"\n=== Phase 3-A, training with seed {seed} ===")
    f1 = train_phase3a(seed)
    phase3a_scores.append(f1)

mean_f1 = np.mean(phase3a_scores)
std_f1  = np.std(phase3a_scores)
print(f"\n→ Phase 3-A (Unfreeze top 20) val macro-F₁ over seeds {seeds}: "
      f"{mean_f1:.4f} ± {std_f1:.4f}")


In [None]:
model = build_frozen_xception_model(num_classes)
model.load_weights("/kaggle/working/fine_tuned_xcep_seed2022.weights.h5")

model.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=["accuracy"]
)

test_loss, test_acc = model.evaluate(test_generator, verbose=1)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

# Reset the generator’s index to zero, so predictions match filenames:
test_generator.reset()

# Number of steps to cover the entire test set
steps = int(np.ceil(test_generator.samples / test_generator.batch_size))

# Get softmax predictions for all test images
pred_probs = model.predict(test_generator, steps=steps, verbose=1)
pred_labels = np.argmax(pred_probs, axis=1)
true_labels = test_generator.classes  # integer labels in the same order

# Map indices back to class names
idx_to_class = {v: k for k, v in test_generator.class_indices.items()}
pred_names = [idx_to_class[i] for i in pred_labels]
true_names = [idx_to_class[i] for i in true_labels]

# Print a quick classification report
print(classification_report(true_names, pred_names, digits=4))

# Optionally plot a confusion matrix


cm = confusion_matrix(true_labels, pred_labels, labels=list(idx_to_class.keys()))
plt.figure(figsize=(8, 6))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    xticklabels=list(idx_to_class.values()),
    yticklabels=list(idx_to_class.values()),
    cmap="Blues"
)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Test-Set Confusion Matrix")
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

# Phase 3B

In [None]:
def train_phase3c(seed: int):
    set_random_seeds(seed)

    # 1) Shuffle the training dataset each seed
    # train_shuffled = train_ds.shuffle(buffer_size=2000, seed=seed)

    # 2) Build & unfreeze top 20 layers, load Phase 2-D weights
    model = build_and_unfreeze_xception(
        n_unfreeze = 20,  

    )

    # 3) Compile with Focal Loss (γ = 2.0, no label smoothing)
    focal_loss = CategoricalFocalCrossentropy(
        gamma=2.0,
        alpha=0.2,
        label_smoothing=0.0,   # ensure soft labels come only from RandAugMix, not smoothing
        from_logits=False
    )
    optimizer = Adam(learning_rate=1e-4)
    model.compile(
        optimizer=optimizer,
        loss=focal_loss,
        metrics=["accuracy"]
    )

    # 4) Callbacks: monitor val_loss, save best weights
    ckpt_path = f"/kaggle/working/fine_tuned_xcep_focal_seed{seed}.weights.h5"
    checkpoint_cb = ModelCheckpoint(
        filepath=ckpt_path,
        monitor="val_loss",
        mode="min",
        save_best_only=True,
        save_weights_only=True,
        verbose=1
    )
    earlystop_cb = EarlyStopping(
        monitor="val_loss",
        mode="min",
        patience=4,
        restore_best_weights=True,
        verbose=1
    )

    # 5) Fit up to 30 epochs (RandAugment + Focal Loss, no class weights)
    history = model.fit(
        train_generator,
        validation_data=val_generator,
        epochs=50,
        callbacks=[checkpoint_cb, earlystop_cb],
        verbose=1
    )

    # 6) Load best weights & compute validation macro-F₁
    model.load_weights(ckpt_path)

    y_true = []
    y_pred = []
    for imgs, labs in val_ds:
        probs = model.predict(imgs, verbose=0)
        y_pred.extend(np.argmax(probs, axis=1))
        y_true.extend(np.argmax(labs.numpy(), axis=1))
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    val_macro_f1 = f1_score(y_true, y_pred, average="macro")
    print(f"\nSeed {seed} → Phase 3-C val macro-F₁ = {val_macro_f1:.4f}\n")
    return val_macro_f1

In [None]:
seeds = [42, 2022, 999]
phase3c_scores = []

for seed in seeds:
    print(f"\n=== Phase 3-C (Focal Loss), training with seed {seed} ===")
    score = train_phase3c(seed)
    phase3c_scores.append(score)

mean_f1 = np.mean(phase3c_scores)
std_f1  = np.std(phase3c_scores)
print(f"\n→ Phase 3-C (Focal Loss) val macro-F₁ over seeds {seeds}: "
      f"{mean_f1:.4f} ± {std_f1:.4f}")

In [None]:
model = build_frozen_xception_model(num_classes)
model.load_weights("/kaggle/working/fine_tuned_xcep_focal_seed2022.weights.h5")

model.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=["accuracy"]
)

test_loss, test_acc = model.evaluate(test_generator, verbose=1)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

# Reset the generator’s index to zero, so predictions match filenames:
test_generator.reset()

# Number of steps to cover the entire test set
steps = int(np.ceil(test_generator.samples / test_generator.batch_size))

# Get softmax predictions for all test images
pred_probs = model.predict(test_generator, steps=steps, verbose=1)
pred_labels = np.argmax(pred_probs, axis=1)
true_labels = test_generator.classes  # integer labels in the same order

# Map indices back to class names
idx_to_class = {v: k for k, v in test_generator.class_indices.items()}
pred_names = [idx_to_class[i] for i in pred_labels]
true_names = [idx_to_class[i] for i in true_labels]

# Print a quick classification report
print(classification_report(true_names, pred_names, digits=4))

# Optionally plot a confusion matrix


cm = confusion_matrix(true_labels, pred_labels, labels=list(idx_to_class.keys()))
plt.figure(figsize=(8, 6))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    xticklabels=list(idx_to_class.values()),
    yticklabels=list(idx_to_class.values()),
    cmap="Blues"
)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Test-Set Confusion Matrix")
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

# Phase 3D

In [None]:
# 1) Extract the sorted list of unique labels from train_df
unique_labels = sorted(train_df["label"].unique())
# Example: ["Basic Top", "Dress", "Hat", "Longsleeve", …, "T-Shirt"]

# 2) Build a mapping from each string‐label → integer index
#    (0, 1, 2, …) in exactly the same order as `unique_labels`.
label_to_index = {lbl: idx for idx, lbl in enumerate(unique_labels)}

# 3) Allocate a zero‐filled array of length = num_classes
num_classes = len(unique_labels)
samples_per_class = np.zeros((num_classes,), dtype=np.int32)

# 4) Loop over train_df["label"] and increment the corresponding index
for lbl in train_df["label"]:
    idx = label_to_index[lbl]  
    samples_per_class[idx] += 1

samples_per_class

In [None]:
from tensorflow.keras import backend as K

def make_class_balanced_focal_loss(
    samples_per_class: np.ndarray,
    beta: float = 0.99,
    gamma: float = 1.5
):
    """
    Returns a function `cb_focal_loss(y_true, y_pred)` implementing
    Class‐Balanced Focal Loss for categorical (one‐hot) labels.

    Args:
      samples_per_class: 1D numpy array of length = num_classes,
                         where samples_per_class[c] is the number of training samples in class c.
      beta:             float in [0, 1). Typical values: 0.9 – 0.9999.
      gamma:            focal‐loss focusing parameter. Typical: 1.0 – 2.0.

    The per‐class weight is computed as:
        w_c = (1 − beta) / (1 − beta^n_c),
    then optionally normalized so that sum(w_c) = num_classes.
    The loss for each sample of true class c is:
        w_c * (1 − p_t)^gamma * (−log p_t),
    where p_t = predicted probability for the true class.
    """

    num_classes = int(samples_per_class.shape[0])

    # 1) Compute the class‐balanced weights: w_c = (1 - beta) / (1 - beta^n_c)
    effective_num = 1.0 - np.power(beta, samples_per_class.astype(np.float32))       # shape=(num_classes,)
    class_weights = (1.0 - beta) / (effective_num + K.epsilon())                    # avoid division by zero if n_c=0

    # 2) (Optional) Renormalize so that sum(class_weights) = num_classes
    class_weights = class_weights / np.sum(class_weights) * num_classes

    # 3) Convert to a TF tensor so we can index into it inside the loss
    class_weights_tf = tf.constant(class_weights, dtype=tf.float32)  # shape=(num_classes,)

    def cb_focal_loss(y_true: tf.Tensor, y_pred: tf.Tensor) -> tf.Tensor:
        """
        y_true: one‐hot tensor of shape (batch_size, num_classes)
        y_pred: probability tensor (after softmax) of shape (batch_size, num_classes)
        """
        # a) Clip y_pred to avoid log(0)
        y_pred_clipped = tf.clip_by_value(y_pred, K.epsilon(), 1.0 - K.epsilon())

        # b) Compute p_t = sum_over_c( y_true[c] * y_pred_clipped[c] ) for each sample
        #    Because y_true is one‐hot, p_t is just the probability assigned to the true class.
        p_t = tf.reduce_sum(y_true * y_pred_clipped, axis=-1)  # shape=(batch_size,)

        # c) Focal scaling factor: (1 - p_t)^gamma
        focal_factor = tf.pow(1.0 - p_t, gamma)                # shape=(batch_size,)

        # d) -log(p_t)
        log_p_t = -tf.math.log(p_t)                            # shape=(batch_size,)

        # e) Gather the class‐balanced weight w_c for each sample:
        #    weights_for_sample[i] = class_weights_tf[c] if sample i's true class = c
        weights_for_sample = tf.reduce_sum(class_weights_tf * y_true, axis=-1)  # shape=(batch_size,)

        # f) Compute per‐sample loss:
        #    loss_i = w_c * (1 - p_t)^gamma * (-log p_t)
        loss_per_sample = weights_for_sample * focal_factor * log_p_t          # shape=(batch_size,)

        # g) Return the mean over the batch
        return tf.reduce_mean(loss_per_sample)

    return cb_focal_loss

In [None]:
beta  = 0.99   # try 0.99 or 0.999
gamma = 1.5    # try between 1.0 and 2.0
cb_focal_loss_fn = make_class_balanced_focal_loss(
    samples_per_class = samples_per_class,
    beta               = beta,
    gamma              = gamma
)

In [None]:
def train_phase3d(seed: int):
    # (a) Set random seeds so each split is reproducible
    set_random_seeds(seed)

    # (b) Shuffle the training dataset if you haven’t already
    # train_shuffled = train_ds.shuffle(buffer_size=2000, seed=seed)

    # (c) Build & unfreeze top-20 layers of Xception (Phase 3-A)
    model = build_and_unfreeze_xception(n_unfreeze=20)

    # (d) Compile with Adam + CB-Focal Loss (remove class_weight here)
    optimizer = Adam(learning_rate=1e-4)
    model.compile(
        optimizer = optimizer,
        loss      = cb_focal_loss_fn,       # <— use our custom Class-Balanced Focal Loss
        metrics   = ["accuracy"]    # you can still track accuracy, but monitor macro-F1 separately
    )

    # (e) Set up callbacks (checkpoint & early stopping)
    ckpt_path = f"/kaggle/working/fine_tuned_xcep_cb_loss_seed{seed}.weights.h5"
    checkpoint_cb = ModelCheckpoint(
        filepath          = ckpt_path,
        monitor           = "val_loss",   # or ideally "val_macro_f1" if you add that callback
        mode              = "min",
        save_best_only    = True,
        save_weights_only = True,
        verbose           = 1
    )
    earlystop_cb = EarlyStopping(
        monitor          = "val_loss",    # again, better to track "val_macro_f1" if you have it
        mode             = "min",
        patience         = 7,
        restore_best_weights = True,
        verbose          = 1
    )

    # (f) Fit for up to 50 epochs (no class_weight argument)
    history = model.fit(
        train_generator,            # ← YOUR CODE: generator/Dataset built from train_df
        validation_data = val_generator,  # ← YOUR CODE: generator/Dataset built from val_df
        epochs          = 50,
        callbacks       = [checkpoint_cb, earlystop_cb],
        verbose         = 1
    )

    # (g) Load best weights (according to lowest val_loss or best val_macro_f1)
    model.load_weights(ckpt_path)

    # (h) Compute validation macro-F₁ (since loss/accuracy alone is not enough)
    y_true = []
    y_pred = []
    for imgs, labs in val_ds:      # ← YOUR CODE: val_ds yields (image_batch, one_hot_labels)
        probs = model.predict(imgs, verbose=0)
        y_pred.extend(np.argmax(probs, axis=1))
        y_true.extend(np.argmax(labs.numpy(), axis=1))

    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    val_macro_f1 = f1_score(y_true, y_pred, average="macro")
    print(f"\nSeed {seed} → Phase 3-D val macro-F₁ = {val_macro_f1:.4f}\n")

    return val_macro_f1

In [None]:
seeds = [42, 2022, 999]
phase3d_scores = []

for seed in seeds:
    print(f"\n=== Phase 3-D (LLRD) training with seed {seed} ===")
    score = train_phase3d(seed)
    phase3d_scores.append(score)

mean_f1 = np.mean(phase3d_scores)
std_f1  = np.std(phase3d_scores)
print(f"\n→ Phase 3-D (LLRD) val macro-F₁ over seeds {seeds}: "
      f"{mean_f1:.4f} ± {std_f1:.4f}")

In [None]:
model = build_frozen_xception_model(num_classes)
model.load_weights("/kaggle/working/fine_tuned_xcep_cb_loss_seed999.weights.h5")

model.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=["accuracy"]
)

test_loss, test_acc = model.evaluate(test_generator, verbose=1)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

# Reset the generator’s index to zero, so predictions match filenames:
test_generator.reset()

# Number of steps to cover the entire test set
steps = int(np.ceil(test_generator.samples / test_generator.batch_size))

# Get softmax predictions for all test images
pred_probs = model.predict(test_generator, steps=steps, verbose=1)
pred_labels = np.argmax(pred_probs, axis=1)
true_labels = test_generator.classes  # integer labels in the same order

# Map indices back to class names
idx_to_class = {v: k for k, v in test_generator.class_indices.items()}
pred_names = [idx_to_class[i] for i in pred_labels]
true_names = [idx_to_class[i] for i in true_labels]

# Print a quick classification report
print(classification_report(true_names, pred_names, digits=4))

# Optionally plot a confusion matrix


cm = confusion_matrix(true_labels, pred_labels, labels=list(idx_to_class.keys()))
plt.figure(figsize=(8, 6))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    xticklabels=list(idx_to_class.values()),
    yticklabels=list(idx_to_class.values()),
    cmap="Blues"
)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Test-Set Confusion Matrix")
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

# Phase 3E

In [None]:
def make_cosine_warmup_scheduler(base_lr: float, total_epochs: int, warmup_epochs: int):
    def lr_schedule(epoch):
        if epoch < warmup_epochs:
            return base_lr * (epoch + 1) / warmup_epochs
        progress = float(epoch - warmup_epochs) / float(max(1, total_epochs - warmup_epochs))
        return 0.5 * base_lr * (1.0 + math.cos(math.pi * progress))
    return LearningRateScheduler(lr_schedule, verbose=1)


In [None]:
def train_phase3e(seed: int):
    set_random_seeds(seed)

    # 1) Build & unfreeze top‐20 layers of Xception
    model = build_and_unfreeze_xception(n_unfreeze=20)

    # 2) Compute samples_per_class from train_df (as you already did)
    #    … assume `samples_per_class` is ready …

    # 3) Instantiate CB‐Focal loss
    beta  = 0.99
    gamma = 1.5
    # cb_focal_loss_fn = make_class_balanced_focal_loss(samples_per_class, beta=beta, gamma=gamma)

    # 4) Choose LR parameters for 3E:
    base_lr = 1e-4
    total_epochs = 50
    warmup_epochs = 3
    cosine_warmup_cb = make_cosine_warmup_scheduler(base_lr, total_epochs, warmup_epochs)

    # 5) Compile the model with base_lr (it will be overridden per‐epoch by our callback)
    optimizer = Adam(learning_rate=base_lr)
    model.compile(
        optimizer = optimizer,
        loss      = "categorical_crossentropy",
        metrics   = ["accuracy"]
    )

    # 6) Callbacks: checkpoint, early stop, plus our Cosine+Warm-Up schedule
    ckpt_path = f"/kaggle/working/fine_tuned_xcep_cosine_seed{seed}.weights.h5"
    checkpoint_cb = ModelCheckpoint(
        filepath          = ckpt_path,
        monitor           = "val_loss",  
        mode              = "min",
        save_best_only    = True,
        save_weights_only = True,
        verbose           = 1
    )
    earlystop_cb = EarlyStopping(
        monitor             = "val_loss",  
        mode                = "min",
        patience            = 7,
        restore_best_weights= True,
        verbose             = 1
    )

    # 7) Fit for 50 epochs (with Cosine+Warm-Up)
    history = model.fit(
        train_generator,            # your generator built from train_df
        validation_data = val_generator,
        epochs          = total_epochs,
        callbacks       = [cosine_warmup_cb, checkpoint_cb, earlystop_cb],
        verbose         = 1,
        class_weight    = class_weights,
    )

    # 8) Load best weights & compute val_macro-F1 (as before)
    model.load_weights(ckpt_path)
    y_true, y_pred = [], []
    for imgs, labs in val_ds:
        probs = model.predict(imgs, verbose=0)
        y_pred.extend(np.argmax(probs, axis=1))
        y_true.extend(np.argmax(labs.numpy(), axis=1))
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    val_macro_f1 = f1_score(y_true, y_pred, average="macro")
    print(f"\nSeed {seed} → Phase 3-A (with 3E) val macro-F1 = {val_macro_f1:.4f}\n")
    return val_macro_f1

In [None]:
seeds = [42, 2022, 999]
phase3e_scores = []

for seed in seeds:
    print(f"\n=== Phase 3-E Cosine training with seed {seed} ===")
    score = train_phase3e(seed)
    phase3e_scores.append(score)

mean_f1 = np.mean(phase3e_scores)
std_f1  = np.std(phase3e_scores)
print(f"\n→ Phase 3-E Cosine val macro-F₁ over seeds {seeds}: "
      f"{mean_f1:.4f} ± {std_f1:.4f}")

In [None]:
model = build_frozen_xception_model(num_classes)
model.load_weights("/kaggle/working/fine_tuned_xcep_cosine_seed42.weights.h5")

model.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=["accuracy"]
)

test_loss, test_acc = model.evaluate(test_generator, verbose=1)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

# Reset the generator’s index to zero, so predictions match filenames:
test_generator.reset()

# Number of steps to cover the entire test set
steps = int(np.ceil(test_generator.samples / test_generator.batch_size))

# Get softmax predictions for all test images
pred_probs = model.predict(test_generator, steps=steps, verbose=1)
pred_labels = np.argmax(pred_probs, axis=1)
true_labels = test_generator.classes  # integer labels in the same order

# Map indices back to class names
idx_to_class = {v: k for k, v in test_generator.class_indices.items()}
pred_names = [idx_to_class[i] for i in pred_labels]
true_names = [idx_to_class[i] for i in true_labels]

# Print a quick classification report
print(classification_report(true_names, pred_names, digits=4))

# Optionally plot a confusion matrix


cm = confusion_matrix(true_labels, pred_labels, labels=list(idx_to_class.keys()))
plt.figure(figsize=(8, 6))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    xticklabels=list(idx_to_class.values()),
    yticklabels=list(idx_to_class.values()),
    cmap="Blues"
)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Test-Set Confusion Matrix")
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

In [None]:
class SWA(tf.keras.callbacks.Callback):
    """
    Implements Stochastic Weight Averaging (SWA) without using TensorFlow Addons.
    - start_epoch: 0-based epoch index at which to begin averaging.
    - verbose: if True, prints updates when averaging begins and ends.
    """
    def __init__(self, start_epoch=10, verbose=False):
        super().__init__()
        self.start_epoch = start_epoch
        self.verbose = verbose
        self.swa_n = 0
        self.swa_weights = None

    def on_epoch_end(self, epoch, logs=None):
        # Only start averaging once we reach start_epoch (inclusive)
        if epoch < self.start_epoch:
            return

        weights = self.model.get_weights()  # list of numpy arrays
        if self.swa_weights is None:
            # First time: initialize SWA weights to current model weights
            self.swa_weights = [w.astype(np.float32) for w in weights]
            self.swa_n = 1
            if self.verbose:
                print(f"\n[SWA] Initialized averaging at epoch {epoch}.")
        else:
            # Update running average: new_avg = (prev_avg * n + current) / (n + 1)
            n = self.swa_n
            for i in range(len(self.swa_weights)):
                self.swa_weights[i] = (self.swa_weights[i] * n + weights[i]) / (n + 1)
            self.swa_n += 1

    def on_train_end(self, logs=None):
        # At the end of training, replace model weights with the SWA average
        if self.swa_weights is not None:
            if self.verbose:
                print(f"\n[SWA] Assigning averaged weights (over {self.swa_n} snapshots).")
            self.model.set_weights(self.swa_weights)
        else:
            if self.verbose:
                print("\n[SWA] No averaging was performed (start_epoch too large).")

In [None]:
model = build_and_unfreeze_xception(n_unfreeze=20)

base_lr = 1e-5
optimizer = Adam(learning_rate=base_lr)
model.compile(
    optimizer = optimizer,
    loss      = "categorical_crossentropy",
    metrics   = ["accuracy"]
)


ckpt_path = "/kaggle/working/fine_tuned_xcep_swa_ce.weights.h5"
checkpoint_cb = ModelCheckpoint(
    filepath          = ckpt_path,
    monitor           = "val_loss",
    mode              = "min",
    save_best_only    = True,
    save_weights_only = True,
    verbose           = 1
)
earlystop_cb = EarlyStopping(
    monitor             = "val_loss",
    mode                = "min",
    patience            = 7,
    restore_best_weights= True,
    verbose             = 1
)
swa_cb = SWA(start_epoch=5, verbose=True)


history = model.fit(
    train_generator,            # your training generator / tf.data.Dataset
    validation_data = val_generator,
    epochs          = 50,
    callbacks       = [checkpoint_cb, earlystop_cb, swa_cb],
    verbose         = 1,
    class_weight    = class_weights
)


In [None]:
model = build_frozen_xception_model(num_classes)
model.load_weights("/kaggle/working/fine_tuned_xcep_swa_ce.weights.h5")

model.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=["accuracy"]
)

test_loss, test_acc = model.evaluate(test_generator, verbose=1)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

# Reset the generator’s index to zero, so predictions match filenames:
test_generator.reset()

# Number of steps to cover the entire test set
steps = int(np.ceil(test_generator.samples / test_generator.batch_size))

# Get softmax predictions for all test images
pred_probs = model.predict(test_generator, steps=steps, verbose=1)
pred_labels = np.argmax(pred_probs, axis=1)
true_labels = test_generator.classes  # integer labels in the same order

# Map indices back to class names
idx_to_class = {v: k for k, v in test_generator.class_indices.items()}
pred_names = [idx_to_class[i] for i in pred_labels]
true_names = [idx_to_class[i] for i in true_labels]

# Print a quick classification report
print(classification_report(true_names, pred_names, digits=4))

# Optionally plot a confusion matrix


cm = confusion_matrix(true_labels, pred_labels, labels=list(idx_to_class.keys()))
plt.figure(figsize=(8, 6))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    xticklabels=list(idx_to_class.values()),
    yticklabels=list(idx_to_class.values()),
    cmap="Blues"
)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Test-Set Confusion Matrix")
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

# GradCAM

In [None]:
model = build_and_unfreeze_xception(
        n_unfreeze=20
    )


In [None]:
import numpy as np
import tensorflow as tf
import cv2
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing import image

def compute_gradcam(model, img_tensor, class_index=None, layer_name=None):
    # Find last Conv2D layer if none provided
    if layer_name is None:
        for l in reversed(model.layers):
            if isinstance(l, tf.keras.layers.Conv2D):
                layer_name = l.name
                break

    # Create a model that outputs both conv layer activations and predictions
    grad_model = tf.keras.models.Model(
        inputs=model.inputs,
        outputs=[model.get_layer(layer_name).output, model.output]
    )

    with tf.GradientTape() as tape:
        conv_outputs, preds = grad_model(img_tensor)   # conv_outputs: (1, Hc, Wc, C)
        if class_index is None:
            class_index = tf.argmax(preds[0])
        loss = preds[:, class_index]                  # loss: (1,)

    # Compute gradients of the target class score w.r.t. conv feature maps
    grads = tape.gradient(loss, conv_outputs)         # grads: (1, Hc, Wc, C)
    pooled_grads = tf.reduce_mean(grads, axis=(1, 2)) # pooled_grads: (1, C)

    conv_outputs = conv_outputs[0]   # (Hc, Wc, C)
    weights = pooled_grads[0]       # (C,)

    # Multiply each feature map by its corresponding weight (broadcasting)
    weighted_maps = conv_outputs * weights            # (Hc, Wc, C)

    # Sum across channels to get the raw heatmap, then apply ReLU
    heatmap = tf.reduce_sum(weighted_maps, axis=-1).numpy()  # (Hc, Wc)
    heatmap = np.maximum(heatmap, 0)

    # Normalize heatmap to [0, 1]
    if heatmap.max() != 0:
        heatmap /= heatmap.max()

    # Resize heatmap to match the input size (H, W)
    heatmap = cv2.resize(heatmap, (img_tensor.shape[2], img_tensor.shape[1]))
    return heatmap, preds[0].numpy()


# ─────────────────────────────────────────────────────────────────────────────
# Assume the following are already defined elsewhere in your code:
#   • model                  → your trained Keras model
#   • test_generator         → a DirectoryIterator or similar with .filepaths & .class_indices
#   • (Optional) last_conv   → name of the last Conv2D layer, e.g. "block14_sepconv2_act"
#
# If you don’t know the layer name in advance, the code below finds it automatically:
# ─────────────────────────────────────────────────────────────────────────────
last_conv = None
for layer in reversed(model.layers):
    if isinstance(layer, tf.keras.layers.Conv2D):
        last_conv = layer.name
        break

# Build inverse mapping index → label
class_indices_inv = {v: k for k, v in test_generator.class_indices.items()}

# Pick 10 random images from the test set
num_samples = len(test_generator.filepaths)
random_idxs = np.random.choice(num_samples, size=10, replace=False)

for idx in random_idxs:
    # Load and resize raw image to (224, 224)
    path = test_generator.filepaths[idx]
    raw_pil = image.load_img(path, target_size=(224, 224))
    raw_np  = image.img_to_array(raw_pil).astype("uint8")  # (224,224,3)

    # Prepare model input: scale to [0,1], add batch dim
    x = raw_np.astype("float32") / 255.0
    x = np.expand_dims(x, axis=0)  # (1,224,224,3)

    # Compute Grad-CAM heatmap and predictions
    heatmap, preds = compute_gradcam(model, x, layer_name=last_conv)

    # Convert heatmap to color and overlay on raw image
    heatmap_uint8 = np.uint8(255 * heatmap)                        
    heatmap_color = cv2.applyColorMap(heatmap_uint8, cv2.COLORMAP_JET)  
    raw_bgr = cv2.cvtColor(raw_np, cv2.COLOR_RGB2BGR)
    overlay_bgr = cv2.addWeighted(raw_bgr, 0.6, heatmap_color, 0.4, 0)
    overlay_rgb = cv2.cvtColor(overlay_bgr, cv2.COLOR_BGR2RGB)

    # Determine predicted label and confidence
    pred_idx   = np.argmax(preds)
    pred_label = class_indices_inv[pred_idx]
    pred_conf  = preds[pred_idx]

    # Plot original and Grad-CAM overlay side by side
    fig, axes = plt.subplots(1, 2, figsize=(8, 4))
    axes[0].imshow(raw_np.astype("uint8"))
    axes[0].axis("off")
    axes[0].set_title("Original")
    axes[1].imshow(overlay_rgb.astype("uint8"))
    axes[1].axis("off")
    axes[1].set_title(f"{pred_label} ({pred_conf:.3f})")
    plt.tight_layout()
    plt.show()
