In [1]:
# For Google Colab use
try:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=True)
    %cd '/content/drive/MyDrive/Colab Notebooks/MInf-DeepfakeDetection-FrameDifferencing'    
except ModuleNotFoundError:
    pass

Mounted at /content/drive
/content/drive/MyDrive/Colab Notebooks/MInf-DeepfakeDetection-FrameDifferencing


In [2]:
# Imports
from __future__ import division

import os
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
from numpy.random import seed
from time import time
from pathlib import Path

import tensorflow as tf

import keras
from keras import preprocessing
from keras.preprocessing.image import ImageDataGenerator
from keras import layers, Model
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.optimizers import *
from keras.applications import *
from keras import metrics
from keras.losses import binary_crossentropy
from keras import backend as K

try:
    import face_recognition
except ModuleNotFoundError:
    !pip install face_recognition

from Util import pipeline
from Baseline.classifiers import *
# from Util import experiment_tuner_models

In [3]:
def evaluate_model(TEST_MODEL, EXPERIMENT_NAME:str, TEST_DIR:str, BACTH_SIZE:int=64, \
                   IMG_HEIGHT:int=256, IMG_WIDTH:int=256, DATA_GENERATOR_SEED:int=1337, \
                   HISTORY=None):
    
    from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
    
    EXPERIMENT_NAME = EXPERIMENT_NAME.replace(" ", "_")
    EXPERIMENT_FOLDER_NAME = f'./Results/{EXPERIMENT_NAME}'
    Path(EXPERIMENT_FOLDER_NAME).mkdir(parents=True, exist_ok=True)

    # Define testing datagen
    print('Defining testing datagen...')
    TEST_DATAGEN = ImageDataGenerator(rescale=1./255)
    TEST_GENERATOR = TEST_DATAGEN.flow_from_directory(directory = TEST_DIR,
                                                      batch_size = BACTH_SIZE,
                                                      class_mode = 'binary', 
                                                      target_size =  (IMG_HEIGHT, IMG_WIDTH),                                
                                                      seed = DATA_GENERATOR_SEED,
                                                      follow_links = True)

    # Evaluate test set and get metrics, e.g. loss
    print('Evaluating test set and getting metrics...')
    TEST_HISORY = TEST_MODEL.evaluate(TEST_GENERATOR,
                                      batch_size = BACTH_SIZE,
                                      return_dict = True)
    TEST_GENERATOR.reset()

    # Get predictions and ground truths
    print('Predicting on test set...')
    Y_pred = (TEST_MODEL.predict(TEST_GENERATOR)).ravel()
    Y_true = (TEST_GENERATOR.classes).ravel()

    # Plot AUC and save plot
    print('Plotting AUC...')
    fpr, tpr, _ = roc_curve(Y_true, Y_pred)
    try:
        roc_auc = auc(fpr, tpr)
    except:
        roc_auc = None
        roc_auc = auc(fpr, tpr)

    plt.figure()
    plt.plot(fpr, tpr, color='magenta', lw=2, label='ROC curve (area = %0.3f)' % roc_auc)
    plt.plot([0, 1], [0, 1], color='black', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.01])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'{EXPERIMENT_NAME.replace("_", " ")} ROC')
    plt.legend(loc="lower right")
    print(f'Saving AUC to {EXPERIMENT_FOLDER_NAME}/{EXPERIMENT_NAME}_AUC.pdf')
    plt.savefig(f'{EXPERIMENT_FOLDER_NAME}/{EXPERIMENT_NAME}_AUC.pdf')

    # Create file, write metrics
    print(f'Saving summary file to {EXPERIMENT_FOLDER_NAME}/{EXPERIMENT_NAME}_summary.txt')
    summary_file = open(f'{EXPERIMENT_FOLDER_NAME}/{EXPERIMENT_NAME}_summary.txt', 'w')
    summary_file.write(str(TEST_HISORY).replace(', \'', ':\n\n'))
    print(str(TEST_HISORY).replace(', \'', ':\n'))

    # find classification thershold which results in max auc
    roc_aucs = []
    for thrshld in map(lambda x: x/1000.0, range(0, 1000+1)):
        fpr, tpr, _ = roc_curve(Y_true, (Y_pred >= thrshld).astype(int))
        roc_aucs.append((auc(fpr, tpr), thrshld))
    max_auc_thrshld = max(roc_aucs)[1]

    # use max auc classification thershold to get confusion matrix and classification report
    y_pred = (Y_pred >= max_auc_thrshld).astype(int)
    summary_output_text = f'''\n\nTHRESHOLD = {max_auc_thrshld}
    \nCONFUSION MATRIX\n{confusion_matrix(Y_true, y_pred)}
    \nCLASSIFICATION REPORT\n{classification_report(Y_true, y_pred, target_names = ["REAL", "FAKE"])}
    _____________________________________________________\n'''
    summary_file.write(summary_output_text)
    summary_file.write(f'''\n{list(zip(Y_true, Y_pred))}''')
    print(summary_output_text)
    summary_file.close()

    # plot history (of training), if provided
    if HISTORY is not None:
        print('Plotting training history x4 plots...')
        acc = HISTORY.history['acc']
        auc = HISTORY.history['auc']
        loss = HISTORY.history['loss']
        fp = HISTORY.history['fp']

        val_acc = HISTORY.history['val_acc']
        val_auc = HISTORY.history['val_auc']
        val_loss = HISTORY.history['val_loss']
        val_fp = HISTORY.history['val_fp']

        epochs = range(len(acc))
        fig, axs = plt.subplots(2, 2, figsize=(10,10))
        fig.suptitle(f'{EXPERIMENT_NAME.replace("_", " ")} Train & Validation Metrics')

        axs[0, 0].plot(epochs, acc, 'r', label='Train Binary Accuracy')
        axs[0, 0].plot(epochs, val_acc, 'b', label='Validation Binary Accuracy')
        axs[0, 0].set_title('Binary Accuracy')
        axs[0, 0].set_ylabel('Accuracy')
        axs[0, 0].set_xlabel('Epochs')
        axs[0, 0].legend()

        axs[0, 1].plot(epochs, loss, 'r', label='Train Loss')
        axs[0, 1].plot(epochs, val_loss, 'b', label='Validation Loss')
        axs[0, 1].set_title('Loss')
        axs[0, 1].set_ylabel('Loss')
        axs[0, 1].set_xlabel('Epochs')
        axs[0, 1].legend()

        axs[1, 0].plot(epochs, auc, 'r', label='Train AUC')
        axs[1, 0].plot(epochs, val_auc, 'b', label='Validation AUC')
        axs[1, 0].set_title('AUC')
        axs[1, 0].set_ylabel('AUC')
        axs[1, 0].set_xlabel('Epochs')
        axs[1, 0].legend()

        axs[1, 1].plot(epochs, fp, 'r', label='Train False Positives')
        axs[1, 1].plot(epochs, val_fp, 'b', label='Validation False Positives')
        axs[1, 1].set_title('False Positives')
        axs[1, 1].set_ylabel('FP')
        axs[1, 1].set_xlabel('Epochs')
        axs[1, 1].legend()
        
        print(f'Saving history plots to {EXPERIMENT_FOLDER_NAME}/{EXPERIMENT_NAME}_history.pdf')
        fig.savefig(f'{EXPERIMENT_FOLDER_NAME}/{EXPERIMENT_NAME}_history.pdf')
    
    return Y_true, Y_pred

