In [None]:
#Mount & install
from google.colab import drive
drive.mount('/content/drive')

!pip -q install efficientnet tensorflow-addons

# Reproducibility
import os, random, numpy as np, tensorflow as tf
SEED = 42
random.seed(SEED); np.random.seed(SEED); tf.random.set_seed(SEED)

#Common imports
import matplotlib.pyplot as plt
from collections import Counter
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

from tensorflow.keras import layers, Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import MobileNetV2
from efficientnet.tfkeras import EfficientNetB0



In [None]:
BASE = '/content/drive/MyDrive/Capstone2025_DeepfakeDetection'
DATA = f'{BASE}/data/frames_cropped_split'
MODEL_DIR = f'{BASE}/models_fast'
os.makedirs(MODEL_DIR, exist_ok=True)

IMG_SIZE = (224, 224)
BATCH_SIZE = 32
EPOCHS = 15


In [None]:
datagen = ImageDataGenerator(rescale=1./255)

train_generator = datagen.flow_from_directory(
    os.path.join(DATA, 'train'), target_size=IMG_SIZE,
    batch_size=BATCH_SIZE, class_mode='binary', shuffle=True, seed=SEED
)
val_generator = datagen.flow_from_directory(
    os.path.join(DATA, 'val'), target_size=IMG_SIZE,
    batch_size=BATCH_SIZE, class_mode='binary', shuffle=False
)
test_generator = datagen.flow_from_directory(
    os.path.join(DATA, 'test'), target_size=IMG_SIZE,
    batch_size=BATCH_SIZE, class_mode='binary', shuffle=False
)

# class weights
from collections import Counter
counts = Counter(train_generator.classes)  # {0: real, 1: fake}
total = sum(counts.values())
class_weight = {0: total/(2.0*counts[0]), 1: total/(2.0*counts[1])}
class_weight


In [None]:
def add_head(base, dropout=0.3):
    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.BatchNormalization()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(dropout)(x)
    out = layers.Dense(1, activation='sigmoid')(x)
    return Model(base.input, out)

def build_mobilenetv2(input_shape=(224,224,3)):
    base = MobileNetV2(weights='imagenet', include_top=False, input_shape=input_shape)
    base.trainable = False   # keeping frozen
    model = add_head(base, dropout=0.3)
    model.compile(optimizer=Adam(1e-3), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_efficientnetb0(input_shape=(224,224,3)):
    base = EfficientNetB0(weights='imagenet', include_top=False, input_shape=input_shape)
    base.trainable = False   # keeping frozen
    model = add_head(base, dropout=0.3)
    model.compile(optimizer=Adam(1e-3), loss='binary_crossentropy', metrics=['accuracy'])
    return model

mnv2 = build_mobilenetv2()
enb0 = build_efficientnetb0()


In [None]:
def get_callbacks(name):
    return [
        EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True, verbose=1),
        ReduceLROnPlateau(monitor='val_loss', factor=0.3, patience=2, min_lr=1e-6, verbose=1),
        ModelCheckpoint(os.path.join(MODEL_DIR, f'{name}_best.keras'),
                        monitor='val_accuracy', save_best_only=True, verbose=1)
    ]

def plot_history(h, title):
    acc, val_acc = h.history['accuracy'], h.history['val_accuracy']
    loss, val_loss = h.history['loss'], h.history['val_loss']
    rng = range(len(acc))
    plt.figure(figsize=(12,4))
    plt.subplot(1,2,1); plt.plot(rng, acc, label='train'); plt.plot(rng, val_acc, label='val'); plt.title(f'{title} Acc'); plt.legend()
    plt.subplot(1,2,2); plt.plot(rng, loss, label='train'); plt.plot(rng, val_loss, label='val'); plt.title(f'{title} Loss'); plt.legend()
    plt.show()

def train_one(model, name, epochs=EPOCHS):
    h = model.fit(
        train_generator,
        validation_data=val_generator,
        epochs=epochs,
        callbacks=get_callbacks(name),
        class_weight=class_weight,
        verbose=1
    )
    plot_history(h, name)
    return model


In [None]:
print("Training MobileNetV2 (single-stage)...")
mnv2 = train_one(mnv2, 'mobilenetv2_fast', epochs=EPOCHS)

print("Training EfficientNetB0 (single-stage)...")
enb0 = train_one(enb0, 'efficientnetb0_fast', epochs=EPOCHS)


In [None]:
#weight sweep on VAL, evaluate on TEST
from tensorflow.keras.models import load_model
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, classification_report, confusion_matrix

# 1) Load models
def _load_best(name_fast, name_ft):
    try:
        return load_model(f"{MODEL_DIR}/{name_fast}.keras")
    except Exception as _:
        return load_model(f"{MODEL_DIR}/{name_ft}.keras")

