In [1]:
import tensorflow as tf

# Enable memory growth for the single GPU
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    print("Running on GPU:", gpus)
else:
    print("No GPU found, using CPU")

from tensorflow import keras
from tensorflow.keras import layers, callbacks
from tensorflow.keras.applications import MobileNetV2,MobileNetV3Small,EfficientNetV2B0
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ReduceLROnPlateau, ModelCheckpoint, EarlyStopping
import pandas as pd
import numpy as np
import datetime
import os
import tensorflow.keras.backend as K
import datetime
from sklearn.metrics import roc_curve, auc
import cv2
import random
from tensorflow.keras.mixed_precision import set_global_policy, Policy



# Initialize MirroredStrategy for single GPU
strategy = tf.distribute.MirroredStrategy()  # Auto-detects available GPUs
print("Number of GPUs in strategy:", strategy.num_replicas_in_sync)

2025-07-29 01:44:17.590134: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1753753457.765297      36 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1753753457.828274      36 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


Running on GPU: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
Number of GPUs in strategy: 1


I0000 00:00:1753753470.031693      36 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 15513 MB memory:  -> device: 0, name: Tesla P100-PCIE-16GB, pci bus id: 0000:00:04.0, compute capability: 6.0


In [2]:
BATCH_SIZE = 32
IMG_SIZE = (112,112)
SPOOF_IMG_SIZE = (64, 64)
VERIFY_IMG_SIZE = (112,112)
DATA_DIR = "/kaggle/input/11-785-fall-20-homework-2-part-2"
TRAIN_DIR = f"{DATA_DIR}/classification_data/train_data"
VERIFICATION_FILE = os.path.join(DATA_DIR, "verification_pairs_val.txt")
CASIA_DIR = "/kaggle/input/casia-fasd/casia-fasd"  # Path to CASIA-FASD dataset
NUM_PAIRS_PER_PERSON = 18
EMBEDDING_DIM = 128
LEARNING_RATE = 0.0001
VAL_SPLIT = 0.2



#   Face verification Preprocessing

In [3]:

def load_and_preprocess_image(image_path, img_size=IMG_SIZE):
    if isinstance(image_path, str):
        image = cv2.imread(image_path)
        if image is None:
            raise ValueError(f"Could not load image: {image_path}")
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    else:
        image = image_path
    image = cv2.resize(image, img_size)
    image = image.astype(np.float32)
    image = np.clip(image, 0, 255)
    return image

def create_pairs_from_verification_file(verification_file_path=VERIFICATION_FILE, base_dir=DATA_DIR):
    pairs = []
    with open(verification_file_path, "r") as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) >= 3:
                img1_path = os.path.join(base_dir, parts[0])
                img2_path = os.path.join(base_dir, parts[1])
                label = int(parts[2])
                pairs.append((img1_path, img2_path, label))
    return pairs

def create_pairs_from_classification_data(classification_dir=TRAIN_DIR, num_pairs_per_person=NUM_PAIRS_PER_PERSON):
    pairs = []
    person_dirs = [d for d in os.listdir(classification_dir) 
                  if os.path.isdir(os.path.join(classification_dir, d))]
    
    for person_dir in person_dirs:
        person_path = os.path.join(classification_dir, person_dir)
        images = [f for f in os.listdir(person_path) 
                 if f.lower().endswith((".jpg", ".jpeg", ".png"))]
        if len(images) >= 2:
            for _ in range(num_pairs_per_person):
                img1, img2 = random.sample(images, 2)
                img1_path = os.path.join(person_path, img1)
                img2_path = os.path.join(person_path, img2)
                pairs.append((img1_path, img2_path, 1))
    
    num_negative_pairs = len(pairs)
    for _ in range(num_negative_pairs):
        person1, person2 = random.sample(person_dirs, 2)
        person1_path = os.path.join(classification_dir, person1)
        person2_path = os.path.join(classification_dir, person2)
        images1 = [f for f in os.listdir(person1_path) 
                  if f.lower().endswith((".jpg", ".jpeg", ".png"))]
        images2 = [f for f in os.listdir(person2_path) 
                  if f.lower().endswith((".jpg", ".jpeg", ".png"))]
        if images1 and images2:
            img1 = random.choice(images1)
            img2 = random.choice(images2)
            img1_path = os.path.join(person1_path, img1)
            img2_path = os.path.join(person2_path, img2)
            pairs.append((img1_path, img2_path, 0))
    
    random.shuffle(pairs)
    return pairs

