In [1]:
# IMPORTS
from tensorflow import keras
from tensorflow_docs.vis import embed

import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import imageio
import cv2
import os

In [2]:
DATA_FOLDER = 'data/input/deepfake_videos'
TRAIN_SAMPLE_FOLDER = 'train_sample_videos'
TEST_FOLDER = 'test_videos'

print(f"Train samples: {len(os.listdir(os.path.join(DATA_FOLDER, TRAIN_SAMPLE_FOLDER)))}")
print(f"Test samples: {len(os.listdir(os.path.join(DATA_FOLDER, TEST_FOLDER)))}")

Train samples: 403
Test samples: 403


In [3]:
train_sample_metadata = pd.read_json(f'{DATA_FOLDER}/{TRAIN_SAMPLE_FOLDER}/metadata.json').T
print(train_sample_metadata.head())
print(train_sample_metadata.tail())

               label  split        original
aagfhgtpmv.mp4  FAKE  train  vudstovrck.mp4
aapnvogymq.mp4  FAKE  train  jdubbvfswz.mp4
abarnvbtwb.mp4  REAL  train            None
abofeumbvv.mp4  FAKE  train  atvmxvwyns.mp4
abqwwspghj.mp4  FAKE  train  qzimuostzz.mp4
               label  split        original
eudeqjhdfd.mp4  REAL  train            None
eukvucdetx.mp4  FAKE  train  gjypopglvi.mp4
lw_fake.mp4     FAKE  train     lw_real.mp4
lw_real.mp4     REAL  train            None
lw_fake2.mp4    FAKE  train     lw_real.mp4


In [4]:
fake_train_sample_video = list(train_sample_metadata.loc[train_sample_metadata.label=='FAKE'][-3:].index)
fake_train_sample_video

In [5]:
# def display_image_from_video(video_path: str):
#     '''
#     input: video_path - path for video
#     process:
#     1. perform a video capture from the video
#     2. read the image
#     3. display the image
#     '''
#     capture_image = cv2.VideoCapture(video_path) 
#     _, frame = capture_image.read()
#     fig = plt.figure(figsize=(10,10))
#     ax = fig.add_subplot(111)
#     frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
#     ax.imshow(frame)

In [6]:
# for video_file in fake_train_sample_video:
#     display_image_from_video(os.path.join(DATA_FOLDER, TRAIN_SAMPLE_FOLDER, video_file))

In [7]:
real_train_sample_video = list(train_sample_metadata.loc[train_sample_metadata.label=='REAL'][-3:].index)
real_train_sample_video

In [8]:
# for video_file in real_train_sample_video:
#     display_image_from_video(os.path.join(DATA_FOLDER, TRAIN_SAMPLE_FOLDER, video_file))

In [9]:
# train_sample_metadata['original'].value_counts()[0:5]

In [None]:
real_train_sample_video = list(train_sample_metadata.loc[train_sample_metadata.label=='REAL'][-3:].index)
real_train_sample_video

fake_train_sample_video = list(train_sample_metadata.loc[train_sample_metadata.label=='FAKE'][-3:].index)
fake_train_sample_video

In [10]:
# def display_image_from_video_list(video_path_list, video_folder=TRAIN_SAMPLE_FOLDER):
#     '''
#     input: video_path_list - path for video
#     process:
#     0. for each video in the video path list
#         1. perform a video capture from the video
#         2. read the image
#         3. display the image
#     '''
#     plt.figure()
#     fig, ax = plt.subplots(2,3,figsize=(16,8))
#     # we only show images extracted from the first 6 videos
#     for i, video_file in enumerate(video_path_list[0:6]):
#         video_path = os.path.join(DATA_FOLDER, video_folder,video_file)
#         capture_image = cv2.VideoCapture(video_path) 
#         ret, frame = capture_image.read()
#         frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
#         ax[i//3, i%3].imshow(frame)
#         ax[i//3, i%3].set_title(f"Video: {video_file}")
#         ax[i//3, i%3].axis('on')

In [None]:
IMG_SIZE = 224
BATCH_SIZE = 64
EPOCHS = 200

MAX_SEQ_LENGTH = 20
NUM_FEATURES = 2048

