In [1]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Dropout, Conv2D, Concatenate, Input, GlobalAveragePooling2D
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.applications import EfficientNetB0
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import StratifiedKFold, train_test_split
from pathlib import Path
import json

# Step 1: Define paths and hyperparameters
DATA_ROOT = "/content/drive/MyDrive/NTU-Roselab-Dataset"
IMG_SIZE = 224
BATCH_SIZE = 16
EPOCHS = 8
LEARNING_RATE = 1e-4
N_FOLDS = 5
MODEL_NAME = "RGB_Laplacian_EfficientNetB0"
OUTPUT_DIR = f"/content/drive/MyDrive/Recapture_Photo_Detection/{MODEL_NAME}/results"
SPLIT_DIR = "/content/drive/MyDrive/Recapture_Photo_Detection"
CHECKPOINT_FILE = f"{OUTPUT_DIR}/checkpoint.json"
PREPROCESSING = "RGB_Laplacian"
HYPERPARAMETERS = {
    "learning_rate": LEARNING_RATE,
    "batch_size": BATCH_SIZE,
    "optimizer": "Adam",
    "epochs": EPOCHS,
    "n_folds": N_FOLDS,
    "dropout_rate": 0.4
}

# Step 2: Mount Google Drive and verify dataset path
try:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=True)
except ImportError:
    raise ImportError("This script must be run in Google Colab with Google Drive mounted.")
if not os.path.exists(DATA_ROOT):
    raise FileNotFoundError(f"Dataset directory {DATA_ROOT} does not exist. Please check the path.")

# Step 3: Check dataset balance
def check_dataset_balance(data_root):
    originals_path = os.path.join(data_root, 'originals')
    recaptures_path = os.path.join(data_root, 'recaptures')
    originals_count = sum(len(files) for _, _, files in os.walk(originals_path))
    recaptures_count = sum(len(files) for _, _, files in os.walk(recaptures_path))
    print(f"Dataset Balance: {originals_count} originals, {recaptures_count} recaptures")
    return originals_count, recaptures_count

originals_count, recaptures_count = check_dataset_balance(DATA_ROOT)

# Step 4: Define preprocessing functions
@tf.function
def rgb_preprocess(img):
    img = tf.image.resize(img, (IMG_SIZE, IMG_SIZE))
    tf.debugging.assert_rank(img, 4)
    img = tf.keras.applications.efficientnet.preprocess_input(img)
    tf.debugging.assert_rank(img, 4)
    return img

@tf.function
def laplacian_preprocess(img):
    img = tf.image.resize(img, (IMG_SIZE, IMG_SIZE))
    tf.debugging.assert_rank(img, 4)
    img = tf.image.rgb_to_grayscale(img)
    tf.debugging.assert_rank(img, 4)
    laplacian_kernel = tf.constant([[0, -1, 0],
                                    [-1, 4, -1],
                                    [0, -1, 0]], dtype=tf.float32)
    laplacian_kernel = tf.reshape(laplacian_kernel, [3, 3, 1, 1])
    filtered = tf.nn.conv2d(img, laplacian_kernel, strides=[1, 1, 1, 1], padding='SAME')
    filtered = tf.abs(filtered)
    f_min = tf.reduce_min(filtered)
    f_max = tf.reduce_max(filtered)
    filtered = (filtered - f_min) / (f_max - f_min + 1e-10)
    filtered = tf.tile(filtered, [1, 1, 1, 3])
    tf.debugging.assert_rank(filtered, 4)
    return filtered

@tf.function
def preprocess(img, label):
    rgb_img = rgb_preprocess(img)
    laplacian_img = laplacian_preprocess(img)
    tf.debugging.assert_rank(rgb_img, 4)
    tf.debugging.assert_rank(laplacian_img, 4)
    return (rgb_img, laplacian_img), label

# Step 5: Define random rotation
@tf.function
def random_rotation(img, max_angle=0.1):
    angles = [0, np.pi/2, np.pi, 3*np.pi/2]
    k = tf.random.uniform(shape=(), minval=0, maxval=len(angles), dtype=tf.int32)
    img = tf.image.rot90(img, k)
    return img

