# DeepFake Detection

## Configuration


In [2]:
!pip install tqdm

Collecting tqdm
  Downloading tqdm-4.66.5-py3-none-any.whl.metadata (57 kB)
Downloading tqdm-4.66.5-py3-none-any.whl (78 kB)
Installing collected packages: tqdm
Successfully installed tqdm-4.66.5


In [None]:
!pip install numpy ultralytics matplotlib opencv-python tensorflow scikit-learn seaborn keras-tuner

In [None]:
import os
import numpy as np
from ultralytics import YOLO
import logging
import matplotlib.pyplot as plt
import cv2
import tensorflow as tf
from tensorflow.keras.utils import image_dataset_from_directory
from tensorflow.keras.layers import RandomRotation, RandomFlip, RandomZoom, GaussianNoise , RandomContrast, RandomBrightness, Resizing, Rescaling, Conv2D, BatchNormalization, MaxPool2D, SeparableConv2D, ReLU, Add, GlobalAveragePooling2D, Dense, Dropout
from tensorflow.keras.layers import Layer
from tensorflow.keras import Model, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.metrics import BinaryAccuracy, Recall, AUC
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.train import BytesList, Int64List
from tensorflow.train import Example, Features, Feature
from sklearn.metrics import confusion_matrix, roc_curve
import seaborn as sns
import kerastuner as kt

In [None]:
CONFIGURATION = {
    "BATCH_SIZE" : 32,
    "IMAGE_SIZE" : 299,
    "BETA_1": 0.9,
    "BETA_2": 0.999,
    "EPSILON": 1e-08,
    "LEARNING_RATE": 0.0002,
    "EPOCHS": 10,
    "SEED": 42,
    "CLASS_NAMES": ["original", "manipulated"],
    "STEPS_PER_EPOCH": 29197,
    "VALIDATION_STEPS": 3649
}

original_sequences_directory = "videos/original_sequences/youtube/c23/videos"
deepFakes_directory = "videos/manipulated_sequences/Deepfakes/c23/videos"
face2Face_directory = "videos/manipulated_sequences/Face2Face/c23/videos"
faceSwap_directory = "videos/manipulated_sequences/FaceSwap/c23/videos"
neuralTexture_directory = "videos/manipulated_sequences/NeuralTextures/c23/videos"
output_original_frame_directory = "frames/original"
output_face2face_directory = "frames/manipulated/face2face"
output_deepfakes_directory = "frames/manipulated/deepfakes"
output_faceswap_directory = "frames/manipulated/faceswap"
output_neuraltexture_directory = "frames/manipulated/neuraltexture"

logging.getLogger('ultralytics').setLevel(logging.ERROR)

## Download del dataset FaceForensics++

In [None]:
%run "download-FaceForensics.py" videos/original --server EU2 -t videos -c c23 -d original

In [None]:
%run "download-FaceForensics.py" videos/original --server EU2 -t videos -c c40 -d original

In [None]:
%run "download-FaceForensics.py" videos/Face2Face --server EU2 -t videos -c c23 -d Face2Face

In [None]:
%run "download-FaceForensics.py" videos/Face2Face --server EU2 -t videos -c c40 -d Face2Face

In [None]:
%run "download-FaceForensics.py" videos/DeepFake  --server EU2 -t videos -c c23 -d Deepfakes

In [None]:
%run "download-FaceForensics.py" videos/DeepFake  --server EU2 -t videos -c c40 -d Deepfakes

In [None]:
%run "download-FaceForensics.py" videos/FaceSwap --server EU2 -t videos -c c23 -d FaceSwap

In [None]:
%run "download-FaceForensics.py" videos/FaceSwap --server EU2 -t videos -c c40 -d FaceSwap

In [None]:
%run "download-FaceForensics.py" videos/NeuralTexture  --server EU2 -t videos -c c23 -d NeuralTextures

In [None]:
%run "download-FaceForensics.py" videos/NeuralTexture  --server EU2 -t videos -c c40 -d NeuralTextures

In [None]:
import os
import shutil

base_path = "downloaded_videos"