In [4]:
# for df_name, df_type in tqdm([('Random_Frame', 'rnd'), ('Frame_Averaging', 'avg'), ('Frame_Difference', 'diff')]):
#         for weight_path in [f'./Baseline/weights/MesoInception_F2F.h5', f'./Baseline/weights/MesoInception_DF.h5']:
#             weight_name = weight_path.split('_')[-1].replace('.h5', '')
#             print(df_type.upper(), weight_name.upper())

#             TEST_MODEL = None
#             TEST_MODEL = MesoInception4().model
#             TEST_MODEL.load_weights(weight_path)

#             evaluate_model(TEST_MODEL = TEST_MODEL,
#                            EXPERIMENT_NAME = f'Untrained_MesoInception4_({weight_name})_on_{df_name}_Dataset',
#                            TEST_DIR = f'./Celeb-DF-v2/Celeb-{df_type}-30-test')

In [None]:
# General model settings
IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS = 256, 256, 3
EPOCHS = 100
DATA_GENERATOR_SEED = 1337
BATCH_SIZE = 64
VALIDATION_SPLIT = 0.2

tf.random.set_seed(DATA_GENERATOR_SEED)
seed(DATA_GENERATOR_SEED)

# Pick dataset; DF_TYPE={'rnd', 'avg'}
for df_name, DF_TYPE in tqdm([('Random_Frame', 'rnd'), ('Frame_Averaging', 'avg'), ('Frame_Difference', 'diff')][::-1]):
    TRAIN_VAL_DIR = f'./Celeb-DF-v2/Celeb-{DF_TYPE}-30'
    print(f'{DF_TYPE}!!!')

    print('Setting up datagens...')
    TRAIN_DATAGEN = ImageDataGenerator(rescale = 1.0/255.0, horizontal_flip = True, fill_mode='nearest', validation_split = VALIDATION_SPLIT)
    TRAIN_GENERATOR = TRAIN_DATAGEN.flow_from_directory(directory = TRAIN_VAL_DIR,
                                                        batch_size = BATCH_SIZE,
                                                        class_mode = 'binary', 
                                                        target_size = (IMG_HEIGHT, IMG_WIDTH),
                                                        subset = 'training',
                                                        seed = DATA_GENERATOR_SEED,
                                                        follow_links = True)

    VAL_DATAGEN = ImageDataGenerator(rescale = 1.0/255.0, validation_split = VALIDATION_SPLIT)
    VALIDATION_GENERATOR = TRAIN_DATAGEN.flow_from_directory(directory = TRAIN_VAL_DIR,
                                                             batch_size = BATCH_SIZE,
                                                             class_mode = 'binary', 
                                                             target_size = (IMG_HEIGHT, IMG_WIDTH),
                                                             subset = 'validation',
                                                             seed = DATA_GENERATOR_SEED,
                                                             follow_links = True)
 
    PRE_TRAINED_MODEL = Xception(input_shape = (IMG_HEIGHT,IMG_WIDTH,3),
                                                include_top = False,
                                                pooling = 'avg',
                                                weights = 'imagenet')
    for layer in PRE_TRAINED_MODEL.layers:
        layer.trainable = False

    x = layers.Flatten()(PRE_TRAINED_MODEL.output)
    x = layers.Dense(2034, activation = 'relu')(x)
    x = layers.Dropout(0.3)(x)
    x = layers.Dense(3328, activation = 'relu')(x)
    x = layers.Dropout(0.2)(x)
    x = layers.Dense(1280, activation = 'relu')(x)
    x = layers.Dropout(0.15)(x)
    x = layers.Dense(1280, activation = 'relu')(x)