In [12]:
def crop_center_square(frame):
    y, x = frame.shape[0:2]
    min_dim = min(y, x)
    start_x = (x // 2) - (min_dim // 2)
    start_y = (y // 2) - (min_dim // 2)
    return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]


def load_video(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE)):
    cap = cv2.VideoCapture(path)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = crop_center_square(frame)
            frame = cv2.resize(frame, resize)
            frame = frame[:, :, [2, 1, 0]]
            frames.append(frame)

            if len(frames) == max_frames:
                break
    finally:
        cap.release()
    return np.array(frames)

In [13]:
def build_feature_extractor():
    feature_extractor = keras.applications.InceptionV3(
        weights="imagenet",
        include_top=False,
        pooling="avg",
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
    )
    preprocess_input = keras.applications.inception_v3.preprocess_input

    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
    preprocessed = preprocess_input(inputs)

    outputs = feature_extractor(preprocessed)
    return keras.Model(inputs, outputs, name="feature_extractor")


feature_extractor = build_feature_extractor()

In [14]:
def prepare_all_videos(df, root_dir):
    num_samples = len(df)
    video_paths = list(df.index)
    labels = df["label"].values
    labels = np.array(labels == 'FAKE').astype(int)


    # `frame_masks` and `frame_features` are what we will feed to our sequence model.
    # `frame_masks` will contain a bunch of booleans denoting if a timestep is
    # masked with padding or not.
    frame_masks = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH), dtype="bool")
    frame_features = np.zeros(
        shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
    )

    # For each video.
    for idx, path in enumerate(video_paths):
        # Gather all its frames and add a batch dimension.
        frames = load_video(os.path.join(root_dir, path))
        frames = frames[None, ...]

        # Initialize placeholders to store the masks and features of the current video.
        temp_frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
        temp_frame_features = np.zeros(
            shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
        )

        # Extract features from the frames of the current video.
        for i, batch in enumerate(frames):
            video_length = batch.shape[0]
            length = min(MAX_SEQ_LENGTH, video_length)
            for j in range(length):
                temp_frame_features[i, j, :] = feature_extractor.predict(
                    batch[None, j, :]
                )
            temp_frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

        frame_features[idx,] = temp_frame_features.squeeze()
        frame_masks[idx,] = temp_frame_mask.squeeze()

    return (frame_features, frame_masks), labels

In [15]:
from sklearn.model_selection import train_test_split

Train_set, Test_set = train_test_split(train_sample_metadata, test_size=0.1, random_state=42, stratify=train_sample_metadata['label'])

print(Train_set.shape, Test_set.shape )

(362, 3) (41, 3)


In [26]:
print(len(Train_set[Train_set["label"]=="REAL"]))
print(len(Train_set[Train_set["label"]=="FAKE"]))

70
292


In [70]:
test_data, test_labels = prepare_all_videos(Test_set, f"{DATA_FOLDER}/{TRAIN_SAMPLE_FOLDER}")



OpenCV: Couldn't read video stream from file "data/input/deepfake_videos/train_sample_videos/lw_fake2.mp4"




In [69]:
# PLS DON'T RUN THIS CELL AGAIN

train_data, train_labels = prepare_all_videos(Train_set, f"{DATA_FOLDER}/{TRAIN_SAMPLE_FOLDER}")
test_data, test_labels = prepare_all_videos(Test_set, f"{DATA_FOLDER}/{TRAIN_SAMPLE_FOLDER}")

# Assuming train_data is a tuple with two elements: (frame_features, frame_masks)
frame_features_shape = train_data[0].shape if train_data and len(train_data) > 0 else None
frame_masks_shape = train_data[1].shape if train_data and len(train_data) > 1 else None

print(f"Frame features in train set: {frame_features_shape}")
print(f"Frame masks in train set: {frame_masks_shape}")

Frame features in train set: (362, 20, 2048)
Frame masks in train set: (362, 20)


In [71]:
frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")

# Refer to the following tutorial to understand the significance of using `mask`:
# https://keras.io/api/layers/recurrent_layers/gru/
x = keras.layers.GRU(16, return_sequences=True)(
    frame_features_input, mask=mask_input
)
x = keras.layers.GRU(8)(x)
x = keras.layers.Dropout(0.45)(x)
x = keras.layers.Dense(8, activation="relu")(x)
output = keras.layers.Dense(1, activation="sigmoid")(x)

model = keras.Model([frame_features_input, mask_input], output)