original_path = os.path.join(base_path, "original_sequences")
manipulated_path = os.path.join(base_path, "manipulated_sequences")

dest_dir = "datasets"
subdirs = ["training_set", "validation_set", "test_set"]
categories = ["original", "manipulated"]

for subdir in subdirs:
    for category in categories:
        os.makedirs(os.path.join(dest_dir, subdir, category), exist_ok=True)

def copy_videos(src_path, dest_base, count):
    """Copia i video dalla sorgente alla destinazione basandosi su intervalli di conteggio."""
    videos = sorted([f for f in os.listdir(src_path) if f.endswith(".mp4")])  # Filtra i file video
    splits = {"training_set": videos[:800], 
              "validation_set": videos[800:900], 
              "test_set": videos[900:]}
    
    for split, video_list in splits.items():
        dest_path = os.path.join(dest_base, split)
        for video in video_list:
            shutil.copy(os.path.join(src_path, video), dest_path)

original_videos_path = os.path.join(original_path, "c23/videos")
for subdir in subdirs:
    copy_videos(original_videos_path, os.path.join(dest_dir, subdir, "original"), 1000)

manipulated_dirs = [d for d in os.listdir(manipulated_path) if os.path.isdir(os.path.join(manipulated_path, d))]
for manipulation in manipulated_dirs:
    manipulated_videos_path = os.path.join(manipulated_path, manipulation, "c23/videos")
    for subdir in subdirs:
        copy_videos(manipulated_videos_path, os.path.join(dest_dir, subdir, "manipulated"), 1000)



## Extracting frames from video

In [None]:
class Track_Face():
  def __init__(self):
    self.model = YOLO('yolov8n.pt', verbose=False)
    self.total_frames_extracted = 0

  def track(self, frame):
    self.img_height, self.img_width, _ = frame.shape
    results = self.model(frame)
    face_box = self._get_face(results)
    cropped_frame = self._crop_face(frame, face_box)
    return cropped_frame

  def _get_face(self, results):
    main_face = None
    for res in results[0].boxes:
      if res.cls == 0: 
        face_box = res.xyxy[0].cpu().numpy()
        main_face = self._get_main_face(main_face, face_box)
    if main_face is None:
      return [0, 0, self.img_width, self.img_height]  
    return main_face 

  def _get_main_face(self, main_face, face_box):
    if main_face is None:
      return face_box
    if self._get_area(face_box) > self._get_area(main_face):
      return face_box
    return main_face

  def _get_area(self, box):
    x1, y1, x2, y2 = box
    area = (x2 - x1) * (y2 - y1)
    return area

  def _crop_face(self, frame, face_box):
    extended_box = self._extend_box(face_box)
    x1, y1, x2, y2 = extended_box
    cropped_face = frame[y1:y2, x1:x2]
    return cropped_face

  def _extend_box(self, box, factor=1.3):
    x1, y1, x2, y2 = box
    box_width, box_height = x2 - x1, y2 - y1
    center_x, center_y = self._calculate_center(box)
    extended_box_width, extended_box_height = self._calculate_new_dimensions(box_width, box_height, factor)
    return self._calculate_new_coordinates(center_x, center_y, extended_box_width, extended_box_height )

  def _calculate_center(self, box):
    x1, y1, x2, y2 = box
    center_x = x1 + (x2 - x1) / 2
    center_y = y1 + (y2 - y1) / 2
    return center_x, center_y

  def _calculate_new_dimensions(self, width, height, factor):
    extended_box_width = width * factor
    extended_box_height = height * factor
    return extended_box_width, extended_box_height

  def _calculate_new_coordinates(self, center_x, center_y, extended_box_width, extended_box_height):
    extended_x1 = max(0, int(center_x - extended_box_width / 2))
    extended_y1 = max(0, int(center_y - extended_box_height / 2))
    extended_x2 = min(self.img_width, int(center_x + extended_box_width / 2))
    extended_y2 = min(self.img_height, int(center_y + extended_box_height / 2))
    return extended_x1, extended_y1, extended_x2, extended_y2

