In [1]:
import os
import argparse
from datetime import datetime
import cv2
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torchvision.models as models
from torchvision import transforms
from torchvision.transforms import functional as F
import random
from PIL import Image
import torch.optim as optim
from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter
import torchvision.models as models

import time
from sklearn.metrics import roc_curve, roc_auc_score, auc

In [2]:
##################################################################################
############### Data Loader Img (or Depth / 3channel) for RY ###############
##################################################################################
class VideoDataset3ch(Dataset):
    def __init__(self, orig_root_dir, transform=None, num_frames=16, is_train=False):
        """
        Args:
            orig_root_dir (str): Path to the root directory containing the original video files.
            depth_root_dir (str): Path to the root directory containing the corresponding depth video files.
            transform (callable, optional): Optional transform to be applied on a sample.
            num_frames (int, optional): Number of frames to be sampled from each video.
            is_train (bool, optional): Flag to indicate if the dataset is used for training.
        """
        self.orig_root_dir = orig_root_dir
        self.transform = transform
        self.num_frames = num_frames
        self.is_train = is_train
        self.classes = ['attack', 'real']  # Label mapping: 0 = attack, 1 = real
        self.samples = self._load_samples()

    def _load_samples(self):
        """
        Load paths to original videos and their corresponding depth videos, along with labels.
        """
        samples = []
        for cls in self.classes:
            orig_cls_dir = os.path.join(self.orig_root_dir, cls)
            for root, _, files in os.walk(orig_cls_dir):
                for fname in files:
                    if fname.endswith(('.mp4', '.mov', '.avi')):
                        orig_video_path = os.path.join(root, fname)
                        samples.append((orig_video_path, self.classes.index(cls)))
        return samples

    def __len__(self):
        return len(self.samples)

    def _load_frames(self, video_path, frame_indices, is_depth=False):
        """
        Load frames from the specified video file at the given frame indices.
        For depth videos, convert frames to single-channel grayscale.
        """
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError(f"Error opening video file: {video_path}")

        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        frames = []

        for idx in frame_indices:
            if idx >= total_frames:  # Ensure indices do not exceed total frames
                break
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ret, frame = cap.read()
            if not ret:
                break

            if is_depth:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)  # Convert to grayscale
                frame = np.expand_dims(frame, axis=-1)  # Add channel dimension

            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) if not is_depth else frame
            frames.append(F.to_tensor(frame))

        cap.release()

        # Handle the case where no frames were loaded
        if not frames:
            print(f"No frames loaded for video: {video_path}, indices: {frame_indices}")
            with open(os.path.join(args.log_dir, 'training_log.txt'), 'a') as log_file:
                log_file.write(f"No frames loaded for video: {video_path}, indices: {frame_indices}")

            # Handle empty frames list by appending a black frame of the expected size
            placeholder_frame = np.zeros((224, 224, 1 if is_depth else 3), dtype=np.uint8)
            frames.append(F.to_tensor(placeholder_frame))

        # Pad if fewer frames are available
        while len(frames) < len(frame_indices):
            frames.append(frames[-1])

        return frames

    def __getitem__(self, idx):
        """
        Override __getitem__ to load frames from both original and depth videos
        with synchronized indices.
        """
        # orig_video_path, depth_video_path, label = self.samples[idx]
        orig_video_path, label = self.samples[idx]

        cap = cv2.VideoCapture(orig_video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        cap.release()

        # Determine frame indices
        if self.is_train:
            start_frame = np.random.randint(0, max(1, total_frames - self.num_frames + 1))
        else:
            start_frame = 0

        frame_indices = np.linspace(start_frame, start_frame + self.num_frames - 1, self.num_frames, dtype=int)

        # Load frames from both videos using the same frame indices
        orig_frames = self._load_frames(orig_video_path, frame_indices, is_depth=False)
        # depth_frames = self._load_frames(depth_video_path, frame_indices, is_depth=True)        
        
        # Apply transformations to both original and depth frames
        if self.transform:
            orig_frames = [self.transform(frame) for frame in orig_frames]
            # depth_frames = [self.transform(frame) for frame in depth_frames]

        # Apply augmentation if in training mode
        if self.is_train:
            angle, scale = self._random_augmentation_params()
            orig_frames = [self.apply_augmentation(frame, angle, scale) for frame in orig_frames]
            # depth_frames = [self.apply_augmentation(frame, angle, scale) for frame in depth_frames]

        orig_frames = torch.stack(orig_frames)  # Shape: (num_frames, 3, H, W)
        # depth_frames = torch.stack(depth_frames)  # Shape: (num_frames, 1, H, W)

        # # Combine into a 4-channel tensor
        # combined_frames = torch.cat([orig_frames, depth_frames], dim=1)  # Shape: (num_frames, 4, H, W)
        combined_frames = orig_frames

        return combined_frames, label
        
        
    def _random_augmentation_params(self):
        """
        Generate random augmentation parameters (angle and scale) for training.
        """
        angle = random.uniform(-180, 180) if random.random() > 0.5 else 0
        scale = random.uniform(0.7, 1.3) if random.random() > 0.5 else 1
        return angle, scale

    def apply_augmentation(self, image, angle, scale):
        """Apply rotation and scaling augmentation."""
        if angle != 0:
            image = F.rotate(image, angle)
        if scale != 1:
            image = F.affine(image, angle=0, translate=(0, 0), scale=scale, shear=0)
        return image

######################################################

class AdaptiveCenterCropAndResize:
    def __init__(self, output_size):
        """
        Args:
            output_size (tuple or int): The desired output size after resizing (e.g., (32, 32)).
        """
        self.output_size = output_size
        self.to_pil = transforms.ToPILImage()
        self.to_tensor = transforms.ToTensor()

    def __call__(self, img):
        # Convert tensor to PIL image if necessary
        if isinstance(img, torch.Tensor):
            img = self.to_pil(img)

        # Handle single-channel images
        if img.mode != 'RGB':
            img = img.convert('L')  # Convert to grayscale mode
            
        # Get image size (width, height)
        width, height = img.size

        # Find the minimum dimension to create the largest possible square
        crop_size = min(width, height)

        # Calculate the coordinates to center-crop the square
        left = (width - crop_size) // 2
        top = (height - crop_size) // 2
        right = (width + crop_size) // 2
        bottom = (height + crop_size) // 2

        # Crop the image to the largest square
        img = img.crop((left, top, right, bottom))

        # Resize the cropped square to the desired output size
        img = img.resize(self.output_size, Image.Resampling.LANCZOS)

        # Convert the resized image back to a tensor
        img = self.to_tensor(img)

        return img

def collate_fn(batch):
    max_length = max([frames.size(0) for frames, _ in batch])  # Get the maximum sequence length
    padded_frames = []  # To store padded 4-channel tensors
    labels = []  # To store labels
    
    for frames, label in batch:
        if frames.size(0) < max_length:
            # Pad with zeros along the frame dimension
            padding = torch.zeros((max_length - frames.size(0), *frames.shape[1:]))
            # padded_frames.append(torch.cat((frames, padding), dim=0))
            padded_frames.append(torch.cat((frames, padding), dim=0))  # Pad at the end
        else:
            padded_frames.append(frames)

        labels.append(label)
    
    # Stack all sequences and labels
    padded_frames = torch.stack(padded_frames)  # Shape: (batch_size, max_length, 4, H, W)
    labels = torch.tensor(labels)  # Shape: (batch_size,)

    return padded_frames, labels


In [3]:
#####################################################################################
############### Data Loader Hybrid (Img+Depth, 4channel) for OULU_NPU ###############
#####################################################################################

class VideoDataset(Dataset):
    def __init__(self, orig_root_dir, depth_root_dir, transform=None, num_frames=16, is_train=False):
        self.orig_root_dir = orig_root_dir
        self.depth_root_dir = depth_root_dir
        self.transform = transform
        self.classes = ['attack', 'real']
        self.samples = self._load_samples()
        self.num_frames = num_frames
        self.is_train = is_train

    def _load_samples(self):
        """
        Load paths to original videos and their corresponding depth videos, along with labels.
        """
        samples = []
        for cls in self.classes:
            orig_cls_dir = os.path.join(self.orig_root_dir, cls)
            for root, _, files in os.walk(orig_cls_dir):
                for fname in files:
                    if fname.endswith(('.mp4', '.mov', '.avi')):
                        orig_video_path = os.path.join(root, fname)
                                                
                        depth_video_path = orig_video_path.replace(self.orig_root_dir, self.depth_root_dir)
                        depth_video_path = os.path.splitext(depth_video_path)[0] + '.mp4'

                        if os.path.exists(depth_video_path):
                            samples.append((orig_video_path, depth_video_path, self.classes.index(cls)))

                        else:
                            print(f"Warning: Depth map video not found for {orig_video_path}")                            
        return samples

    def __len__(self):
        return len(self.samples)

    def _load_frames(self, video_path, frame_indices, is_depth=False):
        """
        Load frames from the specified video file at the given frame indices.
        For depth videos, convert frames to single-channel grayscale.
        """
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError(f"Error opening video file: {video_path}")

        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        frames = []

        for idx in frame_indices:
            if idx >= total_frames:  # Ensure indices do not exceed total frames
                break
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ret, frame = cap.read()
            if not ret:
                break

            if is_depth:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)  # Convert to grayscale
                frame = np.expand_dims(frame, axis=-1)  # Add channel dimension

            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) if not is_depth else frame
            frames.append(F.to_tensor(frame))

        cap.release()

        # Handle the case where no frames were loaded
        if not frames:
            print(f"No frames loaded for video: {video_path}, indices: {frame_indices}")
            with open(os.path.join(args.log_dir, 'training_log.txt'), 'a') as log_file:
                log_file.write(f"No frames loaded for video: {video_path}, indices: {frame_indices}")

            # Handle empty frames list by appending a black frame of the expected size
            placeholder_frame = np.zeros((224, 224, 1 if is_depth else 3), dtype=np.uint8)
            frames.append(F.to_tensor(placeholder_frame))

        # Pad if fewer frames are available
        while len(frames) < len(frame_indices):
            frames.append(frames[-1])

        return frames

    def __getitem__(self, idx):
        """
        Override __getitem__ to load frames from both original and depth videos
        with synchronized indices.
        """
        orig_video_path, depth_video_path, label = self.samples[idx]

        cap = cv2.VideoCapture(orig_video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        cap.release()

        # Determine frame indices
        if self.is_train:
            start_frame = np.random.randint(0, max(1, total_frames - self.num_frames + 1))
        else:
            start_frame = 0

        frame_indices = np.linspace(start_frame, start_frame + self.num_frames - 1, self.num_frames, dtype=int)

        # Load frames from both videos using the same frame indices
        orig_frames = self._load_frames(orig_video_path, frame_indices, is_depth=False)
        depth_frames = self._load_frames(depth_video_path, frame_indices, is_depth=True)        
        
        # Apply transformations to both original and depth frames
        if self.transform:
            orig_frames = [self.transform(frame) for frame in orig_frames]
            depth_frames = [self.transform(frame) for frame in depth_frames]

        # Apply augmentation if in training mode
        if self.is_train:
            angle, scale = self._random_augmentation_params()
            orig_frames = [self.apply_augmentation(frame, angle, scale) for frame in orig_frames]
            depth_frames = [self.apply_augmentation(frame, angle, scale) for frame in depth_frames]

        orig_frames = torch.stack(orig_frames)  # Shape: (num_frames, 3, H, W)
        depth_frames = torch.stack(depth_frames)  # Shape: (num_frames, 1, H, W)

        # Combine into a 4-channel tensor
        combined_frames = torch.cat([orig_frames, depth_frames], dim=1)  # Shape: (num_frames, 4, H, W)

        return combined_frames, label
        
        
    def _random_augmentation_params(self):
        """
        Generate random augmentation parameters (angle and scale) for training.
        """
        angle = random.uniform(-180, 180) if random.random() > 0.5 else 0
        scale = random.uniform(0.7, 1.3) if random.random() > 0.5 else 1
        return angle, scale

    def apply_augmentation(self, image, angle, scale):
        """Apply rotation and scaling augmentation."""
        if angle != 0:
            image = F.rotate(image, angle)
        if scale != 1:
            image = F.affine(image, angle=0, translate=(0, 0), scale=scale, shear=0)
        return image

######################################################

class AdaptiveCenterCropAndResize:
    def __init__(self, output_size):
        """
        Args:
            output_size (tuple or int): The desired output size after resizing (e.g., (32, 32)).
        """
        self.output_size = output_size
        self.to_pil = transforms.ToPILImage()
        self.to_tensor = transforms.ToTensor()

    def __call__(self, img):
        # Convert tensor to PIL image if necessary
        if isinstance(img, torch.Tensor):
            img = self.to_pil(img)

        # Handle single-channel images
        if img.mode != 'RGB':
            img = img.convert('L')  # Convert to grayscale mode
            
        # Get image size (width, height)
        width, height = img.size

        # Find the minimum dimension to create the largest possible square
        crop_size = min(width, height)

        # Calculate the coordinates to center-crop the square
        left = (width - crop_size) // 2
        top = (height - crop_size) // 2
        right = (width + crop_size) // 2
        bottom = (height + crop_size) // 2

        # Crop the image to the largest square
        img = img.crop((left, top, right, bottom))

        # Resize the cropped square to the desired output size
        img = img.resize(self.output_size, Image.Resampling.LANCZOS)

        # Convert the resized image back to a tensor
        img = self.to_tensor(img)

        return img

def collate_fn(batch):
    max_length = max([frames.size(0) for frames, _ in batch])  # Get the maximum sequence length
    padded_frames = []  # To store padded 4-channel tensors
    labels = []  # To store labels
    
    for frames, label in batch:
        if frames.size(0) < max_length:
            # Pad with zeros along the frame dimension
            padding = torch.zeros((max_length - frames.size(0), *frames.shape[1:]))
            # padded_frames.append(torch.cat((frames, padding), dim=0))
            padded_frames.append(torch.cat((frames, padding), dim=0))  # Pad at the end
        else:
            padded_frames.append(frames)

        labels.append(label)
    
    # Stack all sequences and labels
    padded_frames = torch.stack(padded_frames)  # Shape: (batch_size, max_length, 4, H, W)
    labels = torch.tensor(labels)  # Shape: (batch_size,)

    return padded_frames, labels


In [4]:
################################################
############### Models #########################
################################################

In [5]:
### model_mobilenetv3l ###

class CNNTemporalAvgPoolingMBNet(nn.Module):
    def __init__(self, num_classes=2):
        super(CNNTemporalAvgPoolingMBNet, self).__init__()

        # Use MobileNetV3 as the CNN backbone
        # self.cnn = models.mobilenet_v3_large(pretrained=True)
        self.cnn = models.mobilenet_v3_large()
        self.cnn.classifier = nn.Identity()  # Remove the classifier to use the feature extractor
        # Adjust the final fully connected layer to match MobileNetV3's feature size (960 instead of 1280)
        self.fc = nn.Linear(960, num_classes)
        
        # # Use ResNet-101 as the CNN backbone
        # # self.cnn = models.resnet101(weights=models.ResNet101_Weights.DEFAULT)
        # self.cnn = models.resnet101()
        # self.cnn.fc = nn.Identity()  # Remove the classifier to use the feature extractor
        # self.fc = nn.Linear(2048, num_classes)
        

    def forward(self, x):
        batch_size, seq_len, C, H, W = x.size()  # Expect input as [batch_size, seq_len, channels, height, width]
        out_decision_t = []
        cnn_features = []
        
        for t in range(seq_len):
            # with torch.no_grad():
            feature = self.cnn(x[:, t, :, :, :])  # Extract CNN features for each frame
            cnn_features.append(feature)
            # out_decision_t.append(self.fc(feature))
        
        # Stack features from all frames and average over the temporal dimension (seq_len)
        # out_decision_t = torch.stack(out_decision_t, dim=1)  # Shape: [batch_size, seq_len, 960]
        # out = out_decision_t.mean(dim=1)  # Temporal average pooling: Shape: [batch_size, 960]

        cnn_features = torch.stack(cnn_features, dim=1)  # Shape: [batch_size, seq_len, 960]
        temporal_avg_features = cnn_features.mean(dim=1)  # Temporal average pooling: Shape: [batch_size, 960]

        # Pass through the final fully connected layer
        out = self.fc(temporal_avg_features)  # Shape: [batch_size, num_classes]
        return out

    def extract_intermediate_features(self, x):
        """Extract features before and after LSTM."""
        batch_size, seq_len, C, H, W = x.size()
        cnn_features = []
        for t in range(seq_len):
            # with torch.no_grad():
            feature = self.cnn(x[:, t, :, :, :])  # CNN output (before LSTM)
            cnn_features.append(feature)
        cnn_features = torch.stack(cnn_features, dim=1)  # Shape: [batch_size, seq_len, cnn_output_dim]
        temporal_avg_features = cnn_features.mean(dim=1)  # Temporal average pooling: Shape: [batch_size, 960]
        return cnn_features, temporal_avg_features


In [6]:
### model_resnet101 ###

class CNNTemporalAvgPoolingR101(nn.Module):
    def __init__(self, num_classes=2):
        super(CNNTemporalAvgPoolingR101, self).__init__()

        # # Use MobileNetV3 as the CNN backbone
        # self.cnn = models.mobilenet_v3_large(pretrained=True)
        # self.cnn.classifier = nn.Identity()  # Remove the classifier to use the feature extractor
        # # Adjust the final fully connected layer to match MobileNetV3's feature size (960 instead of 1280)
        # self.fc = nn.Linear(960, num_classes)
        
        # Use ResNet-101 as the CNN backbone
        # self.cnn = models.resnet101(weights=models.ResNet101_Weights.DEFAULT)
        self.cnn = models.resnet101()
        self.cnn.fc = nn.Identity()  # Remove the classifier to use the feature extractor
        self.fc = nn.Linear(2048, num_classes)
        

    def forward(self, x):
        batch_size, seq_len, C, H, W = x.size()  # Expect input as [batch_size, seq_len, channels, height, width]
        out_decision_t = []
        cnn_features = []
        
        for t in range(seq_len):
            # with torch.no_grad():
            feature = self.cnn(x[:, t, :, :, :])  # Extract CNN features for each frame
            cnn_features.append(feature)
            # out_decision_t.append(self.fc(feature))
        
        # Stack features from all frames and average over the temporal dimension (seq_len)
        # out_decision_t = torch.stack(out_decision_t, dim=1)  # Shape: [batch_size, seq_len, 960]
        # out = out_decision_t.mean(dim=1)  # Temporal average pooling: Shape: [batch_size, 960]

        cnn_features = torch.stack(cnn_features, dim=1)  # Shape: [batch_size, seq_len, 960]
        temporal_avg_features = cnn_features.mean(dim=1)  # Temporal average pooling: Shape: [batch_size, 960]

        # Pass through the final fully connected layer
        out = self.fc(temporal_avg_features)  # Shape: [batch_size, num_classes]
        return out

    def extract_intermediate_features(self, x):
        """Extract features before and after LSTM."""
        batch_size, seq_len, C, H, W = x.size()
        cnn_features = []
        for t in range(seq_len):
            # with torch.no_grad():
            feature = self.cnn(x[:, t, :, :, :])  # CNN output (before LSTM)
            cnn_features.append(feature)
        cnn_features = torch.stack(cnn_features, dim=1)  # Shape: [batch_size, seq_len, cnn_output_dim]
        temporal_avg_features = cnn_features.mean(dim=1)  # Temporal average pooling: Shape: [batch_size, 960]
        return cnn_features, temporal_avg_features


In [7]:
############### Hybrid Models ##################

In [8]:
# 1. Multi-Channel Input with Channel Expansion
# Input: 224*224*4, image + depth_map
# Modified first Conv2d layer to take 4 input channels

class MultiChannelCNN(nn.Module):
    def __init__(self, num_classes=2):
        super(MultiChannelCNN, self).__init__()
        
        # Load MobileNetV3 as the CNN backbone
        mobilenet_v3 = models.mobilenet_v3_large(weights=models.MobileNet_V3_Large_Weights.DEFAULT)
        
        # Modify the first convolutional layer to accept 4 input channels (RGB + Depth)
        original_conv1 = mobilenet_v3.features[0][0]  # Access the first Conv2d layer
        self.custom_conv1 = nn.Conv2d(
            in_channels=4,  # Change input channels to 4
            out_channels=original_conv1.out_channels,
            kernel_size=original_conv1.kernel_size,
            stride=original_conv1.stride,
            padding=original_conv1.padding,
            bias=original_conv1.bias
        )
        
        # Copy the weights of the original 3-channel convolution to the first 3 channels
        with torch.no_grad():
            self.custom_conv1.weight[:, :3, :, :] = original_conv1.weight
            self.custom_conv1.weight[:, 3:, :, :] = original_conv1.weight.mean(dim=1, keepdim=True)
        
        # Replace the original first layer in MobileNetV3
        mobilenet_v3.features[0][0] = self.custom_conv1
        
        # Remove the classifier to use MobileNetV3 as a feature extractor
        self.cnn = mobilenet_v3
        self.cnn.classifier = nn.Identity()  # No classification head
        
        # Fully connected layer for the final classification
        self.fc = nn.Linear(960, num_classes)  # 960 is the output feature size of MobileNetV3

    def forward(self, x):
        """
        Forward pass for multi-channel input.
        x_image: Tensor of shape [batch_size, seq_len, 3, 224, 224] (RGB frames)
        x_depth: Tensor of shape [batch_size, seq_len, 1, 224, 224] (Depth frames)
        """
        batch_size, seq_len, _, H, W = x.size()
        
        # # Concatenate RGB and Depth channels along the channel dimension
        # x = torch.cat([x_image, x_depth], dim=2)  # Shape: [batch_size, seq_len, 4, 224, 224]
        
        cnn_features = []
        for t in range(seq_len):
            feature = self.cnn(x[:, t, :, :, :])  # Extract features for each frame
            cnn_features.append(feature)
        
        # Temporal average pooling
        cnn_features = torch.stack(cnn_features, dim=1)  # Shape: [batch_size, seq_len, 960]
        temporal_avg_features = cnn_features.mean(dim=1)  # Shape: [batch_size, 960]

        # Pass through the final fully connected layer
        out = self.fc(temporal_avg_features)  # Shape: [batch_size, num_classes]
        return out


In [9]:
# 2. Separate Branches with Feature Concatenation
# Input: 224*224*4, image + depth_map
# RGB branch MobileNet with 3-channel input
# Depth map branch MobileNet with 1-channel input
# Output features from both branches are concatenated, and passed to classifier

class DualBranchMobileNet(nn.Module):
    def __init__(self, num_classes=2):
        super(DualBranchMobileNet, self).__init__()
        
        # RGB branch: Pretrained MobileNetV3 for RGB input (3 channels)
        self.rgb_branch = models.mobilenet_v3_large(weights=models.MobileNet_V3_Large_Weights.DEFAULT)
        # Modify classifier for the RGB branch
        rgb_features_in = self.rgb_branch.classifier[0].in_features
        self.rgb_branch.classifier = nn.Identity()  # Remove classifier, keep feature extractor
        
        # Depth branch: Another MobileNetV3 for depth map (1 channel)
        self.depth_branch = models.mobilenet_v3_large(weights=models.MobileNet_V3_Large_Weights.DEFAULT)
        # Modify the first layer of the depth branch to accept 1-channel input
        self.depth_branch.features[0][0] = nn.Conv2d(
            1, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False
        )
        self.depth_branch.classifier = nn.Identity()  # Remove classifier for feature extraction

        # Concatenate the output of both branches and pass to the classifier
        concat_feature_size = rgb_features_in * 2  # Combine features from both branches
        
        # Final classifier (after feature concatenation)
        self.classifier = nn.Sequential(
            nn.Linear(concat_feature_size, 512),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        # Input shape: [batch_size, num_frames=1, channels=4, height=224, width=224]
        batch_size, num_frames, channels, height, width = x.shape
        assert num_frames == 1, "This model supports single-frame input only."
        
        # Squeeze the frame dimension
        x = x.squeeze(1)  # Shape: [batch_size, channels=4, height, width]
        
        # Split the input into RGB (3 channels) and depth (1 channel)
        rgb_input = x[:, :3, :, :]  # First 3 channels for RGB
        depth_input = x[:, 3:, :, :]  # Last channel for depth map
        
        # Pass through the RGB branch
        rgb_features = self.rgb_branch(rgb_input)  # Shape: [batch_size, rgb_features_in]
        
        # Pass through the depth branch
        depth_features = self.depth_branch(depth_input)  # Shape: [batch_size, rgb_features_in]
        
        # Concatenate the features from both branches
        combined_features = torch.cat((rgb_features, depth_features), dim=1)  # Shape: [batch_size, concat_feature_size]
        
        # Pass through the final classifier
        output = self.classifier(combined_features)  # Shape: [batch_size, num_classes]
        
        return output


In [10]:
# 3. Depth as Auxiliary Input
# Input: 224*224*4, image + depth_map
# MobileNet with 3-channel RGB input
# Simple CNN for depth features
# RGB features before MobileNet final classification layer are modulated, by depth features

class DepthAuxiliaryMobileNet(nn.Module):
    def __init__(self, num_classes=2):
        super(DepthAuxiliaryMobileNet, self).__init__()
        
        # RGB branch: Pretrained MobileNetV3 for RGB input (3 channels)
        self.rgb_branch = models.mobilenet_v3_large(weights=models.MobileNet_V3_Large_Weights.DEFAULT)
        rgb_features_in = self.rgb_branch.classifier[0].in_features
        self.rgb_branch.classifier = nn.Identity()  # Remove the classification head
        
        # Depth branch: Simple CNN for extracting depth features
        self.depth_branch = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1, 1))
        )
        self.depth_feature_dim = 32  # Output size from the depth branch
        
        # Depth modulation layer to integrate depth features with RGB features
        self.modulation_layer = nn.Sequential(
            nn.Linear(self.depth_feature_dim, rgb_features_in),
            nn.Sigmoid()  # Output range [0, 1] for modulation
        )
        
        # Final classifier
        self.classifier = nn.Sequential(
            nn.Linear(rgb_features_in, 512),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        # Input shape: [batch_size, num_frames=1, channels=4, height=224, width=224]
        batch_size, num_frames, channels, height, width = x.shape
        assert num_frames == 1, "This model supports single-frame input only."
        
        # Squeeze the frame dimension
        x = x.squeeze(1)  # Shape: [batch_size, channels=4, height, width]
        
        # Split the input into RGB (3 channels) and depth (1 channel)
        rgb_input = x[:, :3, :, :]  # First 3 channels for RGB
        depth_input = x[:, 3:, :, :]  # Last channel for depth map
        
        # Pass through the RGB branch
        rgb_features = self.rgb_branch(rgb_input)  # Shape: [batch_size, rgb_features_in]
        
        # Pass through the depth branch
        depth_features = self.depth_branch(depth_input)  # Shape: [batch_size, 32, 1, 1]
        depth_features = depth_features.view(batch_size, -1)  # Flatten: [batch_size, 32]
        
        # Modulate RGB features with depth features
        modulation_weights = self.modulation_layer(depth_features)  # Shape: [batch_size, rgb_features_in]
        modulated_rgb_features = rgb_features * modulation_weights  # Element-wise multiplication
        
        # Pass through the final classifier
        output = self.classifier(modulated_rgb_features)  # Shape: [batch_size, num_classes]
        
        return output


In [11]:
################################################
############### Eval utils #####################
################################################

In [12]:
def evaluate_all(model, loader, criterion):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_labels = []
    all_probs = []
    all_times = []
    
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Start timing for prediction
            start_time = time.time()
            outputs = model(inputs)
            inference_time = time.time() - start_time
            all_times.append(inference_time)
            
            # Loss calculation
            loss = criterion(outputs, labels)
            running_loss += loss.item() * inputs.size(0)
            
            # For AUC, EER, etc.
            probs = torch.softmax(outputs, dim=1)[:, 1]  # Assuming class 1 is the target class
            all_probs.extend(probs.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

            # Accuracy calculation
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    # Calculate metrics like AUC-ROC, EER, etc. based on collected labels and probabilities
    fpr, tpr, thresholds = roc_curve(all_labels, all_probs)
    auc_roc = auc(fpr, tpr)
    fnr = 1 - tpr
    eer_threshold = thresholds[np.nanargmin(np.absolute(fnr - fpr))]
    eer = fpr[np.nanargmin(np.absolute(fnr - fpr))]

    # Calculate FAR, FRR, HTER, and Youden's Index
    optimal_idx = np.argmax(tpr - fpr)
    optimal_threshold = thresholds[optimal_idx]
    youdens_index = tpr[optimal_idx] - fpr[optimal_idx]
    far = fpr[optimal_idx]
    frr = fnr[optimal_idx]
    hter = (far + frr) / 2

    # Average inference time
    avg_inference_time = np.mean(all_times)

    # Test loss and accuracy
    test_loss = running_loss / len(loader.dataset)
    test_acc = 100. * correct / total

    # Return dictionary with all results
    return {
        'test_loss': test_loss,
        'test_acc': test_acc,
        'auc_roc': auc_roc,
        'eer': eer,
        'hter': hter,
        'far': far,
        'frr': frr,
        'youdens_index': youdens_index,
        'optimal_threshold': optimal_threshold,
        'avg_inference_time': avg_inference_time,
        'fpr': fpr,
        'tpr': tpr,
        'labels': all_labels,
        'probs': all_probs
    }


In [13]:
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc
import numpy as np

def generate_evaluation_summary(results):
    # Extract metrics from the results dictionary
    test_loss = results['test_loss']
    test_acc = results['test_acc']
    auc_roc = results['auc_roc']
    eer = results['eer']
    hter = results['hter']
    far = results['far']
    frr = results['frr']
    youdens_index = results['youdens_index']
    optimal_threshold = results['optimal_threshold']
    avg_inference_time = results['avg_inference_time']

#     # Print summary
#     print("\n--- Evaluation Summary ---")
#     print(f"Test Loss: {test_loss:.4f}")
#     print(f"Test Accuracy: {test_acc:.2f}%")
#     print(f"AUC-ROC: {auc_roc:.4f}")
#     print(f"Equal Error Rate (EER): {eer:.4f}")
#     print(f"Half Total Error Rate (HTER): {hter:.4f}")
#     print(f"False Acceptance Rate (FAR): {far:.4f}")
#     print(f"False Rejection Rate (FRR): {frr:.4f}")
#     print(f"Youden's Index (Max): {youdens_index:.4f}")
#     print(f"Optimal Threshold (Youden's Index): {optimal_threshold:.4f}")
#     print(f"Average inference time per sample: {avg_inference_time:.6f} seconds")

#     # Plot AUC-ROC Curve
#     plot_roc_curve(results)

    print("\n--- Evaluation Summary ---")
    print(f"HTER (%), AUC-ROC, Test Accuracy (%)")
    print(f"{hter*100:.4f}, {auc_roc:.4f}, {test_acc:.4f}")
    print()
    
    return hter, auc_roc, test_acc

def plot_roc_curve(results):
    """Generate ROC curve from the evaluation results."""
    fpr = results['fpr']
    tpr = results['tpr']
    
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='blue', label=f'AUC = {results["auc_roc"]:.4f}')
    plt.plot([0, 1], [0, 1], color='red', linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.legend(loc="lower right")
    plt.show()

def plot_error_histogram(far, frr, eer):
    """Plot histograms of FAR, FRR, and EER values."""
    plt.figure(figsize=(10, 6))

    # Plot FAR
    plt.subplot(1, 3, 1)
    plt.bar(['FAR'], [far], color='red')
    plt.ylabel('Rate')
    plt.title('False Acceptance Rate (FAR)')

    # Plot FRR
    plt.subplot(1, 3, 2)
    plt.bar(['FRR'], [frr], color='blue')
    plt.ylabel('Rate')
    plt.title('False Rejection Rate (FRR)')

    # Plot EER
    plt.subplot(1, 3, 3)
    plt.bar(['EER'], [eer], color='green')
    plt.ylabel('Rate')
    plt.title('Equal Error Rate (EER)')

    plt.tight_layout()
    plt.show()

def plot_inference_time(avg_inference_time):
    """Plot inference time as a bar chart."""
    plt.figure(figsize=(6, 4))
    plt.bar(['Average Inference Time'], [avg_inference_time], color='purple')
    plt.ylabel('Time (seconds)')
    plt.title('Average Inference Time per Sample')
    plt.show()



In [14]:
################################################
############### Eval results ###################
################################################

In [15]:
############### Eval results Img Only Models ###################

In [17]:
# Hyperparameters and setup
criterion = nn.CrossEntropyLoss()  # For final classification
# optimizer = optim.Adam(model.parameters(), lr=1e-4)

device = torch.device('cuda:1')

transform = transforms.Compose([
    AdaptiveCenterCropAndResize((224, 224)),  # Adaptive crop, resize, and convert to tensor
    # transforms.ToPILImage(),
    # transforms.ToTensor(),
])

###################################################
img_dataset_path = '/home/muhammad_jabbar/face_PAD/datasets/Rose_Youtu'
img_datasets = ['/test']

depth_dataset_path = '/home/muhammad_jabbar/face_PAD/datasets/Rose_Youtu_depth_mp4/DepthAnythingV2_Base'
depth_datasets = ['/test']
###################################################

# mobilenet_v3_large checkpoints
checkpoints_m = [
    # '/home/muhammad_jabbar/face_PAD/datasets/Replay_Attack_mp4/mobilenet_v3_large/checkpoints_1frame/checkpoint_epoch_25.pth',
    # '/home/muhammad_jabbar/face_PAD/datasets/Replay-Mobile/mobilenet_v3_large/checkpoints_1frame/checkpoint_epoch_32.pth',
    # '/home/muhammad_jabbar/face_PAD/datasets/Rose_Youtu/mobilenet_v3_large/checkpoints_1frame/checkpoint_epoch_22.pth',
    # '/home/muhammad_jabbar/face_PAD/logs/log_014_20241216_155805/checkpoints/best_model.pth',

    # Image Models (Mobilenet_v3_large, no pretrained weights)
    # '/home/muhammad_jabbar/face_PAD/RY_20250227/CNN_FacePAD_RY_MobileNetv3Large_NoWeights_train/logs/log_001_20250227_143008/checkpoints/best_model.pth', # Depth, MobileNetV3Large (No pretrained weights)

    # # Depth Models (Mobilenet_v3_large, no pretrained weights)
    '/home/muhammad_jabbar/face_PAD/RY_20250227/CNN_FacePAD_RY_MobileNetv3Large_NoWeights_train/logs/log_002_20250227_143134/checkpoints/best_model.pth', # Depth, MobileNetV3Large (No pretrained weights)
]

# resnet101 checkpoints
checkpoints_r = [
    # '/home/muhammad_jabbar/face_PAD/datasets/Replay_Attack_mp4/resnet101/checkpoints_1frame/checkpoint_epoch_9.pth',
    # '/home/muhammad_jabbar/face_PAD/datasets/Replay-Mobile/resnet101/checkpoints_1frame/checkpoint_epoch_9.pth',
    # '/home/muhammad_jabbar/face_PAD/datasets/Rose_Youtu/resnet101/checkpoints_1frame/checkpoint_epoch_20.pth'
]

# chkpt_paths = [checkpoints_m, checkpoints_r]
chkpt_paths = [checkpoints_m]

###################################################

# Instantiate the model
model_mobilenetv3l = CNNTemporalAvgPoolingMBNet(num_classes=2).to(device)
# model_resnet101 = CNNTemporalAvgPoolingR101(num_classes=2).to(device)

# modelz = [model_mobilenetv3l, model_resnet101]
# model_names = ['model_mobilenetv3l', 'model_resnet101']

modelz = [model_mobilenetv3l]
model_names = ['model_mobilenetv3l']


In [18]:
hters = []
auc_rocs = []
test_accs = []

num_frames = 1
batch_size=256

n_split = 1

print('-----------------------------------------------------')

for i, model in enumerate(modelz):
    for j, chkpt_path in enumerate(chkpt_paths[i]):
        for k, img_dataset in enumerate(img_datasets):

            test_dataset = VideoDataset3ch(
                # orig_root_dir = img_dataset_path + img_dataset,
                orig_root_dir = depth_dataset_path + img_dataset,
                # depth_root_dir = depth_dataset_path + depth_datasets[k],
                transform=transform,
                num_frames=num_frames,
                is_train=False,
            )

            test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn, pin_memory=True)

            checkpoint = torch.load(chkpt_path, weights_only=True)
            model.load_state_dict(checkpoint['model_state_dict'])

            print()            
            print(f'Model: {model_names[i]}')
            print(f'Trained model checkpoint path : {chkpt_path}')
            # print(f'Test img dataet path # : {img_dataset_path + img_dataset}')
            print(f'Test img dataet path # : {depth_dataset_path + img_dataset}')
            print(f"Test samples: {len(test_dataset)}")
            print(f'Test dataet batches # : {len(test_loader)}')
            print()
            
            # Assuming `evaluate` returns the results dictionary
            results = evaluate_all(model, test_loader, criterion)

            # Generate the summary and plot graphs
            hter, auc_roc, test_acc = generate_evaluation_summary(results)
            hters.append(hter)
            auc_rocs.append(auc_roc)
            test_accs.append(test_acc)
            
            print()
            print('-----------------------------------------------------')


-----------------------------------------------------

Model: model_mobilenetv3l
Trained model checkpoint path : /home/muhammad_jabbar/face_PAD/RY_20250227/CNN_FacePAD_RY_MobileNetv3Large_NoWeights_train/logs/log_002_20250227_143134/checkpoints/best_model.pth
Test img dataet path # : /home/muhammad_jabbar/face_PAD/datasets/Rose_Youtu_depth_mp4/DepthAnythingV2_Base/test
Test samples: 1748
Test dataet batches # : 7


--- Evaluation Summary ---
HTER (%), AUC-ROC, Test Accuracy (%)
16.7691, 0.9067, 85.9268


-----------------------------------------------------


In [23]:
### OULU APCER, BPCER, and ACER Calculation ###
import oulumetrics

num_frames = 1
batch_size=256

n_split = 1

print('-----------------------------------------------------')

for i, model in enumerate(modelz):
    for j, chkpt_path in enumerate(chkpt_paths[i]):

        # protocol = 'all' if j==0 else str(j)
        protocol = str(j+1)

        if protocol=='3' or protocol=='4':
            protocol_flist = f'/data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_{protocol}/Test_{n_split}.txt'
        else:
            protocol_flist = f'/data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_{protocol}/Test.txt'

        for k, img_dataset in enumerate(img_datasets):
            
            test_dataset = VideoDataset3ch(
                # orig_root_dir = img_dataset_path + img_dataset,
                orig_root_dir = depth_dataset_path + img_dataset,
                # depth_root_dir = depth_dataset_path + depth_datasets[k],
                file_list_path = protocol_flist,
                transform=transform,
                num_frames=num_frames,
                is_train=False,
                protocol = protocol
            )

            test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn, pin_memory=True)

            checkpoint = torch.load(chkpt_path, weights_only=True)
            model.load_state_dict(checkpoint['model_state_dict'])

            print()            
            print(f'Model: {model_names[i]}')
            print(f'Trained model checkpoint path : {chkpt_path}')
            print(f'Test img dataet path # : {img_dataset_path + img_dataset}')
            print(f'Test depth dataet path # : {depth_dataset_path + depth_datasets[k]}')
            print(f'OULU-NPU Protocol File List # : {protocol_flist}')
            print(f'OULU-NPU Protocol # : {protocol}')
            print(f"Test samples: {len(test_dataset)}")
            print(f'Test dataet batches # : {len(test_loader)}')
            print()
            
            y_attack_types = []
            y_pred = []
            
            model.eval()
            with torch.no_grad():
                for inputs, labels in tqdm(test_loader):
                    inputs, labels = inputs.to(device), labels.to(device)

                    # Perform inference
                    outputs = model(inputs)
                    probabilities = torch.softmax(outputs, dim=1)  # Assuming outputs are logits
                    predictions = torch.argmax(probabilities, dim=1)

                    # Append ground truth and predictions to lists
                    y_attack_types.extend(labels.cpu().numpy())
                    y_pred.extend(predictions.cpu().numpy())

            # Convert lists to numpy arrays
            y_attack_types = np.array(y_attack_types)
            y_pred = np.array(y_pred)

            apcer, bpcer, acer = oulumetrics.calculate_metrics(y_attack_types, y_pred)

            # Print the results
            # print(f"APCER: {apcer*100:.8f}, BPCER: {bpcer*100:.8f}, ACER: {acer*100:.8f}")                    
            print(f"APCER, BPCER, ACER")                    
            print(f"{apcer*100:.8f}, {bpcer*100:.8f}, {acer*100:.8f}")                    
            
            print()
            print('-----------------------------------------------------')


