In [2]:
import warnings
warnings.filterwarnings('ignore')
import os
import cv2
import keras
import numpy as np
import pandas as pd
from google.colab import drive
import matplotlib.pyplot as plt
import tensorflow as tf
from keras import layers
from keras.applications.inception_v3 import InceptionV3
from keras.models import Model
from keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

## Define Parameters

In [2]:
# parameters
IMG_SIZE = 256
BATCH_SIZE = 128
EPOCHS = 20
LEARNING_RATE = 0.001

## Localization Network of Spatial Transform Layer

In [3]:
# Spatial transformer localization network
def get_localization_network():
    localization = keras.Sequential([
        layers.Conv2D(8, kernel_size=7, input_shape=(IMG_SIZE, IMG_SIZE, 3), \
                        activation="relu", kernel_initializer="he_normal"),
        layers.MaxPool2D(strides=2),
        layers.Conv2D(10, kernel_size=5, activation="relu", kernel_initializer="he_normal"),
        layers.MaxPool2D(strides=2),
    ])
    return localization

# Regressor for the 3 * 2 affine matrix
def get_affine_params():
    output_bias = keras.initializers.Constant([1, 0, 0, 0, 1, 0])
    fc_loc = keras.Sequential([
        layers.Dense(32, activation="relu", kernel_initializer="he_normal"),
        layers.Dense(3 * 2, kernel_initializer="zeros", bias_initializer=output_bias)
    ])

    return fc_loc

## Get Pixel Value of Transformed Image in (x, y)

In [4]:
def get_pixel_value(img, x, y):
    """
    Utility function to get pixel value for coordinate
    vectors x and y from a  4D tensor image.
    Input
    -----
    - img: tensor of shape (B, H, W, C)
    - x: flattened tensor of shape (B*H*W,)
    - y: flattened tensor of shape (B*H*W,)
    Returns
    -------
    - output: tensor of shape (B, H, W, C)
    """
    shape = tf.shape(x)
    batch_size = shape[0]
    height = shape[1]
    width = shape[2]

    batch_idx = tf.range(0, batch_size)
    batch_idx = tf.reshape(batch_idx, (batch_size, 1, 1))
    b = tf.tile(batch_idx, (1, height, width))

    indices = tf.stack([b, y, x], 3)

    return tf.gather_nd(img, indices)

## Get Affine Transform Grid

In [5]:
def affine_grid_generator(height, width, theta):
    """
    This function returns a sampling grid, which when
    used with the bilinear sampler on the input feature
    map, will create an output feature map that is an
    affine transformation [1] of the input feature map.
    Input
    -----
    - height: desired height of grid/output. Used
      to downsample or upsample.
    - width: desired width of grid/output. Used
      to downsample or upsample.
    - theta: affine transform matrices of shape (num_batch, 2, 3).
      For each image in the batch, we have 6 theta parameters of
      the form (2x3) that define the affine transformation T.
    Returns
    -------
    - normalized grid (-1, 1) of shape (num_batch, 2, H, W).
      The 2nd dimension has 2 components: (x, y) which are the
      sampling points of the original image for each point in the
      target image.
    Note
    ----
    [1]: the affine transformation allows cropping, translation,
         and isotropic scaling.
    """
    num_batch = tf.shape(theta)[0]

    # create normalized 2D grid
    x = tf.linspace(-1.0, 1.0, width)
    y = tf.linspace(-1.0, 1.0, height)
    x_t, y_t = tf.meshgrid(x, y)

    # flatten
    x_t_flat = tf.reshape(x_t, [-1])
    y_t_flat = tf.reshape(y_t, [-1])
    # reshape to [x_t, y_t , 1] - (homogeneous form)
    ones = tf.ones_like(x_t_flat)
    sampling_grid = tf.stack([x_t_flat, y_t_flat, ones])

    # repeat grid num_batch times
    sampling_grid = tf.expand_dims(sampling_grid, axis=0)
    sampling_grid = tf.tile(sampling_grid, tf.stack([num_batch, 1, 1]))

    # cast to float32 (required for matmul)
    theta = tf.cast(theta, 'float32')
    sampling_grid = tf.cast(sampling_grid, 'float32')

    # transform the sampling grid - batch multiply
    batch_grids = tf.matmul(theta, sampling_grid)
    # batch grid has shape (num_batch, 2, H*W)

    # reshape to (num_batch, H, W, 2)
    batch_grids = tf.reshape(batch_grids, [num_batch, 2, height, width])

    return batch_grids

## Transform Image Using Affine Grid

