In [1]:
# For Google Colab use
try:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=True)
    %cd '/content/drive/MyDrive/Colab Notebooks/MLP-DeepfakeDetection-VariationalAutoencoder'    
except ModuleNotFoundError:
    pass

Mounted at /content/drive
/content/drive/MyDrive/Colab Notebooks/MLP-DeepfakeDetection-VariationalAutoencoder


In [2]:
# Imports
from __future__ import division

import os
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
from numpy.random import seed

import tensorflow as tf

import keras
from keras import preprocessing
from keras.preprocessing.image import ImageDataGenerator
from keras import layers, Model
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.optimizers import *
from keras.applications import *
from keras import metrics
from keras.losses import BinaryCrossentropy
from keras import backend as K

# !pip install -U keras-tuner
# from kerastuner.tuners import RandomSearch, Hyperband
# from kerastuner.engine.hypermodel import HyperModel
# from kerastuner.engine.hyperparameters import HyperParameters
# from kerastuner import Objective

import Models.OCFakeDectVAE as OriginalOCFakeDectVAE

In [3]:
# Check GPU available
%tensorflow_version 2.x
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

Found GPU at: /device:GPU:0
Num GPUs Available:  1


In [4]:
# General model settings
IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS = 100, 100, 3
EPOCHS = 10
DATA_GENERATOR_SEED = 1337
BATCH_SIZE = 192
VALIDATION_SPLIT = 0.2
tf.random.set_seed(DATA_GENERATOR_SEED)
seed(DATA_GENERATOR_SEED)

# Pick dataset; DF_TYPE={'rnd', 'avg'}
DF_TYPE = 'avg'

In [5]:
# We are only using one class (OC), reals... But we test on reals and fakes
TRAIN_VAL_DIR = f'./Celeb-DF-v2/Celeb-{DF_TYPE}-OC' 

TRAIN_DATAGEN = ImageDataGenerator(rescale = 1.0/255.0, horizontal_flip = True, fill_mode='nearest', validation_split = VALIDATION_SPLIT)
TRAIN_GENERATOR = TRAIN_DATAGEN.flow_from_directory(directory = TRAIN_VAL_DIR,
                                                    batch_size = BATCH_SIZE,
                                                    class_mode = 'input', 
                                                    target_size = (IMG_HEIGHT, IMG_WIDTH),
                                                    subset = 'training',
                                                    seed = DATA_GENERATOR_SEED,
                                                    follow_links = True)

VAL_DATAGEN = ImageDataGenerator(rescale = 1.0/255.0, validation_split = VALIDATION_SPLIT)
VALIDATION_GENERATOR = TRAIN_DATAGEN.flow_from_directory(directory = TRAIN_VAL_DIR,
                                                         batch_size = BATCH_SIZE,
                                                         class_mode = 'input', 
                                                         target_size = (IMG_HEIGHT, IMG_WIDTH),
                                                         subset = 'validation',
                                                         seed = DATA_GENERATOR_SEED)

Found 4502 images belonging to 1 classes.
Found 1125 images belonging to 1 classes.


In [6]:
# Define callbacks e.g. Early Stopping

EARLY_STOP = EarlyStopping(monitor='reconstruction_loss',
                           patience=1,
                           mode='min',
                           verbose=1,
                           restore_best_weights=True)

# Define Model OCFakeDect1
vae = OriginalOCFakeDectVAE.OCFakeDect1()
vae.compile(optimizer=Adam())

In [None]:
# Trains for full epochs, also very slow (30 mins per epoch on OC)
STEPS = TRAIN_GENERATOR.n//BATCH_SIZE+1
for _ in tqdm(range(EPOCHS)):
    for _ in tqdm(range(STEPS)):
        vae.fit(np.concatenate([TRAIN_GENERATOR.next()[0], VALIDATION_GENERATOR.next()[0]], axis=0),
                epochs=STEPS,
                batch_size=2*BATCH_SIZE,
                verbose=1,
                callbacks=[EARLY_STOP])

  0%|          | 0/10 [00:00<?, ?it/s]
  0%|          | 0/563 [00:00<?, ?it/s][A

Epoch 1/563


In [None]:
# Plot 5 images from test set
# for (x_train, _) in TRAIN_GENERATOR.next()[:5]:
#     X = np.expand_dims(x_train, axis=0)
#     Z_mean, Z_log_var, Z = vae.encoder.predict(X)
#     X_prime = vae.decoder.predict(Z)
#     face = np.array(X_prime.reshape(IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS)*255, dtype=np.uint8)
#     fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(5, 5))
#     axes[0].imshow(X.squeeze())
#     axes[1].imshow(face)
#     fig.tight_layout()

for (x_test, _) in VALIDATION_GENERATOR.next()[:5]:
    X = np.expand_dims(x_test, axis=0)
    Z_mean, Z_log_var, Z = vae.encoder.predict(X)
    X_prime = vae.decoder.predict(Z)
    face = np.array(X_prime.reshape(IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS)*255, dtype=np.uint8)
    fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(5, 5))
    axes[0].imshow(X.squeeze())
    axes[1].imshow(face)
    fig.tight_layout()