-----------------------------------------------------

Model: model_mobilenetv3l
Trained model checkpoint path : /home/muhammad_jabbar/face_PAD/logs/ouluNPU_ProtocolWise/log_009_20250202_090718/checkpoints/best_model.pth
Test img dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU/Test_files
Test depth dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU_depth_mp4/Test_files
OULU-NPU Protocol File List # : /data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_1/Test.txt
OULU-NPU Protocol # : 1
Test samples: 600
Test dataet batches # : 3



100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████| 3/3 [01:01<00:00, 20.36s/it]


APCER, BPCER, ACER
32.08333333, 30.00000000, 31.04166667

-----------------------------------------------------

Model: model_mobilenetv3l
Trained model checkpoint path : /home/muhammad_jabbar/face_PAD/logs/ouluNPU_ProtocolWise/log_010_20250202_090746/checkpoints/best_model.pth
Test img dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU/Test_files
Test depth dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU_depth_mp4/Test_files
OULU-NPU Protocol File List # : /data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_2/Test.txt
OULU-NPU Protocol # : 2
Test samples: 1080
Test dataet batches # : 5



100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [01:49<00:00, 21.84s/it]

APCER, BPCER, ACER
15.00000000, 12.77777778, 13.88888889

-----------------------------------------------------





In [40]:
### OULU APCER, BPCER, and ACER Calculation ###
import oulumetrics

