In [2]:
import os

import keras
from imutils import paths

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import imageio
import cv2
from IPython.display import Image
from tensorflow.keras.applications import EfficientNetB4
from albumentations import Compose, ImageCompression, GaussNoise, GaussianBlur, HorizontalFlip, PadIfNeeded, OneOf, RandomBrightnessContrast, FancyPCA, HueSaturationValue, ToGray, ShiftScaleRotate, Resize
import albumentations.augmentations.functional as F
import cv2

In [3]:
IMG_SIZE = 224
BATCH_SIZE = 64
EPOCHS = 20

MAX_SEQ_LENGTH = 200
NUM_FEATURES = 1792 #for efficientnetb4
NUM_FEATURES = 2048 #for inceptionv3

In [4]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

train_data = []
val_data = []
test_data = []

### DATASET 1 (FaceForensics + Celeb)
import os
for dirname, _, filenames in os.walk('deep_fake'):
    for filename in filenames:
        if 'validation' in dirname:
            val_data.append(os.path.join(dirname, filename))
        elif 'train' in dirname:
            train_data.append(os.path.join(dirname, filename))
        else:
            test_data.append(os.path.join(dirname, filename))

In [5]:
train_data[:10]

['deep_fake\\train\\fake\\000_M101.mp4',
 'deep_fake\\train\\fake\\001_W101.mp4',
 'deep_fake\\train\\fake\\002_M101.mp4',
 'deep_fake\\train\\fake\\003_M101.mp4',
 'deep_fake\\train\\fake\\004_M101.mp4',
 'deep_fake\\train\\fake\\005_W101.mp4',
 'deep_fake\\train\\fake\\006_M131.mp4',
 'deep_fake\\train\\fake\\007_W101.mp4',
 'deep_fake\\train\\fake\\008_W101.mp4',
 'deep_fake\\train\\fake\\009_M131.mp4']

In [6]:
train_df = pd.DataFrame(train_data)
val_df = pd.DataFrame(val_data)
test_df = pd.DataFrame(test_data)

In [7]:
train_df

Unnamed: 0,0
0,deep_fake\train\fake\000_M101.mp4
1,deep_fake\train\fake\001_W101.mp4
2,deep_fake\train\fake\002_M101.mp4
3,deep_fake\train\fake\003_M101.mp4
4,deep_fake\train\fake\004_M101.mp4
...,...
595,deep_fake\train\real\M011_light_left_disgust_c...
596,deep_fake\train\real\M011_light_left_disgust_c...
597,deep_fake\train\real\M011_light_left_fear_came...
598,deep_fake\train\real\M011_light_left_fear_came...


In [8]:
def label_file(row):
    if 'real' in row:
        return "real"
    elif 'fake' in row:
        return "fake"
    else:
        return 'undefined'

In [9]:
train_df['label'] = train_df[0].apply(label_file)
val_df['label'] = val_df[0].apply(label_file)
test_df['label'] = test_df[0].apply(label_file)

In [10]:
train_df

Unnamed: 0,0,label
0,deep_fake\train\fake\000_M101.mp4,fake
1,deep_fake\train\fake\001_W101.mp4,fake
2,deep_fake\train\fake\002_M101.mp4,fake
3,deep_fake\train\fake\003_M101.mp4,fake
4,deep_fake\train\fake\004_M101.mp4,fake
...,...,...
595,deep_fake\train\real\M011_light_left_disgust_c...,real
596,deep_fake\train\real\M011_light_left_disgust_c...,real
597,deep_fake\train\real\M011_light_left_fear_came...,real
598,deep_fake\train\real\M011_light_left_fear_came...,real


In [11]:
train_df

Unnamed: 0,0,label
0,deep_fake\train\fake\000_M101.mp4,fake
1,deep_fake\train\fake\001_W101.mp4,fake
2,deep_fake\train\fake\002_M101.mp4,fake
3,deep_fake\train\fake\003_M101.mp4,fake
4,deep_fake\train\fake\004_M101.mp4,fake
...,...,...
595,deep_fake\train\real\M011_light_left_disgust_c...,real
596,deep_fake\train\real\M011_light_left_disgust_c...,real
597,deep_fake\train\real\M011_light_left_fear_came...,real
598,deep_fake\train\real\M011_light_left_fear_came...,real


In [12]:
print(train_df['label'].value_counts())
print(val_df['label'].value_counts())
print(test_df['label'].value_counts())

label
fake    300
real    300
Name: count, dtype: int64
label
fake    100
real    100
Name: count, dtype: int64
label
fake    125
real    125
Name: count, dtype: int64


