<a href="https://colab.research.google.com/github/LoreJob/DeepFake-Dct/blob/main/CNN_for_DeepFake_Detection_in_Tensorflow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CNN+LSTM for DeepFake Detection in videos


## Enabling and testing the GPU

First, you'll need to enable GPUs for the notebook:

- Navigate to Ram/Disk on the upper-left part of the colab → Additional Connection Options
- select GPU T4 from the Hardware Accelerator drop-down (You have just 1 hour of using)

Next, we'll confirm that we can connect to the GPU with tensorflow:

In [None]:
import tensorflow as tf
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Found GPU at: /device:GPU:0


## Observe TensorFlow speedup on GPU relative to CPU

This example constructs a typical convolutional neural network layer over a
random image and manually places the resulting ops on either the CPU or the GPU
to compare execution speed.

In [None]:
import tensorflow as tf
import timeit

device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  print(
      '\n\nThis error most likely means that this notebook is not '
      'configured to use a GPU.  Change this in Notebook Settings via the '
      'command palette (cmd/ctrl-shift-P) or the Edit menu.\n\n')
  raise SystemError('GPU device not found')

def cpu():
  with tf.device('/cpu:0'):
    random_image_cpu = tf.random.normal((100, 100, 100, 3))
    net_cpu = tf.keras.layers.Conv2D(32, 7)(random_image_cpu)
    return tf.math.reduce_sum(net_cpu)

def gpu():
  with tf.device('/device:GPU:0'):
    random_image_gpu = tf.random.normal((100, 100, 100, 3))
    net_gpu = tf.keras.layers.Conv2D(32, 7)(random_image_gpu)
    return tf.math.reduce_sum(net_gpu)

# We run each op once to warm up; see: https://stackoverflow.com/a/45067900
cpu()
gpu()

# Run the op several times.
print('Time (s) to convolve 32x7x7x3 filter over random 100x100x100x3 images '
      '(batch x height x width x channel). Sum of ten runs.')
print('CPU (s):')
cpu_time = timeit.timeit('cpu()', number=10, setup="from __main__ import cpu")
print(cpu_time)
print('GPU (s):')
gpu_time = timeit.timeit('gpu()', number=10, setup="from __main__ import gpu")
print(gpu_time)
print('GPU speedup over CPU: {}x'.format(int(cpu_time/gpu_time)))

Time (s) to convolve 32x7x7x3 filter over random 100x100x100x3 images (batch x height x width x channel). Sum of ten runs.
CPU (s):
7.214328854999998
GPU (s):
0.10292108700001279
GPU speedup over CPU: 70x


In [None]:
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

Num GPUs Available:  1


## Packages
As the title says, we are using the tensorflow package.

In [1]:
import os
import tensorflow as tf
from matplotlib import pyplot as plt
import cv2
import numpy as np
from tensorflow.keras import models, layers
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense, Flatten, Dropout
from tensorflow.keras.metrics import Precision, Recall, BinaryAccuracy

## Importing the dataset




### Variables

In [2]:
HEIGHT, WIDTH = 256, 256
N_FRAMES = 20  # Number of frames of a video that will be fed to the model as one sequence
DATASET_DIR = "Video Dataset Small"
CLASSES = ["fake", "real"]
BATCH_SIZE = 32

### Creating the frame extractor

In [3]:
def frames_extraction(video_path):
    frames_list = []

    #Reading the video
    video_reader = cv2.VideoCapture(video_path)

    #Getting total number of frames
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))

    #Selecting only the wanted amount of frames
    skip_frames_window = max(int(video_frames_count/N_FRAMES), 1)

    #Looping to the selected frames
    for frame_counter in range(N_FRAMES):
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)
        
        #reading the frames
        success, frame = video_reader.read()
        #breaking if the read is not successfull
        if not success:
            break
        
        #setting the shape and normalizing the frame (each frame should have a value between 0-1)
        resized_frame = cv2.resize(frame, (HEIGHT, WIDTH))
        normalized_frame = resized_frame / 255

        frames_list.append(normalized_frame)

    video_reader.release()
    return frames_list


### Creating the dataset creator

In [7]:
# Loading videos from folder
def create_dataset(split):
    features = []
    labels = []

    for class_idx, class_name in enumerate(CLASSES):
        print(f'Extracting data of: {split}\\{class_name}')

        file_list = os.listdir(os.path.join(DATASET_DIR, split, class_name))

        for file_name in file_list:
            video_file_path = os.path.join(DATASET_DIR, split, class_name, file_name)
            frames = frames_extraction(video_file_path)

            if len(frames) == N_FRAMES:

                # Append the data to their repective lists.
                features.append(frames)
                labels.append(class_idx)

    features = np.asarray(features)
    labels = np.array(labels)  

    return features, labels

### Loading Datasets

In [8]:
# Create train, validation, and test datasets
X_train, y_train = create_dataset(split = "Train")
X_val, y_val = create_dataset(DATASET_DIR, CLASSES, split='Val')
X_test, y_test = create_dataset(DATASET_DIR, CLASSES, split='Test')

# Normalize y labels to be categorical
y_train = tf.keras.utils.to_categorical(y_train, num_classes=len(CLASSES))
y_val = tf.keras.utils.to_categorical(y_val, num_classes=len(CLASSES))
y_test = tf.keras.utils.to_categorical(y_test, num_classes=len(CLASSES))

Extracting data of: Train\fake


In [None]:
# Shuffle datasets
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(len(X_train))
val_dataset = tf.data.Dataset.from_tensor_slices((X_val, y_val)).shuffle(len(X_val))
test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))

# Batch datasets
train_dataset = train_dataset.batch(BATCH_SIZE, drop_remainder=True)
val_dataset = val_dataset.batch(BATCH_SIZE, drop_remainder=True)
test_dataset = test_dataset.batch(BATCH_SIZE, drop_remainder=True)

# Prefetch for Efficiency
train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE)
val_dataset = val_dataset.prefetch(tf.data.experimental.AUTOTUNE)
test_dataset = test_dataset.prefetch(tf.data.experimental.AUTOTUNE)