mnv2_best = _load_best("mobilenetv2_fast_best", "mobilenetv2_ft_best")
enb0_best = _load_best("efficientnetb0_fast_best", "efficientnetb0_ft_best")

# 2) Get VAL predictions
val_y      = val_generator.classes
val_mnv2_p = mnv2_best.predict(val_generator, verbose=1)
val_enb0_p = enb0_best.predict(val_generator, verbose=1)

# 3) Sweep α on VAL (EfficientNetB0 weight = α, MobileNetV2 weight = 1-α)
alphas = np.linspace(0.0, 1.0, 21)  # 0.00, 0.05, ..., 1.00

def pick_best_alpha(metric="f1"):
    best_alpha, best_score = None, -1
    for a in alphas:
        probs = a*val_enb0_p + (1-a)*val_mnv2_p
        preds = (probs >= 0.5).astype(int)
        if metric == "f1":
            score = f1_score(val_y, preds, pos_label=1)
        else:
            score = accuracy_score(val_y, preds)
        if score > best_score:
            best_alpha, best_score = float(a), float(score)
    return best_alpha, best_score

alpha_f1,  f1_best  = pick_best_alpha(metric="f1")
alpha_acc, acc_best = pick_best_alpha(metric="acc")

print(f"\nBest α (by F1-Fake): {alpha_f1:.2f}  | F1={f1_best:.4f}")
print(f"Best α (by Accuracy): {alpha_acc:.2f} | Acc={acc_best:.4f}")

ALPHA = alpha_acc
print(f"\n>> Using α={ALPHA:.2f} for TEST (EfficientNetB0 weight).")

# 4) Optional threshold tuning on VAL for the chosen α (keeps default 0.5 if no gain)
def best_threshold(val_probs, y, metric_fn, sweep=np.linspace(0.3, 0.7, 17)):
    best_t, best_m = 0.5, -1
    for t in sweep:
        preds = (val_probs >= t).astype(int)
        m = metric_fn(y, preds)
        if m > best_m:
            best_t, best_m = float(t), float(m)
    return best_t, best_m

val_probs_alpha = ALPHA*val_enb0_p + (1-ALPHA)*val_mnv2_p
t_opt, m_opt = best_threshold(val_probs_alpha, val_y, lambda y,p: accuracy_score(y,p))
print(f"Val-tuned threshold (for accuracy): {t_opt:.2f} (val-acc={m_opt:.4f})")

# 5) TEST evaluation for (a) equal-weight 0.50 and (b) α-chosen + t_opt
test_y      = test_generator.classes
test_mnv2_p = mnv2_best.predict(test_generator, verbose=1)
test_enb0_p = enb0_best.predict(test_generator, verbose=1)

# (i) Equal-weight baseline
test_probs_50 = 0.5*test_enb0_p + 0.5*test_mnv2_p
test_pred_50  = (test_probs_50 >= 0.5).astype(int)
acc_50 = accuracy_score(test_y, test_pred_50)
print("\n=== Equal-weight (0.50/0.50), threshold=0.50 ===")
print("Accuracy:", acc_50)
print(classification_report(test_y, test_pred_50, target_names=['Real','Fake']))
print("Confusion Matrix:\n", confusion_matrix(test_y, test_pred_50))

# (ii) Chosen α + tuned threshold
test_probs_a = ALPHA*test_enb0_p + (1-ALPHA)*test_mnv2_p
test_pred_a  = (test_probs_a >= t_opt).astype(int)
acc_a = accuracy_score(test_y, test_pred_a)
print(f"\n=== Weighted (α={ALPHA:.2f}), threshold={t_opt:.2f} ===")
print("Accuracy:", acc_a)
print(classification_report(test_y, test_pred_a, target_names=['Real','Fake']))
print("Confusion Matrix:\n", confusion_matrix(test_y, test_pred_a))