In [13]:
def crop_center_square(frame):
    y, x = frame.shape[0:2]
    min_dim = min(y, x)
    start_x = (x // 2) - (min_dim // 2)
    start_y = (y // 2) - (min_dim // 2)
    return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]


def load_video(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE)):
    cap = cv2.VideoCapture(path)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = crop_center_square(frame)
            frame = cv2.resize(frame, resize)
            frame = frame[:, :, [2, 1, 0]]
            frames.append(frame)

            if len(frames) == max_frames:
                break
    finally:
        cap.release()
    return np.array(frames)

In [14]:
from tensorflow import keras
from tensorflow.keras.applications import EfficientNetB4

IMG_SIZE = 224  # Adjust as necessary for EfficientNetB4

def build_feature_extractor():
    # Create the base EfficientNetB4 model with pre-trained weights from ImageNet
    feature_extractor = EfficientNetB4(
        weights="imagenet",
        include_top=False,  # Exclude the classification head
        pooling="avg",  # Use global average pooling to get a feature vector
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
    )

    # Define the model's input
    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))

    # The EfficientNet model will automatically handle the input preprocessing
    outputs = feature_extractor(inputs)

    # Create a Keras Model with the specified input and output
    return keras.Model(inputs, outputs, name="feature_extractor")

# Instantiate the feature extractor
feature_extractor = build_feature_extractor()



# def build_feature_extractor():
#     feature_extractor = keras.applications.InceptionV3(
#         weights="imagenet",
#         include_top=False,
#         pooling="avg",
#         input_shape=(IMG_SIZE, IMG_SIZE, 3),
#     )
#     preprocess_input = keras.applications.inception_v3.preprocess_input

#     inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
#     preprocessed = preprocess_input(inputs)

#     outputs = feature_extractor(preprocessed)
#     return keras.Model(inputs, outputs, name="feature_extractor")


# feature_extractor = build_feature_extractor()

In [15]:
train_df

Unnamed: 0,0,label
0,deep_fake\train\fake\000_M101.mp4,fake
1,deep_fake\train\fake\001_W101.mp4,fake
2,deep_fake\train\fake\002_M101.mp4,fake
3,deep_fake\train\fake\003_M101.mp4,fake
4,deep_fake\train\fake\004_M101.mp4,fake
...,...,...
595,deep_fake\train\real\M011_light_left_disgust_c...,real
596,deep_fake\train\real\M011_light_left_disgust_c...,real
597,deep_fake\train\real\M011_light_left_fear_came...,real
598,deep_fake\train\real\M011_light_left_fear_came...,real


In [16]:
test_df

Unnamed: 0,0,label
0,deep_fake\test\fake\116_W132.mp4,fake
1,deep_fake\test\fake\117_W132.mp4,fake
2,deep_fake\test\fake\118_W132.mp4,fake
3,deep_fake\test\fake\119_W132.mp4,fake
4,deep_fake\test\fake\120_W021.mp4,fake
...,...,...
245,deep_fake\test\real\M011_light_uniform_surpris...,real
246,deep_fake\test\real\M011_light_uniform_surpris...,real
247,deep_fake\test\real\M011_light_uniform_surpris...,real
248,deep_fake\test\real\M011_light_uniform_surpris...,real


In [17]:
train_df.rename(columns={0:'filepath'}, inplace=True)
test_df.rename(columns={0:'filepath'}, inplace=True)
val_df.rename(columns={0:'filepath'}, inplace=True)

In [18]:
label_processor = keras.layers.StringLookup(
    num_oov_indices=0, vocabulary=np.unique(train_df["label"])
)
print(label_processor.get_vocabulary())

['fake', 'real']


In [19]:
train_df

Unnamed: 0,filepath,label
0,deep_fake\train\fake\000_M101.mp4,fake
1,deep_fake\train\fake\001_W101.mp4,fake
2,deep_fake\train\fake\002_M101.mp4,fake
3,deep_fake\train\fake\003_M101.mp4,fake
4,deep_fake\train\fake\004_M101.mp4,fake
...,...,...
595,deep_fake\train\real\M011_light_left_disgust_c...,real
596,deep_fake\train\real\M011_light_left_disgust_c...,real
597,deep_fake\train\real\M011_light_left_fear_came...,real
598,deep_fake\train\real\M011_light_left_fear_came...,real


In [20]:
test_df

