In [1]:
import tensorflow as tf
from tensorflow import keras
import math
from tensorflow.keras.regularizers import l2
from tensorflow.keras.layers import Dropout
# Print available physical devices
print(tf.config.experimental.list_physical_devices())

def get_model():
    # Load VGG16 with pre-trained weights and without the top layer
    vgg16 = tf.keras.applications.VGG16(
        include_top=False,
        weights="imagenet",
        input_shape=(224, 224, 3)
    )
    for layer in vgg16.layers:
        layer.trainable = False
    # Extract the output from the last layer of VGG16
    last_output = vgg16.output
    x = keras.layers.Flatten()(last_output)
    share_weight_net = keras.Model(inputs=vgg16.input, outputs=x)
    print(share_weight_net.summary())

    # Define inputs
    as_input = keras.Input(shape=(224, 224, 3), name="as_input")
    lpc_input_1 = keras.Input(shape=(224, 224, 3), name="lpc_input_1")
    lpc_input_2 = keras.Input(shape=(224, 224, 3), name="lpc_input_2")
    
    # Anti-spoofing branch
    as_flatten_1 = share_weight_net(as_input)
    as_fc1 = keras.layers.Dense(1024, activation="relu",kernel_regularizer=l2(0.001), name="as_fc1")(as_flatten_1)
    as_fc1 = Dropout(0.5)(as_fc1)
    as_fc2 = keras.layers.Dense(1024, activation="relu",kernel_regularizer=l2(0.001), name="as_fc2")(as_fc1)
    as_output = keras.layers.Dense(2, activation='sigmoid', name="as_output")(as_fc2)

    # Uncomment and complete if needed
    #Local patch comparison branch
    lpc_flatten_1 = share_weight_net(lpc_input_1)
    lpc_fc_o1 = keras.layers.Dense(1024, activation="relu", name="lpc_fc1")(lpc_flatten_1)

    lpc_flatten_2 = share_weight_net(lpc_input_2)
    lpc_fc_o2 = keras.layers.Dense(1024, activation="relu", name="lpc_fc2")(lpc_flatten_2)

    lpc = keras.layers.Lambda(lambda x: tf.math.square(x[0] - x[1]), name='lpc')([lpc_fc_o1, lpc_fc_o2])

    # Define the model
    model = keras.Model(
        inputs=[as_input,lpc_input_1,lpc_input_2], 
        outputs=[as_output,lpc] 
    )

    # Compile the model with appropriate losses and metrics
    model.compile(
        optimizer='adam',
        loss={
            "as_output": keras.losses.SparseCategoricalCrossentropy(),
            "lpc": tpc_loss},
        loss_weights={
            "as_output": 1.0,
            "lpc": 2.5*math.exp(-5)
        },
        metrics={"as_output": 'accuracy'}
    )

    return model

def tpc_loss(y_true, y_pred):
    return tf.reduce_sum(y_pred, axis=1, keepdims=True)

if __name__ == '__main__':
    model = get_model()
    model.summary()