protocol = '2'
n_split = 1

num_frames = 1
batch_size=256

model = modelz[0]
# chkpt_path = '/home/muhammad_jabbar/face_PAD/logs/log_014_20241216_155805/checkpoints/best_model.pth'
# chkpt_path = '/home/muhammad_jabbar/face_PAD/logs/ouluNPU_ProtocolWise/log_001_20250129_160959/checkpoints/best_model.pth' # Img, protocol 1
# chkpt_path = '/home/muhammad_jabbar/face_PAD/logs/ouluNPU_ProtocolWise/log_002_20250129_161355/checkpoints/best_model.pth' # Img, protocol 2
# chkpt_path = '/home/muhammad_jabbar/face_PAD/logs/ouluNPU_ProtocolWise/log_009_20250202_090718/checkpoints/best_model.pth' # Depth, protocol 1
# chkpt_path = '/home/muhammad_jabbar/face_PAD/logs/ouluNPU_ProtocolWise/log_010_20250202_090746/checkpoints/best_model.pth' # Depth, protocol 2


# chkpt_path = '/home/muhammad_jabbar/face_PAD/logs/ouluNPU_ProtocolWise/log_010_20250202_090746/checkpoints/best_model.pth' # Depth, protocol 2

if protocol=='3' or protocol=='4':
    protocol_flist = f'/data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_{protocol}/Test_{n_split}.txt'