Unnamed: 0,filepath,label
0,deep_fake\test\fake\116_W132.mp4,fake
1,deep_fake\test\fake\117_W132.mp4,fake
2,deep_fake\test\fake\118_W132.mp4,fake
3,deep_fake\test\fake\119_W132.mp4,fake
4,deep_fake\test\fake\120_W021.mp4,fake
...,...,...
245,deep_fake\test\real\M011_light_uniform_surpris...,real
246,deep_fake\test\real\M011_light_uniform_surpris...,real
247,deep_fake\test\real\M011_light_uniform_surpris...,real
248,deep_fake\test\real\M011_light_uniform_surpris...,real


In [21]:
def prepare_all_videos(df, root_dir):
    num_samples = len(df)
    video_paths = df["filepath"].values.tolist()
    labels = df["label"].values
    labels = keras.ops.convert_to_numpy(label_processor(labels[..., None]))

    # `frame_masks` and `frame_features` are what we will feed to our sequence model.
    # `frame_masks` will contain a bunch of booleans denoting if a timestep is
    # masked with padding or not.
    frame_masks = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH), dtype="bool")
    frame_features = np.zeros(
        shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
    )

    # For each video.
    for idx, path in enumerate(video_paths):
        # Gather all its frames and add a batch dimension.
        frames = load_video(os.path.join(root_dir, path))
        frames = frames[None, ...]

        # Initialize placeholders to store the masks and features of the current video.
        temp_frame_mask = np.zeros(
            shape=(
                1,
                MAX_SEQ_LENGTH,
            ),
            dtype="bool",
        )
        temp_frame_features = np.zeros(
            shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
        )

        # Extract features from the frames of the current video.
        for i, batch in enumerate(frames):
            video_length = batch.shape[0]
            length = min(MAX_SEQ_LENGTH, video_length)
            for j in range(length):
                temp_frame_features[i, j, :] = feature_extractor.predict(
                    batch[None, j, :], verbose=0,
                )
            temp_frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

        frame_features[idx,] = temp_frame_features.squeeze()
        frame_masks[idx,] = temp_frame_mask.squeeze()

    return (frame_features, frame_masks), labels


train_data, train_labels = prepare_all_videos(train_df, "train")
test_data, test_labels = prepare_all_videos(test_df, "test")
val_data = prepare_all_videos(val_df, "val")

print(f"Frame features in train set: {train_data[0].shape}")
print(f"Frame masks in train set: {train_data[1].shape}")

Frame features in train set: (600, 200, 2048)
Frame masks in train set: (600, 200)


In [21]:
vocab_len = label_processor.get_vocabulary()
vocab_len

['fake', 'real']

In [22]:
# Utility for our sequence model.
def get_sequence_model():
    class_vocab = label_processor.get_vocabulary()

    frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
    mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")

    # Refer to the following tutorial to understand the significance of using `mask`:
    # https://keras.io/api/layers/recurrent_layers/gru/
    x = keras.layers.GRU(16, return_sequences=True)(
        frame_features_input, mask=mask_input
    )
    x = keras.layers.GRU(8)(x)
    x = keras.layers.Dropout(0.4)(x)
    x = keras.layers.Dense(8, activation="relu")(x)
    output = keras.layers.Dense(len(class_vocab), activation="sigmoid")(x)

    rnn_model = keras.Model([frame_features_input, mask_input], output)

    rnn_model.compile(
        loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
    )
    return rnn_model


# Utility for running experiments.
def run_experiment():
    filepath = "/tmp/video_classifier/ckpt.weights.h5"
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1
    )

    seq_model = get_sequence_model()
    history = seq_model.fit(
        [train_data[0], train_data[1]],
        train_labels,
        validation_data = val_data,
        batch_size=BATCH_SIZE,
        epochs=EPOCHS,
        callbacks=[checkpoint],
    )

    seq_model.load_weights(filepath)
    _, accuracy = seq_model.evaluate([test_data[0], test_data[1]], test_labels)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

    return history, seq_model


_, sequence_model = run_experiment()

Epoch 1/20
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 169ms/step - accuracy: 0.4906 - loss: 0.6932
Epoch 1: val_loss improved from inf to 0.69315, saving model to /tmp/video_classifier/ckpt.weights.h5
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 277ms/step - accuracy: 0.4903 - loss: 0.6932 - val_accuracy: 0.5000 - val_loss: 0.6932
Epoch 2/20
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 174ms/step - accuracy: 0.4953 - loss: 0.6932
Epoch 2: val_loss improved from 0.69315 to 0.69315, saving model to /tmp/video_classifier/ckpt.weights.h5
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 210ms/step - accuracy: 0.4957 - loss: 0.6932 - val_accuracy: 0.5000 - val_loss: 0.6931
Epoch 3/20
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 164ms/step - accuracy: 0.4968 - loss: 0.6932
Epoch 3: val_loss did not improve from 0.69315
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 194ms/step - accura