In [None]:
pip install matplotlib tensorflow_addons

In [None]:
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Dense, Flatten, Conv2D, MaxPool2D, Dropout, BatchNormalization, Lambda, LeakyReLU
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.layers import Input
# import tf_slim as slim

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

import numpy as np
import matplotlib.pyplot as plt
from PIL import Image, ImageChops, ImageEnhance
import os
import itertools
import random
%matplotlib inline
np.random.seed(2)

In [None]:
def initializer():
    filter1 = [[0, 0, 0, 0, 0],
               [0, -1, 2, -1, 0],
               [0, 2, -4, 2, 0],
               [0, -1, 2, -1, 0],
               [0, 0, 0, 0, 0]]
    filter2 = [[-1, 2, -2, 2, -1],
               [2, -6, 8, -6, 2],
               [-2, 8, -12, 8, -2],
               [2, -6, 8, -6, 2],
               [-1, 2, -2, 2, -1]]
    filter3 = [[0, 0, 0, 0, 0],
               [0, 0, 0, 0, 0],
               [0, 1, -2, 1, 0],
               [0, 0, 0, 0, 0],
               [0, 0, 0, 0, 0]]
    q = [4.0, 12.0, 2.0]
    filter1 = np.asarray(filter1, dtype=float) / 4
    filter2 = np.asarray(filter2, dtype=float) / 12
    filter3 = np.asarray(filter3, dtype=float) / 2
    filters = [[filter1, filter1, filter1], [filter2, filter2, filter2], [filter3, filter3, filter3]]
    filters = np.einsum('klij->ijlk', filters)
    filters = tf.Variable(filters, dtype=tf.float32)
    return filters

In [None]:
def convert_to_ela_image(path, quality):
    temp_filename = 'temp_file_name.jpg'
    ela_filename = 'temp_ela.png'
    
    image = Image.open(path).convert('RGB')
    image.save(temp_filename, 'JPEG', quality = quality)
    temp_image = Image.open(temp_filename)
    
    ela_image = ImageChops.difference(image, temp_image)
    
    extrema = ela_image.getextrema()
    max_diff = max([ex[1] for ex in extrema])
    if max_diff == 0:
        max_diff = 1
    scale = 255.0 / max_diff
    
    ela_image = ImageEnhance.Brightness(ela_image).enhance(scale)
    
    return ela_image

In [None]:
h = 224
w = 224
image_size = (h, w)

# np_arr = lambda img: img.resize(image_size).flatten() / 255.0
np_arr = lambda img: np.array(img.resize(image_size)).flatten() / 255.0

def prepare_image(image_path, is_ela):
    return np_arr(convert_to_ela_image(image_path, 95)) if is_ela == 1 else np_arr(Image.open(image_path))

In [None]:
x_srm = [] # SRM converted images
x_ela = [] # ELA converted images
labels = [] # 0 for fake, 1 for real

In [None]:
def prepare_data(path, cls, srm, ela, targets):
    for dirname, _, filenames in os.walk(path):
        for filename in filenames:
            if filename.endswith('jpg') or filename.endswith('png'):
                try:
                    full_path = os.path.join(dirname, filename)
                    srm.append(prepare_image(full_path, 0))
                    ela.append(prepare_image(full_path, 1))
                    targets.append(cls)
                except:
                    pass
                if len(targets) % 500 == 0:
                    print('Processing {} images'.format(len(targets)))
    print(len(srm), len(ela), len(labels))

In [None]:
#place authentic
Au_path = '../synthetic/Au'
prepare_data(Au_path, 1, x_srm, x_ela, labels)

In [None]:
#place tampered
Tp_path = '../synthetic/Tp'
prepare_data(Tp_path, 0, x_srm, x_ela, labels)

In [None]:
def reshaping(srm, ela, targets):
    srm = np.array(srm)
    ela = np.array(ela)

    targets = to_categorical(targets, 2)

    srm = srm.reshape(-1, h, w, 3)
    ela = ela.reshape(-1, h, w, 3)

    print(srm.shape, ela.shape, targets.shape)
    
    # stack so we can split on the same pair of images
    x_combined = np.stack((srm, ela), axis=4)

    x_train, x_val, y_train, y_val = train_test_split(x_combined, targets, test_size = 0.2, random_state=5)

    # take them apart
    x_train_srm = x_train[:,:,:,:,0]
    x_val_srm = x_val[:,:,:,:,0]

    x_train_ela = x_train[:,:,:,:,1]
    x_val_ela = x_val[:,:,:,:,1]
    
    return x_train_srm, x_val_srm, x_train_ela, x_val_ela, y_train, y_val