In [None]:
class Extract_frames:
    def __init__(self, input_directory, output_directory, frames_name, save_frame_interval=4, offset=0, quality="c23"):
        self.input_directory = input_directory
        self.output_directory = output_directory
        self.save_frame_interval = save_frame_interval
        self.total_frames_extracted = 0
        self.frames_name = frames_name
        self.offset = offset
        self.quality = quality
        self.track_face = Track_Face()

    def extract(self):
        video_number = 0
        self._create_output_folder()
        for filename in os.listdir(self.input_directory):
            video_path = os.path.join(self.input_directory, filename)
            self._capture_frames(video_path, video_number)
            video_number += 1
        print(f"Estrazione completata, {self.total_frames_extracted} frames estratti.")
        return

    def _create_output_folder(self):
        if not os.path.exists(self.output_directory):
            os.makedirs(self.output_directory)
        return

    def _capture_frames(self, video_path, video_number):
        capture = cv2.VideoCapture(video_path)
        if not capture.isOpened():
            print("Errore: impossibile aprire il file video")
            return
        self._save_frame(capture, video_number)
        capture.release()
        return

    def _save_frame(self, capture, video_number):
        frame_number = 0
        extracted_frame = 0
        while True:
            is_a_frame, frame = capture.read()
            if not is_a_frame:
                break
            if (frame_number - self.offset) % self.save_frame_interval == 0:
                cropped_frame = self.track_face.track(frame)
                resized_frame = cv2.resize(cropped_frame, (299, 299))
                frame_filename = os.path.join(self.output_directory, f"{self.frames_name}_{video_number}_{extracted_frame}_{self.quality}.jpg")
                cv2.imwrite(frame_filename, resized_frame)
                extracted_frame += 1
            frame_number += 1
        print(f"Estrazione completata per il video numero {video_number}. {extracted_frame} frame estratti e salvati in '{self.output_directory}'.")
        self.total_frames_extracted += extracted_frame
        return

### Extraction of frames from original videos

In [None]:
extract_frames = Extract_frames("videos/training_set/original/c23", "dataset/training_set/original", "original")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/training_set/original/c40", "dataset/training_set/original", "original", offset=2, quality="c40")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/validation_set/original/c23", "dataset/validation_set/original", "original")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/validation_set/original/c40", "dataset/validation_set/original", "original", offset=2, quality="c40")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/test_set/original/c23", "dataset/test_set/original", "original")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/test_set/original/c40", "dataset/test_set/original", "original", offset=2, quality="c40")
extract_frames.extract()

### Extraction of frames from manipulated videos

#### Extraction of frames from videos manipulated with deepfakes

In [None]:
extract_frames = Extract_frames("videos/training_set/manipulated/c23", "dataset/training_set/manipulated/deepfake", "deepfake")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/training_set/manipulated/c40", "dataset/training_set/manipulated/deepfake", "deepfake", offset=2, quality="c40")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/validation_set/manipulated/c23", "dataset/validation_set/manipulated/deepfake", "deepfake")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/validation_set/manipulated/c40", "dataset/validation_set/manipulated/deepfake", "deepfake")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/test_set/manipulated/c23", "dataset/test_set/manipulated/deepfake", "deepfake")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/test_set/manipulated/c40", "dataset/test_set/manipulated/deepfake", "deepfake")
extract_frames.extract()

#### Extraction of frames from videos manipulated with NeuralTexture

In [None]:
extract_frames = Extract_frames("videos/training_set/manipulated/c23", "dataset/training_set/manipulated/neuraltexture", "neuraltexture")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/training_set/manipulated/c40", "dataset/training_set/manipulated/neuraltexture", "neuraltexture", offset=2, quality="c40")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/validation_set/manipulated/c23", "dataset/validation_set/manipulated/neuraltexture", "neuraltexture")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/validation_set/manipulated/c40", "dataset/validation_set/manipulated/neuraltexture", "neuraltexture")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/test_set/manipulated/c23", "dataset/test_set/manipulated/neuraltexture", "neuraltexture")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/test_set/manipulated/c40", "dataset/test_set/manipulated/neuraltexture", "neuraltexture")
extract_frames.extract()

#### Extraction of frames from videos manipulated with Face2Face

