## QDL-CMFD
### a Quality-independent and Deep Learning-based Copy-Move image Forgery Detection method.
#### _ Mehrad Aria

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.layers import Layer, Input, Lambda
from tensorflow.keras.layers import BatchNormalization, Activation, Concatenate
from tensorflow.keras import regularizers
from tensorflow.keras.models import Model
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras import backend as K

import numpy as np
import collections
from itertools import chain
import pickle

from libsvm import svmutil
import scipy.signal as signal
import scipy.special as special
import scipy.optimize as optimize

from PIL import Image
import skimage.io
import skimage.transform
import cv2

import matplotlib.pyplot as plt
from __future__ import print_function
import warnings
warnings.filterwarnings("ignore")
%matplotlib inline
%load_ext autoreload
%autoreload 2

In [None]:
def normalize_kernel(kernel):
    return kernel / np.sum(kernel)

def gaussian_kernel2d(n, sigma):
    Y, X = np.indices((n, n)) - int(n/2)
    gaussian_kernel = 1 / (2 * np.pi * sigma ** 2) * np.exp(-(X ** 2 + Y ** 2) / (2 * sigma ** 2))
    return normalize_kernel(gaussian_kernel)

def local_mean(image, kernel):
    return signal.convolve2d(image, kernel, 'same')

In [None]:
def local_deviation(image, local_mean, kernel):
    sigma = image ** 2
    sigma = signal.convolve2d(sigma, kernel, 'same')
    return np.sqrt(np.abs(local_mean ** 2 - sigma))

In [None]:
def calculate_mscn_coefficients(image, kernel_size=6, sigma=7/6):
    C = 1/255
    kernel = gaussian_kernel2d(kernel_size, sigma=sigma)
    local_mean = signal.convolve2d(image, kernel, 'same')
    local_var = local_deviation(image, local_mean, kernel)

    return (image - local_mean) / (local_var + C)

In [None]:
def generalized_gaussian_dist(x, alpha, sigma):
    beta = sigma * np.sqrt(special.gamma(1 / alpha) / special.gamma(3 / alpha))

    coefficient = alpha / (2 * beta() * special.gamma(1 / alpha))
    return coefficient * np.exp(-(np.abs(x) / beta) ** alpha)

In [None]:
def calculate_pair_product_coefficients(mscn_coefficients):
    return collections.OrderedDict({
        'mscn': mscn_coefficients,
        'horizontal': mscn_coefficients[:, :-1] * mscn_coefficients[:, 1:],
        'vertical': mscn_coefficients[:-1, :] * mscn_coefficients[1:, :],
        'main_diagonal': mscn_coefficients[:-1, :-1] * mscn_coefficients[1:, 1:],
        'secondary_diagonal': mscn_coefficients[1:, :-1] * mscn_coefficients[:-1, 1:]
    })

In [None]:
def asymmetric_generalized_gaussian(x, nu, sigma_l, sigma_r):
    def beta(sigma):
        return sigma * np.sqrt(special.gamma(1 / nu) / special.gamma(3 / nu))

    coefficient = nu / ((beta(sigma_l) + beta(sigma_r)) * special.gamma(1 / nu))
    f = lambda x, sigma: coefficient * np.exp(-(x / beta(sigma)) ** nu)

    return np.where(x < 0, f(-x, sigma_l), f(x, sigma_r))

In [None]:
def asymmetric_generalized_gaussian_fit(x):
    def estimate_phi(alpha):
        numerator = special.gamma(2 / alpha) ** 2
        denominator = special.gamma(1 / alpha) * special.gamma(3 / alpha)
        return numerator / denominator

    def estimate_r_hat(x):
        size = np.prod(x.shape)
        return (np.sum(np.abs(x)) / size) ** 2 / (np.sum(x ** 2) / size)

    def estimate_R_hat(r_hat, gamma):
        numerator = (gamma ** 3 + 1) * (gamma + 1)
        denominator = (gamma ** 2 + 1) ** 2
        return r_hat * numerator / denominator

    def mean_squares_sum(x, filter = lambda z: z == z):
        filtered_values = x[filter(x)]
        squares_sum = np.sum(filtered_values ** 2)
        return squares_sum / ((filtered_values.shape))

    def estimate_gamma(x):
        left_squares = mean_squares_sum(x, lambda z: z < 0)
        right_squares = mean_squares_sum(x, lambda z: z >= 0)

        return np.sqrt(left_squares) / np.sqrt(right_squares)

    def estimate_alpha(x):
        r_hat = estimate_r_hat(x)
        gamma = estimate_gamma(x)
        R_hat = estimate_R_hat(r_hat, gamma)

        solution = optimize.root(lambda z: estimate_phi(z) - R_hat, [0.2]).x

        return solution[0]

    def estimate_sigma(x, alpha, filter = lambda z: z < 0):
        return np.sqrt(mean_squares_sum(x, filter))

    def estimate_mean(alpha, sigma_l, sigma_r):
        return (sigma_r - sigma_l) * constant * (special.gamma(2 / alpha) / special.gamma(1 / alpha))

    alpha = estimate_alpha(x)
    sigma_l = estimate_sigma(x, alpha, lambda z: z < 0)
    sigma_r = estimate_sigma(x, alpha, lambda z: z >= 0)

    constant = np.sqrt(special.gamma(1 / alpha) / special.gamma(3 / alpha))
    mean = estimate_mean(alpha, sigma_l, sigma_r)

    return alpha, mean, sigma_l, sigma_r

