# Libraries Used

* ffmpeg-python
* av
* cmake
* dlib  (based on the python version)
* face-recognition

In [None]:
# from google.colab import drive
# drive.mount('/content/drive', force_remount = True)

# !unzip drive/MyDrive/Deepfake-Detection.zip
# %cd Deepfake-Detection

!pip install ffmpeg-python
!pip install av
!pip install cmake
!pip install dlib
!pip install face-recognition

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting ffmpeg-python
  Downloading ffmpeg_python-0.2.0-py3-none-any.whl (25 kB)
Installing collected packages: ffmpeg-python
Successfully installed ffmpeg-python-0.2.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting av
  Downloading av-10.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (31.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m50.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: av
Successfully installed av-10.0.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting face-recognition
  Down

# Imports & Globals

In [None]:
import av
import face_recognition

import numpy as np
import os
import random

In [None]:
DS_ORG = './dataset_original/'
DS_IFRAME = './dataset_IFrames/'
DS_FACE = './dataset_face/'
DS_FINAL = './dataset_final/'

CELEB_REAL = 'Celeb-real/'
CELEB_FAKE = 'Celeb-synthesis/'
YT_REAL = 'YouTube-real/'

# I-Frame Extraction

## Testing Logic

In [None]:
test_vid = av.open('dataset_original/Celeb-real/id0_0000.mp4')

for packet in test_vid.demux():
    for frame in packet.decode():
        print(f'{frame.pict_type} - {frame.key_frame}')

In [None]:
test_input = av.open('dataset_original/Celeb-real/id0_0000.mp4')
test_output = av.open('dataset_IFrames/id0_0000.mp4', 'w')

in_stream = test_input.streams.video[0]
in_stream.codec_context.skip_frame = "NONKEY"

out_stream = test_output.add_stream(template=in_stream)

for packet in test_input.demux(in_stream):
    if packet.dts is None:
        continue

    if packet.is_keyframe:
        packet.stream = out_stream
        test_output.mux(packet)

test_input.close()
test_output.close()

In [None]:
count = 0
for video in os.listdir(DS_ORG + CELEB_REAL):
    count += 1
    if (count == 10):
        break

    input_vid = av.open(DS_ORG + CELEB_REAL + video)
    output_vid = av.open(DS_IFRAME + CELEB_REAL + video, 'w')

    in_stream = input_vid.streams.video[0]
    in_stream.codec_context.skip_frame = "NONKEY"

    out_stream = output_vid.add_stream(template=in_stream)

    for packet in input_vid.demux(in_stream):
        if packet.dts is None:
            continue
    
        if packet.is_keyframe:
            packet.stream = out_stream
            output_vid.mux(packet)

    input_vid.close()
    output_vid.close()


## Implementation

In [None]:
def extract_frames(src_dir, dest_dir, vid_class, filename):
    input_vid = av.open(src_dir + vid_class + filename)
    output_vid = av.open(dest_dir + vid_class + filename, 'w')

    in_stream = input_vid.streams.video[0]
    in_stream.codec_context.skip_frame = "NONKEY"

    out_stream = output_vid.add_stream(template=in_stream)

    for packet in input_vid.demux(in_stream):
        if packet.dts is None:
            continue

        if packet.is_keyframe:
            packet.stream = out_stream
            output_vid.mux(packet)

    input_vid.close()
    output_vid.close()

In [None]:
# Extracting I-Frames from real celebrity videos

for video in os.listdir(DS_ORG + CELEB_REAL):
    extract_frames(DS_ORG, DS_IFRAME, CELEB_REAL, video)

In [None]:
# Extracting I-Frames from real youtube videos

for video in os.listdir(DS_ORG + YT_REAL):
    extract_frames(DS_ORG, DS_IFRAME, YT_REAL, video)

In [None]:
# Extracting I-Frames from deepfake celebrity videos
# 408 vidoes chosen at random to ensure equal amount of real and fake vidoes 
# used for training. (158 real celeb videos + 250 real youtube vidoes)

video_list = random.sample(os.listdir(DS_ORG + CELEB_FAKE), 408)
for video in video_list:
    extract_frames(DS_ORG, DS_IFRAME, CELEB_FAKE, video)

# Face Extraction

## Functions

In [None]:
# MesoNet works best with images having 256x256 dimension
# If face location borders span a smaller distance, extend the borders
# on either side equally to ensure 256x256 image

def normalize_face_borders(low, high):
    diff = high - low
    if diff >= 256:
        return

    x = diff / 2
    if (low >= x): 
        low -= x
    else:
        x = x + (x - low) + (1 if diff % 2 == 1 else 0)
        low = 0

    high += x

    return low, high

In [None]:
# New normalize function to always make the cropped face image 256x256 dimension
# which will be fed as input to the MesoNet

def modified_normalize_face_borders(low, high, boundary):
    diff = high - low

    if diff <= 256:
        offset = 256 - diff
        low = max(0, min(low - offset / 2 , low))
        high = min(boundary, max(high + (offset - offset / 2), high))

    return low, high

In [None]:
def get_crop_window(face_location, height, width):
    face_location = (face_location[0][3], height - face_location[0][0], face_location[0][1], height - face_location[0][2])

    left, right = modified_normalize_face_borders(face_location[0], face_location[2], width)
    bot, top = modified_normalize_face_borders(face_location[3], face_location[1], height)

    face_location = (left, height - top, right, height - bot)

    return face_location

## Testing Logic

In [None]:
test_input = av.open('dataset_IFrames/Celeb-real/id0_0000.mp4')

count = 0

for frame in test_input.decode():
    nd_frame = frame.to_ndarray()
    img_frame = frame.to_image()

    height, width = img_frame.height, img_frame.width

    # Face location returned by face_recognition api: [(top, right, bottom, left)] in css terms
    # Face location required by PIL.Image: (left, top, right, bottom)
    face_location = face_recognition.api.face_locations(nd_frame)
    face_location = get_crop_window(face_location, height, width)
    
    img_frame = img_frame.crop(face_location)
    img_frame.save(f'dataset_face/Celeb-real/id0_0000_{count}.jpg')

    count += 1 

### Turning Cropped faces to a Video

In [None]:
test_input = av.open('dataset_IFrames/Celeb-real/id10_0001.mp4')
test_output = av.open('dataset_face/Celeb-real/id10_0001.mp4', 'w')

in_stream = test_input.streams.video[0]
codec_name = in_stream.codec_context.name

out_stream = test_output.add_stream(codec_name, 2)
out_stream.width = in_stream.codec_context.width
out_stream.height = in_stream.codec_context.height
out_stream.pix_fmt = in_stream.codec_context.pix_fmt

for frame in test_input.decode(in_stream):
    img_frame = frame.to_image()
    nd_frame = frame.to_ndarray()

    height, width = img_frame.height, img_frame.width

    # Face location returned by face_recognition api: [(top, right, bottom, left)]
    # Face location required by PIL.Image: (left, top, right, bottom)
    face_location = face_recognition.api.face_locations(nd_frame)
    
    if len(face_location) == 0:
        continue

    # face_location = (face_location[0][3], face_location[0][0], face_location[0][1], face_location[0][2])

    # left, right = normalize_face_borders(face_location[0], face_location[2])
    # bot, top = normalize_face_borders(face_location[3], face_location[1])
    # face_location = (left, top, right, bot)

    face_location = get_crop_window(face_location, height, width)
    img_frame = img_frame.crop(face_location)

    out_frame = av.VideoFrame.from_image(img_frame)
    out_packet = out_stream.encode(out_frame)
    test_output.mux(out_packet)

out_packet = out_stream.encode(None)
test_output.mux(out_packet)

test_input.close()
test_output.close()



## Implementation

In [None]:
def gpu_simple_save_cropped_faces_to_video(src_dir, dest_dir, vid_class, filename):
    
    input = av.open(src_dir + vid_class + filename)
    output = av.open(dest_dir + vid_class + filename, 'w')

    in_stream = input.streams.video[0]
    codec_name = in_stream.codec_context.name

    # output video dimension should be 256x256
    out_stream = output.add_stream(codec_name, 2)
    out_stream.width = 256
    out_stream.height = 256
    out_stream.pix_fmt = in_stream.codec_context.pix_fmt

    frame_list = []
    image_list = []
    for frame in input.decode(in_stream):
        frame_list.append(frame.to_ndarray())
        image_list.append(frame.to_image())

    face_locations = face_recognition.api.batch_face_locations(frame_list, 0)
    for img_frame, face_location in zip(image_list, face_locations):
        if len(face_location) == 0:
            continue

        face_location = (face_location[0][3], face_location[0][0], face_location[0][1], face_location[0][2])
        img_frame = img_frame.crop(face_location)

        out_frame = av.VideoFrame.from_image(img_frame)
        out_packet = out_stream.encode(out_frame)
        output.mux(out_packet)

    out_packet = out_stream.encode(None)
    output.mux(out_packet)

    input.close()
    output.close()

In [None]:
for video in os.listdir(DS_IFRAME + CELEB_REAL):
    gpu_simple_save_cropped_faces_to_video(DS_IFRAME, DS_FACE, CELEB_REAL, video)

In [None]:
for video in os.listdir(DS_IFRAME + CELEB_FAKE):
    gpu_simple_save_cropped_faces_to_video(DS_IFRAME, DS_FACE, CELEB_FAKE, video)

In [None]:
for video in os.listdir(DS_IFRAME + YT_REAL):
    gpu_simple_save_cropped_faces_to_video(DS_IFRAME, DS_FACE, YT_REAL, video)

# **MESONET**

In [None]:
import keras
from keras import layers
from keras.layers import Conv2D, MaxPool2D, Flatten, Dense, BatchNormalization, Dropout
from keras.optimizers import Adam

In [None]:
def create_model(input_size):
  model = keras.Sequential()

  model.add(layers.Conv2D(input_shape=input_size, filters=8, kernel_size=3, activation='relu', padding="same"))
  model.add(BatchNormalization())
  model.add(MaxPool2D(2, 2, padding="same"))

  model.add(layers.Conv2D(input_shape=(128, 128, 8), filters=8, kernel_size=5, activation='relu', padding="same"))
  model.add(BatchNormalization())
  model.add(MaxPool2D(2, 2, padding="same"))

  
  model.add(layers.Conv2D(input_shape=(64, 64, 8), filters=16, kernel_size=5, activation='relu', padding="same"))
  model.add(BatchNormalization())
  model.add(MaxPool2D(4, 4, padding="same"))

  
  model.add(layers.Conv2D(input_shape=(16, 16, 16), filters=16, kernel_size=5, activation='relu', padding="same"))
  model.add(BatchNormalization())
  model.add(MaxPool2D(4, 4, padding="same"))
  
  return model
  

In [None]:
input_size = (256, 256, 3)
model = create_model(input_size)
model.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics = ['accuracy'])
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 256, 256, 8)       224       
                                                                 
 batch_normalization (BatchN  (None, 256, 256, 8)      32        
 ormalization)                                                   
                                                                 
 max_pooling2d (MaxPooling2D  (None, 128, 128, 8)      0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 128, 128, 8)       1608      
                                                                 
 batch_normalization_1 (Batc  (None, 128, 128, 8)      32        
 hNormalization)                                                 
                                                        

