In [None]:
import os
import sys

import tensorflow as tf
tf.config.experimental_run_functions_eagerly(True)

import h5py
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd


from sklearn.preprocessing import MinMaxScaler

tf.test.gpu_device_name()

In [None]:
# add src to path
sys.path.insert(0,'../../utils')
sys.path.insert(0,'../../../src')
from utils.modelling_functions import getYCbCr, get_xception, getAdditionalScalars, get_scalaraNN_model, load_dataset_h5
from utils.db_helper import get_files_paths_recursive


# Loading datasets

In [None]:
import joblib
import skimage
import sklearn

from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import xception
from tensorflow.keras.preprocessing import image as keras_img
from sklearn.utils import shuffle
from PIL import Image

In [None]:
# specify categories
categories = {"fake": 0,
              "real": 1}

# specify learning process
img_input_shape = (299,299,3)
scalars_input_shape = (18,)
batch_size = 8
use_scalars=False

# specify paths
exp_path = "../../../exp/"
train_path = "..\..\..\classification_db/train"
val_path = "..\..\..\classification_db/val"

if (not os.path.exists(train_path)) or (not os.path.exists(val_path)):
    raise "Paths not exists"

saving_dir = os.path.join(exp_path, "models", f"final_model_{use_scalars}")

# directory for saving
if os.path.exists(saving_dir) == False:
    print("Path not exists")
    # os.makedirs(saving_dir)



In [None]:
scaler = joblib.load(os.path.join(exp_path, "models", "Scaler", "scaler.save"))
scaler.clip = False


In [None]:
def preprocess_scalars(scalars):
    scalars = np.expand_dims(scalars, axis=0)
    scalars = scaler.transform(scalars)
    scalars = np.squeeze(scalars)
    return scalars

In [None]:
def preprocess_image(img_path):
    pil_image = Image.open(img_path)
    np_image = np.array(pil_image).astype((np.uint8))
    np_image = xception.preprocess_input(np_image)
    np_ycbcr = skimage.color.rgb2ycbcr(np_image)
    
    return np_ycbcr

In [None]:
def generator(path_dir, batch_size, categories, use_scalars=True):
    i = 0

    # get all file paths into proper lists
    if not (os.path.exists(path_dir)):
        raise ValueError("Bad path specified")

    # get all file paths and list with classes
    img_file_list = []
    csv_file_list = []
    class_list = []
    for r, d, f in os.walk(path_dir):
        for file in f:
            if file.endswith(".png"):
                img_file_list.append(os.path.join(r, file))
                class_list.append(os.path.basename(r))
            elif file.endswith(".csv"):
                csv_file_list.append(os.path.join(r, file))

    if not use_scalars:
        csv_file_list = [0] * len(class_list)

    # shuffle all list list together
    img_file_list, csv_file_list, class_list = shuffle(img_file_list, csv_file_list, class_list)
    
    while True:
        batch = {'xception_input': [], 'dense_input':[], "labels": []}  # use a dict for multiple inputs
        for b in range(batch_size):
            # reset iterator number nad shuffle list when all images have been used
            if i == len(img_file_list):
                i = 0
                img_file_list, csv_file_list, class_list = shuffle(img_file_list, csv_file_list, class_list)

            # obtain image path, 
            image_path = img_file_list[i]
            # load and preprocess image
            image = preprocess_image(image_path)
            batch['xception_input'].append(image)

            # if use scalars then load and preprocess them
            if use_scalars:
                scalars = np.array(pd.read_csv(csv_file_list[i], index_col=0, header=None, squeeze=True))
                scalars = preprocess_scalars(scalars)
                batch['dense_input'].append(scalars)

            category_int = categories.get(class_list[i])
            batch['labels'].append(category_int)

            i += 1

        # convert each list to numpy array
        batch['xception_input'] = np.array(batch['xception_input']) 
        batch['dense_input'] = np.array(batch['dense_input'])
        batch['labels'] = np.array(batch['labels'])

        # return value with scalars or without
        if use_scalars:
            yield [batch['xception_input'], batch['dense_input']] , batch['labels']
        else:
            yield batch['xception_input'], batch['labels']


# Create model

In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.applications import Xception, xception
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Input, Dense, Conv2D, BatchNormalization, GlobalAveragePooling2D, Add, Flatten, concatenate
from sklearn import svm
from sklearn.metrics import confusion_matrix, classification_report