In [None]:
def calculate_brisque_features(image, kernel_size=7, sigma=7/6):
    def calculate_features(coefficients_name, coefficients, accum=np.array([])):
        alpha, mean, sigma_l, sigma_r = asymmetric_generalized_gaussian_fit(coefficients)

        if coefficients_name == 'mscn':
            var = (sigma_l ** 2 + sigma_r ** 2) / 2
            return [alpha, var]

        return [alpha, mean, sigma_l ** 2, sigma_r ** 2]

    mscn_coefficients = calculate_mscn_coefficients(image, kernel_size, sigma)
    coefficients = calculate_pair_product_coefficients(mscn_coefficients)

    features = [calculate_features(name, coeff) for name, coeff in coefficients.items()]
    flatten_features = list(chain.from_iterable(features))
    return np.array(flatten_features)

In [None]:
def plot_histogram(x, label):
    n, bins = np.histogram(x.ravel(), bins=50)
    n = n / np.max(n)
    plt.plot(bins[:-1], n, label=label, marker='o')

In [None]:
gray_image = skimage.color.rgb2gray(skimage.io.imread('./Images/LR.jpg'))
_ = skimage.io.imshow('./Images/LR.jpg')

In [None]:
mscn_coefficients = calculate_mscn_coefficients(gray_image, 7, 7/6)
coefficients = calculate_pair_product_coefficients(mscn_coefficients)

%matplotlib inline
plt.rcParams["figure.figsize"] = 12, 11

for name, coeff in coefficients.items():
    plot_histogram(coeff.ravel(), name)

plt.axis([-2.5, 2.5, 0, 1.05])
plt.legend(prop={'size': 16})
plt.show()

In [None]:
brisque_features = calculate_brisque_features(gray_image, kernel_size=7, sigma=7/6)

downscaled_image = cv2.resize(gray_image, None, fx=1/2, fy=1/2, interpolation = cv2.INTER_CUBIC)
downscale_brisque_features = calculate_brisque_features(downscaled_image, kernel_size=7, sigma=7/6)

brisque_features = np.concatenate((brisque_features, downscale_brisque_features))

def scale_features(features):
    with open('./BRISQUE/normalize.pickle', 'rb') as handle:
        scale_params = pickle.load(handle)

    min_ = np.array(scale_params['min_'])
    max_ = np.array(scale_params['max_'])

    return -1 + (2.0 / (max_ - min_) * (features - min_))

def calculate_image_quality_score(brisque_features):
    model = svmutil.svm_load_model('./BRISQUE/brisque_svm.txt')
    scaled_brisque_features = scale_features(brisque_features)

    x, idx = svmutil.gen_svm_nodearray(
        scaled_brisque_features,
        isKernel=(model.param.kernel_type == svmutil.PRECOMPUTED))

    nr_classifier = 1
    prob_estimates = (svmutil.c_double * nr_classifier)()
    result = svmutil.libsvm.svm_predict_probability(model, x, prob_estimates)
    Nresult = 100 - result
    return Nresult

calculate_image_quality_score(brisque_features)

In [None]:
! python ./OSRGAN/OSRGAN.py --model_path OSRGAN/Model/OSRGAN.pth --input Images/LR.jpg

In [None]:
gray_image = skimage.color.rgb2gray(skimage.io.imread('./Images/SR.jpg'))
_ = skimage.io.imshow('./Images/SR.jpg')

mscn_coefficients = calculate_mscn_coefficients(gray_image, 7, 7/6)
coefficients = calculate_pair_product_coefficients(mscn_coefficients)

%matplotlib inline
plt.rcParams["figure.figsize"] = 12, 11

for name, coeff in coefficients.items():
    plot_histogram(coeff.ravel(), name)