else:
    protocol_flist = f'/data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_{protocol}/Test.txt'

test_dataset = VideoDataset3ch(
    orig_root_dir = img_dataset_path + img_datasets[0],
    # orig_root_dir = depth_dataset_path + img_datasets[0],
    file_list_path = protocol_flist,
    transform=transform,
    num_frames=num_frames,
    is_train=False,
    protocol = protocol
)

test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn, pin_memory=True)

checkpoint = torch.load(chkpt_path, weights_only=True)
model.load_state_dict(checkpoint['model_state_dict'])

y_attack_types = []
y_pred = []

model.eval()

print(f'Trained model checkpoint path : {chkpt_path}')
print(f'Test dataet path # : {img_dataset_path + img_datasets[0]}')
print(f"Test samples: {len(test_dataset)}")
print(f'OULU-NPU protocol # : {protocol}')
print(f'OULU-NPU protocol file list # : {protocol_flist}')

with torch.no_grad():
    for inputs, labels in tqdm(test_loader):
        inputs, labels = inputs.to(device), labels.to(device)

        # Perform inference
        outputs = model(inputs)
        probabilities = torch.softmax(outputs, dim=1)  
        predictions = torch.argmax(probabilities, dim=1)

        # Append ground truth and predictions to lists
        y_attack_types.extend(labels.cpu().numpy())
        y_pred.extend(predictions.cpu().numpy())

