In [None]:
# from tensorflow_docs.vis import embed
from tensorflow import keras

import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import imageio
import cv2
import os

In [None]:
DATA_FOLDER = '/content/drive/MyDrive/Colab Notebooks/Deepfake Detection'
TRAIN_SAMPLE_FOLDER = '/content/drive/MyDrive/Colab Notebooks/Deepfake Detection/Train_Sample_Videos'
TEST_FOLDER = '/content/drive/MyDrive/Colab Notebooks/Deepfake Detection/Test_Videos'

print(f"Train samples: {len(os.listdir(os.path.join(DATA_FOLDER, TRAIN_SAMPLE_FOLDER)))}")
print(f"Test samples: {len(os.listdir(os.path.join(DATA_FOLDER, TEST_FOLDER)))}")

In [None]:
train_sample_metadata = pd.read_json('/content/drive/MyDrive/Colab Notebooks/Deepfake Detection/Train_Sample_Videos/metadata.json').T
train_sample_metadata.head()

In [None]:
train_sample_metadata.groupby('label')['label'].count().plot(figsize=(15, 5), kind='bar', title='Distribution of Labels in the Training Set')
plt.show()

In [None]:
train_sample_metadata.shape

In [None]:
fake_train_sample_video = list(train_sample_metadata.loc[train_sample_metadata.label=='FAKE'].sample(10).index)
fake_train_sample_video

In [None]:
import cv2
import matplotlib.pyplot as plt

def show_first_frame(video_file_path):
    """
    Fetches and displays the first frame of a given video.

    Parameters:
        video_file_path (str): The full path to the video file.

    Raises:
        FileNotFoundError: Raised if the video file cannot be found or opened.
        RuntimeError: Raised if the video file contains no readable frames.
    """
    # Set up the video capture object to read from the video file
    video_capture = cv2.VideoCapture(video_file_path)

    # Verify that the video file was opened successfully
    if not video_capture.isOpened():
        video_capture.release()  # Ensure resources are released
        raise FileNotFoundError(f"Failed to access the video at {video_file_path}")

    # Attempt to capture the first frame
    successful, frame = video_capture.read()

    # Ensure that a frame was successfully captured
    if not successful:
        video_capture.release()  # Ensure resources are released before raising an error
        raise RuntimeError("No frames could be read from the video file")

    # Adjust the frame's color format for displaying
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Display the frame using matplotlib
    plt.figure(figsize=(10, 10))
    plt.imshow(frame_rgb)
    plt.axis('off')  # Hide axes for better visualization
    plt.show()

    # Close the video capture object to free resources
    video_capture.release()

In [None]:
for video_file in fake_train_sample_video:
    show_first_frame(os.path.join(DATA_FOLDER, TRAIN_SAMPLE_FOLDER, video_file))

In [None]:
real_train_sample_video = list(train_sample_metadata.loc[train_sample_metadata.label=='REAL'].sample(10).index)
real_train_sample_video

In [None]:
for video_file in real_train_sample_video:
    show_first_frame(os.path.join(DATA_FOLDER, TRAIN_SAMPLE_FOLDER, video_file))

In [None]:
train_sample_metadata['original'].value_counts()[0:10]

In [None]:
import os
import cv2
import matplotlib.pyplot as plt