plt.axis([-2.5, 2.5, 0, 1.05])
plt.legend(prop={'size': 16})
plt.show()

brisque_features = calculate_brisque_features(gray_image, kernel_size=7, sigma=7/6)

downscaled_image = cv2.resize(gray_image, None, fx=1/2, fy=1/2, interpolation = cv2.INTER_CUBIC)
downscale_brisque_features = calculate_brisque_features(downscaled_image, kernel_size=7, sigma=7/6)

brisque_features = np.concatenate((brisque_features, downscale_brisque_features))

calculate_image_quality_score(brisque_features)

In [None]:
def std_norm_along_chs(x) :
    avg = K.mean(x, axis=-1, keepdims=True)
    std = K.maximum(1e-4, K.std(x, axis=-1, keepdims=True))
    return (x - avg) / std

def BnInception(x, nb_inc=16, inc_filt_list=[(1,1), (3,3), (5,5)], name='uinc') :
    uc_list = []
    for idx, ftuple in enumerate( inc_filt_list ) :
        uc = Conv2D( nb_inc, ftuple, activation='linear', padding='same', name=name+'_c%d' % idx)(x)
        uc_list.append(uc)
    if ( len( uc_list ) > 1 ) :
        uc_merge = Concatenate( axis=-1, name=name+'_merge')(uc_list)
    else :
        uc_merge = uc_list[0]
    uc_norm = BatchNormalization(name=name+'_bn')(uc_merge)
    xn = Activation('relu', name=name+'_re')(uc_norm)
    return xn

class SelfCorrelationPercPooling( Layer ) :
    def __init__( self, nb_pools=256, **kwargs ) :
        self.nb_pools = nb_pools
        super( SelfCorrelationPercPooling, self ).__init__( **kwargs )
    def build( self, input_shape ) :
        self.built = True
    def call( self, x, mask=None ) :
        bsize, nb_rows, nb_cols, nb_feats = K.int_shape( x )
        nb_maps = nb_rows * nb_cols
        x_3d = K.reshape( x, tf.stack( [ -1, nb_maps, nb_feats ] ) )
        x_corr_3d = tf.matmul( x_3d, x_3d, transpose_a = False, transpose_b = True ) / nb_feats
        x_corr = K.reshape( x_corr_3d, tf.stack( [ -1, nb_rows, nb_cols, nb_maps ] ) )
        if ( self.nb_pools is not None ) :
            ranks = K.cast( K.round( tf.linspace( 1., nb_maps - 1, self.nb_pools ) ), 'int32' )
        else :
            ranks = tf.range( 1, nb_maps, dtype = 'int32' )
        x_sort, _ = tf.nn.top_k( x_corr, k = nb_maps, sorted = True )
        x_f1st_sort = K.permute_dimensions( x_sort, ( 3, 0, 1, 2 ) )
        x_f1st_pool = tf.gather( x_f1st_sort, ranks )
        x_pool = K.permute_dimensions( x_f1st_pool, ( 1, 2, 3, 0 ) )
        return x_pool
    def compute_output_shape( self, input_shape ) :
        bsize, nb_rows, nb_cols, nb_feats = input_shape
        nb_pools = self.nb_pools if ( self.nb_pools is not None ) else ( nb_rows * nb_cols - 1 )
        return tuple( [ bsize, nb_rows, nb_cols, nb_pools ] )

class BilinearUpSampling2D( Layer ) :
    def call( self, x, mask=None ) :
        bsize, nb_rows, nb_cols, nb_filts = K.int_shape(x)
        new_size = tf.constant( [ nb_rows * 2, nb_cols * 2 ], dtype = tf.int32 )
        return tf.image.resize( x, new_size)
    def compute_output_shape( self, input_shape ) :
        bsize, nb_rows, nb_cols, nb_filts = input_shape
        return tuple( [ bsize, nb_rows * 2, nb_cols * 2, nb_filts ] )

class ResizeBack( Layer ) :
    def call( self, x ) :
        t, r = x
        new_size = [ tf.shape(input=r)[1], tf.shape(input=r)[2] ]
        return tf.image.resize( t, new_size)
    def compute_output_shape( self, input_shapes ) :
        tshape, rshape = input_shapes
        return ( tshape[0], ) + rshape[1:3] + ( tshape[-1], )
    