# Convert lists to numpy arrays
y_attack_types = np.array(y_attack_types)
y_pred = np.array(y_pred)

apcer, bpcer, acer = oulumetrics.calculate_metrics(y_attack_types, y_pred)

# Print the results
# print(f"APCER: {apcer*100:.8f}, BPCER: {bpcer*100:.8f}, ACER: {acer*100:.8f}")
print(f"APCER, BPCER, ACER")
print(f"{apcer*100:.8f}, {bpcer*100:.8f}, {acer*100:.8f}")



Trained model checkpoint path : /home/muhammad_jabbar/face_PAD/logs/ouluNPU_ProtocolWise/log_010_20250202_090746/checkpoints/best_model.pth
Test dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU/Test_files
Test samples: 1080
OULU-NPU protocol # : 2
OULU-NPU protocol file list # : /data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_2/Test.txt


100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [01:47<00:00, 21.45s/it]

APCER: 0.00000000, BPCER: 12.77777778, ACER: 6.38888889





In [None]:
############### Eval results Hybrid (Img+Depth) Models ###################

In [23]:
# Hyperparameters and setup
criterion = nn.CrossEntropyLoss()  # For final classification
# optimizer = optim.Adam(model.parameters(), lr=1e-4)

device = torch.device('cuda:1')