# Step 6: Define data augmentation
@tf.function
def augment_image(inputs, label):
    rgb_img, laplacian_img = inputs
    rgb_img = tf.image.random_flip_left_right(rgb_img)
    rgb_img = random_rotation(rgb_img)
    laplacian_img = tf.image.random_flip_left_right(laplacian_img)
    laplacian_img = random_rotation(laplacian_img)
    return (rgb_img, laplacian_img), label

# Step 7: Load or create train-test split
def load_or_create_split():
    split_files = ['X_train.npy', 'X_test.npy', 'y_train.npy', 'y_test.npy']
    if all(os.path.exists(os.path.join(SPLIT_DIR, f)) for f in split_files):
        print("Loading existing train-test split...")
        X_train = np.load(os.path.join(SPLIT_DIR, 'X_train.npy'))
        X_test = np.load(os.path.join(SPLIT_DIR, 'X_test.npy'))
        y_train = np.load(os.path.join(SPLIT_DIR, 'y_train.npy'))
        y_test = np.load(os.path.join(SPLIT_DIR, 'y_test.npy'))
    else:
        print("Creating new train-test split...")
        dataset = image_dataset_from_directory(
            DATA_ROOT,
            labels='inferred',
            label_mode='binary',
            image_size=(IMG_SIZE, IMG_SIZE),
            batch_size=BATCH_SIZE,
            shuffle=True,
            seed=42
        )
        images, labels = [], []
        for img_batch, label_batch in dataset:
            images.append(img_batch.numpy())
            labels.append(label_batch.numpy())
        images = np.concatenate(images, axis=0)
        labels = np.concatenate(labels, axis=0).flatten()

        X_train, X_test, y_train, y_test = train_test_split(
            images, labels, test_size=0.2, stratify=labels, random_state=42
        )

        Path(SPLIT_DIR).mkdir(parents=True, exist_ok=True)
        np.save(os.path.join(SPLIT_DIR, 'X_train.npy'), X_train)
        np.save(os.path.join(SPLIT_DIR, 'X_test.npy'), X_test)
        np.save(os.path.join(SPLIT_DIR, 'y_train.npy'), y_train)
        np.save(os.path.join(SPLIT_DIR, 'y_test.npy'), y_test)
    return X_train, X_test, y_train, y_test

X_train, X_test, y_train, y_test = load_or_create_split()
test_ds = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(BATCH_SIZE).map(preprocess).prefetch(tf.data.AUTOTUNE)

# Step 8: Define the dual EfficientNetB0 model
def create_model():
    # Inputs
    rgb_input = Input(shape=(IMG_SIZE, IMG_SIZE, 3), name='rgb_input')
    laplacian_input = Input(shape=(IMG_SIZE, IMG_SIZE, 3), name='laplacian_input')

    # RGB branch
    rgb_base_model = EfficientNetB0(include_top=False, weights=None, input_shape=(IMG_SIZE, IMG_SIZE, 3), name='efficientnetb0_rgb')
    rgb_base_model.load_weights(tf.keras.utils.get_file(
        'efficientnetb0_notop.h5',
        'https://storage.googleapis.com/keras-applications/efficientnetb0_notop.h5'
    ))
    rgb_base = rgb_base_model(rgb_input)
    rgb_base = GlobalAveragePooling2D(name='gap_rgb')(rgb_base)

    # Laplacian branch
    laplacian_base_model = EfficientNetB0(include_top=False, weights=None, input_shape=(IMG_SIZE, IMG_SIZE, 3), name='efficientnetb0_laplacian')
    laplacian_base_model.load_weights(tf.keras.utils.get_file(
        'efficientnetb0_notop.h5',
        'https://storage.googleapis.com/keras-applications/efficientnetb0_notop.h5'
    ))
    laplacian_base = laplacian_base_model(laplacian_input)
    laplacian_base = GlobalAveragePooling2D(name='gap_laplacian')(laplacian_base)

    # Combine features
    x = Concatenate(name='concat_features')([rgb_base, laplacian_base])
    x = Dense(128, activation='relu', name='dense_128')(x)
    x = Dropout(HYPERPARAMETERS['dropout_rate'], name='dropout')(x)
    out = Dense(1, activation='sigmoid', name='output')(x)

    model = Model(inputs=[rgb_input, laplacian_input], outputs=out, name='RGB_Laplacian_EfficientNetB0')
    model.compile(
        optimizer=tf.keras.optimizers.Adam(LEARNING_RATE),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )

    return model