def create_data_augmentation_layer():
    return keras.Sequential([
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(0.1),
        layers.RandomZoom(0.1),
    ])

def prepare_dataset(pairs, batch_size=BATCH_SIZE, shuffle=True, augment=True):
    def load_pair(img1_path, img2_path, label):
        img1 = load_and_preprocess_image(img1_path.numpy().decode("utf-8"))
        img2 = load_and_preprocess_image(img2_path.numpy().decode("utf-8"))
        
        # Convert to tensors with proper shape
        img1 = tf.convert_to_tensor(img1, dtype=tf.float32)
        img2 = tf.convert_to_tensor(img2, dtype=tf.float32)
        
        return (img1, img2, label)
    
    img1_paths = [pair[0] for pair in pairs]
    img2_paths = [pair[1] for pair in pairs]
    labels = [pair[2] for pair in pairs]
    
    dataset = tf.data.Dataset.from_tensor_slices((img1_paths, img2_paths, labels))
    if shuffle:
        dataset = dataset.shuffle(buffer_size=len(pairs))
    
    dataset = dataset.map(
        lambda p1, p2, l: tf.py_function(
            load_pair, [p1, p2, l], [tf.float32, tf.float32, tf.int32]
        ),
        num_parallel_calls=tf.data.AUTOTUNE
    )
    
    dataset = dataset.map(
        lambda img1, img2, label: (
            (tf.ensure_shape(img1, [*IMG_SIZE, 3]),
             tf.ensure_shape(img2, [*IMG_SIZE, 3])),
            tf.ensure_shape(label, [])
        )
    )
    
    dataset = dataset.batch(batch_size)
    
    # Apply augmentation AFTER batching
    if augment:
        augment_layer = create_data_augmentation_layer()
        def augment_batch(batch_images, batch_labels):
            img1_batch, img2_batch = batch_images
            img1_batch = augment_layer(img1_batch)
            img2_batch = augment_layer(img2_batch)
            return (img1_batch, img2_batch), batch_labels
        
        dataset = dataset.map(augment_batch, num_parallel_calls=tf.data.AUTOTUNE)
    
    dataset = dataset.prefetch(tf.data.AUTOTUNE)
    return dataset

def load_verification_data(verification_file_path=VERIFICATION_FILE, base_dir=DATA_DIR):
    pairs = create_pairs_from_verification_file(verification_file_path, base_dir)
    images1, images2, labels = [], [], []
    
    for img1_path, img2_path, label in pairs:
        try:
            img1 = load_and_preprocess_image(img1_path)
            img2 = load_and_preprocess_image(img2_path)
            images1.append(img1)
            images2.append(img2)
            labels.append(label)
        except Exception as e:
            print(f"Error loading pair {img1_path}, {img2_path}: {e}")
            continue
    
    return np.array(images1), np.array(images2), np.array(labels)

# Anti-spoofing preprocessing

In [4]:
def create_antispoofing_generators(data_dir, img_size=IMG_SIZE, batch_size=BATCH_SIZE, val_split=VAL_SPLIT):
    """Create data generators for anti-spoofing using CASIA-FASD train and test folders."""
    train_datagen = ImageDataGenerator(
        rotation_range=10,
        horizontal_flip=True,
        zoom_range=0.2,
        shear_range=0.2,
        # brightness_range=[0.8, 1.2],
        validation_split=val_split
    )
    
    val_datagen = ImageDataGenerator(
        validation_split=val_split
    )
    
    test_datagen = ImageDataGenerator()
    
    train_generator = train_datagen.flow_from_directory(
        os.path.join(data_dir, 'train'),
        target_size=img_size,
        batch_size=batch_size,
        class_mode='binary',

        shuffle=True,
        subset='training'
    )
    
    val_generator = val_datagen.flow_from_directory(
        os.path.join(data_dir, 'train'),
        target_size=img_size,
        batch_size=batch_size,
        class_mode='binary',

        shuffle=False,
        subset='validation'
    )
    
    test_generator = test_datagen.flow_from_directory(
        os.path.join(data_dir, 'test'),
        target_size=img_size,
        batch_size=batch_size,
        class_mode='binary',

        shuffle=False
    )
    
    return train_generator, val_generator, test_generator

# Combine Dataset 