model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
model.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_5 (InputLayer)        [(None, 20, 2048)]           0         []                            
                                                                                                  
 input_6 (InputLayer)        [(None, 20)]                 0         []                            
                                                                                                  
 gru_2 (GRU)                 (None, 20, 16)               99168     ['input_5[0][0]',             
                                                                     'input_6[0][0]']             
                                                                                                  
 gru_3 (GRU)                 (None, 8)                    624       ['gru_2[0][0]']         

In [85]:
# checkpoint = keras.callbacks.ModelCheckpoint('./', save_weights_only=True, save_best_only=True)
history = model.fit(
        [train_data[0], train_data[1]],
        train_labels,
        validation_data=([test_data[0], test_data[1]], test_labels),
        # callbacks=[checkpoint],
        epochs=EPOCHS,
        batch_size=BATCH_SIZE
    )

Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/200
Epoch 78

In [90]:
from IPython.display import HTML
from base64 import b64encode


def play_video(video_file, subset=TRAIN_SAMPLE_FOLDER):
    video_path = os.path.join(DATA_FOLDER, subset, video_file)
    print(video_path)
    video_url = open(os.path.join(DATA_FOLDER, subset, video_file),'rb').read()
    data_url = "data:video/mp4;base64," + b64encode(video_url).decode()
    return HTML("""<video width=500 controls><source src="%s" type="video/mp4"></video>""" % data_url)

def prepare_single_video(frames):
    frames = frames[None, ...]
    frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
    frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

    for i, batch in enumerate(frames):
        video_length = batch.shape[0]
        length = min(MAX_SEQ_LENGTH, video_length)
        for j in range(length):
            frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
        frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

    return frame_features, frame_mask

def sequence_prediction(path, subset=TRAIN_SAMPLE_FOLDER):
    realpath = os.path.join(DATA_FOLDER, subset, path)
    print(f"Realpath: {realpath}")
    frames = load_video(realpath)
    frame_features, frame_mask = prepare_single_video(frames)
    return model.predict([frame_features, frame_mask], verbose=0)[0]


test_data = (["lw_real.mp4", "lw_fake.mp4", "cyxlcuyznd.mp4"], [0, 1, 0])

reals = []
preds = set()

for test_video, label in zip(test_data[0], test_data[1]):
    pred = sequence_prediction(test_video)[0]
    preds.add(pred)
    if(pred>=0.5):
        print(f'The predicted class of the video is FAKE with probability {pred}')
        assert label == 1
    else:
        reals.append(test_video)
        print(f'The predicted class of the video is REAL with probability {1-pred}')
        assert label == 0



Realpath: data/input/deepfake_videos/train_sample_videos/lw_real.mp4
The predicted class of the video is REAL with probability 0.8881828859448433
Realpath: data/input/deepfake_videos/train_sample_videos/lw_fake.mp4
The predicted class of the video is FAKE with probability 0.9853692054748535
Realpath: data/input/deepfake_videos/train_sample_videos/cyxlcuyznd.mp4
The predicted class of the video is REAL with probability 0.8992258459329605


In [94]:
test_video = "007_non_training.mp4"
pred = sequence_prediction(test_video, subset=TEST_FOLDER)[0]
preds.add(pred)
if(pred>=0.5):
    print(f'The predicted class of the video is FAKE with probability {pred}')
else:
    reals.append(test_video)
    print(f'The predicted class of the video is REAL with probability {1-pred}')

Realpath: data/input/deepfake_videos/test_videos/007.mp4
The predicted class of the video is REAL with probability 0.7436708211898804


In [95]:
# save model
model.save("final.keras")

In [97]:
# load model
reconstructed_model = keras.models.load_model("final.keras")

# predict

def sequence_prediction(path, subset=TRAIN_SAMPLE_FOLDER):
    realpath = os.path.join(DATA_FOLDER, subset, path)
    print(f"Realpath: {realpath}")
    frames = load_video(realpath)
    frame_features, frame_mask = prepare_single_video(frames)
    return reconstructed_model.predict([frame_features, frame_mask], verbose=0)[0]

test_video = "007_non_training.mp4"
pred = sequence_prediction(test_video, subset=TEST_FOLDER)[0]
preds.add(pred)
if(pred>=0.5):
    print(f'The predicted class of the video is FAKE with probability {pred}')
else:
    reals.append(test_video)
    print(f'The predicted class of the video is REAL with probability {1-pred}')

Realpath: data/input/deepfake_videos/test_videos/007_non_training.mp4
The predicted class of the video is REAL with probability 0.7436708211898804