In [6]:
def bilinear_sampler(img, x, y):
    """
    Performs bilinear sampling of the input images according to the
    normalized coordinates provided by the sampling grid. Note that
    the sampling is done identically for each channel of the input.
    To test if the function works properly, output image should be
    identical to input image when theta is initialized to identity
    transform.
    Input
    -----
    - img: batch of images in (B, H, W, C) layout.
    - grid: x, y which is the output of affine_grid_generator.
    Returns
    -------
    - out: interpolated images according to grids. Same size as grid.
    """
    H = tf.shape(img)[1]
    W = tf.shape(img)[2]
    max_y = tf.cast(H - 1, 'int32')
    max_x = tf.cast(W - 1, 'int32')
    zero = tf.zeros([], dtype='int32')

    # rescale x and y to [0, W-1/H-1]
    x = tf.cast(x, 'float32')
    y = tf.cast(y, 'float32')
    x = 0.5 * ((x + 1.0) * tf.cast(max_x-1, 'float32'))
    y = 0.5 * ((y + 1.0) * tf.cast(max_y-1, 'float32'))

    # grab 4 nearest corner points for each (x_i, y_i)
    x0 = tf.cast(tf.floor(x), 'int32')
    x1 = x0 + 1
    y0 = tf.cast(tf.floor(y), 'int32')
    y1 = y0 + 1

    # clip to range [0, H-1/W-1] to not violate img boundaries
    x0 = tf.clip_by_value(x0, zero, max_x)
    x1 = tf.clip_by_value(x1, zero, max_x)
    y0 = tf.clip_by_value(y0, zero, max_y)
    y1 = tf.clip_by_value(y1, zero, max_y)
    # get pixel value at corner coords
    Ia = get_pixel_value(img, x0, y0)
    Ib = get_pixel_value(img, x0, y1)
    Ic = get_pixel_value(img, x1, y0)
    Id = get_pixel_value(img, x1, y1)

    # recast as float for delta calculation
    x0 = tf.cast(x0, 'float32')
    x1 = tf.cast(x1, 'float32')
    y0 = tf.cast(y0, 'float32')
    y1 = tf.cast(y1, 'float32')

    # calculate deltas
    wa = (x1-x) * (y1-y)
    wb = (x1-x) * (y-y0)
    wc = (x-x0) * (y1-y)
    wd = (x-x0) * (y-y0)

    # add dimension for addition
    wa = tf.expand_dims(wa, axis=3)
    wb = tf.expand_dims(wb, axis=3)
    wc = tf.expand_dims(wc, axis=3)
    wd = tf.expand_dims(wd, axis=3)

    # compute output
    out = tf.add_n([wa*Ia, wb*Ib, wc*Ic, wd*Id])

    return out

## Spatial Transform Layer

In [7]:
# Spatial transformer network forward function
def stn(x):
    localization = get_localization_network()
    fc_loc = get_affine_params()

    xs = localization(x)
    xs = tf.reshape(xs, (-1, 10 * 60 * 60 ))
    theta = fc_loc(xs)
    theta = tf.reshape(theta, (-1, 2, 3))

    grid = affine_grid_generator(IMG_SIZE, IMG_SIZE, theta)
    x_s = grid[:, 0, :, :]
    y_s = grid[:, 1, :, :]
    x = bilinear_sampler(x, x_s, y_s)

    return x

## Load Data from Google Drive

In [32]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Load Train and Test Data as Numpy Array

In [None]:
def get_image_shape(image_path):
    image = cv2.imread(image_path, cv2.IMREAD_COLOR)
    return image.shape

google_colab_base_path = '/content/drive/My Drive/anti-spoofing/'
train_data_path = google_colab_base_path + 'train_img/train_img/color/'
test_data_path = google_colab_base_path + 'test_img/test_img/color/'

# load data
train_file_names = os.listdir(train_data_path)
test_file_names = os.listdir(test_data_path)
train_size = len(train_file_names)
test_size = len(test_file_names)

train_image_shape = get_image_shape(train_data_path + train_file_names[0])
test_image_shape = get_image_shape(train_data_path + train_file_names[0])

x_train = np.ndarray(shape=(train_size, train_image_shape[0], train_image_shape[1], train_image_shape[2]), dtype=np.uint8)
y_train = np.ndarray(shape=train_size, dtype=np.int8)
x_test = np.ndarray(shape=(test_size, test_image_shape[0], test_image_shape[1], test_image_shape[2]), dtype=np.uint8)
y_test = np.ndarray(shape=test_size, dtype=np.int8)