In [5]:
print("Preparing face verification datasets...")
train_pairs = create_pairs_from_classification_data()
train_verify_dataset = prepare_dataset(train_pairs, batch_size=BATCH_SIZE, augment=True)
val_pairs = create_pairs_from_verification_file()
val_verify_dataset = prepare_dataset(val_pairs, batch_size=BATCH_SIZE, augment=False)

print("Creating anti-spoofing data generators...")
spoof_train_generator, spoof_val_generator, spoof_test_generator = create_antispoofing_generators(CASIA_DIR)

def combined_generator():
    verify_iter = iter(train_verify_dataset)
    spoof_iter = iter(spoof_train_generator)
    while True:
        try:
            (img1, img2), verify_label = next(verify_iter)
            spoof_img, spoof_label = next(spoof_iter)
            batch_size = min(img1.shape[0], spoof_img.shape[0])
            img1 = img1[:batch_size]
            img2 = img2[:batch_size]
            verify_label = tf.cast(verify_label[:batch_size], tf.float32)  # Cast to float32
            spoof_img = spoof_img[:batch_size]
            spoof_label = spoof_label[:batch_size]
            yield (img1, img2, spoof_img), {'verify': verify_label, 'spoof': spoof_label}
        except StopIteration:
            break

def val_generator():
    verify_iter = iter(val_verify_dataset)
    spoof_iter = iter(spoof_val_generator)
    while True:
        try:
            (img1, img2), verify_label = next(verify_iter)
            spoof_img, spoof_label = next(spoof_iter)
            batch_size = min(img1.shape[0], spoof_img.shape[0])
            img1 = img1[:batch_size]
            img2 = img2[:batch_size]
            verify_label = tf.cast(verify_label[:batch_size], tf.float32)  # Cast to float32
            spoof_img = spoof_img[:batch_size]
            spoof_label = spoof_label[:batch_size]
            yield (img1, img2, spoof_img), {'verify': verify_label, 'spoof': spoof_label}
        except StopIteration:
            break

# Create datasets
train_dataset = tf.data.Dataset.from_generator(
    combined_generator,
    output_types=((tf.float32, tf.float32, tf.float32), {'verify': tf.float32, 'spoof': tf.float32}),
    output_shapes=(([None, *IMG_SIZE, 3], [None, *IMG_SIZE, 3], [None, *IMG_SIZE, 3]), {'verify': [None], 'spoof': [None]})
).prefetch(tf.data.AUTOTUNE)

val_dataset = tf.data.Dataset.from_generator(
    val_generator,
    output_types=((tf.float32, tf.float32, tf.float32), {'verify': tf.float32, 'spoof': tf.float32}),
    output_shapes=(([None, *IMG_SIZE, 3], [None, *IMG_SIZE, 3], [None, *IMG_SIZE, 3]), {'verify': [None], 'spoof': [None]})
).prefetch(tf.data.AUTOTUNE)

Preparing face verification datasets...
Creating anti-spoofing data generators...
Found 46198 images belonging to 2 classes.
Found 11549 images belonging to 2 classes.
Found 65786 images belonging to 2 classes.


In [6]:
def create_embedding_network(embedding_dim=EMBEDDING_DIM):
    """Create embedding network for shared feature extraction."""
    base_model = EfficientNetV2B0(
        weights="imagenet",
        include_top=False,
        input_shape=(*IMG_SIZE, 3)
    )
    base_model.trainable = True
    
    inputs = keras.Input(shape=(*IMG_SIZE, 3))
    x = keras.applications.efficientnet_v2.preprocess_input(inputs)
    x = base_model(x)
    x = layers.GlobalAveragePooling2D()(x)
    
    x = layers.Dense(512, name="embedding_dense1")(x)
    x = layers.BatchNormalization(name="embedding_bn1")(x)
    x = layers.Activation('relu', name="activation_bn1")(x)
    embeddings = layers.Dropout(0.4, name="embedding_dropout1")(x)

    
    
         
    return keras.Model(inputs,embeddings, name = "embedding_network")