class Preprocess( Layer ) :

    def call( self, x, mask=None ) :
        bsize, nb_rows, nb_cols, nb_colors = K.int_shape(x)
        if (nb_rows != 256) or (nb_cols !=256) :
            x256 = tf.image.resize( x,
                                             [256, 256],
                                             method=tf.image.ResizeMethod.BILINEAR,
                                             name='resize' )
        else :
            x256 = x
        if K.dtype(x) == 'float32' :
            xout = x256
        else :
            xout = preprocess_input( x256 )
        return xout
    def compute_output_shape( self, input_shape ) :
        return (input_shape[0], 256, 256, 3)

def create_qdlcmfd_similarity_subnetwork( img_shape=(256,256,3),
                                   nb_pools=100,
                                   name='QDL-CMFD-S' ) :
    img_input = Input( shape=img_shape, name=name+'_in' )
    bname = name + '_cnn'

    x = Conv2D(64, (3, 3), activation=tf.keras.layers.LeakyReLU(), kernel_regularizer=regularizers.L1L2(l2=1e-4), padding='same', name=bname+'_c1')(img_input)
    x = BatchNormalization(axis = -1, name=bname+'_b1')(x)
    x = Conv2D(256, (3, 3), activation=tf.keras.layers.LeakyReLU(), kernel_regularizer=regularizers.L1L2(l2=1e-4), padding='same', name=bname+'_c2')(x)
    x = BatchNormalization(axis = -1, name=bname+'_b2')(x)
    x = Conv2D(512, (3, 3), activation=tf.keras.layers.LeakyReLU(), kernel_regularizer=regularizers.L1L2(l2=1e-4), padding='same', name=bname+'_c3')(x)
    x = BatchNormalization(axis = -1, name=bname+'_b3')(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name=bname+'_mp')(x)
    x = Activation(std_norm_along_chs, name=bname+'_sn')(x)

    bname = name + '_corr'
    xcorr = SelfCorrelationPercPooling(name=bname+'_corr')(x)
    xn = BatchNormalization(name=bname+'_bn')(xcorr)
    patch_list = [(1,1),(3,3),(5,5)]

    bname = name + '_dconv'
    f16  = BnInception( xn, 8, patch_list, name =bname+'_mpf')
    f32  = BilinearUpSampling2D( name=bname+'_bx2')( f16 )
    dx32 = BnInception( f32, 6, patch_list, name=bname+'_dx2')
    f64a = BilinearUpSampling2D( name=bname+'_bx4a')( f32 )
    f64b = BilinearUpSampling2D( name=bname+'_bx4b')( dx32 )
    f64  = Concatenate(axis=-1, name=name+'_dx4_m')([f64a, f64b])
    dx64 = BnInception( f64, 4, patch_list, name=bname+'_dx4')
    f128a = BilinearUpSampling2D( name=bname+'_bx8a')( f64a )
    f128b = BilinearUpSampling2D( name=bname+'_bx8b')( dx64 )
    f128  = Concatenate(axis=-1, name=name+'_dx8_m')([f128a, f128b])
    dx128 = BnInception( f128, 2, patch_list, name=bname+'_dx8')
    f256a = BilinearUpSampling2D( name=bname+'_bx16a')( f128a )
    f256b = BilinearUpSampling2D( name=bname+'_bx16b')( dx128 )
    f256  = Concatenate(axis=-1, name=name+'_dx16_m')([f256a,f256b])
    dx256 = BnInception( f256, 2, patch_list, name=bname+'_dx16')
    fm256 = Concatenate(axis=-1,name=name+'_mfeat')([f256a,dx256])
    masks = BnInception( fm256, 2, [(5,5),(7,7),(11,11)], name=bname+'_dxF')

    pred_mask = Conv2D(1, (3,3), activation='sigmoid', name=name+'_pred_mask', padding='same')(masks)

    model = Model(inputs=img_input, outputs=pred_mask, name=name)
    return model

