In [None]:
import os
import random
import numpy as np
from PIL import Image

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models, optimizers, losses, callbacks
from tensorflow.keras.applications.xception import preprocess_input

from sklearn.model_selection import train_test_split
import json
from tensorflow.keras.mixed_precision import set_global_policy
set_global_policy('mixed_float16')

In [None]:
# Parameters
TIME_STEPS = 30  # Number of frames per video
SKIP_FRAMES = 20  # Number of frames to skip
HEIGHT, WIDTH = 299, 299
LSTM_UNITS = 256
NUM_CLASSES = 2  # Real and Fake
DROPOUT_RATE = 0.5
TOTAL_EPOCHS = 25
BATCH_SIZE = 4

In [None]:
# Load JSON file
with open('/teamspace/studios/this_studio/FF++_CElebDF_combined.json', 'r') as f:
    data = json.load(f)

# Extracting training, validation, and test sets
train_data = data['training_set']
val_data = data['validation_set']
test_data = data['testing_set']


In [None]:
# Create a dictionary to store labels and paths
def create_labels_dict(dataset):
    labels = {}
    for entry in dataset:
        folder = entry['folder']   # We don't necessarily need 'folder' beyond reference
        label = 0 if entry['label'] == 'real' else 1  # Convert label to 0 or 1
        path = entry['path']       # Full path to the video folder
        labels[folder] = (label, path)  # Store label and path using folder as key
    return labels

# Create labels and paths for each set
train_labels = create_labels_dict(train_data)
val_labels = create_labels_dict(val_data)
test_labels = create_labels_dict(test_data)

# Get the list of folder names (for reference) in each dataset
train_IDs = list(train_labels.keys())
val_IDs = list(val_labels.keys())
test_IDs = list(test_labels.keys())

In [None]:
# Data preprocessing function
def preprocess_image(image):
    image = np.array(image)
    image = preprocess_input(image)  # Normalize for the model
    return image

# Define VideoDataset class to handle paths and labels from JSON
class VideoDataset(tf.keras.utils.Sequence):
    def __init__(self, list_IDs, labels, batch_size, num_frames=30, skip_frames=20, shuffle=True, **kwargs):
        super().__init__(**kwargs)
        self.list_IDs = list_IDs      # Folder names (for referencing)
        self.labels = labels          # Dictionary with paths and labels
        self.batch_size = batch_size
        self.num_frames = num_frames
        self.skip_frames = skip_frames
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        # Generate indexes of the batch
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
        batch_IDs = [self.list_IDs[k] for k in indexes]  # Get folder names for batch
        X, y = self.__data_generation(batch_IDs)
        return X, y

    def on_epoch_end(self):
        # Shuffle indexes after each epoch
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle:
            np.random.shuffle(self.indexes)

    def __data_generation(self, batch_IDs):
        X = []
        y = []

        for ID in batch_IDs:
            label, video_dir = self.labels[ID]  # Get label and path for this folder

            # Fetch frames from the directory
            frames = sorted(os.listdir(video_dir))

            # If folder is empty, skip this video
            if len(frames) == 0:
                print(f"No frames found in directory {video_dir}, skipping.")
                continue  # Skip if no frames found

            # Apply skip_frames and num_frames logic
            frames = frames[self.skip_frames:self.skip_frames + self.num_frames]

            # Pad frames if not enough are present
            if len(frames) < self.num_frames:
                frames += [frames[-1]] * (self.num_frames - len(frames))  # Pad with the last frame

            frames_array = []
            for frame_name in frames:
                frame_path = os.path.join(video_dir, frame_name)  # Use the path to frames
                img = Image.open(frame_path).convert('RGB')
                img = preprocess_image(img)
                frames_array.append(img)

            X.append(frames_array)
            y.append(label)

        X = np.array(X)  # Shape: (batch_size, num_frames, H, W, C)
        y = np.array(y)

        # Convert labels to one-hot encoding
        # y = tf.keras.utils.to_categorical(y, num_classes=2)
        
        # Optional: check shapes
        # print("X shape:", X.shape)
        # print("y shape:", y.shape)

        return X, y

In [None]:
# Create datasets
train_dataset = VideoDataset(train_IDs, train_labels, BATCH_SIZE, num_frames=TIME_STEPS, skip_frames=SKIP_FRAMES, shuffle=True)
val_dataset = VideoDataset(val_IDs, val_labels, BATCH_SIZE, num_frames=TIME_STEPS, skip_frames=SKIP_FRAMES, shuffle=False)
test_dataset = VideoDataset(test_IDs, test_labels, BATCH_SIZE, num_frames=TIME_STEPS, skip_frames=SKIP_FRAMES, shuffle=False)

In [None]:
# Build the model
def build_model(lstm_hidden_size=256, num_classes=2, dropout_rate=0.5):
    inputs = layers.Input(shape=(TIME_STEPS, HEIGHT, WIDTH, 3))

    base_model = keras.applications.Xception(weights='imagenet', include_top=False, pooling='avg')
    for layer in base_model.layers:
        layer.trainable = False  # Freeze base model layers initially

    x = layers.TimeDistributed(base_model)(inputs)
    x = layers.LSTM(lstm_hidden_size)(x)
    x = layers.Dropout(dropout_rate)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = keras.Model(inputs, outputs)
    return model