transform = transforms.Compose([
    AdaptiveCenterCropAndResize((224, 224)),  # Adaptive crop, resize, and convert to tensor
    # transforms.ToPILImage(),
    # transforms.ToTensor(),
])

###################################################
img_dataset_path = '/home/muhammad_jabbar/face_PAD/datasets/Rose_Youtu'
img_datasets = ['/test']

depth_dataset_path = '/home/muhammad_jabbar/face_PAD/datasets/Rose_Youtu_depth_mp4/DepthAnythingV2_Base'
depth_datasets = ['/test']
###################################################

# mobilenet_v3_large hybrid models checkpoints
chan4_chkpt_paths = [
    # '/home/muhammad_jabbar/face_PAD/logs/log_003_20241212_090445/checkpoints/best_model.pth', # RA
    # '/home/muhammad_jabbar/face_PAD/logs/log_002_20241211_172816/checkpoints/best_model.pth', # RM
    # '/home/muhammad_jabbar/face_PAD/logs/log_001_20241211_172747/checkpoints/best_model.pth', # RY
    # '/home/muhammad_jabbar/face_PAD/logs/log_010_20241215_124956/checkpoints/best_model.pth', # OULU_NPU
    # '/home/muhammad_jabbar/face_PAD/logs/ouluNPU_ProtocolWise/log_003_20250130_085327/checkpoints/best_model.pth', # OULU-NPU Protocol 1
    # '/home/muhammad_jabbar/face_PAD/logs/ouluNPU_ProtocolWise/log_004_20250130_085405/checkpoints/best_model.pth', # OULU-NPU Protocol 2

    ## Mobilenet_v3_large, no pretrained weights
    # '/data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/4ch_model/log_001_20250211_083818/checkpoints/best_model.pth', # OULU-NPU Protocol-All
    # '/data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/4ch_model/log_002_20250211_083914/checkpoints/best_model.pth', # OULU-NPU Protocol-1
    # '/data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/4ch_model/log_003_20250211_083939/checkpoints/best_model.pth', # OULU-NPU Protocol-2
    '/home/muhammad_jabbar/face_PAD/RY_20250227/CNN_FacePAD_RY_MobileNetv3Large_NoWeights_train/logs/log_003_20250227_143850/checkpoints/best_model.pth', # RY
]