In [None]:
extract_frames = Extract_frames("videos/training_set/manipulated/c23", "dataset/training_set/manipulated/face2face", "face2face")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/training_set/manipulated/c40", "dataset/training_set/manipulated/face2face", "face2face", offset=2, quality="c40")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/validation_set/manipulated/c23", "dataset/validation_set/manipulated/face2face", "face2face")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/validation_set/manipulated/c40", "dataset/validation_set/manipulated/face2face", "face2face")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/test_set/manipulated/c23", "dataset/test_set/manipulated/face2face", "face2face")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/test_set/manipulated/c40", "dataset/test_set/manipulated/face2face", "face2face")
extract_frames.extract()

#### Extraction of frames from videos manipulated with FaceSwap

In [None]:
extract_frames = Extract_frames("videos/training_set/manipulated/c23", "dataset/training_set/manipulated/faceswap", "faceswap")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/training_set/manipulated/c40", "dataset/training_set/manipulated/faceswap", "faceswap", offset=2, quality="c40")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/validation_set/manipulated/c23", "dataset/validation_set/manipulated/faceswap", "faceswap")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/validation_set/manipulated/c40", "dataset/validation_set/manipulated/faceswap", "faceswap")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/test_set/manipulated/c23", "dataset/test_set/manipulated/faceswap", "faceswap")
extract_frames.extract()

In [None]:
extract_frames = Extract_frames("videos/test_set/manipulated/c40", "dataset/test_set/manipulated/faceswap", "faceswap")
extract_frames.extract()

## Dataset Creation

In [4]:
train_dataset = image_dataset_from_directory(
    "dataset/training_set",
    labels="inferred",
    label_mode = "binary",
    class_names=CONFIGURATION["CLASS_NAMES"],
    image_size=(CONFIGURATION["IMAGE_SIZE"], CONFIGURATION["IMAGE_SIZE"]),
    shuffle=True,
    seed=CONFIGURATION["SEED"],
    batch_size=CONFIGURATION["BATCH_SIZE"]
)

Found 944585 files belonging to 2 classes.


In [None]:
validation_dataset = image_dataset_from_directory(
    "dataset/validation_set",
    labels="inferred",
    label_mode = "binary",
    class_names=CONFIGURATION["CLASS_NAMES"],
    image_size=(CONFIGURATION["IMAGE_SIZE"], CONFIGURATION["IMAGE_SIZE"]),
    shuffle=True,
    seed=CONFIGURATION["SEED"],
    batch_size=CONFIGURATION["BATCH_SIZE"]
)

Found 112837 files belonging to 2 classes.


In [4]:
test_dataset = image_dataset_from_directory(
    "dataset/test_set",
    labels="inferred",
    label_mode = "binary",
    class_names=CONFIGURATION["CLASS_NAMES"],
    image_size=(CONFIGURATION["IMAGE_SIZE"], CONFIGURATION["IMAGE_SIZE"]),
    shuffle=True,
    seed=CONFIGURATION["SEED"],
    batch_size=CONFIGURATION["BATCH_SIZE"]
)

Found 110462 files belonging to 2 classes.


## Data Augmentation

In [7]:
augmentation_layer  = tf.keras.Sequential([
  RandomFlip("horizontal"),
  RandomRotation(0.005),
  RandomZoom(0.1),
  RandomContrast(0.2),
  RandomBrightness(0.2)
  ])

## Dataset Preparation

In [6]:
resize_rescale_layers = tf.keras.Sequential([
       Rescaling(1./255),
])

In [8]:
def preprocess_train(image, label):
    image = augmentation_layer(image)
    image = resize_rescale_layers(image)
    return image, label

In [9]:
def preprocess_validation_test(image, label):
    image = resize_rescale_layers(image)
    return image, label

In [9]:
training_dataset = (
    train_dataset
    .map(preprocess_train, num_parallel_calls = tf.data.AUTOTUNE)    
    .prefetch(tf.data.AUTOTUNE)
)

In [10]:
validation_dataset = (
    validation_dataset.
    map(preprocess_validation_test, num_parallel_calls = tf.data.AUTOTUNE)
    .prefetch(tf.data.AUTOTUNE)
)

