In [None]:

# Import the required libraries.
import os
import cv2
import math
import random
import numpy as np
import datetime as dt
import tensorflow as tf
from collections import deque
import matplotlib.pyplot as plt

from moviepy.editor import *
%matplotlib inline

# For Xception 
from tensorflow.keras.applications import Xception

import kerastuner as kt
from tensorflow.keras.optimizers import RMSprop, Adam, SGD, Nadam, Adadelta
from kerastuner.tuners import RandomSearch
from kerastuner.engine.hyperparameters import HyperParameters

from sklearn.model_selection import train_test_split

from tensorflow.keras.layers import *
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import plot_model
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.layers import TimeDistributed, Conv2D, MaxPooling2D, Dropout, LSTM, Flatten, Dense


For GPU availability in Google Co-Laboratory(CoLab)

In [None]:
# Incase of Google Co-Laboratory(colab)

# # Check if GPU is available
# if tf.test.gpu_device_name():
#     print('GPU available:')
#     !nvidia-smi
# else:
#     print('GPU not available.')

# # Configure TensorFlow to use GPU if available
# config = tf.compat.v1.ConfigProto()
# config.gpu_options.allow_growth = True
# session = tf.compat.v1.Session(config=config)

In [None]:
seed_constant = 27
np.random.seed(seed_constant)
random.seed(seed_constant)
tf.random.set_seed(seed_constant)

In [None]:
# Incase of Google Co-Laboratory(Colab)

# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
# Create a Matplotlib figure and specify the size of the figure
plt.figure(figsize=(20, 20))

# Get the names of all classes/categories in Celeb-DF-v2.
dir_path = r'/content/drive/MyDrive/dataset2/Train'
all_classes_names = os.listdir(dir_path)

# Ensure there are classes available
if not all_classes_names:
    print("No classes found in the specified directory.")
else:
    # Determine the sample size based on the number of classes
    sample_size = min(20, len(all_classes_names))

    # Check if sample size is 0
    if sample_size == 0:
        print("Sample size is 0. Cannot sample from an empty population.")
    else:
        # Generate a list of random indices
        random_range = random.sample(range(len(all_classes_names)), sample_size)

        # Iterating through all the generated random values.
        for counter, random_index in enumerate(random_range, 1):
            # Retrieve a Class Name using the Random Index.
            selected_class_Name = all_classes_names[random_index]

            # Retrieve the list of all the video files present in the randomly selected Class Directory.
            video_files_names_list = os.listdir(os.path.join(dir_path, selected_class_Name))

            # Randomly select a video file from the list retrieved from the randomly selected Class Directory.
            selected_video_file_name = random.choice(video_files_names_list)

            # Initialize a VideoCapture object to read from the video file.
            video_reader = cv2.VideoCapture(os.path.join(dir_path, selected_class_Name, selected_video_file_name))

            # Read the first frame of the video file.
            _, bgr_frame = video_reader.read()

            # Release the VideoCapture object.
            video_reader.release()

            # Convert the frame from BGR into RGB format.
            rgb_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB)

            # Write the class name on the video frame.
            cv2.putText(rgb_frame, selected_class_Name, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

            # Display the frame.
            plt.subplot(5, 4, counter)
            plt.imshow(rgb_frame)
            plt.axis('off')

        # Show the Matplotlib figure with the frames.
        plt.show()

In [None]:
# Specify the height and weight to which each video frame will be resized in our dataset.
IMAGE_HEIGHT, IMAGE_WIDTH = 71, 71

# Specify the numbers of frames of a video that will be fed to the model as one sequence.
SEQUENCE_LENGTH = 20

# Specify the directory containing the  dataset.
DATASET_DIR = r'/content/drive/MyDrive/dataset2/Train'


# Specify the list containing the names of the classes used for training.
CLASSES_LIST = ["Fake", "Real"]

In [None]:
def frames_extraction(video_path):
    '''
    This function will extract the required frames from a video after resizing and normalizing them.
    Args:
        video_path: The path of the video in the disk, whose frames are to be extracted.
    Returns:
        frames_list: A list containing the resized and normalized frames of the video.
    '''

    # Declare a list to store video frames.
    frames_list = []

    # Read the Video File using the VideoCapture object.
    video_reader = cv2.VideoCapture(video_path)

    # Get the total number of frames in the video.
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))

    # Calculate the the interval after which frames will be added to the list.
    skip_frames_window = max(int(video_frames_count/SEQUENCE_LENGTH), 1)

    # Iterate through the Video Frames.
    for frame_counter in range(SEQUENCE_LENGTH):

        # Set the current frame position of the video.
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)

        # Reading the frame from the video.
        success, frame = video_reader.read()

        # Check if Video frame is not successfully read then break the loop
        if not success:
            break

        # Resize the Frame to fixed height and width.
        resized_frame = cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_WIDTH))

        # Normalize the resized frame by dividing it with 255 so that each pixel value then lies between 0 and 1
        normalized_frame = resized_frame / 255

        # Append the normalized frame into the frames list
        frames_list.append(normalized_frame)

    # Release the VideoCapture object.
    video_reader.release()

    # Return the frames list.
    return frames_list