# Step 9: Convert NumPy types to JSON-serializable types and round floats
def convert_to_serializable(obj):
    if isinstance(obj, np.integer):
        return int(obj)
    elif isinstance(obj, np.floating):
        return round(float(obj), 2)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, np.int64):
        return int(obj)
    return obj

# Step 10: Define function to save results with rounded values
def save_model_results(model, dataset, history, model_name, output_dir, fold=None, preprocessing='None', hyperparameters=None):
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    fold_str = f"_fold_{fold}" if fold is not None else ""

    y_true, y_pred = [], []
    for (rgb_imgs, laplacian_imgs), labels in dataset:
        preds = (model.predict([rgb_imgs, laplacian_imgs], verbose=0) > 0.5).astype(int)
        y_true.extend(labels.numpy().astype(int))
        y_pred.extend(preds.flatten())

    originals_pred = sum(1 for p in y_pred if p == 0)
    recaptures_pred = sum(1 for p in y_pred if p == 1)
    print(f"Predictions {fold_str}: {originals_pred} originals, {recaptures_pred} recaptures")

    class_report = classification_report(y_true, y_pred, target_names=['originals', 'recaptured'], output_dict=True)
    class_report_df = pd.DataFrame(class_report).transpose().round(2)
    class_report_df.to_csv(f'{output_dir}/{model_name}_classification_report{fold_str}.csv')

    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['originals', 'recaptured'], yticklabels=['originals', 'recaptured'])
    plt.title(f'Confusion Matrix - {model_name}{fold_str}')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.savefig(f'{output_dir}/{model_name}_confusion_matrix{fold_str}.png')
    plt.close()

    cm_df = pd.DataFrame(cm, index=['True_originals', 'True_recaptured'], columns=['Pred_originals', 'Pred_recaptured'])
    cm_df.to_csv(f'{output_dir}/{model_name}_confusion_matrix{fold_str}.csv')

    if fold is None:
        summary_file = f'{output_dir}/{model_name}_summary.txt'
        with open(summary_file, 'w') as f:
            model.summary(print_fn=lambda x: f.write(x + '\n'))

    total_params = int(model.count_params())
    trainable_params = int(sum([tf.keras.backend.count_params(w) for w in model.trainable_weights]))

    results = {
        'Model': model_name,
        'Preprocessing': preprocessing,
        'Accuracy': round(class_report['accuracy'], 2),
        'Total_Parameters': total_params,
        'Trainable_Parameters': trainable_params,
        'Fold': fold if fold is not None else 'Final'
    }
    if hyperparameters:
        results.update({k: convert_to_serializable(v) for k, v in hyperparameters.items()})
    for label, metrics in class_report.items():
        if isinstance(metrics, dict):
            results.update({
                f'Precision_{label}': round(metrics['precision'], 2),
                f'Recall_{label}': round(metrics['recall'], 2),
                f'F1-Score_{label}': round(metrics['f1-score'], 2),
                f'Support_{label}': convert_to_serializable(metrics['support'])
            })

    with open(f'{output_dir}/{model_name}_results{fold_str}.json', 'w') as f:
        json.dump(results, f, indent=4, default=convert_to_serializable)

    if history is not None:
        plt.figure(figsize=(10, 4))
        plt.subplot(1, 2, 1)
        plt.plot(history.history['accuracy'], label='Train Accuracy')
        if 'val_accuracy' in history.history:
            plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
        plt.title(f'Accuracy Curve - {model_name}{fold_str}')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.grid(True)

        plt.subplot(1, 2, 2)
        plt.plot(history.history['loss'], label='Train Loss')
        if 'val_loss' in history.history:
            plt.plot(history.history['val_loss'], label='Validation Loss')
        plt.title(f'Loss Curve - {model_name}{fold_str}')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.savefig(f'{output_dir}/{model_name}_accuracy_loss_curve{fold_str}.png')
        plt.close()

    if fold is None:
        model.save(f'{output_dir}/{model_name}_model.keras', overwrite=True)

    return results

# Step 11: Checkpointing and resumption logic
def load_checkpoint():
    if os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, 'r') as f:
            checkpoint = json.load(f)
        last_completed_fold = checkpoint.get('last_completed_fold', 0)
        print(f"Resuming from checkpoint: Last completed fold = {last_completed_fold}")
        return last_completed_fold
    return 0