#     x = layers.Dropout(0)(x)
    x = layers.Dense(1, activation = 'sigmoid')(x)
    
    TEST_MODEL = Model(PRE_TRAINED_MODEL.input, x)
    
    TEST_MODEL.compile(optimizer = 'sgd',
              loss = 'binary_crossentropy',
              metrics = [metrics.BinaryAccuracy(name = 'acc'),
                         metrics.AUC(name = 'auc'),
                         metrics.FalsePositives(name = 'fp')])

    train_gen_list = list(TRAIN_GENERATOR.classes)
    val_gen_list = list(VALIDATION_GENERATOR.classes)

    train_neg, train_pos = train_gen_list.count(0), train_gen_list.count(1)
    val_neg, val_pos = val_gen_list.count(0), val_gen_list.count(1)

    pos = train_pos + val_pos
    neg = train_neg + val_neg
    total = pos + neg

    weight_for_0 = (1.0 / neg)*(total)/2.0 
    weight_for_1 = (1.0 / pos)*(total)/2.0

    CLASS_WEIGHT = {0: weight_for_0, 1: weight_for_1}
    print(f'Class weights = {CLASS_WEIGHT}')

    EARLY_STOP = EarlyStopping(monitor='val_auc',
                            patience=EPOCHS//10,
                            mode='max',
                            verbose=1,
                            restore_best_weights=True)
    
    # MODEL_CHECKPOINT = ModelCheckpoint(filepath = f'./Weights/OM-{DF_TYPE.upper()}', 
    #                                    monitor='val_auc', 
    #                                    verbose=0, 
    #                                    save_best_only=True,
    #                                    save_weights_only=True,
    #                                    mode='max', 
    #                                    save_freq='epoch')
    
    print(f'Early stopping in {EARLY_STOP.patience}')

    HISTORY = TEST_MODEL.fit(TRAIN_GENERATOR,
                            steps_per_epoch = TRAIN_GENERATOR.n//TRAIN_GENERATOR.batch_size,
                            validation_data = VALIDATION_GENERATOR,
                            validation_steps = VALIDATION_GENERATOR.n//VALIDATION_GENERATOR.batch_size,
                            epochs = EPOCHS,
                            verbose = 1,
                            class_weight = CLASS_WEIGHT,
                            callbacks = [EARLY_STOP
                                        #  MODEL_CHECKPOINT
                                         ])
    