featcon_chkpt_paths = [
    # '/home/muhammad_jabbar/face_PAD/logs/log_004_20241212_103930/checkpoints/best_model.pth', # RA
    # '/home/muhammad_jabbar/face_PAD/logs/log_006_20241212_105604/checkpoints/best_model.pth', # RM
    # '/home/muhammad_jabbar/face_PAD/logs/log_005_20241212_105409/checkpoints/best_model.pth', # RY
    # '/home/muhammad_jabbar/face_PAD/logs/log_011_20241215_125132/checkpoints/best_model.pth', # OULU_NPU
    # '/home/muhammad_jabbar/face_PAD/logs/ouluNPU_ProtocolWise/log_005_20250130_144609/checkpoints/best_model.pth',# OULU-NPU Protocol 1
    # '/home/muhammad_jabbar/face_PAD/logs/ouluNPU_ProtocolWise/log_006_20250130_144636/checkpoints/best_model.pth', # OULU-NPU Protocol 2

    ## Mobilenet_v3_large, no pretrained weights
    # '/data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/sepbr_model/log_001_20250211_084223/checkpoints/best_model.pth', # OULU-NPU Protocol-All
    # '/data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/sepbr_model/log_002_20250211_084319/checkpoints/best_model.pth', # OULU-NPU Protocol-1
    # '/data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/sepbr_model/log_003_20250211_084432/checkpoints/best_model.pth', # OULU-NPU Protocol-2
    '/home/muhammad_jabbar/face_PAD/RY_20250227/CNN_FacePAD_RY_MobileNetv3Large_NoWeights_train/logs/log_004_20250227_144117/checkpoints/best_model.pth', # RY
]

aux_chkpt_paths = [
    # '/home/muhammad_jabbar/face_PAD/logs/log_008_20241212_132803/checkpoints/best_model.pth', # RA
    # '/home/muhammad_jabbar/face_PAD/logs/log_007_20241212_122848/checkpoints/best_model.pth', # RM
    # '/home/muhammad_jabbar/face_PAD/logs/log_009_20241212_142559/checkpoints/best_model.pth', # RY
    # '/home/muhammad_jabbar/face_PAD/logs/log_012_20241216_085001/checkpoints/best_model.pth', # OULU_NPU
    # '/home/muhammad_jabbar/face_PAD/logs/ouluNPU_ProtocolWise/log_007_20250130_144824/checkpoints/best_model.pth',# OULU-NPU Protocol 1
    # '/home/muhammad_jabbar/face_PAD/logs/ouluNPU_ProtocolWise/log_008_20250130_144840/checkpoints/best_model.pth', # OULU-NPU Protocol 2

    ## Mobilenet_v3_large, no pretrained weights
    # '/data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/aux_model/log_001_20250211_154651/checkpoints/best_model.pth', # OULU-NPU Protocol-All
    # '/data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/aux_model/log_002_20250211_154756/checkpoints/best_model.pth', # OULU-NPU Protocol-1
    # '/data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/aux_model/log_003_20250211_154819/checkpoints/best_model.pth', # OULU-NPU Protocol-2
    '/home/muhammad_jabbar/face_PAD/RY_20250227/CNN_FacePAD_RY_MobileNetv3Large_NoWeights_train/logs/log_005_20250227_144306/checkpoints/best_model.pth', # RY
]


chkpt_paths = [chan4_chkpt_paths, featcon_chkpt_paths, aux_chkpt_paths]

###################################################

# Instantiate the model
model_hybridmobil_4chan = MultiChannelCNN(num_classes=2).to(device)
model_hybridmobil_featcon = DualBranchMobileNet(num_classes=2).to(device)
model_hybridmobil_aux = DepthAuxiliaryMobileNet(num_classes=2).to(device)

modelz = [model_hybridmobil_4chan, model_hybridmobil_featcon, model_hybridmobil_aux]
model_names = ['model_hybridmobil_4chan', 'model_hybridmobil_featcon', 'model_hybridmobil_aux']


In [24]:
hters = []
auc_rocs = []
test_accs = []

num_frames = 1
batch_size=256

n_split = 1

print('-----------------------------------------------------')

for i, model in enumerate(modelz):
    for j, chkpt_path in enumerate(chkpt_paths[i]):
        for k, img_dataset in enumerate(img_datasets):

            test_dataset = VideoDataset(
                orig_root_dir = img_dataset_path + img_dataset,
                depth_root_dir = depth_dataset_path + depth_datasets[k],
                transform=transform,
                num_frames=num_frames,
                is_train=False,
            )

            test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn, pin_memory=True)

            checkpoint = torch.load(chkpt_path, weights_only=True)
            model.load_state_dict(checkpoint['model_state_dict'])

            print()            
            print(f'Model: {model_names[i]}')
            print(f'Trained model checkpoint path : {chkpt_path}')
            print(f'Test img dataet path # : {img_dataset_path + img_dataset}')
            print(f'Test depth dataet path # : {depth_dataset_path + depth_datasets[k]}')
            print(f"Test samples: {len(test_dataset)}")
            print(f'Test dataet batches # : {len(test_loader)}')
            print()
            
            # Assuming `evaluate` returns the results dictionary
            results = evaluate_all(model, test_loader, criterion)

            # Generate the summary and plot graphs
            hter, auc_roc, test_acc = generate_evaluation_summary(results)
            hters.append(hter)
            auc_rocs.append(auc_roc)
            test_accs.append(test_acc)
            
            print()
            print('-----------------------------------------------------')


-----------------------------------------------------

Model: model_hybridmobil_4chan
Trained model checkpoint path : /home/muhammad_jabbar/face_PAD/RY_20250227/CNN_FacePAD_RY_MobileNetv3Large_NoWeights_train/logs/log_003_20250227_143850/checkpoints/best_model.pth
Test img dataet path # : /home/muhammad_jabbar/face_PAD/datasets/Rose_Youtu/test
Test depth dataet path # : /home/muhammad_jabbar/face_PAD/datasets/Rose_Youtu_depth_mp4/DepthAnythingV2_Base/test
Test samples: 1748
Test dataet batches # : 7


--- Evaluation Summary ---
HTER (%), AUC-ROC, Test Accuracy (%)
11.9616, 0.9384, 88.4439


-----------------------------------------------------

Model: model_hybridmobil_featcon
Trained model checkpoint path : /home/muhammad_jabbar/face_PAD/RY_20250227/CNN_FacePAD_RY_MobileNetv3Large_NoWeights_train/logs/log_004_20250227_144117/checkpoints/best_model.pth
Test img dataet path # : /home/muhammad_jabbar/face_PAD/datasets/Rose_Youtu/test
Test depth dataet path # : /home/muhammad_jabbar/face_

In [12]:
### OULU APCER, BPCER, and ACER Calculation ###
import oulumetrics