def save_checkpoint(fold):
    Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
    checkpoint = {'last_completed_fold': fold}
    with open(CHECKPOINT_FILE, 'w') as f:
        json.dump(checkpoint, f, indent=4)

# Step 12: Perform 5-fold cross-validation with checkpointing
skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=42)
fold_results = []
last_completed_fold = load_checkpoint()

for fold, (train_idx, val_idx) in enumerate(skf.split(X_train, y_train), 1):
    if fold <= last_completed_fold:
        print(f"Skipping fold {fold} (already completed)")
        fold_str = f"_fold_{fold}"
        result_file = f'{OUTPUT_DIR}/{MODEL_NAME}_results{fold_str}.json'
        if os.path.exists(result_file):
            with open(result_file, 'r') as f:
                fold_results.append(json.load(f))
        continue

    print(f"\nTraining Fold {fold}/{N_FOLDS}")

    X_fold_train, X_fold_val = X_train[train_idx], X_train[val_idx]
    y_fold_train, y_fold_val = y_train[train_idx], y_train[val_idx]
    train_ds = (
        tf.data.Dataset.from_tensor_slices((X_fold_train, y_fold_train))
        .batch(BATCH_SIZE)
        .map(preprocess)
        .map(augment_image)
        .prefetch(tf.data.AUTOTUNE)
    )

    val_ds = (
        tf.data.Dataset.from_tensor_slices((X_fold_val, y_fold_val))
        .batch(BATCH_SIZE)
        .map(preprocess)
        .prefetch(tf.data.AUTOTUNE)
    )

    model = create_model()
    fold_str = f"_fold_{fold}"
    checkpoint_path = f'{OUTPUT_DIR}/{MODEL_NAME}_model{fold_str}.weights.h5'
    if os.path.exists(checkpoint_path):
        print(f"Loading weights for fold {fold} from {checkpoint_path}")
        model.load_weights(checkpoint_path)

    checkpoint_callback = ModelCheckpoint(checkpoint_path, monitor='val_loss', save_best_only=True, save_weights_only=True)
    early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
    history = model.fit(train_ds, validation_data=val_ds, epochs=EPOCHS, class_weight={0: 1.0, 1: 1.5}, verbose=1, callbacks=[early_stopping, checkpoint_callback])

    results = save_model_results(
        model, val_ds, history, MODEL_NAME, OUTPUT_DIR,
        fold=fold, preprocessing=PREPROCESSING, hyperparameters=HYPERPARAMETERS
    )
    fold_results.append(results)

    save_checkpoint(fold)

# Step 13: Train final model on full training set
if last_completed_fold < N_FOLDS + 1:
    print("\nTraining final model on full training set")
    final_model_path = f'{OUTPUT_DIR}/{MODEL_NAME}_model.keras'
    if os.path.exists(final_model_path) and last_completed_fold == 'final':
        print(f"Final model already saved at {final_model_path}, skipping training")
        result_file = f'{OUTPUT_DIR}/{MODEL_NAME}_results.json'
        if os.path.exists(result_file):
            with open(result_file, 'r') as f:
                results = json.load(f)
            print(f"Final Results for {MODEL_NAME}:", results)
    else:
        train_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train)).batch(BATCH_SIZE).map(preprocess).map(augment_image).prefetch(tf.data.AUTOTUNE)
        model = create_model()
        checkpoint_path = f'{OUTPUT_DIR}/{MODEL_NAME}_model.weights.h5'
        if os.path.exists(checkpoint_path):
            print(f"Loading weights for final model from {checkpoint_path}")
            model.load_weights(checkpoint_path)

        checkpoint_callback = ModelCheckpoint(checkpoint_path, monitor='loss', save_best_only=True, save_weights_only=True)
        history = model.fit(train_ds, epochs=EPOCHS, class_weight={0: 1.0, 1: 1.5}, verbose=1, callbacks=[checkpoint_callback])

        results = save_model_results(
            model, test_ds, history, MODEL_NAME, OUTPUT_DIR,
            preprocessing=PREPROCESSING, hyperparameters=HYPERPARAMETERS
        )
        print(f"Final Results for {MODEL_NAME}:", results)

        save_checkpoint('final')