for i in range(train_size):
    image = cv2.imread(train_data_path + train_file_names[i], cv2.IMREAD_COLOR)
    x_train[i] = image

    if 'real' in train_file_names[i]:
        label = 0
    elif 'fake' in train_file_names[i]:
        label = 1

    y_train[i] = label

for i in range(test_size):
    image = cv2.imread(test_data_path + test_file_names[i], cv2.IMREAD_COLOR)
    x_test[i] = image

    label = 1
    if 'real' in test_file_names[i]:
        label = 0
    y_test[i] = label

x_train = x_train.astype("float32") / 255.0
x_train = np.reshape(x_train, (-1, IMG_SIZE, IMG_SIZE, 3))

x_test = x_test.astype("float32") / 255.0
x_test = np.reshape(x_test, (-1, IMG_SIZE, IMG_SIZE, 3))

y_train = to_categorical(y_train, num_classes=2)
y_test = to_categorical(y_test, num_classes=2)

print('x_train shape: ', x_train.shape)
print('y_train shape', y_train.shape)
print('x_test shape: ', x_test.shape)
print('y_test shape: ', y_test.shape)

x_train shape:  (1655, 256, 256, 3)
y_train shape (1655, 2)
x_test shape:  (2408, 256, 256, 3)
y_test shape:  (2408, 2)


## Create the Model

In [9]:
# load the pre trained InceptionV3 model
base_model = InceptionV3(weights='imagenet', include_top=False)

# give the input image to spatial transform layer before inceptionv3
input = layers.Input(shape=(IMG_SIZE, IMG_SIZE, 3))
x = stn(input)
x = base_model(x)

# global average pooling layer
x = layers.Flatten()(x)

# fully connected layer for classification
x = layers.Dense(512, activation='relu')(x)
x = layers.Dense(256, activation='relu')(x)

# output layer
output = layers.Dense(2, activation='softmax')(x)

# freeze all layers of InceptionV3 model
model = Model(inputs=input, outputs=output)
for layer in base_model.layers:
    layer.trainable = False

optimizer = keras.optimizers.Adam(learning_rate=LEARNING_RATE)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_v3/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5


## Model Summery

In [10]:
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(None, 256, 256, 3)]        0         []                            
                                                                                                  
 sequential (Sequential)     (None, 60, 60, 10)           3194      ['input_2[0][0]']             
                                                                                                  
 tf.reshape (TFOpLambda)     (None, 36000)                0         ['sequential[0][0]']          
                                                                                                  
 sequential_1 (Sequential)   (None, 6)                    1152230   ['tf.reshape[0][0]']          
                                                                                              

## Data Augmentation

In [6]:
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

datagen.fit(x_train)

## Define Callbacks

In [7]:
# save the best model in all epochs
checkpoint = ModelCheckpoint('deep_clf.keras', verbose=1, monitor='val_loss', save_best_only=True, mode='auto')
# stop if model is overfitting
early_stopping = EarlyStopping(monitor='val_loss', patience=3)
# reduce learning rate when vallidation loss is not improving
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.8, patience=1, min_lr=0.0001)

callbacks = [checkpoint, early_stopping, reduce_lr]

## Train the Model

In [None]:
model.fit(datagen.flow(x_train, y_train, batch_size=32), callbacks=callbacks, validation_data=(x_test, y_test), epochs=EPOCHS)

Epoch 1/20
Epoch 1: val_loss did not improve from 0.34152
Epoch 2/20
Epoch 2: val_loss did not improve from 0.34152
Epoch 3/20
Epoch 3: val_loss did not improve from 0.34152
Epoch 4/20
Epoch 4: val_loss did not improve from 0.34152
Epoch 5/20
Epoch 5: val_loss did not improve from 0.34152
Epoch 6/20
Epoch 6: val_loss improved from 0.34152 to 0.32675, saving model to deep_model.keras
Epoch 7/20
Epoch 7: val_loss improved from 0.32675 to 0.32048, saving model to deep_model.keras
Epoch 8/20
Epoch 8: val_loss did not improve from 0.32048
Epoch 9/20
Epoch 9: val_loss did not improve from 0.32048
Epoch 10/20
Epoch 10: val_loss improved from 0.32048 to 0.27805, saving model to deep_model.keras
Epoch 11/20
Epoch 11: val_loss did not improve from 0.27805
Epoch 12/20
Epoch 12: val_loss did not improve from 0.27805
Epoch 13/20
Epoch 13: val_loss did not improve from 0.27805


<keras.src.callbacks.History at 0x7d2f143dba30>