In [None]:
def get_xception():
    base_model = Xception(include_top=False, weights=None, classes=2)
    model = Sequential()
    model.add(base_model)
    model.add(GlobalAveragePooling2D())
    model.add(Dense(1, activation='sigmoid'))
    model.compile(optimizer='Adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

In [None]:
def get_xception_for_double_model():
    # define xception model
    input_img = Input(shape=(299, 299,3))

    model = Xception(include_top=False, weights=None, classes=2)(input_img)
    model = GlobalAveragePooling2D()(model)
    model = Dense(1, activation='sigmoid')(model)

    model = Model(inputs=input_img, outputs=model)

    return model

In [None]:
def get_scalarsNN_model():
    # define dense model with scalar input
    initializer =  tf.keras.initializers.GlorotNormal()
    input_scalar = Input(shape=(18,))
    model = Dense(18, activation='relu', kernel_initializer=initializer)(input_scalar)
    model = Dense(18, activation='relu', kernel_initializer=initializer)(model)
    model = Dense(18, activation='relu', kernel_initializer=initializer)(model)
    model = BatchNormalization()(model)

    model = Dense(10, activation='relu', kernel_initializer=initializer)(model)
    model = Dense(10, activation='relu', kernel_initializer=initializer)(model)
    model = Dense(10, activation='relu', kernel_initializer=initializer)(model)
    model = BatchNormalization()(model)


    model = Dense(5, activation='relu', kernel_initializer=initializer)(model)
    model = Dense(5, activation='relu', kernel_initializer=initializer)(model)
    model = Dense(5, activation='relu', kernel_initializer=initializer)(model)

    model = Dense(1, activation='sigmoid')(model)
    model = Model(inputs=input_scalar, outputs=model) 

    return model

In [None]:
if use_scalars:
    # get two models
    # model_scalars = get_scalarsNN_model()
    model_scalars = load_model (os.path.join(exp_path, "models", "ScalarNN_scalars", "model.h5"),
                    compile=False)

    model_cnn = get_xception_for_double_model()
    # model_cnn.load_weights(os.path.join(exp_path, "models", "final_model_False", "model.h5"))
    # combine them
    combined_model = concatenate([model_scalars.output, model_cnn.output])

    # # create output layes
    model_out = Dense(2, activation='relu')(combined_model)
    # model_out = Dense(3, activation='relu')(model_out)
    model_out = Dense(1, activation='sigmoid')(model_out)

    # # define final model
    model = Model(inputs=[model_cnn.input, model_scalars.input], outputs=model_out)

    model.compile(optimizer='Adam', loss='binary_crossentropy', metrics=['accuracy'])
else:
    model = get_xception()

In [None]:
model_cnn = get_xception_for_double_model()
model_cnn.summary()

In [None]:
early_stop = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
checkopoint = ModelCheckpoint(os.path.join(saving_dir, "model.h5"),
                              save_best_only=True,
                              save_weights_only=True)

In [None]:
train_generator = generator(train_path, batch_size, categories, use_scalars=use_scalars)
val_generator = generator(val_path, batch_size, categories, use_scalars=use_scalars)

In [None]:
results = model.fit(train_generator, validation_data = val_generator,
                    callbacks = [early_stop, checkopoint], epochs=50, 
                    steps_per_epoch=500, validation_steps = 125)

## Evaluation

In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sn
import joblib

In [None]:
len_of_test_database = 7000

In [None]:
model.load_weights(os.path.join(saving_dir, "model.h5"))
test_generator = generator(val_path, 1, categories, use_scalars=use_scalars)

In [None]:
y_pred = np.empty((len_of_test_database,1))
y_true = np.empty((len_of_test_database,1))

for i in range(len_of_test_database):
    if i%100 == 0:
        print(f"{i} images predicted")
        
    tmp_input, tmp_true = next(test_generator)
    tmp_pred = model.predict(tmp_input)
    
    y_true[i] = tmp_true
    y_pred[i] = tmp_pred

y_pred[y_pred <= 0.5] = 0.
y_pred[y_pred > 0.5] = 1.

In [None]:
# version to print
report = classification_report(y_true, y_pred)
print(report)
with open (os.path.join(saving_dir, 'classification_report.txt'), 'w+') as f:
    f.write(report)

In [None]:
categories = ["fake", "real"]
# get confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)
pd_conf_matrix = pd.DataFrame(conf_matrix, columns=categories, index=categories)
sn.set(font_scale=1.4) # for label size
ax = plt.axes()
sn.heatmap(pd_conf_matrix, ax = ax, annot=True, cmap='binary', fmt='g', cbar = False)
ax.set_title('Confusion Matrix')

plt.savefig(os.path.join(saving_dir, 'conf_matrix.png'))