# Step 14: Aggregate cross-validation results
if fold_results:
    fold_df = pd.DataFrame(fold_results)
    mean_results = {
        'Model': MODEL_NAME,
        'Preprocessing': PREPROCESSING,
        'Mean_Accuracy': round(fold_df['Accuracy'].mean(), 2),
        'Std_Accuracy': round(fold_df['Accuracy'].std(), 2),
        'Mean_Precision_recaptured': round(fold_df['Precision_recaptured'].mean(), 2),
        'Mean_Recall_recaptured': round(fold_df['Recall_recaptured'].mean(), 2),
        'Mean_F1-Score_recaptured': round(fold_df['F1-Score_recaptured'].mean(), 2),
        'Mean_Total_Parameters': int(fold_df['Total_Parameters'].mean()),
        'Mean_Trainable_Parameters': int(fold_df['Trainable_Parameters'].mean())
    }
    with open(f'{OUTPUT_DIR}/{MODEL_NAME}_cv_summary.json', 'w') as f:
        json.dump(mean_results, f, indent=4, default=convert_to_serializable)
    fold_df.to_csv(f'{OUTPUT_DIR}/{MODEL_NAME}_cv_results.csv', index=False)
    print("\nCross-Validation Summary:", mean_results)

Mounted at /content/drive
Dataset Balance: 1202 originals, 1199 recaptures
Loading existing train-test split...
Resuming from checkpoint: Last completed fold = 5
Skipping fold 1 (already completed)
Skipping fold 2 (already completed)
Skipping fold 3 (already completed)
Skipping fold 4 (already completed)
Skipping fold 5 (already completed)

Training final model on full training set
Loading weights for final model from /content/drive/MyDrive/Recapture_Photo_Detection/RGB_Laplacian_EfficientNetB0/results/RGB_Laplacian_EfficientNetB0_model.weights.h5
Epoch 1/8


  saveable.load_own_variables(weights_store.get(inner_path))


[1m120/120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m189s[0m 221ms/step - accuracy: 0.8449 - loss: 0.4623
Epoch 2/8
[1m120/120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m29s[0m 234ms/step - accuracy: 0.9091 - loss: 0.2831
Epoch 3/8
[1m120/120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m27s[0m 219ms/step - accuracy: 0.9329 - loss: 0.2073
Epoch 4/8
[1m120/120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m28s[0m 229ms/step - accuracy: 0.9605 - loss: 0.1228
Epoch 5/8
[1m120/120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m42s[0m 243ms/step - accuracy: 0.9709 - loss: 0.0900
Epoch 6/8
[1m120/120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m29s[0m 235ms/step - accuracy: 0.9803 - loss: 0.0671
Epoch 7/8
[1m120/120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m31s[0m 254ms/step - accuracy: 0.9877 - loss: 0.0447
Epoch 8/8
[1m120/120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 213ms/step - accuracy: 0.9890 - loss: 0.0361
Predictions : 232 originals, 249 

Final Results for RGB_Laplacian_EfficientNetB0: {'Model': 'RGB_Laplacian_EfficientNetB0', 'Preprocessing': 'RGB_Laplacian', 'Accuracy': 0.9, 'Total_Parameters': 8427079, 'Trainable_Parameters': 8343033, 'Fold': 'Final', 'learning_rate': 0.0001, 'batch_size': 16, 'optimizer': 'Adam', 'epochs': 8, 'n_folds': 5, 'dropout_rate': 0.4, 'Precision_originals': 0.91, 'Recall_originals': 0.88, 'F1-Score_originals': 0.9, 'Support_originals': 241.0, 'Precision_recaptured': 0.88, 'Recall_recaptured': 0.92, 'F1-Score_recaptured': 0.9, 'Support_recaptured': 240.0, 'Precision_macro avg': 0.9, 'Recall_macro avg': 0.9, 'F1-Score_macro avg': 0.9, 'Support_macro avg': 481.0, 'Precision_weighted avg': 0.9, 'Recall_weighted avg': 0.9, 'F1-Score_weighted avg': 0.9, 'Support_weighted avg': 481.0}

Cross-Validation Summary: {'Model': 'RGB_Laplacian_EfficientNetB0', 'Preprocessing': 'RGB_Laplacian', 'Mean_Accuracy': np.float64(0.89), 'Std_Accuracy': 0.03, 'Mean_Precision_recaptured': np.float64(0.87), 'Mean_Rec