def create_combined_model(verify_image_size = IMG_SIZE, spoof_image_size = IMG_SIZE, embedding_dim=EMBEDDING_DIM, learning_rate=LEARNING_RATE):
    """Create multi-task Siamese model for verification and anti-spoofing."""
    with strategy.scope():
        input1 = keras.Input(shape=(*verify_image_size, 3), name="image1")
        input2 = keras.Input(shape=(*verify_image_size, 3), name="image2")
        spoof_input = keras.Input(shape=(*spoof_image_size, 3), name="spoof_input")
        
        embedding_network = create_embedding_network(embedding_dim)
        
        embedding1 = embedding_network(input1)
        embedding2 = embedding_network(input2)
        spoof_embedding= embedding_network(spoof_input)
        

        concat = Concatenate(name="similarity_dense1")([embedding1, embedding2])
        dense = Dense(embedding_dim, activation = 'relu', name ='concatenation_layer')(concat)
        drop_out = layers.Dropout(0.3, name="similarity_dropout")(dense)
        verify_output = layers.Dense(1, activation='sigmoid', name="verify")(drop_out)
        
        
        
        spoof_output = layers.Dense(1, activation='sigmoid', name="spoof")(spoof_embedding)
        
        model = keras.Model(
            inputs=[input1, input2, spoof_input],
            outputs=[verify_output, spoof_output]
        )
        
        optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
        model.compile(
            optimizer=optimizer,
            loss={
                'verify': 'binary_crossentropy',
                'spoof': 'binary_crossentropy'
            },
            loss_weights={
                'verify': 0.6,
                'spoof': 0.4
            },
            metrics={
                'verify': ['accuracy'],
                'spoof': ['accuracy']
            }
        )
    
    return model

In [7]:
def train_model(model, train_dataset, val_dataset, epochs=100):
    """Train multi-task model with verification and anti-spoofing datasets."""
    checkpoint = callbacks.ModelCheckpoint(
        f"best_anti_spoofing_model_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.keras",
        monitor="val_verify_accuracy",
        save_best_only=True,
        mode="auto",
        verbose=1
    )
    early_stopping = callbacks.EarlyStopping(
        monitor="val_verify_accuracy",
        patience=5,
        restore_best_weights=True,
        verbose=1,
        mode = 'max'
    )
    reduce_lr = callbacks.ReduceLROnPlateau(
        monitor="val_verify_accuracy",
        factor=0.1,
        patience=2,
        min_lr=1e-6,
        verbose=1
    )
    
    
    history = model.fit(
        train_dataset,
        validation_data=val_dataset,
        epochs=10,
        verbose=1,
        callbacks=[checkpoint, early_stopping, reduce_lr]
    )
    
    return history

In [8]:
print("Building combined model...")
model = create_combined_model()
model.summary()
print("Training model...")
history = train_model(model, train_dataset, val_dataset)
pd.DataFrame(history.history).to_csv('model_metrics.csv')

Building combined model...
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/efficientnet_v2/efficientnetv2-b0_notop.h5
[1m24274472/24274472[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


Training model...
Epoch 1/10


E0000 00:00:1753753784.814037      36 meta_optimizer.cc:966] layout failed: INVALID_ARGUMENT: Size of values 0 does not match size of permutation 4 @ fanin shape inStatefulPartitionedCall/functional_1_1/embedding_network_1/efficientnetv2-b0_1/block2b_drop_1/stateless_dropout/SelectV2-2-TransposeNHWCToNCHW-LayoutOptimizer
I0000 00:00:1753753796.973334      94 cuda_dnn.cc:529] Loaded cuDNN version 90300


   4500/Unknown [1m1556s[0m 332ms/step - loss: 0.3880 - spoof_accuracy: 0.9738 - spoof_loss: 0.0609 - verify_accuracy: 0.5378 - verify_loss: 0.7151




Epoch 1: val_verify_accuracy improved from -inf to 0.71880, saving model to best_anti_spoofing_model_20250729_014849.keras
[1m4500/4500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1633s[0m 349ms/step - loss: 0.3880 - spoof_accuracy: 0.9738 - spoof_loss: 0.0609 - verify_accuracy: 0.5378 - verify_loss: 0.7151 - val_loss: 0.2900 - val_spoof_accuracy: 0.9906 - val_spoof_loss: 0.0511 - val_verify_accuracy: 0.7188 - val_verify_loss: 0.5278 - learning_rate: 1.0000e-04
Epoch 2/10
[1m4500/4500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 311ms/step - loss: 0.2544 - spoof_accuracy: 0.9999 - spoof_loss: 0.0012 - verify_accuracy: 0.7489 - verify_loss: 0.5076
Epoch 2: val_verify_accuracy improved from 0.71880 to 0.75983, saving model to best_anti_spoofing_model_20250729_014849.keras
[1m4500/4500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1458s[0m 324ms/step - loss: 0.2544 - spoof_accuracy: 0.9999 - spoof_loss: 0.0012 - verify_accuracy: 0.7489 - verify_loss: 0.5076 - val_loss: 