

---


# Face Anti-Spoofing using Central Difference Convolution Networks


---


> **CS584 - Research Project - Fall 2021**
By *Ismail Elomari Alaoui* and *Reda Chaguer*

---




In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import math

from tensorflow.keras import Model
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.initializers import VarianceScaling
from tensorflow.keras.layers import StringLookup, Conv2D, Layer, MaxPooling2D, Input, Dense, Flatten, BatchNormalization, Activation, UpSampling2D, concatenate, Resizing, Flatten, Dropout

from sklearn.metrics import confusion_matrix

In [None]:
# # Kaggle
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))
tf.keras.backend.clear_session()
# Colab
# Initialize the session using GPU Configuration
# sess = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(log_device_placement=True))


In [None]:
batch_size = 8
theta = 0.7
use_nn = False
g_dropout = False
l_dropout = False
dropout_val = 0.2

## DATA Loading


In [None]:
# from google.colab import drive
# drive.mount('/content/gdrive/')

In [None]:
# !unzip '/content/gdrive/MyDrive/MSU-MSFD/MSU-MSFD.zip'

In [None]:
import pandas as pd
import os
# combining real and fake faces file for training and dev
data_dir = "/kaggle/input/msumsfd/MSU-MSFD/"
 
real_vid_loc=data_dir+"scene01/real/" 
attack_vid_loc=data_dir+"scene01/attack/"

In [None]:
train_txt_file = data_dir+"train_sub_list.txt"   

ext = '.mp4'
with open("train_combined.txt", "w") as out:
  with open(train_txt_file, "r") as file:
    for line in file:        
      line=line.rstrip()
      for attack in ['android_SD_', 'laptop_SD_']:
        for typ in ['ipad_video_', 'iphone_video_', 'printed_photo_']:
          for id, rf in enumerate(['attack', 'real']):
            if attack == 'laptop_SD_':
              ext = '.mov'
            if rf == 'attack':
              string = attack_vid_loc+rf+"_client0"+line+"_"+attack+typ+"scene01"+ext
            else:
              string = real_vid_loc+rf+"_client0"+line+"_"+attack+"scene01"+ext
            if os.path.exists(string):
              out.write(string+","+str(id)+"\n") #-----> format: folder/image_name
            ext = '.mp4'

train_df=pd.read_csv (r'train_combined.txt', header = None)
train_df.columns = ["video", "label"]
train_df.to_csv (r"train_combined.csv", index=None)

print(f"Total number of examples for training: {len(train_df)}")

In [None]:
test_txt_file = data_dir+"test_sub_list.txt"

ext = '.mp4'
with open("test_combined.txt", "w") as out:
  with open(test_txt_file, "r") as file:
    for line in file:        
      line=line.rstrip()
      for attack in ['android_SD_', 'laptop_SD_']:
        for typ in ['ipad_video_', 'iphone_video_', 'printed_photo_']:
          for id, rf in enumerate(['attack', 'real']):
            if attack == 'laptop_SD_':
              ext = '.mov'
            if rf == 'attack':
              string = attack_vid_loc+rf+"_client0"+line+"_"+attack+typ+"scene01"+ext
            else:
              string = real_vid_loc+rf+"_client0"+line+"_"+attack+"scene01"+ext
            if os.path.exists(string):
              out.write(string+","+str(id)+"\n") #-----> format: folder/image_name
            ext = '.mp4'

test_df=pd.read_csv (r'test_combined.txt', header = None)
test_df.columns = ["video","label"]
test_df.to_csv (r"test_combined.csv", index=None)

print(f"Total number of examples for testing: {len(test_df)}")

## Data processing

In [None]:
## Video preprocessing

IMG_SIZE = 256
MAX_FRAMES = 20
# ESCAPE_FRAMES = 30
# TRAIN_FRAMES = 5

# The following two methods are taken from this tutorial:
# https://www.tensorflow.org/hub/tutorials/action_recognition_with_tf_hub