In [11]:
test_dataset = (
    test_dataset
    .map(preprocess_validation_test, num_parallel_calls = tf.data.AUTOTUNE)
    .prefetch(tf.data.AUTOTUNE)
)

### Dataset Visualization

In [None]:
plt.figure(figsize = (12,12))

for images, labels in validation_dataset.take(1):
  for i in range(16):
    tensor = labels[i].numpy()
    ax = plt.subplot(4,4, i+1)
    plt.imshow(images[i])
    plt.title(CONFIGURATION["CLASS_NAMES"][int(tensor.item())])
    plt.axis("off")

## TFRecords

### Convert TFDataset in TFRecord

In [10]:
train_dataset = (
    training_dataset.
    unbatch()
)

In [12]:
validation_dataset = (
    validation_dataset.
    unbatch()
)

In [13]:
test_dataset = (
    test_dataset.
    unbatch()
)

In [16]:
train_dataset

<_UnbatchDataset element_spec=(TensorSpec(shape=(299, 299, 3), dtype=tf.float32, name=None), TensorSpec(shape=(1,), dtype=tf.float32, name=None))>

In [14]:
validation_dataset

<_UnbatchDataset element_spec=(TensorSpec(shape=(299, 299, 3), dtype=tf.float32, name=None), TensorSpec(shape=(1,), dtype=tf.float32, name=None))>

In [15]:
test_dataset

<_UnbatchDataset element_spec=(TensorSpec(shape=(299, 299, 3), dtype=tf.float32, name=None), TensorSpec(shape=(1,), dtype=tf.float32, name=None))>

In [16]:
def create_example(image, label):
    label = tf.cast(label.item(), tf.int64)
    
    bytes_feature = Feature(
        bytes_list=BytesList(value=[image])
    )

    int_feature = Feature(
        int64_list=Int64List(value=[label])
    )

    example = Example(
        features=Features(feature={
            'images': bytes_feature,
            'labels': int_feature,
        })
    )

    return example.SerializeToString()

In [17]:
def encode_image(image, label):
    image = tf.image.convert_image_dtype(image, tf.uint8)
    image = tf.io.encode_jpeg(image)
    return image,label

In [18]:
def write_tfrecord(dataset, num_shards, path):
  encoded_dataset = (
    dataset.
    map(encode_image)
  )

  for shard_number in range(num_shards):
    sharded_dataset = (
        encoded_dataset
        .shard(num_shards, shard_number)
        .as_numpy_iterator()
    )
    with tf.io.TFRecordWriter(path.format(shard_number)) as file_writer:
      for encoded_image, encoded_label in sharded_dataset:
        example = create_example(encoded_image, encoded_label)
        file_writer.write(example)

In [5]:
NUM_TRAIN_SHARDS = 60
NUM_VALIDATION_SHARDS = 15
NUM_TEST_SHARDS = 15
train_dataset_path = os.path.join(os.getcwd(), 'tfrecords', 'train_dataset', 'shard_{:01d}.tfrecord')
validation_dataset_path = os.path.join(os.getcwd(), 'tfrecords', 'validation_dataset', 'shard_{:01d}.tfrecord')
test_dataset_path = os.path.join(os.getcwd(), 'tfrecords', 'test_dataset', 'shard_{:01d}.tfrecord')

In [15]:
write_tfrecord(train_dataset, NUM_TRAIN_SHARDS, train_dataset_path)

In [20]:
write_tfrecord(validation_dataset, NUM_VALIDATION_SHARDS, validation_dataset_path)

In [21]:
write_tfrecord(test_dataset, NUM_TEST_SHARDS, test_dataset_path)

### Reconvert TFRecord in TFDataset 

In [6]:
reconstructed_train_dataset = tf.data.TFRecordDataset(
    filenames =[train_dataset_path.format(p) for p in range(NUM_TRAIN_SHARDS)] )

In [7]:
print([train_dataset_path.format(p) for p in range(NUM_TRAIN_SHARDS)])