# 6) Saving the settings
import json, time
summary = {
    "alpha_used": ALPHA,
    "threshold_used": t_opt,
    "val_best_alpha_acc": {"alpha": alpha_acc, "val_acc": acc_best},
    "val_best_alpha_f1":  {"alpha": alpha_f1,  "val_f1_fake": f1_best},
    "test_equal_weight_acc": acc_50,
    "test_weighted_acc": acc_a,
    "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}
with open(f"{MODEL_DIR}/ensemble_summary.json", "w") as f:
    json.dump(summary, f, indent=2)
print(f"\nSaved summary → {MODEL_DIR}/ensemble_summary.json")


In [None]:
# Load best checkpoints
from tensorflow.keras.models import load_model
import numpy as np, os, cv2, matplotlib.pyplot as plt
import tensorflow as tf

mnv2 = load_model(os.path.join(MODEL_DIR, 'mobilenetv2_fast_best.keras'))
enb0 = load_model(os.path.join(MODEL_DIR, 'efficientnetb0_fast_best.keras'))


# Label names
idx_to_label = {v:k for k,v in test_generator.class_indices.items()}

GC_DIR = os.path.join(MODEL_DIR, 'gradcam')
os.makedirs(GC_DIR, exist_ok=True)


In [None]:
def get_last_conv_layer(model):
    # Finding the last 2D conv-like layer
    for layer in reversed(model.layers):
        if isinstance(layer, (tf.keras.layers.Conv2D, tf.keras.layers.DepthwiseConv2D)):
            return layer.name
    # Fallback
    candidates = ['Conv_1', 'block7a_project_conv']
    for name in candidates:
        if name in [l.name for l in model.layers]:
            return name
    raise ValueError("No convolutional layer found for Grad-CAM.")

def compute_gradcam(model, img_array, layer_name=None, eps=1e-8):
    """
    img_array: (H,W,3) in [0,1] (already rescaled). Will add batch dim inside.
    Returns heatmap in [0,1], same HxW.
    """
    if layer_name is None:
        layer_name = get_last_conv_layer(model)
    grad_model = tf.keras.models.Model(
        [model.inputs],
        [model.get_layer(layer_name).output, model.output]
    )

    with tf.GradientTape() as tape:
        inputs = tf.cast(img_array[None, ...], tf.float32)
        conv_outputs, preds = grad_model(inputs)
        # Binary classifier - sigmoid output
        loss = preds[:, 0]

    grads = tape.gradient(loss, conv_outputs)
    # Global-average-pool the gradients over spatial dims
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    conv_outputs = conv_outputs[0]
    heatmap = tf.reduce_sum(tf.multiply(conv_outputs, pooled_grads), axis=-1)

    heatmap = tf.nn.relu(heatmap)
    heatmap = heatmap / (tf.reduce_max(heatmap) + eps)
    heatmap = heatmap.numpy()
    # Resize to input size
    h, w = img_array.shape[:2]
    heatmap = cv2.resize(heatmap, (w, h), interpolation=cv2.INTER_CUBIC)
    return heatmap

def overlay_heatmap(img, heatmap, alpha=0.35):
    """
    img: float [0,1], heatmap: [0,1], returns RGB [0,1]
    """
    cmap = cv2.applyColorMap((heatmap*255).astype(np.uint8), cv2.COLORMAP_JET)
    cmap = cv2.cvtColor(cmap, cv2.COLOR_BGR2RGB) / 255.0
    overlay = (1 - alpha) * img + alpha * cmap
    overlay = np.clip(overlay, 0, 1)
    return overlay


In [None]:
def predict_prob(model, img):

    p = model.predict(img[None, ...], verbose=0)[0,0]
    return float(p)

def show_and_save_gradcam(img, label, fname_prefix="sample"):

    # Individual model CAMs
    cam_mnv2 = compute_gradcam(mnv2, img)
    cam_enb0 = compute_gradcam(enb0, img)

    # average of normalized heatmaps
    cam_ens = (cam_mnv2 + cam_enb0) / 2.0

    # overlays
    ov_mnv2 = overlay_heatmap(img, cam_mnv2)
    ov_enb0 = overlay_heatmap(img, cam_enb0)
    ov_ens  = overlay_heatmap(img, cam_ens)

    # prredictions
    p_mnv2 = predict_prob(mnv2, img)
    p_enb0 = predict_prob(enb0, img)
    p_ens  = (p_mnv2 + p_enb0) / 2.0

    # Titles
    gt = idx_to_label[int(label)]
    pred_m = f"MobileNetV2: p(fake)={p_mnv2:.3f}"
    pred_e = f"EfficientNetB0: p(fake)={p_enb0:.3f}"
    pred_s = f"Ensemble: p(fake)={p_ens:.3f}"

    # Plot
    plt.figure(figsize=(12,9))
    plt.subplot(2,2,1); plt.imshow(img); plt.axis('off'); plt.title(f"Original (GT: {gt})")
    plt.subplot(2,2,2); plt.imshow(ov_mnv2); plt.axis('off'); plt.title(pred_m)
    plt.subplot(2,2,3); plt.imshow(ov_enb0); plt.axis('off'); plt.title(pred_e)
    plt.subplot(2,2,4); plt.imshow(ov_ens);  plt.axis('off'); plt.title(pred_s)
    plt.tight_layout()

    # Save
    out_path = os.path.join(GC_DIR, f"{fname_prefix}.png")
    plt.savefig(out_path, dpi=150, bbox_inches='tight')
    plt.show()
    print(f"Saved → {out_path}")


In [None]:
# Reset generator
test_generator.reset()

# select one batch from test set
batch_imgs, batch_labels = next(test_generator)  # shapes: (B,224,224,3), (B,)

# Visualize first K samples in that batch
K = 5
for i in range(K):
    img = batch_imgs[i]
    lbl = batch_labels[i]
    show_and_save_gradcam(img, lbl, fname_prefix=f"gradcam_test_{i}")
