### Cell 1: Check Data Directories
This step remains the same.

In [11]:
!ls /kaggle/input/1000-videos-split/1000_videos/train/fake | head -n 5
!ls /kaggle/input/1000-videos-split/1000_videos/train/real | head -n 5

128_896_10.png
128_896_11.png
128_896_12.png
128_896_13.png
128_896_14.png
ls: write error: Broken pipe
129_10.png
129_2.png
129_3.png
129_4.png
129_5.png
ls: write error: Broken pipe


### Hardware Detection

This cell runs a check to identify the available hardware and selects the appropriate TensorFlow distribution strategy.

In [12]:
import tensorflow as tf
import os

def get_distribution_strategy():
    """
    Detects available hardware (TPU, multi-GPU, single-GPU, CPU) and returns
    the appropriate TensorFlow distribution strategy.
    """
    try:
        # Attempt to detect and initialize a TPU
        tpu = tf.distribute.cluster_resolver.TPUClusterResolver.connect()
        strategy = tf.distribute.TPUStrategy(tpu)
        print("✅ Running on TPU")
    except (ValueError, tf.errors.NotFoundError):
        # If no TPU is found, check for GPUs
        gpus = tf.config.list_physical_devices('GPU')
        if len(gpus) > 1:
            # If multiple GPUs are available, use MirroredStrategy
            strategy = tf.distribute.MirroredStrategy()
            print(f"✅ Running on {len(gpus)} GPUs")
        elif len(gpus) == 1:
            # If a single GPU is available, use the default strategy
            strategy = tf.distribute.get_strategy()
            print("✅ Running on a single GPU")
        else:
            # If no GPUs are found, run on CPU
            strategy = tf.distribute.get_strategy()
            print("✅ Running on CPU")
            
    print(f"Number of accelerator replicas: {strategy.num_replicas_in_sync}")
    return strategy

# Run the detection function to see what hardware is available
strategy = get_distribution_strategy()

✅ Running on a single GPU
Number of accelerator replicas: 1


In [13]:
%%writefile model.py
import tensorflow as tf
# tf.config.run_functions_eagerly(True)

def backbone():
    '''
    RETURNS THE BACKBONE FEATURE ENCODER NETWORK
    XCEPTION USED IN THIS CASE
    '''
    mod  = tf.keras.applications.Xception(weights='imagenet')
    mod = tf.keras.Model(mod.input, mod.layers[-13].output)
    return mod
    
class ModifiedBranch(tf.keras.layers.Layer):
    def __init__(self, a_vec_size, **kwargs):
        # --- FIX 1: Accept **kwargs and pass them to the parent class ---
        super(ModifiedBranch, self).__init__(**kwargs)
        self.a_vec_size = a_vec_size
        self.dense_layer = tf.keras.layers.Dense(self.a_vec_size, activation='tanh')
    # --- ADD THIS METHOD TO OVER COME WARRNING---
    def build(self, input_shape):
        super(ModifiedBranch, self).build(input_shape)

    def call(self, input):
        af = tf.keras.backend.mean(input, axis=2) 
        hs = self.dense_layer(af)
        return hs

    # --- FIX 2: Add get_config to save your custom arguments ---
    def get_config(self):
        config = super(ModifiedBranch, self).get_config()
        config.update({
            "a_vec_size": self.a_vec_size,
        })
        return config


class MainBranch(tf.keras.layers.Layer):
    def __init__(self, a_vec_size, dim, **kwargs):
        # --- FIX 1: Accept **kwargs and pass them to the parent class ---
        super(MainBranch, self).__init__(**kwargs)
        self.a_vec_size = a_vec_size
        self.dim = dim
        self.dropout_layer = tf.keras.layers.Dropout(0.5)

    def call(self, input):
        e = tf.transpose(input, perm=[0, 2, 1])
        e = tf.keras.layers.Reshape((-1, self.a_vec_size))(e)
        e = tf.keras.activations.relu(e)
        e = self.dropout_layer(e)
        e = tf.keras.layers.Reshape((self.dim**2, self.a_vec_size))(e)
        e = tf.transpose(e, perm=[0, 2, 1])
        return e

    # --- FIX 2: Add get_config to save your custom arguments ---
    def get_config(self):
        config = super(MainBranch, self).get_config()
        config.update({
            "a_vec_size": self.a_vec_size,
            "dim": self.dim,
        })
        return config