['c:\\Users\\nesea\\Desktop\\DeepFake Detection\\tfrecords\\train_dataset\\shard_0.tfrecord', 'c:\\Users\\nesea\\Desktop\\DeepFake Detection\\tfrecords\\train_dataset\\shard_1.tfrecord', 'c:\\Users\\nesea\\Desktop\\DeepFake Detection\\tfrecords\\train_dataset\\shard_2.tfrecord', 'c:\\Users\\nesea\\Desktop\\DeepFake Detection\\tfrecords\\train_dataset\\shard_3.tfrecord', 'c:\\Users\\nesea\\Desktop\\DeepFake Detection\\tfrecords\\train_dataset\\shard_4.tfrecord', 'c:\\Users\\nesea\\Desktop\\DeepFake Detection\\tfrecords\\train_dataset\\shard_5.tfrecord', 'c:\\Users\\nesea\\Desktop\\DeepFake Detection\\tfrecords\\train_dataset\\shard_6.tfrecord', 'c:\\Users\\nesea\\Desktop\\DeepFake Detection\\tfrecords\\train_dataset\\shard_7.tfrecord', 'c:\\Users\\nesea\\Desktop\\DeepFake Detection\\tfrecords\\train_dataset\\shard_8.tfrecord', 'c:\\Users\\nesea\\Desktop\\DeepFake Detection\\tfrecords\\train_dataset\\shard_9.tfrecord', 'c:\\Users\\nesea\\Desktop\\DeepFake Detection\\tfrecords\\train_data

In [7]:
reconstructed_validation_dataset = tf.data.TFRecordDataset(
    filenames =[validation_dataset_path.format(p) for p in range(NUM_VALIDATION_SHARDS)] )

In [8]:
def parse_tfrecords(example):

    feature_description = {
          "images": tf.io.FixedLenFeature([], tf.string),
          "labels": tf.io.FixedLenFeature([], tf.int64),
      }

    example = tf.io.parse_single_example(example, feature_description)
    example["images"] = tf.image.convert_image_dtype(
        tf.io.decode_jpeg(
        example["images"], channels = 3), dtype = tf.float32)

    return example["images"], example["labels"]


In [9]:
training_dataset = (
    reconstructed_train_dataset
    .map(parse_tfrecords)
    .batch(CONFIGURATION["BATCH_SIZE"])
    .prefetch(tf.data.AUTOTUNE)
)

In [10]:
validation_dataset = (
    reconstructed_validation_dataset
    .map(parse_tfrecords)
    .batch(CONFIGURATION["BATCH_SIZE"])
    .prefetch(tf.data.AUTOTUNE)
)

## Xception Net

### Convolution

In [12]:
class Convolution(Layer):
  def __init__(self, n_filters, kernel_size, n_strides=1):
    super(Convolution, self).__init__(name = "convolution")
    self.convolution = Conv2D(filters = n_filters, kernel_size = kernel_size, strides = n_strides, padding = "same",  activation = "relu")
    self.batch_normalization = BatchNormalization()

  def call(self, x, training = True):
    x = self.convolution(x)
    x = self.batch_normalization(x, training = training)
    return x


### Separable Convolution

In [13]:
class SeparableConvolution(Layer):
  def __init__(self, n_filters, kernel_size, n_strides=1):
    super(SeparableConvolution, self).__init__(name="separable_convolution")
    self.separable_convolution = SeparableConv2D(filters=n_filters, kernel_size=kernel_size, strides=n_strides, padding="same", activation=None)
    self.batch_normalization = BatchNormalization()
    self.dropout = Dropout(0.2)

  def call(self, x, training=True):
    x = self.separable_convolution(x)
    x = self.batch_normalization(x, training=training)
    if training:
      x = self.dropout(x, training=training)
    return x

### Sum Convolution

In [14]:
class SumConvolution(Layer):
  def __init__(self, n_filters, kernel_size, n_strides=1):
    super(SumConvolution, self).__init__(name = "sum_convolution")
    self.sum_convolution = Conv2D(filters = n_filters, kernel_size = kernel_size, strides = n_strides, padding = "same",  activation = None)
    self.batch_normalization = BatchNormalization()

  def call(self, x, training = True):
    x = self.sum_convolution(x)
    x = self.batch_normalization(x, training = training)
    return x

### Entry Flow

In [15]:
class EntryFlow(Layer):
    def __init__(self, filters):
        super(EntryFlow, self).__init__(name="entry_flow")
        self.convolution1 = Convolution(filters[0], 3, 2)
        self.convolution2 = Convolution(filters[1], 3) 

        self.sum_convolution1 = Convolution(filters[2], 1, 2)

        self.separable_convolution1 = SeparableConvolution(filters[2], 3)
        self.activation = ReLU()
        self.separable_convolution2 = SeparableConvolution(filters[2], 3)
        self.max_pooling = MaxPool2D(pool_size=(3, 3), strides=2, padding="same")

        self.sum_convolution2 = Convolution(filters[3], 1, 2) 

        self.separable_convolution3 = SeparableConvolution(filters[3], 3)
        self.separable_convolution4 = SeparableConvolution(filters[3], 3)
                                                                
        self.sum_convolution3 = Convolution(filters[4], 1, 2)

        self.separable_convolution5 = SeparableConvolution(filters[4], 3)
        self.separable_convolution6 = SeparableConvolution(filters[4], 3)

    def call(self, x, training=True):
        x = self.convolution1(x)        
        x = self.convolution2(x)
        tensor = self.sum_convolution1(x)

        x = self.separable_convolution1(x)
        x = self.activation(x)
        x = self.separable_convolution2(x)
        x = self.max_pooling(x)

        x = Add()([tensor, x])
        tensor = self.sum_convolution2(x)

        x = self.activation(x)
        x = self.separable_convolution3(x)
        x = self.activation(x)
        x = self.separable_convolution4(x)
        x = self.max_pooling(x)

        x = Add()([tensor, x])
        tensor = self.sum_convolution3(x)

        x = self.activation(x)
        x = self.separable_convolution5(x)
        x = self.activation(x)
        x = self.separable_convolution6(x)
        x = self.max_pooling(x)

        x = Add()([tensor, x])

        return x

### Middle Flow

In [16]:
class MiddleFlow(Layer):
    def __init__(self, filters):
        super(MiddleFlow, self).__init__(name="middle_flow")
        self.activation = ReLU()
        self.separable_convolution1 = SeparableConvolution(filters[4], 3)
        self.separable_convolution2 = SeparableConvolution(filters[4], 3)
        self.separable_convolution3 = SeparableConvolution(filters[4], 3)

    def call(self, x, tensor, training=True):
        for i in range(8):
            x = self.activation(x)
            x = self.separable_convolution1(x, training = training)
            x = self.activation(x)
            x = self.separable_convolution2(x, training = training)
            x = self.activation(x)
            x = self.separable_convolution3(x, training = training)
            x = Add()([tensor, x])
        return x

### Exit Flow

In [17]:
class ExitFlow(Layer):
  def __init__(self, filters):
    super(ExitFlow, self).__init__(name = "exit_flow")
    self.sum_convolution = SumConvolution(filters[5], 1, 2)

    self.activation = ReLU()
    self.separable_convolution1 = SeparableConvolution(filters[4], 3)
    self.separable_convolution2 = SeparableConvolution(filters[5], 3)
    self.max_pooling = MaxPool2D(pool_size = (3,3), strides = 2, padding = "same")

    self.separable_convolution3 = SeparableConvolution(filters[6], 3)
    self.separable_convolution4 = SeparableConvolution(filters[7], 3)
    self.global_average_pooling = GlobalAveragePooling2D()

    #Optionally one may insert fully-connected layers beffore the logistic regression layer, which is explored in the experimental evaluation section (in particular, see figures 7 and 8)

  def call(self, x, tensor, training = True):
    tensor = self.sum_convolution(tensor)

    x = self.activation(x)
    x = self.separable_convolution1(x)
    x = self.activation(x)
    x = self.separable_convolution2(x)
    x = self.max_pooling(x)

    x = Add()([tensor, x])

    x = self.separable_convolution3(x)
    x = self.activation(x)
    x = self.separable_convolution4(x)
    x = self.activation(x)
    x = self.global_average_pooling(x)

    return x



### Xception

In [70]:
from tensorflow.keras.utils import register_keras_serializable

@register_keras_serializable()
class XceptionNet(Model):
    def __init__(self, filters, trainable=True, name="xception_net", **kwargs):
        super(XceptionNet, self).__init__(name=name, **kwargs)
        self.entry_flow = EntryFlow(filters)
        self.middle_flow = MiddleFlow(filters)
        self.exit_flow = ExitFlow(filters)
        self.dense = Dense(1, activation="sigmoid")
        self.trainable = trainable
        self.filters = filters  

    def call(self, x, training=True):
        x = self.entry_flow(x, training=training)
        tensor = x
        x = self.middle_flow(x, tensor, training=training)
        tensor = x
        x = self.exit_flow(x, tensor, training=training)
        x = self.dense(x)
        return x

    def summary(self):
        x = Input(shape=(299, 299, 3))
        model = Model(inputs=[x], outputs=self.call(x))
        return model.summary()

    def get_config(self):
        config = super(XceptionNet, self).get_config()
        config.update({
            "filters": self.filters,  
            "trainable": self.trainable
        })
        return config

    @classmethod
    def from_config(cls, config):
        filters = config.pop('filters') 
        return cls(filters=filters, **config)



In [71]:
model = XceptionNet([32,64,128,256,364,512,1024,2048])

In [None]:
model.summary()

## Train

### Adam Optimizer

In [65]:
adam_optimizer = Adam(
    learning_rate = CONFIGURATION["LEARNING_RATE"],
    beta_1 = CONFIGURATION["BETA_1"],
    beta_2 = CONFIGURATION["BETA_2"],
    epsilon = CONFIGURATION["EPSILON"],
    )

### Early Stopping

In [64]:
es_callback = EarlyStopping(
    monitor = "val_loss",
    mode = "auto",
    patience = 3,
    restore_best_weights = True,
    baseline = None
)

### Tensorboard Callback

In [62]:
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir="logs")