def show_frames_from_videos(video_files, base_folder=TRAIN_SAMPLE_FOLDER):
    """
    Displays the first frame from each video in the given list of video files.

    Parameters:
        video_files (list): A list of video file names.
        base_folder (str): The directory containing the video files, default is TRAIN_SAMPLE_FOLDER.

    Description:
        This function plots the first frame of the first six videos from the provided list. Each frame is displayed
        in a grid format using matplotlib.
    """
    # Create a figure with subplots arranged in 2 rows and 3 columns
    fig, axes = plt.subplots(2, 3, figsize=(16, 8))

    # Loop through the first six videos in the list
    for index, video_name in enumerate(video_files[:6]):
        video_full_path = os.path.join(DATA_FOLDER, base_folder, video_name)
        video_capture = cv2.VideoCapture(video_full_path)

        success, frame = video_capture.read()
        if not success:
            print(f"Failed to read from {video_name}")
            axes[index // 3, index % 3].set_title("Failed to load video")
            axes[index // 3, index % 3].axis('off')
            continue

        # Convert the color from BGR to RGB
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Display the image in the respective subplot
        axes[index // 3, index % 3].imshow(frame_rgb)
        axes[index // 3, index % 3].set_title(video_name)
        axes[index // 3, index % 3].axis('on')  # Keep the axis on for clarity

        # Release the video capture object
        video_capture.release()

    plt.tight_layout()
    plt.show()

In [None]:
same_original_fake_train_sample_video = list(train_sample_metadata.loc[train_sample_metadata.original=='meawmsgiti.mp4'].index)
show_frames_from_videos(same_original_fake_train_sample_video)

In [None]:
test_videos = pd.DataFrame(list(os.listdir(os.path.join(DATA_FOLDER, TEST_FOLDER))), columns=['video'])

In [None]:
test_videos.head(10)

In [None]:
show_first_frame(os.path.join(DATA_FOLDER, TEST_FOLDER, test_videos.iloc[3].video))

In [None]:
fake_videos = list(train_sample_metadata.loc[train_sample_metadata.label=='FAKE'].index)

In [None]:
from IPython.display import HTML
from base64 import b64encode
import os

def embed_video_in_notebook(video_filename, directory=TRAIN_SAMPLE_FOLDER):
    """
    Embeds a video file into an IPython notebook for playback.

    Args:
        video_filename (str): The filename of the video to embed.
        directory (str): The directory where the video file is located. Default is TRAIN_SAMPLE_FOLDER.

    Returns:
        HTML: An HTML object that embeds a video player within the notebook.

    Raises:
        FileNotFoundError: If the video file cannot be found in the specified directory.
    """
    try:
        # Construct the full path to the video file
        video_path = os.path.join(DATA_FOLDER, directory, video_filename)
        # Read the video file as binary data
        with open(video_path, 'rb') as video_file:
            video_data = video_file.read()

        # Encode the video data in base64 and create the data URL
        video_base64 = b64encode(video_data).decode('utf-8')
        data_url = f"data:video/mp4;base64,{video_base64}"

        # Return an HTML object that contains the video element
        return HTML(f'<video width="500" controls><source src="{data_url}" type="video/mp4"></video>')

    except FileNotFoundError:
        raise FileNotFoundError(f"The video file {video_filename} could not be found in {directory}.")

# Example usage:
# Assuming 'fake_videos[10]' contains the filename of the video to play
video_to_play = fake_videos[14]
embed_video_in_notebook(video_to_play)

In [None]:
IMG_SIZE = 224
BATCH_SIZE = 64
EPOCHS = 20

MAX_SEQ_LENGTH = 20
NUM_FEATURES = 2048

In [None]:
import cv2
import numpy as np

def square_crop_frame(image):
    """
    Crops the given image to a square format by reducing the largest dimension to match the smallest one.

    Parameters:
        image (numpy.ndarray): The frame to be cropped, assumed to be in height x width x channels format.

    Returns:
        numpy.ndarray: A square-cropped version of the input image.
    """
    height, width = image.shape[:2]
    min_dimension = min(height, width)
    start_x = (width - min_dimension) // 2
    start_y = (height - min_dimension) // 2
    return image[start_y:start_y + min_dimension, start_x:start_x + min_dimension]

def process_video_frames(video_path, max_frames=0, resize_dims=(IMG_SIZE, IMG_SIZE)):
    """
    Extracts and processes frames from a video file, resizing them and converting color format.

    Parameters:
     video_path (str): Full path to the video file.
        max_frames (int): Maximum number of frames to extract. Extracts all if set to zero.
        resize_dims (tuple): Dimensions to resize frames to, in (width, height) format.

    Returns:
        numpy.ndarray: An array of processed video frames.
    """
    capture = cv2.VideoCapture(video_path)
    processed_frames = []
    try:
        while True:
            read_success, frame = capture.read()
            if not read_success:
                break
            frame = square_crop_frame(frame)
            frame = cv2.resize(frame, resize_dims)
            # Convert BGR to RGB for standard color format
            frame = frame[..., ::-1]
            processed_frames.append(frame)

            if max_frames > 0 and len(processed_frames) >= max_frames:
                break
    finally:
        capture.release()
        return np.array(processed_frames)

In [None]:
import tensorflow as tf
from tensorflow import keras

def build_feature_extractor(model_name='ResNet50'):
    """
    Builds a feature extraction model using a specified base model from Keras Applications.

    Args:
        model_name (str): Name of the model to use ('ResNet50', 'VGG16', 'MobileNetV2', etc.).

    Returns:
        keras.Model: A Keras model that takes an image as input and outputs the extracted features.
    """
    # Dynamically retrieving the model and preprocessing function based on the model_name
    base_model_class = getattr(keras.applications, model_name)
    base_model = base_model_class(
        weights="imagenet",
        include_top=False,
        pooling="avg",
        input_shape=(IMG_SIZE, IMG_SIZE, 3)
    )
    preprocess_input = getattr(keras.applications, model_name.lower()).preprocess_input

    # Define the input layer
    inputs = keras.Input(shape=(IMG_SIZE, IMG_SIZE, 3))
    # Preprocess input
    x = preprocess_input(inputs)
    # Get features
    outputs = base_model(x)

    # Create the model
    model = keras.Model(inputs=inputs, outputs=outputs, name=f"{model_name}_feature_extractor")

    return model

# Example usage:
feature_extractor = build_feature_extractor('ResNet50')  # I also tried with 'ResNet50', 'VGG16', etc.

In [None]:
import numpy as np
import os

def extract_video_features(dataframe, directory):
    """
    Processes all videos specified in a dataframe to extract features for model input, including masks.

    Args:
        dataframe (pd.DataFrame): DataFrame containing indices as video file paths and a 'label' column.
        directory (str): Base directory where video files are stored.

    Returns:
        tuple: A tuple (features_and_masks, labels) where features_and_masks contains the video
               features and masks indicating valid data points, and labels are the binary labels.
    """
    total_videos = len(dataframe)
    video_file_paths = dataframe.index.tolist()
    binary_labels = np.array(dataframe["label"].values == 'FAKE', dtype=int)

    # Initialize arrays to hold data for all videos
    video_masks = np.zeros((total_videos, MAX_SEQ_LENGTH), dtype=bool)
    video_features = np.zeros((total_videos, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

    # Process each video individually
    for video_idx, video_file in enumerate(video_file_paths):
        full_video_path = os.path.join(directory, video_file)
        video_data = process_video_frames(full_video_path)
        video_data = np.expand_dims(video_data, axis=0)  # Add a batch dimension

        # Temporary storage for this video's data
        current_video_mask = np.zeros((1, MAX_SEQ_LENGTH), dtype=bool)
        current_video_features = np.zeros((1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

        # Frame-by-frame feature extraction
        frames_to_process = min(MAX_SEQ_LENGTH, video_data.shape[1])
        for frame_idx in range(frames_to_process):
            frame = video_data[:, frame_idx, :]
            extracted_features = feature_extractor.predict(frame[None, :])
            current_video_features[0, frame_idx, :] = extracted_features

        current_video_mask[0, :frames_to_process] = True  # Mark frames as valid

        # Store the extracted data in the corresponding arrays
        video_features[video_idx] = current_video_features.squeeze()
        video_masks[video_idx] = current_video_mask.squeeze()

    return (video_features, video_masks), binary_labels

In [None]:
from sklearn.model_selection import train_test_split

Train_set, Test_set = train_test_split(train_sample_metadata,test_size=0.1,random_state=42,stratify=train_sample_metadata['label'])

print(Train_set.shape, Test_set.shape )

In [None]:
train_data, train_labels = extract_video_features(Train_set, "train")
test_data, test_labels = extract_video_features(Test_set, "test")

print(f"Frame features in train set: {train_data[0].shape}")
print(f"Frame masks in train set: {train_data[1].shape}")

In [None]:
from tensorflow.keras import layers, models, regularizers

frame_features_input = layers.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
mask_input = layers.Input((MAX_SEQ_LENGTH,), dtype="bool")

x = layers.Bidirectional(layers.GRU(16, return_sequences=True, kernel_regularizer=regularizers.l2(0.01)))(
    frame_features_input, mask=mask_input
)
x = layers.Bidirectional(layers.GRU(8, kernel_regularizer=regularizers.l2(0.01)))(x)
x = layers.Dropout(0.5)(x)  # Increased dropout
x = layers.Dense(8, activation="relu")(x)
output = layers.Dense(1, activation="sigmoid")(x)

model = models.Model([frame_features_input, mask_input], output)

model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
model.summary()

In [None]:
import os
from tensorflow.keras import callbacks, models

# Define the directory for storing model checkpoints and the final model
checkpoint_dir = './model_checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)  # Ensure the directory exists

# Setup the model checkpoint callback to save only the best model during training
checkpoint_filepath = os.path.join(checkpoint_dir, 'model-{epoch:02d}-{val_loss:.2f}.h5')
checkpoint_callback = callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    monitor='val_loss',
    verbose=1,
    save_best_only=True,
    save_weights_only=True,
    mode='min'
)

# EarlyStopping callback to stop training early if no improvement
early_stopping_callback = callbacks.EarlyStopping(
    monitor='val_loss',
    patience=10,
     verbose=1,
    mode='min',
    restore_best_weights=True
)

# Model training
history = model.fit(
    [train_data[0], train_data[1]],
    train_labels,
    validation_data=([test_data[0], test_data[1]], test_labels),
    epochs=10,
    batch_size=8,
    callbacks=[checkpoint_callback, early_stopping_callback],
    verbose=1
)

# Save the final model after training
final_model_path = os.path.join(checkpoint_dir, 'final_model6.h5')
model.save(final_model_path)
print(f"Model saved to {final_model_path}")

# Optionally, print the history of training
print("Training history:", history.history)

In [None]:
# Evaluate the model on the test set
test_loss, test_accuracy = model.evaluate(
    [test_data[0], test_data[1]],  # Test features and masks
    test_labels,                  # Test labels
    batch_size=8                  # Using the same batch size as during training
)

print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")

In [None]:
def prepare_single_video(frames):
    frames = frames[None, ...]  # Add batch dimension if not present
    frame_mask = np.zeros((1, MAX_SEQ_LENGTH), dtype=bool)
    frame_features = np.zeros((1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype='float32')

    video_length = frames.shape[1]
    length = min(MAX_SEQ_LENGTH, video_length)

    for j in range(length):
        frame = np.expand_dims(frames[0, j], axis=0)  # Add batch dimension
        features = feature_extractor.predict(frame)
        frame_features[0, j, :] = features

    frame_mask[0, :length] = True  # Mark the frames that have data
    return frame_features, frame_mask


def sequence_prediction(video_path):
    frames = process_video_frames(video_path)
    frame_features, frame_mask = prepare_single_video(frames)
    prediction = model.predict([frame_features, frame_mask])[0]
    return prediction


import imageio
import IPython.display as display

def to_gif(images):
    imageio.mimsave('animation.gif', images, fps=10)
    return display.Image(filename='animation.gif')


test_video_path = os.path.join(DATA_FOLDER, TEST_FOLDER, np.random.choice(test_videos["video"].values.tolist()))
print(f"Test video path: {test_video_path}")

prediction = sequence_prediction(test_video_path)
if prediction >= 0.5:
    print('The predicted class of the video is FAKE')
else:
    print('The predicted class of the video is REAL')

# Optional: Load frames again to create a GIF for visualization
frames = process_video_frames(test_video_path)
to_gif(frames)