**DATASET CREATION**

In [None]:
import shutil
import imghdr
from PIL import Image

dir_real = 'dataset_REAL/'
dir_fake = 'dataset_FAKE/'
dir_train = './training_data/'
dir_test = './testing_data/'

if os.path.exists(dir_real) or os.path.exists(dir_fake):
  shutil.rmtree(dir_real)
  shutil.rmtree(dir_fake)
os.makedirs(dir_real)
os.makedirs(dir_fake)

In [None]:
def dataset_extract_frames(source_path, dir_name, vid, count):
  frame_count = 0;
  jpg_encountered = bool(False)

  if(imghdr.what(os.path.join(source_path, vid)) == 'jpeg'):
    image = Image.open(source_path + vid)
    image.save(f'./{dir_name}/vid_{count}_fr_{frame_count}.jpg')
    jpg_encountered = bool(True)
  
  if(jpg_encountered):
    return

  vid = av.open(source_path + vid)

  for frame in vid.decode():
    image = frame.to_image()
    image.save(f'./{dir_name}/vid_{count}_fr_{frame_count}.jpg')
    frame_count += 1
  frame_count = 0

In [None]:
#extracting frames from Celeb-real face-cropped data
vid_count = 1;
source_path = DS_FACE + CELEB_REAL
for video in os.listdir(source_path):
  print(video, vid_count)
  dataset_extract_frames(source_path, dir_real, video, vid_count)
  vid_count += 1