### Metrics

In [66]:
loss_fn = BinaryCrossentropy()
optimizer = adam_optimizer

In [59]:
metrics = [BinaryAccuracy(name = "accuracy"),
           Recall(name = "recall"),
           AUC(name = "auc")
           ]

In [67]:
model.compile(loss = loss_fn, optimizer = optimizer, metrics = metrics)

### Class Weighting

In [None]:
total_samples = 1167889
original_samples = 254812
manipulated_samples = 913077

In [None]:
class_weight = {
    0: total_samples / original_samples,
    1: total_samples /  manipulated_samples
}

### Fit Model

In [None]:
history = model.fit (
    training_dataset,
    validation_data = validation_dataset,
    epochs = 10,
    steps_per_epoch = 29190,
    validation_steps = 3640,
    callbacks = [es_callback, tensorboard_callback],
    class_weight = class_weight
)


## Evaluation

In [209]:
def predict(video_url):
  capture = cv2.VideoCapture(video_url)
  if not capture.isOpened():
    print("Errore: impossibile aprire il file video")
    return
  tensors = create_frame_tensor(capture)
  capture.release()
  predictions = []
  for tensor in tensors:
    frame_prediction = model.predict(tensor)
    print(frame_prediction)
    if frame_prediction < 0.5:
      frame_prediction = "original"
    else:
      frame_prediction = "manipulated"
    predictions.append(frame_prediction)
  result = max(set(predictions), key=predictions.count)
  return result

def create_frame_tensor(capture):
  track_face = TrackFace()
  frame_count = 0
  frame_tensors = []
  while True:
    is_a_frame, frame = capture.read()
    if not is_a_frame:
      break
    cropped_frame = track_face.track(frame)
    tensor = tf.convert_to_tensor(cropped_frame, dtype=tf.float32)
    resize_tensor = tf.keras.Sequential([
       Resizing(CONFIGURATION["IMAGE_SIZE"], CONFIGURATION["IMAGE_SIZE"]),
       Rescaling(1./255),
    ])
    tensor = tf.expand_dims(tensor, axis=0)
    frame_tensors.append(resize_tensor(tensor))
  return frame_tensors