import cv2
def crop_center_square(frame):
    y, x = frame.shape[0:2]
    min_dim = min(y, x)
    start_x = (x // 2) - (min_dim // 2)
    start_y = (y // 2) - (min_dim // 2)
    return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]


def load_video(path, max_frames=MAX_FRAMES, resize=(IMG_SIZE, IMG_SIZE)):
    cap = cv2.VideoCapture(path)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = crop_center_square(frame)
            frame = cv2.resize(frame, resize)
            frame = frame[:, :, [2, 1, 0]]
            frames.append(frame)

            if len(frames) == max_frames:
                break
    finally:
        cap.release()
    return np.array(frames) #, len(frames)

In [None]:
# path = test_df['video'].iloc[0]
# L = load_video(path)
# print(np.shape(L))
# plt.imshow(L[0].astype("uint8"))

In [None]:
from random import randint
def prepare_all_videos(df, max_frames=MAX_FRAMES):
    X = list()
    Y = list()
    for index, row in df.iterrows():
        frames = load_video(row['video'], max_frames=max_frames)
        X.append(frames)
        Y.append(row['label'])
    return X, Y

def prepare_all_videos_v2(df, max_frames=MAX_FRAMES):
    X = list()
    Y = list()
    for index, row in df.iterrows():
        frames = load_video(row['video'], max_frames=max_frames)
        for x in frames:
            X.append(x)
            Y.append(row['label'])
    return np.array(X), np.array(Y)

def pick_input_images(X, Y, nb_images_per_video=3):
    new_df = pd.DataFrame(columns = ['image', 'label'])
    
    for i, frames in enumerate(X):
      print(frames.shape)
      n = frames.shape[0]
      for _ in range(nb_images_per_video):
          new_row = {'image':frames[randint(0, n-1), :, :, :], 'label':Y[i]}
          new_df = new_df.append(new_row, ignore_index=True)
    return new_df

In [None]:
    # train_data = new_train_df['image'].to_numpy()
    # train_labels = new_train_df['label'].to_numpy(dtype=int)
    # test_data = new_test_df['image'].to_numpy()
    # test_labels = new_test_df['label'].to_numpy(dtype=int)

    # # print(X)
    # print(X.shape)
    # print(np.shape(X))
    # print(X[0].shape)
    # print(Y.shape)

In [None]:
# X, Y = prepare_all_videos(train_df.sample(2))
# new_train_df = pick_input_images(X, Y)
# print(new_train_df.shape)
# # new_test_df = prepare_all_videos(test_df.sample(5))

## Model Conception


In [None]:
class Conv2d_cd(Layer):
    def __init__(self, filters, kernel_size=3, strides=1,
                 padding='SAME', use_bias=False, theta=0.7):
        super(Conv2d_cd, self).__init__()
        self.filters = filters
        self.kernel_size = kernel_size
        self.strides = strides
        self.padding = padding
        self.theta = theta
        self.use_bias = use_bias
        self.kernel_initializer = VarianceScaling(scale=2.0)

    def build(self, input_shape):
        self._filter = tf.compat.v1.get_variable(name='conv2d_cd', shape=[self.kernel_size, self.kernel_size, input_shape[-1], self.filters], initializer=self.kernel_initializer)

    def call(self, inputs):
        out_normal = tf.nn.conv2d(inputs, self._filter, strides=[1, self.strides, self.strides, 1], padding=self.padding, name='conv2d_cd/normal')
        if math.fabs(self.theta - 0.0) < 1e-8:
            return out_normal 
        kernel_diff = tf.math.reduce_sum(self._filter, axis=0, keepdims=True)
        kernel_diff = tf.math.reduce_sum(kernel_diff, axis=1, keepdims=True)
        kernel_diff = tf.tile(kernel_diff, [self.kernel_size, self.kernel_size, 1, 1])
        out_diff = tf.nn.conv2d(inputs, kernel_diff, strides=[1, self.strides, self.strides, 1], padding=self.padding, name='conv2d_cd/diff')

        return out_normal - self.theta * out_diff


In [None]:
def getBlock(theta):
    Block = Sequential()
    
    Block.add(Conv2d_cd(128, padding='SAME', theta=theta))
    Block.add(BatchNormalization())
    Block.add(Activation('relu'))
    if l_dropout:
        Block.add(Dropout(dropout_val))
        
    Block.add(Conv2d_cd(196, padding='SAME', theta=theta))
    Block.add(BatchNormalization())
    Block.add(Activation('relu'))
    if l_dropout:
        Block.add(Dropout(dropout_val))
        
    Block.add(Conv2d_cd(128, padding='SAME', theta=theta))
    Block.add(BatchNormalization())
    Block.add(Activation('relu'))
    if l_dropout:
        Block.add(Dropout(dropout_val))
        
    Block.add(MaxPooling2D((3, 3), strides=2, padding='SAME'))
    
    if g_dropout:
        Block.add(Dropout(dropout_val))
        
    return Block

def getConvLayer(nb_filters, theta, input_shape=None):
    conv = Sequential()
    if input_shape:
        conv.add(Input(input_shape))
    conv.add(Conv2d_cd(nb_filters, padding='SAME', theta=theta))
    conv.add(BatchNormalization())
    conv.add(Activation('relu'))
    if g_dropout:
        conv.add(Dropout(dropout_val))
    return conv
    

In [None]:
class CDCN(Model):
    def __init__(self, input_shape=(256, 256, 3), theta=0.7):
        super(CDCN, self).__init__()  

        # self.pick_input_images = pick_input_images

        self.Conv1 = getConvLayer(64, theta, input_shape=input_shape)

        self.Block1 = getBlock(theta)

        self.Block2 = getBlock(theta)   

        self.Block3 = getBlock(theta)

        self.Conv2 = getConvLayer(128, theta)

        self.Conv3 = getConvLayer(64, theta)

        self.Conv4 = getConvLayer(1, theta)

        self.downsample32x32 = Resizing(32, 32, interpolation='bilinear')

        self.nn = Sequential()
        self.nn.add(Flatten())
        self.nn.add(Dense(1, activation='sigmoid'))



    def call(self, input_tensor):	    	# x [3, 256, 256]
        
        # X, Y = input_tensor

        # x = self.pick_input_images(X, Y)
        # train_data = new_train_df['image'].to_numpy()
        # train_labels = new_train_df['label'].to_numpy(dtype=int)

        x = self.Conv1(input_tensor)		   
        
        x_Block1 = self.Block1(x)	    	    	# x [128, 128, 128]
        x_Block1_32x32 = self.downsample32x32(x_Block1)   # x [128, 32, 32]  
        
        x_Block2 = self.Block2(x_Block1)	    # x [128, 64, 64]	  
        x_Block2_32x32 = self.downsample32x32(x_Block2)   # x [128, 32, 32]  
        
        x_Block3 = self.Block3(x_Block2)	    # x [128, 32, 32]  	
        x_Block3_32x32 = self.downsample32x32(x_Block3)   # x [128, 32, 32]  
        
        x = concatenate((x_Block1_32x32,x_Block2_32x32, x_Block3_32x32), axis=-1)    # x [128*3, 32, 32]   ## Changed axis 1 with axis 3
        
        x = self.Conv2(x)    # x [128, 32, 32] 
        x = self.Conv3(x)    # x [64, 32, 32] 
        x = self.Conv4(x)    # x [1, 32, 32] 
        
        x = tf.squeeze(x, axis=-1)

        y_pred = self.nn(x)
        return y_pred 

In [None]:
def Normalization(image, label):
    image = tf.cast(image/255. ,tf.float16)
    return image, label

def Normalization_MSU(x):
    return x/255.

In [None]:
# # ## Kaggle NUAA
# # # All Dataset Directories
# data_dirs = {'raw': '/kaggle/input/nuaadataset/NUAA/raw', 'normalized':'/kaggle/input/nuaadataset/NUAA/normalized', 'detected':'/kaggle/input/nuaadataset/NUAA/detected'}

# img_height = 256
# img_width = 256

# train, valid = {}, {}

# for ds, ds_path in data_dirs.items():
#     train[ds]  = tf.keras.utils.image_dataset_from_directory(
#       ds_path,
#       validation_split=0.2,
#       subset="training",
#       labels="inferred",
#       label_mode = "binary",
#       image_size=(img_height, img_width),
#       seed=123,
#       batch_size=batch_size)

#     valid[ds]  = tf.keras.utils.image_dataset_from_directory(
#       ds_path,
#       validation_split=0.2,
#       subset="validation",
#       labels="inferred",
#       label_mode = "binary",
#       image_size=(img_height, img_width),
#       seed=123,
#       batch_size=batch_size)
    
#     train[ds].map(Normalization, num_parallel_calls=tf.data.AUTOTUNE)
#     valid[ds].map(Normalization, num_parallel_calls=tf.data.AUTOTUNE)


In [None]:
# # Plotting some data
# for ds, _ in data_dirs.items():
#     plt.figure(figsize=(10, 10))
#     class_names = train[ds].class_names
#     for images, labels in train[ds].take(1):
#         for i in range(batch_size):
#             ax = plt.subplot(6, 6, i + 1)
#             plt.imshow(images[i].numpy().astype("uint8"))
#             plt.title(class_names[int(labels[i])])
#             plt.axis("off")

In [None]:
def trainModel(train, valid, dataset, y_train=None, input_shape=(256, 256, 3), theta=0.7, epochs=3):
    model = CDCN(input_shape=input_shape, theta=theta)

    model.compile(loss='binary_crossentropy',
               optimizer='adam',
               metrics=[tf.keras.metrics.BinaryAccuracy(), tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])

    # Train model
    if dataset == "MSU-MSFD":
        history = model.fit(train, y_train, epochs=epochs, validation_data=valid)
    else:
        history = model.fit(train[dataset], epochs=epochs, validation_data=valid[dataset])
    model.save('/kaggle/working/models/model_'+dataset)
    return model, history

def benchmarkModel(model):
    scores = {}
    # Evalution Using Testing Set
    for ds in data_dirs.keys():
        scores[ds] = model.evaluate(valid[ds], verbose=1)
    return scores

def prob_to_labels(probs, threshold=0.5):
	return (probs >= threshold).astype('int')

def getConfusionMatrix(model, dataset):
    x = np.concatenate([x for x, y in valid[dataset]], axis=0)
    y = np.concatenate([y for x, y in valid[dataset]], axis=0)
    probs_pred = model.predict(x)
    y_pred = prob_to_labels(probs_pred)
    confusion_matrix(y, y_pred)

In [None]:
X_train, Y_train = prepare_all_videos_v2(train_df, max_frames=30)

In [None]:
# X_train = list(map(Normalization_MSU, X_train))

In [None]:
# X_train, Y_train = np.array(X_train), np.array(Y_train)
print(X_train.shape)
print(Y_train.shape)

In [None]:
X_test, Y_test = prepare_all_videos_v2(test_df, max_frames=1)

In [None]:
# X_test = list(map(Normalization_MSU, X_test))

In [None]:
#  X_test, Y_test = np.array(X_test), np.array(Y_test)
print(X_test.shape)
print(Y_test.shape)

In [None]:
model_msu, h_msu = trainModel(X_train, (X_test, Y_test), 'MSU-MSFD', y_train=Y_train, epochs=5)

In [None]:
!zip -r '/kaggle/working/models.zip' '/kaggle/working/models'

In [None]:
model_msu = load_model('/kaggle/working/models/model_MSU-MSFD')

In [None]:
model_msu.summary()

In [None]:
# y_pred = model_msu.predict(X_test)

In [None]:
# print(Y_test)
# y_pred = y_pred.T[0]
# y_pred = list(map(lambda x: 0 if x<0.5 else 1, y_pred))
# print(y_pred)

In [None]:
# # # Plotting some data MSU-MSFD
# class_names = ['real', 'fake']
# # Plotting some data
# plt.figure(figsize=(10, 10))

# for image in X_test:
#     for i in range(8):
#         ax = plt.subplot(6, 6, i + 1)
#         plt.imshow(image.astype("uint8"))
#         plt.title("tag:"+class_names[Y_test[i]]+"\npred:"+class_names[y_pred[i]])
#         plt.axis("off")
# plt.tight_layout()
     


In [None]:
# model_raw, h_raw = trainModel(train, valid, 'raw')

In [None]:
# model_raw = load_model('/kaggle/working/models/model_raw')
# s_raw = benchmarkModel(model_raw)

In [None]:
# getConfusionMatrix(model_raw, 'raw') 

In [None]:
# model_normalized, h_normalized = trainModel(train, valid, 'normalized', epochs=2)

In [None]:
# model_normalized = load_model('/kaggle/working/models/model_normalized')
# s_normalized = benchmarkModel(model_normalized)

In [None]:
# getConfusionMatrix(model_normalized, 'normalized')

In [None]:
# model_detected, h_detected = trainModel(train, valid, 'detected')

In [None]:
# model_detected = load_model('/kaggle/working/models/model_detected')
# s_detected = benchmarkModel(model_detected)

In [None]:
# getConfusionMatrix(model_detected, 'detected')

In [None]:
# !cp -r  '/kaggle/input/model-raw/kaggle/working/model_raw' '/kaggle/working/model_raw'

In [None]:
# !zip -r '/kaggle/working/models.zip' '/kaggle/working/models'