In [None]:
#extracting frames from YouTube-real face-cropped data
source_path = DS_FACE + YT_REAL
for video in os.listdir(source_path):
  print(video, vid_count)
  dataset_extract_frames(source_path, dir_real, video, vid_count)
  vid_count += 1

In [None]:
#extracting frames from Celeb-synthesis face-cropped data
vid_count = 1
source_path = DS_FACE + CELEB_FAKE
for video in os.listdir(source_path):
  print(video, vid_count)
  dataset_extract_frames(source_path, dir_fake, video, vid_count)
  vid_count += 1

In [None]:
#extracting test data

def extract_test_data(source_path):
  frame_list = []
  for frame in os.listdir(source_path):
    frame_path = os.path.join(source_path, frame)
    frame_list.append(frame_path)

  size = int(20/100 * len(frame_list))
  sampled_list = random.sample(frame_list, size)
  return sampled_list

In [None]:
#extracting test data from real dataset

source_path = dir_train + dir_real
sampled_list = extract_test_data(source_path)
for frame_path in sampled_list:
  shutil.copy(frame_path, os.path.join(dir_test, os.path.basename(frame_path)))
  os.remove(frame_path)

In [None]:
#extracting test data from fake dataset

source_path = dir_train + dir_fake
sampled_list = extract_test_data(source_path)
for frame_path in sampled_list:
  shutil.copy(frame_path, os.path.join(dir_test, os.path.basename(frame_path)))
  os.remove(frame_path)

In [None]:
#creating dataset from folders
def create_dataset(dir_path):
  ds = keras.utils.image_dataset_from_directory(
      directory = dir_path,
      labels = 'inferred',
      label_mode = 'binary',
      batch_size = 32,
      color_mode = 'rgb',
      shuffle = True,
      validation_split = 0.2,
      subset = 'validation',
      seed = 1
  )
  return ds

In [None]:
train_ds = create_dataset(dir_train)
for data, labels in train_ds.take(1):
  print(data.shape)

Found 29535 files belonging to 2 classes.
Using 5907 files for validation.


KeyboardInterrupt: ignored