#     weight_name = WEIGHTS_PATH.split('_')[-1].replace('.h5', '')
    evaluate_model(TEST_MODEL = TEST_MODEL,
                   EXPERIMENT_NAME = f'Our_Model_on_{df_name}_Dataset',
                   TEST_DIR = f'./Celeb-DF-v2/Celeb-{DF_TYPE}-30-test',
                   HISTORY = HISTORY)


  0%|          | 0/3 [00:00<?, ?it/s][A

diff!!!
Setting up datagens...
Found 44393 images belonging to 2 classes.
Found 11097 images belonging to 2 classes.
Class weights = {0: 4.930691309756531, 1: 0.5564246034133526}
Early stopping in 10
Epoch 1/100

In [None]:
# # General model settings
# IMG_HEIGHT, IMG_WIDTH = 71, 71
# EPOCHS = 5
# DATA_GENERATOR_SEED = 1337
# BATCH_SIZE = 64
# VALIDATION_SPLIT = 0.2

# tf.random.set_seed(DATA_GENERATOR_SEED)
# seed(DATA_GENERATOR_SEED)

# for EXPERIMENT_TYPE in tqdm(['optimiser', 'structure']):
#     for df_name, DF_TYPE in tqdm([('Random_Frame', 'rnd'), ('Frame_Averaging', 'avg'), ('Frame_Difference', 'diff')]):
        
#         EXPERIMENT_NAME = f'Hyperband({EXPERIMENT_TYPE.title()})_{df_name}_Dataset'
#         EXPERIMENT_FOLDER_NAME = f'./Results/{EXPERIMENT_NAME}'
#         Path(EXPERIMENT_FOLDER_NAME).mkdir(parents=True, exist_ok=True)
#         print(EXPERIMENT_NAME)

#         TRAIN_VAL_DIR = f'./Celeb-DF-v2/Celeb-{DF_TYPE}-30'

#         print('Defining train and validation datagens...')
#         TRAIN_DATAGEN = ImageDataGenerator(rescale = 1.0/255.0, horizontal_flip = True, fill_mode='nearest', validation_split = VALIDATION_SPLIT)
#         TRAIN_GENERATOR = TRAIN_DATAGEN.flow_from_directory(directory = TRAIN_VAL_DIR,
#                                                             batch_size = BATCH_SIZE,
#                                                             class_mode = 'binary', 
#                                                             target_size = (IMG_HEIGHT, IMG_WIDTH),
#                                                             subset = 'training',
#                                                             seed = DATA_GENERATOR_SEED,
#                                                             follow_links = True)

#         VAL_DATAGEN = ImageDataGenerator(rescale = 1.0/255.0, validation_split = VALIDATION_SPLIT)
#         VALIDATION_GENERATOR = TRAIN_DATAGEN.flow_from_directory(directory = TRAIN_VAL_DIR,
#                                                                  batch_size = BATCH_SIZE,
#                                                                  class_mode = 'binary', 
#                                                                  target_size = (IMG_HEIGHT, IMG_WIDTH),
#                                                                  subset = 'validation',
#                                                                  seed = DATA_GENERATOR_SEED,
#                                                                  follow_links = True)
        
#         print('Calculating class weights')
#         train_gen_list = list(TRAIN_GENERATOR.classes)
#         val_gen_list = list(VALIDATION_GENERATOR.classes)

#         train_neg, train_pos = train_gen_list.count(0), train_gen_list.count(1)
#         val_neg, val_pos = val_gen_list.count(0), val_gen_list.count(1)

#         pos = train_pos + val_pos
#         neg = train_neg + val_neg
#         total = pos + neg

#         weight_for_0 = (1.0 / neg) * (total) / 2.0 
#         weight_for_1 = (1.0 / pos) * (total) / 2.0

#         CLASS_WEIGHT = {0: weight_for_0, 1: weight_for_1}
#         print(f'Class weights = {CLASS_WEIGHT}')

#         print(f'Setting up Hyperband with {EXPERIMENT_TYPE}')
#         tuner = Hyperband(eval(f'experiment_tuner_models.{EXPERIMENT_TYPE}_experiment_model'),
#                           objective = Objective('val_auc', direction = 'max'),
#                           max_epochs = EPOCHS,
#                           directory = './Results/',
#                           project_name = f'{EXPERIMENT_NAME}/',
#                           overwrite = True)

#         tuner.search_space_summary()
#         tuner.search(TRAIN_GENERATOR,
#                      validation_data = VALIDATION_GENERATOR,
#                      epochs = EPOCHS,
#                      steps_per_epoch = TRAIN_GENERATOR.n // TRAIN_GENERATOR.batch_size,
#                      validation_steps = VALIDATION_GENERATOR.n // VALIDATION_GENERATOR.batch_size,
#                      class_weight = CLASS_WEIGHT,
#                      verbose = 1,
#                      callbacks = [EarlyStopping(monitor = 'val_auc',
#                                                 patience = EPOCHS//5+1,
#                                                 mode = 'max',
#                                                 verbose = 1)])
#         print(tuner.results_summary())
#         print(tuner.get_best_models())

#         print(f'Saving tuner summary file to {EXPERIMENT_FOLDER_NAME}/{EXPERIMENT_NAME}_tuner.txt')
#         tuner_file = open(f'{EXPERIMENT_FOLDER_NAME}/{EXPERIMENT_NAME}_tuner.txt', 'w')
#         tuner_file.write(tuner.results_summary())
#         tuner_file.write(tuner.get_best_models())
#     #     summary_file.write(str(tuner.results_summary()).replace(', \'', ':\n\n'))
#     #     summary_file.write(str(tuner.get_best_models()).replace(', \'', ':\n\n'))
#         tuner_file.close()

#         evaluate_model(TEST_MODEL = tuner.get_best_models()[0],
#                        EXPERIMENT_NAME = EXPERIMENT_NAME,
#                        TEST_DIR = f'./Celeb-DF-v2/Celeb-{DF_TYPE}-30-test',
#                        HISTORY = tuner.get_best_models()[0].history)