num_frames = 1
batch_size=256

n_split = 1

print('-----------------------------------------------------')

for i, model in enumerate(modelz):
    for j, chkpt_path in enumerate(chkpt_paths[i]):

        protocol = 'all' if j==0 else str(j)

        if protocol=='3' or protocol=='4':
            protocol_flist = f'/data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_{protocol}/Test_{n_split}.txt'
        else:
            protocol_flist = f'/data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_{protocol}/Test.txt'

        for k, img_dataset in enumerate(img_datasets):
            
            test_dataset = VideoDataset(
                orig_root_dir = img_dataset_path + img_dataset,
                depth_root_dir = depth_dataset_path + depth_datasets[k],
                file_list_path = protocol_flist,
                transform=transform,
                num_frames=num_frames,
                is_train=False,
                protocol = protocol
            )

            test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn, pin_memory=True)

            checkpoint = torch.load(chkpt_path, weights_only=True)
            model.load_state_dict(checkpoint['model_state_dict'])

            print()            
            print(f'Model: {model_names[i]}')
            print(f'Trained model checkpoint path : {chkpt_path}')
            print(f'Test img dataet path # : {img_dataset_path + img_dataset}')
            print(f'Test depth dataet path # : {depth_dataset_path + depth_datasets[k]}')
            print(f'OULU-NPU Protocol File List # : {protocol_flist}')
            print(f'OULU-NPU Protocol # : {protocol}')
            print(f"Test samples: {len(test_dataset)}")
            print(f'Test dataet batches # : {len(test_loader)}')
            print()
            
            y_attack_types = []
            y_pred = []
            
            model.eval()
            with torch.no_grad():
                for inputs, labels in tqdm(test_loader):
                    inputs, labels = inputs.to(device), labels.to(device)

                    # Perform inference
                    outputs = model(inputs)
                    probabilities = torch.softmax(outputs, dim=1)  # Assuming outputs are logits
                    predictions = torch.argmax(probabilities, dim=1)

                    # Append ground truth and predictions to lists
                    y_attack_types.extend(labels.cpu().numpy())
                    y_pred.extend(predictions.cpu().numpy())

            # Convert lists to numpy arrays
            y_attack_types = np.array(y_attack_types)
            y_pred = np.array(y_pred)

            apcer, bpcer, acer = oulumetrics.calculate_metrics(y_attack_types, y_pred)

            # Print the results
            # print(f"APCER: {apcer*100:.8f}, BPCER: {bpcer*100:.8f}, ACER: {acer*100:.8f}")                    
            print(f"APCER, BPCER, ACER")                    
            print(f"{apcer*100:.8f}, {bpcer*100:.8f}, {acer*100:.8f}")                    
            
            print()
            print('-----------------------------------------------------')


-----------------------------------------------------

Model: model_hybridmobil_4chan
Trained model checkpoint path : /data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/4ch_model/log_001_20250211_083818/checkpoints/best_model.pth
Test img dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU/Test_files
Test depth dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU_depth_mp4/Test_files
OULU-NPU Protocol File List # : /data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_all/Test.txt
OULU-NPU Protocol # : all
Test samples: 1800
Test dataet batches # : 8



100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 8/8 [05:50<00:00, 43.86s/it]


APCER, BPCER, ACER
4.86111111, 20.83333333, 12.84722222

-----------------------------------------------------

Model: model_hybridmobil_4chan
Trained model checkpoint path : /data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/4ch_model/log_002_20250211_083914/checkpoints/best_model.pth
Test img dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU/Test_files
Test depth dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU_depth_mp4/Test_files
OULU-NPU Protocol File List # : /data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_1/Test.txt
OULU-NPU Protocol # : 1
Test samples: 600
Test dataet batches # : 3



100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 3/3 [02:33<00:00, 51.03s/it]


APCER, BPCER, ACER
0.00000000, 100.00000000, 50.00000000

-----------------------------------------------------

Model: model_hybridmobil_4chan
Trained model checkpoint path : /data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/4ch_model/log_003_20250211_083939/checkpoints/best_model.pth
Test img dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU/Test_files
Test depth dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU_depth_mp4/Test_files
OULU-NPU Protocol File List # : /data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_2/Test.txt
OULU-NPU Protocol # : 2
Test samples: 1080
Test dataet batches # : 5



100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [04:31<00:00, 54.21s/it]


APCER, BPCER, ACER
9.16666667, 15.00000000, 12.08333333

-----------------------------------------------------

Model: model_hybridmobil_featcon
Trained model checkpoint path : /data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/sepbr_model/log_001_20250211_084223/checkpoints/best_model.pth
Test img dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU/Test_files
Test depth dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU_depth_mp4/Test_files
OULU-NPU Protocol File List # : /data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_all/Test.txt
OULU-NPU Protocol # : all
Test samples: 1800
Test dataet batches # : 8



100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 8/8 [05:44<00:00, 43.05s/it]


APCER, BPCER, ACER
1.52777778, 19.16666667, 10.34722222

-----------------------------------------------------

Model: model_hybridmobil_featcon
Trained model checkpoint path : /data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/sepbr_model/log_002_20250211_084319/checkpoints/best_model.pth
Test img dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU/Test_files
Test depth dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU_depth_mp4/Test_files
OULU-NPU Protocol File List # : /data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_1/Test.txt
OULU-NPU Protocol # : 1
Test samples: 600
Test dataet batches # : 3



100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 3/3 [01:22<00:00, 27.48s/it]


APCER, BPCER, ACER
0.00000000, 96.66666667, 48.33333333

-----------------------------------------------------

Model: model_hybridmobil_featcon
Trained model checkpoint path : /data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/sepbr_model/log_003_20250211_084432/checkpoints/best_model.pth
Test img dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU/Test_files
Test depth dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU_depth_mp4/Test_files
OULU-NPU Protocol File List # : /data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_2/Test.txt
OULU-NPU Protocol # : 2
Test samples: 1080
Test dataet batches # : 5



100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [03:50<00:00, 46.04s/it]


APCER, BPCER, ACER
15.83333333, 6.38888889, 11.11111111

-----------------------------------------------------

Model: model_hybridmobil_aux
Trained model checkpoint path : /data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/aux_model/log_001_20250211_154651/checkpoints/best_model.pth
Test img dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU/Test_files
Test depth dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU_depth_mp4/Test_files
OULU-NPU Protocol File List # : /data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_all/Test.txt
OULU-NPU Protocol # : all
Test samples: 1800
Test dataet batches # : 8



100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 8/8 [06:14<00:00, 46.78s/it]


APCER, BPCER, ACER
5.83333333, 27.77777778, 16.80555556

-----------------------------------------------------

Model: model_hybridmobil_aux
Trained model checkpoint path : /data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/aux_model/log_002_20250211_154756/checkpoints/best_model.pth
Test img dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU/Test_files
Test depth dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU_depth_mp4/Test_files
OULU-NPU Protocol File List # : /data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_1/Test.txt
OULU-NPU Protocol # : 1
Test samples: 600
Test dataet batches # : 3



100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 3/3 [01:22<00:00, 27.50s/it]


APCER, BPCER, ACER
0.41666667, 99.16666667, 49.79166667

-----------------------------------------------------

Model: model_hybridmobil_aux
Trained model checkpoint path : /data/muhammad_jabbar/datasets/CNN_FacePAD_Oulu_NPU_MobileNetv3Large_NoWeights_train/logs/aux_model/log_003_20250211_154819/checkpoints/best_model.pth
Test img dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU/Test_files
Test depth dataet path # : /data/muhammad_jabbar/datasets/Oulu_NPU_depth_mp4/Test_files
OULU-NPU Protocol File List # : /data/muhammad_jabbar/datasets/Oulu_NPU/Baseline/Protocol_2/Test.txt
OULU-NPU Protocol # : 2
Test samples: 1080
Test dataet batches # : 5



100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [02:18<00:00, 27.71s/it]

APCER, BPCER, ACER
15.83333333, 8.61111111, 12.22222222

-----------------------------------------------------





In [13]:
5

5