In [None]:
x_train_srm, x_val_srm, x_train_ela, x_val_ela, y_train, y_val = reshaping(x_srm, x_ela, labels)

In [None]:
initializer_srm = initializer()

In [None]:
def outer_product(x):
    #Einstein Notation  [batch,1,1,depth] x [batch,1,1,depth] -> [batch,depth,depth]
    phi_I = tf.einsum('ijkm,ijkn->imn',x[0],x[1])
    
    # Reshape from [batch_size,depth,depth] to [batch_size, depth*depth]
    phi_I = tf.reshape(phi_I,[-1,x[0].shape[3]*x[1].shape[3]])
    
    # Divide by feature map size [sizexsize]
    size1 = int(x[1].shape[1])
    size2 = int(x[1].shape[2])
    phi_I = tf.divide(phi_I, size1*size2)
    
    # Take signed square root of phi_I
    y_ssqrt = tf.multiply(tf.sign(phi_I),tf.sqrt(tf.abs(phi_I)+1e-12))
    
    # Apply l2 normalization
    z_l2 = tf.math.l2_normalize(y_ssqrt, axis=1)
    return z_l2

In [None]:
def conv_layers(input):
    x = Conv2D(32, 3, padding='valid', activation='relu')(input)
    x = Conv2D(32, 3, padding='valid', activation='relu')(x)
    x = MaxPool2D(pool_size=2)(x)
    x = BatchNormalization()(x)

    x = Conv2D(64, 3, padding='valid', activation='relu')(x)
    x = BatchNormalization()(x)
    x = Conv2D(64, 3, padding='valid', activation='relu')(x)
    x = MaxPool2D(pool_size=2)(x)
    x = BatchNormalization()(x)

    x = Conv2D(128, 5, padding='valid', activation='relu')(x)
    x = BatchNormalization()(x)
    x = Conv2D(128, 5, padding='valid', activation='relu')(x)
    x = MaxPool2D(pool_size=2)(x)
    x = BatchNormalization()(x)

#     x = Conv2D(256, 5, padding='valid', activation='relu')(x)
#     x = BatchNormalization()(x)
#     x = Conv2D(256, 5, padding='valid', activation='relu')(x)
#     x = MaxPool2D(pool_size=2)(x)
#     x = BatchNormalization()(x)

#   x = Conv2D(512, 3, padding='valid', activation='relu')(x)
#   x = BatchNormalization()(x)
#   x = Conv2D(512, 3, padding='valid', activation='relu')(x)
#   x = MaxPool2D(pool_size=2)(x)
#   model = BatchNormalization()(x)

    return x