In [None]:
def create_dataset():
    '''
    This function will extract the data of the selected classes and create the required dataset.
    Returns:
        features:          A list containing the extracted frames of the videos.
        labels:            A list containing the indexes of the classes associated with the videos.
        video_files_paths: A list containing the paths of the videos in the disk.
    '''

    # Declared Empty Lists to store the features, labels and video file path values.
    features = []
    labels = []
    video_files_paths = []

    # Iterating through all the classes mentioned in the classes list
    for class_index, class_name in enumerate(CLASSES_LIST):

        # Display the name of the class whose data is being extracted.
        print(f'Extracting Data of Class: {class_name}')

        # Get the list of video files present in the specific class name directory.
        class_path = os.path.join(DATASET_DIR, class_name)
        files_list = os.listdir(class_path)

        # Iterate through all the files present in the files list.
        for file_name in files_list:

            # Get the complete video path.
            video_file_path = os.path.join(class_path, file_name)

            # Extract the frames of the video file.
            frames = frames_extraction(video_file_path)

            # Check if the extracted frames are equal to the SEQUENCE_LENGTH specified above.
            # So ignore the vides having frames less than the SEQUENCE_LENGTH.
            if len(frames) == SEQUENCE_LENGTH:

                # Append the data to their repective lists.
                features.append(frames)
                labels.append(class_index)
                video_files_paths.append(video_file_path)

    # Converting the list to numpy arrays
    features = np.asarray(features)
    labels = np.array(labels)

    # Return the frames, class index, and video file path.
    return features, labels, video_files_paths

In [None]:
# Create the dataset.
features, labels, video_files_paths = create_dataset()

In [None]:
# Using Keras's to_categorical method to convert labels into one-hot-encoded vectors
one_hot_encoded_labels = to_categorical(labels)

In [None]:
# Split the Data into Train ( 70% ) and Test Set ( 30% ).
features_train, features_test, labels_train, labels_test = train_test_split(features, one_hot_encoded_labels, test_size = 0.30, shuffle = True, random_state = seed_constant)

In [1]:
def create_Xception_LRCN_model():
    '''
    This function will construct the required Xception + LRCN model for deep fake video detection.
    Returns:
        model: It is the required constructed Xception + LRCN model.
    '''
    # Load the pre-trained Xception model (excluding the top layers).
    base_model = Xception(weights='imagenet', include_top=False, input_shape=(IMAGE_HEIGHT, IMAGE_WIDTH, 3))

    # Freeze the weights of the Xception layers.
    for layer in base_model.layers:
        layer.trainable = False

    # We will use a Sequential model for model construction.
    model = Sequential()

    # Add the Xception base model to the sequential model.
    model.add(TimeDistributed(base_model, input_shape=(SEQUENCE_LENGTH, IMAGE_HEIGHT, IMAGE_WIDTH, 3)))

    # Add LRCN layers on top of the Xception base.
    model.add(TimeDistributed(Flatten()))

    model.add(LSTM(176))

    model.add(Dense(len(CLASSES_LIST), activation='softmax'))

    # Display the model's summary.
    model.summary()

    # Return the constructed Xception + LRCN model.
    return model

# Define your SEQUENCE_LENGTH, IMAGE_HEIGHT, IMAGE_WIDTH, and CLASSES_LIST before calling the function.


In [None]:
# Construct the required LRCN model.
LRCN_model = create_Xception_LRCN_model()

# Display the success message.
print("Model Created Successfully!")

In [2]:
# Plot the structure of the constructed LRCN model.
plot_model(LRCN_model, to_file = 'LRCN_model_structure_plot.png', show_shapes = True, show_layer_names = True)


In [None]:
# Create an Instance of Early Stopping Callback.
early_stopping_callback = EarlyStopping(monitor = 'val_loss', patience = 25, mode ='min', restore_best_weights = True)

# Compile the model and specify loss function, optimizer and metrics to the model.
LRCN_model.compile(loss = 'categorical_crossentropy', optimizer = 'Adam', metrics =["accuracy"])