class Attention(tf.keras.layers.Layer):
    def __init__(self, dim, a_vec_size, **kwargs):
        # --- FIX 1: Accept **kwargs and pass them to the parent class ---
        super(Attention, self).__init__(**kwargs)
        self.dim = dim
        self.a_vec_size = a_vec_size
        self.dense_eh = tf.keras.layers.Dense(self.dim**2)
        self.dense_final = tf.keras.layers.Dense(1, use_bias=False)
        self.add_layer = tf.keras.layers.Add()
        self.dropout_layer = tf.keras.layers.Dropout(0.5)

    def call(self, input):
        eh = self.dense_eh(input[0])
        eh = tf.keras.layers.Reshape((1, self.dim**2))(eh)
        eh = self.add_layer([input[1], eh])
        eh = tf.keras.activations.relu(eh)
        eh = self.dropout_layer(eh)
        eh = tf.transpose(eh, perm=[0, 2, 1])
        eh = tf.keras.layers.Reshape((-1, self.a_vec_size))(eh)
        eh = self.dense_final(eh)
        eh = tf.keras.layers.Reshape((-1, self.dim**2))(eh)
        eh = tf.keras.activations.relu(eh)
        return eh
        
    # --- FIX 2: Add get_config to save your custom arguments ---
    def get_config(self):
        config = super(Attention, self).get_config()
        config.update({
            "dim": self.dim,
            "a_vec_size": self.a_vec_size,
        })
        return config


def model(a_vec_size, dim):
    '''
    THIS FUNCTION CALLS THE ENTIRE MODEL
    INPUT : a_vec_size, dim
    a_vec_size = number of hidden nodes used in attention mechanism
    dim = output feature map dimension of backbone network
    OUTPUT : mod
    mod = built model
    '''
    back = backbone() #CALLING THE BACKBONE NETWORK
    backbone_feature = back.output  
    out = tf.keras.layers.Conv2D(filters = a_vec_size, kernel_size = (1,1), strides=(1,1), padding = 'valid', use_bias=True)(backbone_feature) #APPLYING 1X1 CONVOLUTION
    out = tf.keras.layers.BatchNormalization(axis=-1)(out)
    out = tf.keras.activations.relu(out)
    out = tf.keras.layers.Dropout(0.8)(out)
    out = tf.keras.layers.Reshape((a_vec_size, dim**2))(out) #RESHAPED TO DIMENSION (1024, 381)
    #THIS OUTPUT IS PASSED THROUGH TWO BRANCHES
    modified = ModifiedBranch(a_vec_size)(out) #FIRST BRANCH WHICH TRANSFORMS THE OUTPUT FEATURE MAPS GENERATED BY BACKBONE
    main = MainBranch(a_vec_size, dim)(out) #SECOND BRANCH 
    att = Attention(dim, a_vec_size)([modified, main]) #USING ATTENTION BETWEEN THE TWO BRANCHES
    fin = tf.keras.layers.Dense(2, activation='softmax')(att) #CLASSIFICATION LAYER
    fin = tf.keras.layers.Flatten()(fin)
    mod = tf.keras.Model(inputs=back.input, outputs=fin) #MODEL BUILT
    return mod

Overwriting model.py


In [14]:
%%writefile utils.py
import os
import cv2
import glob
import numpy as np

def get_input(path):
    im = cv2.imread(path)
    return(im)

def get_files(path, ext):
    files = []
    label_files= []
    for x in os.walk(path):
        for y in glob.glob(os.path.join(x[0], '*.{}'.format(ext))):
            files.append(y)
    label_files = ['fake', 'real']
    return files, label_files


# def get_output(path, label_file):
#     img_id = path.split('/')[-1].split('_')[0]
#     laba = []
#     for label in label_file:
#       if label == img_id:
#         laba.append(1)
#       else:
#         laba.append(0)
#     return laba
# Corrected logic for utils.py
def get_output(path, label_file):
    # The true label is the name of the parent folder (e.g., 'fake' or 'real')
    true_label = path.split('/')[-2]

    # Create a one-hot encoded vector
    # e.g., if true_label is 'fake', it returns [1, 0]
    # e.g., if true_label is 'real', it returns [0, 1]
    if true_label == 'fake':
        return [1, 0]
    else: # Assumes the only other option is 'real'
        return [0, 1]




def image_generator(files, label_files, batch_size, resize=None):
    while True:
          batch_paths  = np.random.choice(a  = files, 
                                          size = batch_size)
          batch_x = []
          batch_y = [] 
          
          for input_path in batch_paths:
              input = get_input(input_path)
              output = get_output(input_path, label_files)
              if resize is not None:
                input = cv2.resize(input, resize)
              batch_x.append(input)
              batch_y.append(output)

          batch_x = np.array(batch_x)
          batch_y = np.array(batch_y)
          yield batch_x, batch_y