In [None]:
def conv_layers_2(input):
    x = Conv2D(32, 3, padding='same')(input)
    x = LeakyReLU(alpha=0.2)(x)
    x = Conv2D(32, 3, padding='same')(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = MaxPool2D(pool_size=2)(x)
    x = BatchNormalization()(x)

    x = Conv2D(64, 3, padding='same')(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = BatchNormalization()(x)
    x = Conv2D(64, 3, padding='same')(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = MaxPool2D(pool_size=2)(x)
    x = BatchNormalization()(x)

    x = Conv2D(128, 3, padding='same')(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = BatchNormalization()(x)
    x = Conv2D(128, 3, padding='same')(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = MaxPool2D(pool_size=2)(x)
    x = BatchNormalization()(x)

    x = Conv2D(256, 3, padding='same')(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = BatchNormalization()(x)
    x = Conv2D(256, 3, padding='same')(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = MaxPool2D(pool_size=2)(x)
    x = BatchNormalization()(x)

    return x

In [None]:
# SRM stream
srm_input = Input(shape=[h, w, 3], name='srm_input')
op = tf.nn.conv2d(srm_input, initializer_srm, strides=[1, 1, 1, 1], padding='SAME', name='srm-layer')
srm_model = conv_layers(op)

# ELA stream
ela_input = Input(shape=[h, w, 3], name='ela_input')
ela_model = conv_layers(ela_input)

# Concatenate streams
# concat = tf.keras.layers.Concatenate()([srm_model, ela_model])

# Bilinear fusion
x = Lambda(outer_product, name='outer_product')([srm_model,ela_model])

x = Flatten()(x)
x = Dense(256, activation='relu')(x)
# x = Dense(256)(x)
# x = LeakyReLU(alpha=0.2)(x)
x = BatchNormalization()(x)

output = Dense(2, activation='softmax')(x)

model = tf.keras.Model(inputs=[srm_input, ela_input], outputs=[output])
model.summary()

In [None]:
epochs = 50
batch_size = 32
init_lr = 1e-4
# optimizer = Adam(lr = init_lr)
optimizer = Adam(learning_rate = init_lr, amsgrad=True)
# decay = init_lr/epochs
# optimizer = RMSprop(learning_rate = init_lr, centered = True)

reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.001)

early_stopping = EarlyStopping(monitor='val_accuracy', min_delta=0,patience=5, verbose=0, mode='auto')

checkpoint_filepath = 'two_stream/checkpoint'
model_checkpoint_callback = ModelCheckpoint(filepath=checkpoint_filepath, save_weights_only=True, monitor='val_accuracy', mode='max', save_best_only=True)

# model.compile(optimizer = optimizer, loss = 'binary_crossentropy', metrics=['accuracy'])

In [None]:
history = model.fit([x_train_srm, x_train_ela], y_train, batch_size=batch_size, epochs=epochs, validation_data=([x_val_srm, x_val_ela], y_val), callbacks=[early_stopping, model_checkpoint_callback, reduce_lr])

In [None]:
model.save('two_stream.h5')

# Testing

In [None]:
x_srm_test = [] # SRM converted images
x_ela_test = [] # ELA converted images
test_labels = [] # 0 for fake, 1 for real

In [None]:
#place authentic
Au_path_test = '../synthetic_test/Au'
prepare_data(Au_path_test, 1, x_srm_test, x_ela_test, test_labels)
# random.shuffle(X)
print(len(x_srm_test), len(x_ela_test), len(test_labels))

In [None]:
#place tampered
Tp_path_test = '../synthetic_test/Tp'
prepare_data(Tp_path_test, 0, x_srm_test, x_ela_test, test_labels)
print(len(x_srm_test), len(x_ela_test), len(test_labels))

In [None]:
x_test_srm, x_test2_srm, x_test_ela, x_test2_ela, y_test, y_test2 = reshaping(x_srm_test, x_ela_test, test_labels)

In [None]:
from sklearn.metrics import average_precision_score

predictions = model.predict([x_test_srm, x_test_ela])
average_precision = average_precision_score(y_test, predictions)

In [None]:
average_precision

In [None]:
model.evaluate([x_val_srm, x_val_ela], y_val)

# CASIA

In [None]:
casia_srm = [] # SRM converted images
casia_ela = [] # ELA converted images
casia_labels = [] # 0 for fake, 1 for real

In [None]:
#place authentic
Au_path_casia = '../forgery/data/Au'
prepare_data(Au_path_casia, 1, casia_srm, casia_ela, casia_labels)
# random.shuffle(X)
print(len(casia_srm), len(casia_ela), len(casia_labels))

In [None]:
#place tampered
Tp_path_casia = '../forgery/data/Tp'
prepare_data(Tp_path_casia, 0, casia_srm, casia_ela, casia_labels)
print(len(casia_srm), len(casia_ela), len(casia_labels))

In [None]:
x_train_casia_srm, x_val_casia_srm, x_train_casia_ela, x_val_casia_ela, y_casia_train, y_casia_val = reshaping(casia_srm, casia_ela, casia_labels)

In [None]:
casia_model = tf.keras.models.load_model('two_stream.h5')

# for layer in casia_model.layers[:-1]:
#     layer.trainable = False

In [None]:
casia_model.compile(optimizer = optimizer, loss = 'binary_crossentropy', metrics=['accuracy', tf.keras.metrics.AUC(), tfa.metrics.F1Score(num_classes=2, average="micro")])

In [None]:
history = casia_model.fit([x_train_casia_srm, x_train_casia_ela], y_casia_train, batch_size=batch_size, epochs=epochs, validation_data=([x_val_casia_srm, x_val_casia_ela], y_casia_val), 
                    callbacks=[early_stopping, model_checkpoint_callback, reduce_lr])

In [None]:
casia_model.save('casia_model.h5')