#start training the model.
LRCN_model_training_history = LRCN_model.fit(x = features_train, y = labels_train, epochs = 5, batch_size = 4, shuffle = True, validation_split = 0.25, callbacks = [early_stopping_callback])



For the Hyper Parameter Turing (Optimizer, Node, Filters, LSTM Layers )

Below 6 Cells are for Hyper Parameter Turing



In [None]:
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import TimeDistributed, Conv2D, MaxPooling2D, Dropout, Flatten, LSTM, Dense
# from kerastuner.tuners import RandomSearch
# from kerastuner.engine.hyperparameters import HyperParameters

# def build_lrcn_model(hp):
#     model = Sequential()

#     model.add(TimeDistributed(Conv2D(hp.Int('conv1_units', min_value=8, max_value=128, step=8), (3, 3), padding='same', activation='relu'),
#                               input_shape=(SEQUENCE_LENGTH, IMAGE_HEIGHT, IMAGE_WIDTH, 3)))
#     model.add(TimeDistributed(MaxPooling2D((4, 4))))
#     model.add(TimeDistributed(Dropout(hp.Float('dropout1', min_value=0.2, max_value=0.5, step=0.05))))

#     model.add(TimeDistributed(Conv2D(hp.Int('conv2_units', min_value=8, max_value=128, step=8), (3, 3), padding='same', activation='relu')))
#     model.add(TimeDistributed(MaxPooling2D((4, 4))))
#     model.add(TimeDistributed(Dropout(hp.Float('dropout2', min_value=0.2, max_value=0.5, step=0.05))))

#     model.add(TimeDistributed(Conv2D(hp.Int('conv3_units', min_value=8, max_value=128, step=8), (3, 3), padding='same', activation='relu')))
#     model.add(TimeDistributed(MaxPooling2D((2, 2))))
#     model.add(TimeDistributed(Dropout(hp.Float('dropout3', min_value=0.2, max_value=0.5, step=0.05))))

#     model.add(TimeDistributed(Conv2D(hp.Int('conv4_units', min_value=8, max_value=128, step=8), (3, 3), padding='same', activation='relu')))
#     model.add(TimeDistributed(MaxPooling2D((2, 2))))

#     model.add(TimeDistributed(Flatten()))

#     model.add(LSTM(hp.Int('lstm_units', min_value=16, max_value=256, step=16)))

#     model.add(Dense(len(CLASSES_LIST), activation='softmax'))

#     model.compile(optimizer=hp.Choice('optimizer', values=['adam', 'rmsprop', 'sgd', 'nadam', 'adadelta']),
#                   loss='categorical_crossentropy',
#                   metrics=['accuracy'])

#     return model

In [None]:
# # Instantiate the Keras Tuner RandomSearch tuner
# tuner = kt.RandomSearch(
#                         build_lrcn_model,
#                         objective = 'val_accuracy',
#                         max_trials=3,
#                         directory="minorProject1",
#                         project_name='perfinal'
#                         )

In [None]:
# # Define the search space and start the hyperparameter tuning
# tuner.search(features_train, labels_train, epochs=5, validation_data=(features_test, labels_test))

In [None]:
# tuner.get_best_hyperparameters()[0].values

In [None]:
# model = tuner.get_best_models(num_models=1)[0]

In [None]:
# model.fit(features_train, labels_train, epochs=20, initial_epoch=5, validation_data=(features_test, labels_test))

In [None]:
# Evaluate the trained model.
model_evaluation_history = LRCN_model.evaluate(features_test, labels_test)

In [None]:
# Get the loss and accuracy from model_evaluation_history.
model_evaluation_loss, model_evaluation_accuracy = model_evaluation_history

# Define the string date format.
# Get the current Date and Time in a DateTime Object.
# Convert the DateTime object to string according to the style mentioned in date_time_format string.
date_time_format = '%Y_%m_%d__%H_%M_%S'
current_date_time_dt = dt.datetime.now()
current_date_time_string = dt.datetime.strftime(current_date_time_dt, date_time_format)

# Define a useful name for our model to make it easy for us while navigating through multiple saved models.
model_file_name = f'LRCN_model___Date_Time_{current_date_time_string}___Loss_{model_evaluation_loss}___Accuracy_{model_evaluation_accuracy}.h5'

# Save the Model.
LRCN_model.save(model_file_name)


In [None]:
# Visualize the training and validation loss metrices.

def plot_metric(history, train_metric, val_metric, metric_name):
    # Plot training & validation metric values
    plt.plot(history[train_metric], label=f'Training {metric_name}')
    plt.plot(history[val_metric], label=f'Validation {metric_name}')
    plt.title(f'Model {metric_name}')
    plt.xlabel('Epoch')
    plt.ylabel(metric_name)
    plt.legend()
    plt.show()