def create_qdlcmfd_manipulation_subnetwork( img_shape=(256,256,3),
                                     name='QDL-CMFD-M' ) :
    img_input = Input( shape = img_shape, name = name+'_in' )
    bname = name + '_cnn'

    x = Conv2D(64, (3, 3), activation=tf.keras.layers.LeakyReLU(), kernel_regularizer=regularizers.L1L2(l2=1e-4), padding='same', name=bname+'_c1')(img_input)
    x = BatchNormalization(axis = -1, name=bname+'_b1')(x)
    x = Conv2D(256, (3, 3), activation=tf.keras.layers.LeakyReLU(), kernel_regularizer=regularizers.L1L2(l2=1e-4), padding='same', name=bname+'_c2')(x)
    x = BatchNormalization(axis = -1, name=bname+'_b2')(x)
    x = Conv2D(512, (3, 3), activation=tf.keras.layers.LeakyReLU(), kernel_regularizer=regularizers.L1L2(l2=1e-4), padding='same', name=bname+'_c3')(x)
    x = BatchNormalization(axis = -1, name=bname+'_b3')(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name=bname+'_mp')(x)

    patch_list = [(1,1),(3,3),(5,5)]
    bname = name + '_dconv'
    f16 = BnInception( x, 8, patch_list, name =bname+'_mpf')
    f32  = BilinearUpSampling2D(name=bname+'_bx2')( f16 )
    dx32 = BnInception( f32, 6, patch_list, name=bname+'_dx2')
    f64  = BilinearUpSampling2D(name=bname+'_bx4')( dx32 )
    dx64 = BnInception( f64, 4, patch_list, name=bname+'_dx4')
    f128  = BilinearUpSampling2D(name=bname+'_bx8')( dx64 )
    dx128 = BnInception( f128, 2, patch_list, name=bname+'_dx8')
    f256  = BilinearUpSampling2D(name=bname+'_bx16')( dx128 )
    dx256 = BnInception( f256, 2, [(5,5),(7,7),(11,11)], name=bname+'_dx16')

    pred_mask = Conv2D(1, (3,3), activation='sigmoid', name=bname+'_pred_mask', padding='same')(dx256)

    model = Model(inputs=img_input, outputs=pred_mask, name = bname)
    return model

def create_QDLCMFD_test_model( weight_file=None ) :
    similarity_subnetwork = create_qdlcmfd_similarity_subnetwork()
    manipulation_subnetwork = create_qdlcmfd_manipulation_subnetwork()
    QDLCMFD_S = Model( inputs=similarity_subnetwork.inputs, outputs=similarity_subnetwork.layers[-2].output)
    QDLCMFD_M = Model( inputs=manipulation_subnetwork.inputs, outputs=manipulation_subnetwork.layers[-2].output)

    img_raw = Input( shape=(None,None,3), name='image_in')
    img_in = Preprocess( name='preprocess')( img_raw )
    subnetwork1 = QDLCMFD_S( img_in )
    subnetwork2 = QDLCMFD_M( img_in )
    merged_network = Concatenate(axis=-1, name='merge')([subnetwork1, subnetwork2])
    f = BnInception( merged_network, 3, name='fusion' )
    mask_out = Conv2D( 3, (3,3), padding='same', activation='softmax', name='pred_mask')(f)
    mask_out = ResizeBack(name='restore')([mask_out, img_raw] )

    model = Model( inputs = img_raw, outputs = mask_out, name = 'QDL-CMFD')
    if weight_file is not None :
        try :
            model.load_weights(weight_file)
            print("Model successfully loaded from {}".format(weight_file))
        except Exception as e :
            print("Failed to load from {} / Error: {}".format(weight_file,e))
    return model

In [None]:
def QDLCMFD_decoder( QDLCMFDmodel, rgb ) :
    single_sample_batch = np.expand_dims( rgb, axis=0 )
    pred = QDLCMFDmodel.predict( single_sample_batch )[0]
    return pred

def visualize_result( rgb, gt, pred, figsize=(12,4), title=None ) :
    plt.figure( figsize=figsize )
    plt.subplot(131)
    plt.imshow( rgb )
    plt.title('input image')
    plt.subplot(132)
    plt.title('ground truth')
    plt.imshow(gt)
    plt.subplot(133)
    plt.imshow(pred)
    plt.title('DCMFD pred')
    if title is not None :
        plt.suptitle( title )

def visualize_1result( rgb, pred, figsize=(12,4), title=None ) :
    plt.figure( figsize=figsize )
    plt.subplot(121)
    plt.imshow( rgb )
    plt.title('input image')
    plt.subplot(122)
    plt.imshow(pred)
    plt.title('DCMFD pred')
    if title is not None :
        plt.suptitle( title )

In [None]:
QDLCMFDmodel = create_QDLCMFD_test_model('./Model/QDL-CMFD-Model.hd5')

In [None]:
image = Image.open('./Images/SR.jpg')
pred = QDLCMFD_decoder(QDLCMFDmodel, image)
visualize_1result(image, pred)

----

**QDL-CMFD** v.1.2.16 | a Quality-independent and Deep Learning-based Copy-Move image Forgery Detection method.
<br>{Image Forgery Detection, Copy-Move Forgery, Deep Learning, Generative Adversarial Networks, Convolutional Neural Networks, Super-Resolution Enhancement}

© Proposed method implementation by **Mehrad Aria** for Paper [QDL-CMFD](https://doi.org/10.1016/j.neucom.2022.09.017).
<br>September 2021 / Shiraz, Iran.

----