model = build_model(lstm_hidden_size=LSTM_UNITS, num_classes=NUM_CLASSES, dropout_rate=DROPOUT_RATE)
model.summary()

In [None]:
import tensorflow as tf
from keras.saving import register_keras_serializable

@register_keras_serializable()
def recall_m(y_true, y_pred):
    # Convert predictions to binary class labels (0 or 1)
    y_pred = tf.argmax(y_pred, axis=-1)  # Get predicted class from probabilities
    
    # True positives (predicted 1 and true 1)
    true_positives = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(y_true, 1), tf.equal(y_pred, 1)), tf.float32))
    
    # Actual positives (true 1)
    actual_positives = tf.reduce_sum(tf.cast(tf.equal(y_true, 1), tf.float32))
    
    # Recall calculation with epsilon for numerical stability
    recall = true_positives / (actual_positives + tf.keras.backend.epsilon())
    return recall

@register_keras_serializable()
def precision_m(y_true, y_pred):
    # Convert predictions to binary class labels (0 or 1)
    y_pred = tf.argmax(y_pred, axis=-1)  # Get predicted class from probabilities
    
    # True positives (predicted 1 and true 1)
    true_positives = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(y_true, 1), tf.equal(y_pred, 1)), tf.float32))
    
    # Predicted positives (predicted 1)
    predicted_positives = tf.reduce_sum(tf.cast(tf.equal(y_pred, 1), tf.float32))
    
    # Precision calculation with epsilon for numerical stability
    precision = true_positives / (predicted_positives + tf.keras.backend.epsilon())
    return precision

@register_keras_serializable()
def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    f1 = 2 * ((precision * recall) / (precision + recall + tf.keras.backend.epsilon()))
    return f1

# Now you can load the model in one of two ways:

# Option 1: Using the registered metrics (preferred)
model = keras.models.load_model('/teamspace/studios/this_studio/MODELS/Trial_4.69 Phase2.keras')

# Option 2: Passing custom objects explicitly
custom_objects = {
    'f1_m': f1_m,
    'precision_m': precision_m,
    'recall_m': recall_m
}
model = keras.models.load_model('/teamspace/studios/this_studio/XL_best_models/BEST_CELEB_DF_Phase2.keras', 
                              custom_objects=custom_objects)
model.summary()

In [None]:
# Training Phase 1
for layer in model.layers[1].layer.layers[:]:
    layer.trainable = False

# # Re-compile the model with a lower learning rate
model.compile(optimizer=optimizers.Adam(learning_rate=1e-4),
              loss=losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy', f1_m,precision_m,recall_m])
model_save_dir='/teamspace/studios/this_studio/XL_best_models'
# Callbacks for Phase 1
checkpoint_phase1 = callbacks.ModelCheckpoint(
    filepath=os.path.join(model_save_dir, 'COMBINED_best_Phase1.keras'),
    monitor='val_f1_m',
    save_best_only=True,
    save_weights_only=False,
    verbose=1,
    mode='max'
)

early_stopping_phase1 = callbacks.EarlyStopping(
    monitor='val_f1_m',
    patience=5,
    verbose=1,
    mode='max'
)


BATCH_SIZE = 64
num_epochs_phase2 =50

# Update train_dataset and val_dataset with the new batch size
train_dataset.batch_size = BATCH_SIZE
val_dataset.batch_size = BATCH_SIZE


history_phase1 = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=num_epochs_phase2,
    callbacks=[checkpoint_phase1, early_stopping_phase1],
    verbose=1,
)

In [None]:
# Training Phase 2
for layer in model.layers[1].layer.layers[:]:
    layer.trainable = True

# # Re-compile the model with a lower learning rate
model.compile(optimizer=optimizers.Adam(learning_rate=1e-5),
              loss=losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy', f1_m,precision_m,recall_m])
model_save_dir='/teamspace/studios/this_studio/XL_best_models'
# Callbacks for Phase 2
checkpoint_phase2 = callbacks.ModelCheckpoint(
    filepath=os.path.join(model_save_dir, 'COMBINED_best_Phase2.keras'),
    monitor='val_f1_m',
    save_best_only=True,
    save_weights_only=False,
    verbose=1,
    mode='max'
)

early_stopping_phase2 = callbacks.EarlyStopping(
    monitor='val_f1_m',
    patience=5,
    verbose=1,
    mode='max'
)

BATCH_SIZE = 16
num_epochs_phase2 = 50

# Update train_dataset and val_dataset with the new batch size
train_dataset.batch_size = BATCH_SIZE
val_dataset.batch_size = BATCH_SIZE


history_phase2 = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=num_epochs_phase2,
    callbacks=[checkpoint_phase2, early_stopping_phase2],
    verbose=1,
)

# Load the best model from Phase 2
model.load_weights(os.path.join(model_save_dir, 'CELEB_DF_Phase2.keras'))

# Evaluate on test set
# Update test_dataset with the new batch size
test_dataset.batch_size = BATCH_SIZE
test_loss, test_acc = model.evaluate(test_dataset, verbose=1)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}')