[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU'), PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 224, 224, 3)]     0         
_________________________________________________________________
block1_conv1 (Conv2D)        (None, 224, 224, 64)      1792      
_________________________________________________________________
block1_conv2 (Conv2D)        (None, 224, 224, 64)      36928     
_________________________________________________________________
block1_pool (MaxPooling2D)   (None, 112, 112, 64)      0         
_________________________________________________________________
block2_conv1 (Conv2D)        (None, 112, 112, 128)     73856     
_________________________________________________________________
block2_conv2 (Conv2D)        (None, 112, 112, 128)     14758

In [2]:
import os
import random
import yaml
import tensorflow as tf
from sklearn import preprocessing
from sklearn.utils import shuffle
from tqdm import tqdm


class Dataset:
    def __init__(self, name, batch_size, attack_dir, real_dir):  # Fixed __init__
        self.name = name
        self.batch_size = batch_size
        self.attack_dir = attack_dir
        self.real_dir = real_dir
        self.dataset = MsuMsfdDataset(batch_size, attack_dir, real_dir)

        file_path, label_truth = self.dataset.load_idx()

        # Encode the labels
        self.encoding_truth = preprocessing.LabelEncoder()
        self.encoding_truth.fit(label_truth)
        self.list_label_truth = self.encoding_truth.transform(label_truth)

        self.list_file_path_truth = file_path.copy()

        self.shuffle_dataset()
        self.len_dataset = len(self.list_file_path_truth)
    

        with open("config.yaml", "r") as ymlfile:
            cfg = yaml.load(ymlfile, Loader=yaml.FullLoader)
        self.standard_img_size = cfg['net']['input_img_size']

        # Build dataset using tf.data for better performance
        self.train_dataset = self.build_tf_dataset()

    def shuffle_dataset(self):
        self.list_file_path_truth, self.list_label_truth = shuffle(self.list_file_path_truth, self.list_label_truth,
                                                                   random_state=10)

    def load_and_preprocess_image(self, file_path, label):
        img = tf.io.read_file(file_path)
        img = tf.image.decode_png(img, channels=3)
        img = tf.image.resize(img, (self.standard_img_size, self.standard_img_size))
        img = img / 255.0  # Normalize to [0, 1]
        return img, label

    def augment_image(self, img, label):
        # Apply random flipping, brightness changes, etc.
        img = tf.image.random_flip_left_right(img)
        img = tf.image.random_brightness(img, 0.1)
        return img, label

    def build_tf_dataset(self):
        # Create dataset from file paths and labels
        dataset = tf.data.Dataset.from_tensor_slices((self.list_file_path_truth, self.list_label_truth))
        dataset = dataset.shuffle(buffer_size=self.len_dataset)  # Shuffle dataset
        dataset = dataset.map(self.load_and_preprocess_image, num_parallel_calls=2)  # Load images
        dataset = dataset.map(self.augment_image, num_parallel_calls=2)  # Data augmentation
        dataset = dataset.batch(self.batch_size)  # Batch the data
        dataset = dataset.prefetch(buffer_size=2)  # Prefetch for optimal performance
        return dataset

    def generate_minibatch(self):
        start_idx = 0
        total_batches = (self.len_dataset + self.batch_size - 1) // self.batch_size
        print("Using tf.data pipeline for minibatch generation")

        with tqdm(total=total_batches, desc="Generating Minibatches", unit="batch") as pbar:
            for batch in self.train_dataset:
                batch_img_4_truth, batch_label_truth = batch

                # Select random images for LPC loss
                batch_random_1 = []
                batch_random_2 = []
                list_random_images_path1 = random.sample(self.list_file_path_truth, k=len(batch_img_4_truth))
                list_random_images_path2 = random.sample(self.list_file_path_truth, k=len(batch_img_4_truth))

                for file_path_1 in list_random_images_path1:
                    img1 = self.load_and_preprocess_image(file_path_1, None)[0]
                    batch_random_1.append(img1)

                for file_path_2 in list_random_images_path2:
                    img2 = self.load_and_preprocess_image(file_path_2, None)[0]
                    batch_random_2.append(img2)

                batch_img_random_1 = tf.stack(batch_random_1)
                batch_img_random_2 = tf.stack(batch_random_2)
                batch_label_random = tf.zeros(len(batch_img_random_1), dtype=tf.float32)

                start_idx += self.batch_size
                pbar.update(1)
                yield batch_img_4_truth, batch_label_truth, batch_img_random_1, batch_img_random_2, batch_label_random


class MsuMsfdDataset:
    def __init__(self, batch_size, attack_dir, real_dir):  # Fixed __init__
        self.batch_size = batch_size
        self.attack_dir = attack_dir
        self.real_dir = real_dir

    def load_idx(self):
        list_file_path = []
        list_label_truth = []
        attack_dir = self.attack_dir
        real_dir = self.real_dir
        i = 0
        j= 0
        print('Loading index for attack samples...')
        for pic in os.listdir(attack_dir):
            list_file_path.append(os.path.join(attack_dir, pic))
            list_label_truth.append('attack')
            i=i+1

        print('Loading index for real samples...')
        for pic in os.listdir(real_dir):
            list_file_path.append(os.path.join(real_dir, pic))
            list_label_truth.append('real')
            j = j+1
        
        print(f"[total:{i+j},real:{j},attack:{i}]")
        return list_file_path, list_label_truth

if __name__ == '__main__':
    msumsfd_dataset = Dataset('replayattack', 32,attack_dir= r"E:\akash singh\GFA-CNN-new\GFA-CNN-master\GFA-CNN-master\dataset_small1\msumsfd_gfacnn\attack",
                              real_dir=r"E:\akash singh\GFA-CNN-new\GFA-CNN-master\GFA-CNN-master\dataset_small1\msumsfd_gfacnn\real")
#     sum = 0
#     for img_4_truth, label_truth, img_random_1, img_random_2, label_random in msumsfd_dataset.generate_minibatch():
#         print(label_truth)
#         tensor_sum = tf.reduce_sum(label_truth)
#         sum = sum+tensor_sum.numpy()

# # Print the result
# print("Sum of tensor elements:", sum)



Loading index for attack samples...
Loading index for real samples...
[total:4292,real:1568,attack:2724]


In [3]:
import model
import yaml
import tensorflow as tf
import numpy as np
import os
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc
from tqdm import tqdm

# Configure GPU memory growth (if needed)
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

if __name__ == '__main__':
    with open("config.yaml", "r") as ymlfile:
        cfg = yaml.safe_load(ymlfile)  # Use safe_load for security

    epoch = cfg['training']['epoch']
    batch = cfg['training']['batch']

    attack_dir = r"dataset_small1\msumsfd_gfacnn\attack"
    real_dir = r"dataset_small1\msumsfd_gfacnn\real"
    dataset = Dataset('MSU-MSFD', batch_size=batch, attack_dir=attack_dir, real_dir=real_dir)
    GFA_CNN = model.get_model()

    checkpoint_filepath = cfg['training']['checkpoint']
    model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_filepath,
        save_weights_only=True,
        monitor='loss',
        mode='min',
        save_best_only=True)

    print('Preprocessing data ...')

    print('Training ...')
    j = 1
    for e in tqdm(range(epoch), desc='Epochs'):
        print(f"[epoch number: {j}]")
        dataset.shuffle_dataset()
        i = 1
        for batch_img_4_truth, batch_label_truth, batch_img_random_1, batch_img_random_2, batch_label_random in tqdm(
                 dataset.generate_minibatch(), desc='Batches', total=len(dataset.list_file_path_truth) // batch):
            GFA_CNN.fit(
                {"as_input": batch_img_4_truth, "lpc_input_1": batch_img_random_1, "lpc_input_2": batch_img_random_2},
                {"as_output": batch_label_truth, 'lpc': batch_label_random},
                epochs=1,
                batch_size=4,
                callbacks=[model_checkpoint_callback],
                verbose=1
            )
            print(f"counting iterations {i}")
            i += 1
        j += 1

    print("Saving the model...")
    GFA_CNN.save('gfamodel.keras') 
    print("model saved")

    # Load the testing dataset
    attack_dir_test = r"dataset_small1\msumsfd_gfacnn_test1\attack"
    real_dir_test = r"dataset_small1\msumsfd_gfacnn_test1\real"
    test_dataset = Dataset('MSU-MSFD', batch_size=batch, attack_dir=attack_dir_test, real_dir=real_dir_test)

    y_true = []
    y_pred = []

    for batch_img_4_truth, batch_label_truth, batch_img_random_1, batch_img_random_2 in test_dataset.generate_minibatch():
        predictions = GFA_CNN.predict({
            "as_input": batch_img_4_truth,
            "lpc_input_1": batch_img_random_1,
            "lpc_input_2": batch_img_random_2
        })

        y_true.extend(batch_label_truth)
        y_pred.extend(predictions[0])  # Adjust according to your output names

    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    y_pred = y_pred[:, 1]  # Assuming first column represents positive class probability

    def calculate_eer(y_true, y_pred):
        fpr, tpr, thresholds = roc_curve(y_true, y_pred)
        fnr = 1 - tpr
        eer_threshold = thresholds[np.nanargmin(np.abs(fnr - fpr))]
        eer = fpr[np.nanargmin(np.abs(fnr - fpr))]
        return eer

    def calculate_hter(eer, y_true, y_pred):
        fpr, tpr, _ = roc_curve(y_true, y_pred)
        fnr = 1 - tpr
        threshold = np.interp(eer, fpr, tpr)
        hter = (fpr[np.argmin(np.abs(threshold - tpr))] + fnr[np.argmin(np.abs(threshold - fnr))]) / 2
        return hter

    eer = calculate_eer(y_true, y_pred)
    print(f"Equal Error Rate (EER): {eer:.4f}")

    hter = calculate_hter(eer, y_true, y_pred)
    print(f"Half Total Error Rate (HTER): {hter:.4f}")

    fpr, tpr, thresholds = roc_curve(y_true, y_pred)
    plt.figure()
    plt.plot(fpr, tpr, color='blue', lw=2, label='ROC curve (area = %0.2f)' % auc(fpr, tpr))
    plt.plot([0, 1], [0, 1], color='gray', linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate (FPR)')
    plt.ylabel('True Positive Rate (TPR)')
    plt.title('Receiver Operating Characteristic (ROC)')
    plt.legend(loc='lower right')
    plt.grid(True)
    plt.show()


[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU'), PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
Physical devices cannot be modified after being initialized
Loading index for attack samples...
Loading index for real samples...
[total:4292,real:1568,attack:2724]
Model: "model_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_2 (InputLayer)         [(None, 224, 224, 3)]     0         
_________________________________________________________________
block1_conv1 (Conv2D)        (None, 224, 224, 64)      1792      
_________________________________________________________________
block1_conv2 (Conv2D)        (None, 224, 224, 64)      36928     
_________________________________________________________________
block1_pool (MaxPooling2D)   (None, 112, 112, 64)      0         
_________________________________________________________________
block2_conv1 (Conv2D)    

Epochs:   0%|          | 0/1 [00:00<?, ?it/s]

[epoch number: 1]




Using tf.data pipeline for minibatch generation



[A
[A

Train on 16 samples




counting iterations 1



[A

Train on 16 samples




counting iterations 2



[A

Train on 16 samples




counting iterations 3



[A

Train on 16 samples




counting iterations 4



[A

Train on 16 samples




counting iterations 5



[A

Train on 16 samples




counting iterations 6


Generating Minibatches:   2%|▏         | 6/269 [01:51<1:21:09, 18.52s/batch]
Batches:   2%|▏         | 6/268 [01:51<1:20:51, 18.52s/it]
Epochs:   0%|          | 0/1 [01:51<?, ?it/s]


KeyboardInterrupt: 

In [None]:

import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc
import tensorflow as tf
import yaml
from datasets import Dataset
import tensorflow
from tensorflow.keras.models import load_model
from model import tpc_loss


with open("config.yaml", "r") as ymlfile:
        cfg = yaml.safe_load(ymlfile)  # Use safe_load for security

# Define the custom loss function again
def tpc_loss(y_true, y_pred):
    return tf.reduce_sum(y_pred, axis=1, keepdims=True)

# Load the model and specify custom loss function
GFA_CNN = tf.keras.models.load_model("gfamodel.keras",custom_objects={'tpc_loss': tpc_loss})
print(f"Model loaded")

 # Load the testing dataset
attack_dir_test = r"dataset_small1\msumsfd_gfacnn_test\attack"
real_dir_test = r"dataset_small1\msumsfd_gfacnn_test\real"
test_dataset = Dataset('replayattack_test', batch_size=16, attack_dir=attack_dir_test, real_dir=real_dir_test)  # Adjust as necessary

# Collect predictions and true labels
y_true = []
y_pred = []

for batch_img_4_truth, batch_label_truth, batch_img_random_1, batch_img_random_2, batch_label_random in test_dataset.generate_minibatch():
    # Make predictions
    predictions = GFA_CNN.predict({
        "as_input": batch_img_4_truth,
        "lpc_input_1": batch_img_random_1,
        "lpc_input_2": batch_img_random_2,
    })

    # Collect true labels and predictions
    y_true.extend(batch_label_truth)
    y_pred.extend(predictions[0])  # Adjust according to your output names


# Convert lists to numpy arrays for evaluation
y_true = np.array(y_true)
y_pred = np.array(y_pred)
y_pred = y_pred[:, 1]  # Assuming second column represents positive class probability

print(y_true.shape)
print(y_pred.shape)

# Calculate EER and HTER
def calculate_eer(y_true, y_pred):
    fpr, tpr, thresholds = roc_curve(y_true, y_pred)
    fnr = 1 - tpr
    eer_threshold = thresholds[np.nanargmin(np.abs(fnr - fpr))]
    eer = fpr[np.nanargmin(np.abs(fnr - fpr))]
    return eer

def calculate_hter(eer, y_true, y_pred):
    fpr, tpr, _ = roc_curve(y_true, y_pred)
    fnr = 1 - tpr
    threshold = np.interp(eer, fpr, tpr)
    hter = (fpr[np.argmin(np.abs(threshold - tpr))] + fnr[np.argmin(np.abs(threshold - fnr))]) / 2
    return hter

# Calculate EER
eer = calculate_eer(y_true, y_pred)
print(f"Equal Error Rate (EER): {eer*100:.4f}")

# Calculate HTER
hter = calculate_hter(eer, y_true, y_pred)
print(f"Half Total Error Rate (HTER): {hter*100:.4f}")

# Calculate ROC curve
fpr, tpr, thresholds = roc_curve(y_true, y_pred)

# Plot ROC curve
plt.figure()
plt.plot(fpr, tpr, color='blue', lw=2, label='ROC curve (area = %0.2f)' % auc(fpr, tpr))
plt.plot([0, 1], [0, 1], color='gray', linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate (FPR)')
plt.ylabel('True Positive Rate (TPR)')
plt.title('Receiver Operating Characteristic (ROC)')
plt.legend(loc='lower right')
plt.grid(True)
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score


def plot_metrics(y_true, y_pred):
    # Calculate the metrics
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, average='binary')  # Use 'binary' for binary classification
    recall = recall_score(y_true, y_pred, average='binary')
    f1 = f1_score(y_true, y_pred, average='binary')

    # Store the metrics in a dictionary for easy plotting
    metrics = {'Accuracy': accuracy, 'Precision': precision, 'Recall': recall, 'F1 Score': f1}
    print(metrics)
    
    # Plot the metrics
    fig, ax = plt.subplots()
    ax.bar(metrics.keys(), metrics.values(), color=['blue', 'green', 'red', 'purple'])

    # Set plot labels and title
    ax.set_ylabel('Score')
    ax.set_ylim([0, 1])  # The metrics are in the range [0, 1]
    ax.set_title('Classification Metrics')

    # Display the values on top of each bar
    for i, v in enumerate(metrics.values()):
        ax.text(i, v + 0.01, f'{v:.2f}', ha='center', fontweight='bold')

    plt.show()
y_pred1 = (y_pred>0.5).astype(int)
plot_metrics(y_true,y_pred1)