Overwriting utils.py


In [15]:
%%writefile train.py
import os
import utils
import model
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping # Add EarlyStopping here
# tf.config.run_functions_eagerly(True)

class train():
    '''
    Class used to train the model used in deep fake image detection
    INPUT : train_path, val_path, epochs, batch_size, steps
    train_path = absolute path of the training image set
    val_path = absolute path of the calidation image set
    epochs = Number of epochs to be used for training the model
    batch_size = Batch size to be used per step
    steps = Number of steps to be used per epoch
    '''
    def __init__(self, train_path, val_path):
        self.train_path = train_path
        self.val_path = val_path
        here = os.path.dirname(os.path.abspath(__file__))
        self.path = os.path.join(here, "models")
    
    def get_files(self):
        self.train_files, self.label_files = utils.get_files(self.train_path, 'png')
        self.val_files, self.label_files = utils.get_files(self.val_path, 'png')

    # @tf.function
    def train(self, model, path, epochs, batch_size, steps, dim):
        """
        Compiles and trains the deep learning model.
        """
        model.compile('Adam', loss=tf.keras.losses.CategoricalCrossentropy(), metrics=['accuracy'])
        
        checkpoint_filepath = os.path.join(path, "best_model.keras")
        
        # This callback saves the best model found so far
        model_checkpoint_callback = ModelCheckpoint(
            filepath=checkpoint_filepath,
            save_best_only=True, 
            save_weights_only=False, 
            monitor='val_accuracy', 
            mode='max'
        )

        # --- STEP 2: Create the EarlyStopping callback ---
        # This callback will stop training if 'val_loss' does not improve for 5 epochs
        early_stopping_callback = EarlyStopping(
            monitor='val_loss',
            patience=5,
            restore_best_weights=True
        )

        # Start the training process
        model.fit(
            utils.image_generator(self.train_files, self.label_files, batch_size, dim), 
            epochs=epochs, 
            steps_per_epoch=steps,
            validation_data=utils.image_generator(self.val_files, self.label_files, batch_size, dim),
            validation_steps=150,
            # Add the new callback to the list
            callbacks=[model_checkpoint_callback, early_stopping_callback]
        )

    
    def run(self, epochs, batch_size, steps, dim=(299, 299)):
        '''
        DRIVER FUNCTION
        '''
        self.get_files()
        print("************TRAINING SOFT ATTENTION BASED DEEP FAKE DETECTION MODEL************")
        mod = model.model(1024, 19)
        self.train(mod, self.path, epochs, batch_size, steps, dim)

Overwriting train.py


In [16]:
%%writefile main.py
import argparse
from train import train

def main():
    parser = argparse.ArgumentParser(description='Visual Attention based Deepfake Video Forgery Detection')

    parser.add_argument('--train', type=str, nargs = '+', help = 'What is the path of the training image data?')
    parser.add_argument('--val', type=str, nargs = '+', help = 'What is the path of the Validation image data?')
    parser.add_argument('--epochs', type=int, default=50, help = 'What is the training epoch for model?')
    parser.add_argument('--batch', type=int, default=32, help = 'What is the training batch size?')
    parser.add_argument('--steps', type=int, default=40, help = 'What is the training steps per epoch?')

    args = parser.parse_args()
    args.train = ' '.join(args.train)
    args.val = ' '.join(args.val)

    print("Configuration")
    print("----------------------------------------------------------------------")
    print("Training Path : {}".format(args.train))
    print("Validation Path : {}".format(args.val))
    print("Epochs while training the model : {}".format(args.epochs))
    print("Batch Size : {}".format(args.batch))
    print("Steps per epochs : {}".format(args.steps))
    print("----------------------------------------------------------------------")

    train(args.train, args.val).run(args.epochs, args.batch, args.steps)

if __name__=='__main__':
    main()

Overwriting main.py


In [None]:
# Example Usage: total file no /batch = steps

!python main.py \
    --train "/kaggle/input/1000-videos-split/1000_videos/train" \
    --val "/kaggle/input/1000-videos-split/1000_videos/validation" \
    --epochs 50 \
    --batch 16 \
    --steps 727

2025-09-21 18:32:54.126950: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1758479574.147634   29230 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1758479574.153695   29230 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
Configuration
----------------------------------------------------------------------
Training Path : /kaggle/input/1000-videos-split/1000_videos/train
Validation Path : /kaggle/input/1000-videos-split/1000_videos/validation
Epochs while training the model : 50
Batch Size : 16
Steps per epochs : 727
----------------------------------------------------------------------
************TRAINING SOFT ATTENTION BASED DEEP FAKE DETECTION MODE