# Assuming LRCN_model_training_history is your History object
history = LRCN_model_training_history.history

# Use the defined function to plot the metrics
plot_metric(history, 'loss', 'val_loss', 'Total Loss vs Total Validation Loss')

In [None]:
# Visualize the training and validation accuracy metrices.

def plot_metric(history, train_metric, val_metric, metric_name):
    # Plot training & validation metric values
    plt.plot(history[train_metric], label=f'Training {metric_name}')
    plt.plot(history[val_metric], label=f'Validation {metric_name}')
    plt.title(f'Model {metric_name}')
    plt.xlabel('Epoch')
    plt.ylabel(metric_name)
    plt.legend()
    plt.show()

# Assuming LRCN_model_training_history is your History object
history = LRCN_model_training_history.history

# Use the defined function to plot the loss
plot_metric(history, 'loss', 'val_loss', 'Total Loss vs Total Validation Loss')

# Use the defined function to plot the accuracy
plot_metric(history, 'accuracy', 'val_accuracy', 'Total Accuracy vs Total Validation Accuracy')

In [None]:

def predict_on_video(video_file_path, output_file_path, SEQUENCE_LENGTH):
    '''
    This function will perform action recognition on a video using the LRCN model.
    Args:
        video_file_path: The path of the video on which the action recognition is to be performed.
        output_file_path: The path where the output video with the predicted action overlayed will be stored.
        SEQUENCE_LENGTH: The fixed number of frames of a video that can be passed to the model as one sequence.
    '''
    # Initialize the VideoCapture object to read from the video file.
    video_reader = cv2.VideoCapture(video_file_path)

    # Create a list to store video frames
    frames_list = []

    # Iterate through the video and store frames in the frames list
    while True:
        # Read the frame
        ok, frame = video_reader.read()

        # Break the loop if the frame is not read properly
        if not ok:
            break

        # Resize the frame to fixed dimensions
        resized_frame = cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_WIDTH))

        # Normalize the resized frame
        normalized_frame = resized_frame / 255.0

        # Append the normalized frame to the frames list
        frames_list.append(normalized_frame)

        # Break the loop if the frames list reaches the desired sequence length
        if len(frames_list) == SEQUENCE_LENGTH:
            break

    # Release the videocapture object
    video_reader.release()

    # Convert the frames list to a numpy array
    frames_array = np.array(frames_list)

    # Reshape the frames array to match the input shape expected by the model
    frames_array = frames_array.reshape(1, SEQUENCE_LENGTH, IMAGE_HEIGHT, IMAGE_WIDTH, 3)

    # Perform prediction using the LRCN model
    predicted_labels_probabilities = LRCN_model.predict(frames_array)[0]

    # Get the index of the class with the highest probability
    predicted_label = np.argmax(predicted_labels_probabilities)

    # Get the class name using the retrieved index
    predicted_class_name = CLASSES_LIST[predicted_label]

    # Print the predicted class and confidence
    print(f'Predicted Action: {predicted_class_name}\nConfidence: {predicted_labels_probabilities[predicted_label]}')

    # Overlay the predicted action on the original video
    overlay_on_video(video_file_path, output_file_path, predicted_class_name)

def overlay_on_video(input_video_path, output_video_path, text):
    '''
    Overlay the predicted action text on the original video.
    Args:
        input_video_path: Path of the original video.
        output_video_path: Path where the output video with overlay will be stored.
        text: Text to overlay on the video.
    '''
    video_reader = cv2.VideoCapture(input_video_path)
    frame_width = int(video_reader.get(3))
    frame_height = int(video_reader.get(4))

    # Define the codec and create a video writer object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_video_path, fourcc, 20.0, (frame_width, frame_height))

    while True:
        ret, frame = video_reader.read()
        if not ret:
            break

        # Add text overlay on the frame
        cv2.putText(frame, f"Action: {text}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

        # Write the frame to the output video
        video_writer.write(frame)

    # Release video reader and writer
    video_reader.release()
    video_writer.release()

# Prompt user to enter the test video path

# original_path =""
# converted_path = original_path.replace("\\", "/")

test_video_path = r'/content/drive/MyDrive/dataset2/Train/Fake/70.mp4'



# Specify the output path for the overlayed video
output_video_path = '/content/drive/MyDrive/Output'

# Set your SEQUENCE_LENGTH
SEQUENCE_LENGTH = 20

# Call the function
predict_on_video(test_video_path, output_video_path, SEQUENCE_LENGTH)