In [None]:
# Re-define test generators for training the nerual net (since we are considering both classes now)
TRAIN_VAL_DIR = f'./Celeb-DF-v2/Celeb-{DF_TYPE}-30' 
TEST_DIR = f'./Celeb-DF-v2/Celeb-{DF_TYPE}-30-test' 

TRAIN_DATAGEN = ImageDataGenerator(rescale = 1.0/255.0, horizontal_flip = True, fill_mode='nearest', validation_split = VALIDATION_SPLIT)
TRAIN_GENERATOR = TRAIN_DATAGEN.flow_from_directory(directory = TRAIN_VAL_DIR,
                                                    batch_size = BATCH_SIZE,
                                                    class_mode = 'binary', 
                                                    target_size = (IMG_HEIGHT, IMG_WIDTH),
                                                    subset = 'training',
                                                    seed = DATA_GENERATOR_SEED,
                                                    follow_links = True)

VAL_DATAGEN = ImageDataGenerator(rescale = 1.0/255.0, validation_split = VALIDATION_SPLIT)
VALIDATION_GENERATOR = TRAIN_DATAGEN.flow_from_directory(directory = TRAIN_VAL_DIR,
                                                         batch_size = BATCH_SIZE,
                                                         class_mode = 'binary', 
                                                         target_size = (IMG_HEIGHT, IMG_WIDTH),
                                                         subset = 'validation',
                                                         seed = DATA_GENERATOR_SEED)

TEST_DATAGEN = ImageDataGenerator(rescale = 1.0/255.0)
TEST_GENERATOR = TEST_DATAGEN.flow_from_directory(directory = TEST_DIR,
                                                  batch_size = BATCH_SIZE,
                                                  class_mode = 'binary', 
                                                  target_size = (IMG_HEIGHT, IMG_WIDTH),                                
                                                  seed = DATA_GENERATOR_SEED)

In [None]:
# Freeze the layers for the encoder, since now we will only train the dence layers at the end
for layer in vae.encoder.layers:
    layer.trainable = False

# Add simple neural network at end of encoder (AFTER encoder(and decoder) has been trained on OC)
_, _, x = vae.encoder.output
x = layers.Flatten()(x)
x = layers.Dense(1, activation = 'sigmoid')(x)

FC = Model(vae.encoder.input, x, name="FC")
FC.compile(optimizer = SGD(),
           loss = BinaryCrossentropy(),
           metrics = [metrics.BinaryAccuracy(name = 'acc'),
                      metrics.AUC(name = 'auc'),
                      metrics.FalsePositives(name = 'fp')])

In [None]:
FC.fit(TRAIN_GENERATOR, 
       steps_per_epoch = TRAIN_GENERATOR.n//TRAIN_GENERATOR.batch_size
       validation_data = VALIDATION_GENERATOR,
       validation_steps = TEST_GENERATOR.n//TEST_GENERATOR.batch_size,
       epochs=STEPS,
       batch_size=BATCH_SIZE, 
       verbose=1,
       callbacks=[ModelCheckpoint(f'./Checkpoints/FC+OCFakeDectVAE/best_model',
                                  monitor='val_auc', 
                                  mode='max'
                                  verbose=1, 
                                  save_best_only=True)])

In [None]:
# # Load all training data, takes a very long time (3hrs)
# training_data = []
# for i in tqdm(range(TRAIN_GENERATOR.n//BATCH_SIZE+1)):
#     (x_train, _), (x_test, _) = TRAIN_GENERATOR.next(), VALIDATION_GENERATOR.next()
#     training_data.extend(np.concatenate([x_train, x_test], axis=0)) 

# vae.fit(training_data,
#         epochs=100,
#         batch_size=2*BATCH_SIZE,
#         verbose=1)

In [None]:
# # VERY hacky test to see if the model is actually working
# # Purposefully overfit to a single batch of data and train for 1000 epochs on just that one batch
# (x_train, _), (x_test, _) = TRAIN_GENERATOR.next(), VALIDATION_GENERATOR.next()
# training_data = np.concatenate([x_train, x_test], axis=0)
# vae.fit(training_data,
#         epochs=1000,
#         batch_size=2*BATCH_SIZE,
#         verbose=1)

# for x in x_test[:10]:
#     X = np.expand_dims(x, axis=0)
#     Z_mean, Z_log_var, Z = vae.encoder.predict(X)
#     X_prime = vae.decoder.predict(Z)
#     face = np.array(X_prime.reshape(IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS)*255, dtype=np.uint8)
#     fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(5, 5))
#     axes[0].imshow(X.squeeze())
#     axes[1].imshow(face)
#     fig.tight_layout()