In [None]:
!zip -r my_model.zip models/

In [None]:
%%writefile predict.py
import tensorflow as tf
import numpy as np
import cv2
import argparse
import os

# --- Import your custom layers and preprocessing function ---
from model import ModifiedBranch, MainBranch, Attention
from tensorflow.keras.applications.xception import preprocess_input

# Define the labels list globally
LABELS = ['fake', 'real']

def load_and_prep_image(image_path, target_size=(299, 299)):
    """
    Loads, resizes, and preprocesses a single image.
    Returns None if the image cannot be read.
    """
    try:
        img = cv2.imread(image_path)
        if img is None:
            print(f"Warning: Could not read image {image_path}. Skipping.")
            return None
            
        img = cv2.resize(img, target_size)
        img_preprocessed = preprocess_input(img)
        return np.expand_dims(img_preprocessed, axis=0)
    except Exception as e:
        print(f"Error processing image {image_path}: {e}")
        return None

def predict_single_image(model, image_path):
    """
    Loads a single image, predicts it, and prints the result.
    """
    image_batch = load_and_prep_image(image_path)
    if image_batch is None: return

    print("Predicting...")
    prediction = model.predict(image_batch)
    
    predicted_index = np.argmax(prediction[0])
    predicted_label = LABELS[predicted_index]
    confidence = prediction[0][predicted_index] * 100

    print("\n--- Prediction Result ---")
    print(f"       File: {os.path.basename(image_path)}")
    print(f"Prediction is: {predicted_label.upper()}")
    print(f"  Confidence: {confidence:.2f}%")
    print("-------------------------")

def evaluate_folder(model, folder_path):
    """
    Recursively evaluates all images in a given folder.
    """
    print(f"Scanning folder: {folder_path}\nThis may take a while...")
    total_files = 0
    correct_predictions = 0
    
    for root, dirs, files in os.walk(folder_path):
        for filename in files:
            if not filename.lower().endswith(('.png', '.jpg', '.jpeg')): continue
            true_label = os.path.basename(root).lower()
            if true_label not in LABELS: continue

            image_path = os.path.join(root, filename)
            image_batch = load_and_prep_image(image_path)
            if image_batch is None: continue
            
            total_files += 1
            prediction = model.predict(image_batch, verbose=0)
            predicted_index = np.argmax(prediction[0])
            predicted_label = LABELS[predicted_index]

            # --- MODIFICATION IS HERE ---
            confidence = prediction[0][predicted_index] * 100

            if predicted_label == true_label:
                correct_predictions += 1
                result = "CORRECT"
            else:
                result = "WRONG"
            
            # --- AND HERE ---
            print(f"  > File: {filename} | True: {true_label} | Predicted: {predicted_label} ({confidence:.2f}%)  [{result}]")

    if total_files > 0:
        accuracy = (correct_predictions / total_files) * 100
        print("\n--- Evaluation Summary ---")
        print(f"Total Images: {total_files}")
        print(f"Correct Predictions: {correct_predictions}")
        print(f"OVERALL ACCURACY: {accuracy:.2f}%")
        print("--------------------------")
    else:
        print("\nNo valid image files found in 'fake' or 'real' subdirectories.")

def main():
    parser = argparse.ArgumentParser(description='Predict if an image is real or fake.')
    parser.add_argument('--input_path', type=str, required=True, help='Path to an image file OR a folder.')
    parser.add_argument('--model_path', type=str, default='models/best_model.keras', help='Path to the saved model file.')
    args = parser.parse_args()

    if not os.path.exists(args.model_path):
        print(f"Error: Model file not found at {args.model_path}")
        return

    custom_objects = {"ModifiedBranch": ModifiedBranch, "MainBranch": MainBranch, "Attention": Attention}
    print("Loading model...")
    model = tf.keras.models.load_model(args.model_path, custom_objects=custom_objects)
    print("Model loaded.")

    if os.path.isfile(args.input_path):
        predict_single_image(model, args.input_path)
    elif os.path.isdir(args.input_path):
        evaluate_folder(model, args.input_path)
    else:
        print(f"Error: Input path is not a valid file or directory: {args.input_path}")

if __name__ == "__main__":
    main()

In [None]:
# Example of how to run prediction on a single image from your validation set.
# You will need to find a valid path to an image.

# !python predict.py --image "/kaggle/input/1000-videos-split/1000_videos/validation/fake/000_003_0.png"
!python predict.py --input_path "/kaggle/input/1000-videos-split/1000_videos/test"