# Sys check

In [None]:
# !pip install torch torchvision torchaudio 

In [None]:
# !pip install timm
# !pip install matplotlib 
# !pip install opencv-python
# !pip install scikit-learn
# !pip install Pillow

In [1]:
import os 
root = os.getcwd()
root 

'd:\\IT\\GITHUB\\TakeHomeTest'

In [2]:
!nvidia-smi

Sat Apr 12 10:07:30 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 560.94                 Driver Version: 560.94         CUDA Version: 12.6     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                  Driver-Model | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA GeForce GTX 1650 Ti   WDDM  |   00000000:01:00.0  On |                  N/A |
| N/A   56C    P8              5W /   50W |     806MiB /   4096MiB |     23%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [3]:
import torch 
print(torch.__version__)
print(torch.cuda.is_available())

2.6.0+cu118
True


In [None]:
def training(model, optimizer, criterion, train_loader, val_loader, num_epochs=10, device='cuda'):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f"[Epoch {epoch+1}/{num_epochs}] Loss: {running_loss / len(train_loader):.4f}")
        
        # Evaluate on validation set
        print("Validation performance:")
        evaluate(model, val_loader)

In [7]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet mean/std
                         std=[0.229, 0.224, 0.225])
])

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_classes = 2
batch_size = 8
num_epochs = 10
learning_rate = 1e-4

In [11]:
# Hàm đánh giá
def evaluate(model, dataloader):
    model.eval()
    y_true, y_pred = [], []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(preds.cpu().numpy())

    report = classification_report(y_true, y_pred, target_names=["normal", "spoof"])
    print(report)

# Single-Image

In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader
from sklearn.metrics import classification_report

import os
import matplotlib.pyplot as plt

In [8]:
# Tải dữ liệu
data_dir = "dataset"
train_set = datasets.ImageFolder(os.path.join(data_dir, "train"), transform=transform)
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)

val_set = datasets.ImageFolder(os.path.join(data_dir, "dev"), transform=transform)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)

In [9]:
model = models.resnet50(pretrained=True)
for param in model.parameters():
    param.requires_grad = False

# Thay thế lớp fully-connected cuối cùng
model.fc = nn.Sequential(
    nn.Linear(model.fc.in_features, 256),
    nn.ReLU(),
    nn.Dropout(0.4),
    nn.Linear(256, num_classes)
)

model = model.to(device)



In [10]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.fc.parameters(), lr=learning_rate)

In [None]:
training(model, optimizer, criterion, train_loader, val_loader, num_epochs=num_epochs, device=device)

[Epoch 1/10] Loss: 0.5858
Validation performance:
              precision    recall  f1-score   support

      normal       0.78      0.84      0.81       602
       spoof       0.83      0.77      0.79       602

    accuracy                           0.80      1204
   macro avg       0.80      0.80      0.80      1204
weighted avg       0.80      0.80      0.80      1204

[Epoch 2/10] Loss: 0.4750
Validation performance:
              precision    recall  f1-score   support

      normal       0.77      0.88      0.82       602
       spoof       0.86      0.74      0.80       602

    accuracy                           0.81      1204
   macro avg       0.81      0.81      0.81      1204
weighted avg       0.81      0.81      0.81      1204

[Epoch 3/10] Loss: 0.4388
Validation performance:
              precision    recall  f1-score   support

      normal       0.77      0.89      0.83       602
       spoof       0.87      0.74      0.80       602

    accuracy                    

In [15]:
# Lưu mô hình
torch.save(model.state_dict(), "resnet50_liveness_single.pth")

In [None]:
# Load the model's state dictionary
# model.load_state_dict(torch.load("resnet50_liveness_single.pth"))

In [13]:
evaluate(model, val_loader)

              precision    recall  f1-score   support

      normal       0.88      0.78      0.83       602
       spoof       0.80      0.90      0.85       602

    accuracy                           0.84      1204
   macro avg       0.84      0.84      0.84      1204
weighted avg       0.84      0.84      0.84      1204



# Multi-image

In [None]:
from torch.utils.data import Dataset
from PIL import Image
import glob
import os
import torch

class MultiImageLivenessDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        Custom dataset for multi-image liveness detection.

        Args:
            root_dir (str): Root directory containing subdirectories for each class ('normal' and 'spoof').
            transform (callable, optional): Transformations to apply to the images.
        """
        self.samples = []  # List to store image paths and labels
        self.transform = transform  # Transformations to apply to images

        # Iterate over the two classes: 'normal' and 'spoof'
        for label_dir in ["normal", "spoof"]:
            full_path = os.path.join(root_dir, label_dir)  # Full path to the class directory
            persons = {}  # Dictionary to group images by person ID

            # Group images by person ID
            for img_path in glob.glob(os.path.join(full_path, "*.jpg")):
                filename = os.path.basename(img_path)  # Extract the filename
                person_id = filename.split("_")[0]  # Extract the person ID from the filename
                if person_id not in persons:
                    persons[person_id] = []  # Initialize a list for the person ID
                persons[person_id].append(img_path)  # Add the image path to the person's list

            # Process each person's images
            for person_id, images in persons.items():
                images = sorted(images)  # Sort images for consistency
                if len(images) >= 4:
                    selected = images[:4]  # Select the first 4 images if there are enough
                else:
                    # If fewer than 4 images, duplicate the first image to make up the difference
                    selected = (images + [images[0]] * 4)[:4]
                # Append the selected images and label (0 for 'normal', 1 for 'spoof') to the samples list
                self.samples.append((selected, 0 if label_dir == "normal" else 1))

    def __len__(self):
        """
        Returns the total number of samples in the dataset.
        """
        return len(self.samples)

    def __getitem__(self, idx):
        """
        Retrieves a sample from the dataset.

        Args:
            idx (int): Index of the sample to retrieve.

        Returns:
            tuple: A tuple containing a tensor of stacked images and the corresponding label.
        """
        img_paths, label = self.samples[idx]  # Get the image paths and label for the given index
        imgs = []  # List to store the processed images

        # Load and process each image
        for path in img_paths:
            image = Image.open(path).convert("RGB")  # Open the image and convert it to RGB
            if self.transform:
                image = self.transform(image)  # Apply transformations if specified
            imgs.append(image)  # Add the processed image to the list

        # Stack the images into a single tensor and return it along with the label
        return torch.stack(imgs), torch.tensor(label)


In [17]:
train_set = MultiImageLivenessDataset("dataset/train", transform=transform)
val_set = MultiImageLivenessDataset("dataset/dev", transform=transform)
train_loader = DataLoader(train_set, batch_size=8, shuffle=True)
val_loader = DataLoader(val_set, batch_size=8, shuffle=False)

In [18]:
# Hàm đánh giá
def evaluate(model, dataloader):
    model.eval()
    y_true, y_pred = [], []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(preds.cpu().numpy())

    report = classification_report(y_true, y_pred, target_names=["normal", "spoof"])
    print(report)

## Resnet


In [None]:
import torch.nn as nn
import torchvision.models as models

class MultiImageResNetLSTM(nn.Module):
    def __init__(self, base_model, lstm_hidden=512, num_classes=2):
        """
        A model combining ResNet for feature extraction and LSTM for temporal modeling.

        Args:
            base_model: Pretrained ResNet model.
            lstm_hidden: Number of hidden units in the LSTM layer.
            num_classes: Number of output classes for classification.
        """
        super(MultiImageResNetLSTM, self).__init__()
        # Use ResNet as a feature extractor, removing the fully connected (FC) layer
        self.feature_extractor = nn.Sequential(*list(base_model.children())[:-1])  # ResNet50 without FC
        
        # LSTM layer for temporal modeling of features
        self.lstm = nn.LSTM(input_size=2048, hidden_size=lstm_hidden, batch_first=True)
        
        # Fully connected classifier
        self.classifier = nn.Sequential(
            nn.Linear(lstm_hidden, 256),  # Linear layer to reduce dimensions
            nn.ReLU(),                   # Activation function
            nn.Dropout(0.4),             # Dropout for regularization
            nn.Linear(256, num_classes)  # Final layer for classification
        )

    def forward(self, x):
        """
        Forward pass of the model.

        Args:
            x: Input tensor of shape [B, 4, C, H, W], where:
               B = Batch size, 4 = Number of images per sample, 
               C = Channels, H = Height, W = Width.

        Returns:
            Output tensor of shape [B, num_classes].
        """
        # Reshape input to process all images in the batch
        B, N, C, H, W = x.shape
        x = x.view(B * N, C, H, W)  # [B * 4, C, H, W]

        # Extract features using ResNet
        features = self.feature_extractor(x)  # [B * 4, 2048, 1, 1]
        features = features.view(B, N, -1)    # Reshape to [B, 4, 2048]

        # Pass features through LSTM
        _, (hn, _) = self.lstm(features)      # hn: [1, B, lstm_hidden]
        hn = hn.squeeze(0)                    # Remove the first dimension: [B, lstm_hidden]

        # Classify using the fully connected layers
        out = self.classifier(hn)             # [B, num_classes]
        return out


In [21]:
resnet = models.resnet50(pretrained=True)
model = MultiImageResNetLSTM(resnet).to(device)



In [22]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
training(model, optimizer, criterion, train_loader, val_loader, num_epochs=5, device='cuda')

[Epoch 1/5] Loss: 0.5239
Validation performance:
              precision    recall  f1-score   support

      normal       0.91      0.90      0.90       201
       spoof       0.90      0.91      0.91       204

    accuracy                           0.90       405
   macro avg       0.90      0.90      0.90       405
weighted avg       0.90      0.90      0.90       405

[Epoch 2/5] Loss: 0.3656
Validation performance:
              precision    recall  f1-score   support

      normal       0.95      0.88      0.91       201
       spoof       0.89      0.96      0.92       204

    accuracy                           0.92       405
   macro avg       0.92      0.92      0.92       405
weighted avg       0.92      0.92      0.92       405

[Epoch 3/5] Loss: 0.2872
Validation performance:
              precision    recall  f1-score   support

      normal       0.83      0.96      0.89       201
       spoof       0.95      0.81      0.88       204

    accuracy                       

In [24]:
evaluate(model, val_loader)

              precision    recall  f1-score   support

      normal       0.95      0.94      0.94       201
       spoof       0.94      0.95      0.94       204

    accuracy                           0.94       405
   macro avg       0.94      0.94      0.94       405
weighted avg       0.94      0.94      0.94       405



In [64]:
from sklearn.metrics import roc_auc_score

def calculate_roc_auc(model, dataloader, device):
    model.eval()
    y_true, y_scores = [], []

    with torch.no_grad():
        for image_group, labels in dataloader:
            image_group = image_group.to(device)
            labels = labels.to(device)

            outputs = model(image_group)
            probabilities = torch.softmax(outputs, dim=1)[:, 1]  # Get probabilities for the positive class

            y_true.extend(labels.cpu().numpy())
            y_scores.extend(probabilities.cpu().numpy())

    roc_auc = roc_auc_score(y_true, y_scores)
    print(f"ROC-AUC Score: {roc_auc:.4f}")
    return roc_auc

# Calculate ROC-AUC for the validation set
roc_auc = calculate_roc_auc(model, val_loader, device)

ROC-AUC Score: 0.8573


In [26]:
# Save the model's state dictionary
torch.save(model.state_dict(), "resnet50_liveness_multi.pth")

In [53]:
# Ensure the model is initialized correctly
resnet = models.resnet50(pretrained=True)
model = MultiImageResNetLSTM(resnet).to(device)

# Load the state dictionary
model.load_state_dict(torch.load("resnet50_liveness_multi.pth"))



<All keys matched successfully>

Test

In [None]:
from PIL import Image
import torch

def predict_spoof_probability(input_path, model, transform, device):
    """
    Predict if an image or group of images is normal or spoof using the multi-image model.
    
    Args:
        input_path: Can be either:
                   - Single image path (string)
                   - List of image paths for the same person
                   - Directory containing images of the same person
        model: Trained MultiImageResNetLSTM model
        transform: Preprocessing transforms
        device: Device to run inference on ('cuda' or 'cpu')
    
    Returns:
        A probability in range [0,1] where:
        - 0 indicates normal (real face)
        - 1 indicates spoof (fake face)
    """
    model.eval()
    
    # Determine input type
    if isinstance(input_path, str):
        if os.path.isfile(input_path):
            # Single image path
            img_paths = [input_path]
        elif os.path.isdir(input_path):
            # Directory of images
            img_paths = sorted(glob.glob(os.path.join(input_path, "*.jpg")))
            if not img_paths:
                raise ValueError(f"No images found in directory: {input_path}")
        else:
            raise ValueError(f"Input path does not exist: {input_path}")
    elif isinstance(input_path, list):
        # List of image paths
        img_paths = input_path
    else:
        raise ValueError("Input path must be a string or list of strings")
    
    # Process images
    processed_images = []
    for path in img_paths:
        image = Image.open(path).convert("RGB")
        if transform:
            image = transform(image)
        processed_images.append(image)
    
    # Handle case where we have fewer than 4 images
    if len(processed_images) < 4:
        # Duplicate first image to get 4 total images
        while len(processed_images) < 4:
            processed_images.append(processed_images[0])
    elif len(processed_images) > 4:
        # If more than 4 images, use only the first 4
        processed_images = processed_images[:4]
    
    # Stack images and prepare for model
    image_batch = torch.stack(processed_images).unsqueeze(0).to(device)  # [1, 4, C, H, W]
    
    # Make prediction
    with torch.no_grad():
        outputs = model(image_batch)
        probabilities = torch.softmax(outputs, dim=1)
        
    # Return spoof probability (index 1)
    return probabilities[0, 1].item()



# Example paths (replace with actual paths)
single_image = "D:\\IT\\GITHUB\\TakeHomeTest\\dataset\\dev\\normal\\2_4.jpg"
score1 = predict_spoof_probability(single_image, model, transform, device)
print(f"Single image spoof probability: {score1:.4f}")
print(f"Single image classification: {'Spoof (fake)' if score1 > 0.5 else 'Normal (real)'}")


multiple_images = [
    "D:\\IT\\GITHUB\\TakeHomeTest\\dataset\\dev\\spoof\\63_1.jpg",
    "D:\\IT\\GITHUB\\TakeHomeTest\\dataset\\dev\\spoof\\63_2.jpg",
    "D:\\IT\\GITHUB\\TakeHomeTest\\dataset\\dev\\spoof\\63_3.jpg",
    "D:\\IT\\GITHUB\\TakeHomeTest\\dataset\\dev\\spoof\\63_4.jpg"
]
score2 = predict_spoof_probability(multiple_images, model, transform, device)
print(f"Multiple images spoof probability: {score2:.4f}")
print(f"Multiple images classification: {'Spoof (fake)' if score2 > 0.5 else 'Normal (real)'}")


# person_dir = "D:\\IT\\GITHUB\\TakeHomeTest\\dataset\\dev\\spoof\\person_a"
# # Example directory (replace with actual directory)
# score3 = predict_spoof_probability(person_dir, model, transform, device)   
# print(f"Directory spoof probability: {score2:.4f}")
# print(f"Directory classification: {'Spoof (fake)' if score2 > 0.5 else 'Normal (real)'}") 


Single image spoof probability: 0.0427
Multiple images spoof probability: 0.9030
Single image classification: Normal (real)
Multiple images classification: Spoof (fake)


## VIT

In [27]:
import timm
import torch
import torch.nn as nn

class ViT_LivenessClassifier(nn.Module):
    def __init__(self, vit_model_name='vit_base_patch16_224', num_classes=2):
        super(ViT_LivenessClassifier, self).__init__()
        self.vit = timm.create_model(vit_model_name, pretrained=True)
        self.vit.head = nn.Identity()  # Bỏ classification head của ViT

        self.embedding_dim = self.vit.num_features  # Thường là 768

        self.classifier = nn.Sequential(
            nn.Linear(self.embedding_dim, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        # x: [B, 4, C, H, W]
        B, N, C, H, W = x.shape
        x = x.view(B * N, C, H, W)

        embeddings = self.vit(x)  # [B*4, D]
        embeddings = embeddings.view(B, N, -1)  # [B, 4, D]

        # Mean pooling over 4 embeddings
        pooled = embeddings.mean(dim=1)  # [B, D]
        out = self.classifier(pooled)
        return out


  from .autonotebook import tqdm as notebook_tqdm


In [28]:
vit_model = ViT_LivenessClassifier(vit_model_name='vit_base_patch16_224').to(device)

In [29]:
# loss và optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(vit_model.parameters(), lr=2e-5)

In [None]:
training(model, optimizer, criterion, train_loader, val_loader, num_epochs=2, device='cuda')

[Epoch 1/2] Loss: 0.1505
Validation performance:
              precision    recall  f1-score   support

      normal       0.94      0.96      0.95       201
       spoof       0.96      0.94      0.95       204

    accuracy                           0.95       405
   macro avg       0.95      0.95      0.95       405
weighted avg       0.95      0.95      0.95       405

[Epoch 2/2] Loss: 0.1607
Validation performance:
              precision    recall  f1-score   support

      normal       0.94      0.96      0.95       201
       spoof       0.96      0.94      0.95       204

    accuracy                           0.95       405
   macro avg       0.95      0.95      0.95       405
weighted avg       0.95      0.95      0.95       405



In [33]:
evaluate(model, val_loader)

              precision    recall  f1-score   support

      normal       0.94      0.96      0.95       201
       spoof       0.96      0.94      0.95       204

    accuracy                           0.95       405
   macro avg       0.95      0.95      0.95       405
weighted avg       0.95      0.95      0.95       405



In [34]:
# Save the model's state dictionary
torch.save(model.state_dict(), "vit_liveness_multi.pth")

In [None]:
# model.load_state_dict(torch.load("vit_liveness_multi.pth"))

# AutoEncoder

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SimpleAutoEncoder(nn.Module):
    def __init__(self):
        super(SimpleAutoEncoder, self).__init__()
        # Encoder: Reduces the spatial dimensions while increasing the feature depth
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 32, 3, stride=2, padding=1),  # Convolution: [B, 3, 224, 224] -> [B, 32, 112, 112]
            nn.ReLU(),  # Activation function
            nn.Conv2d(32, 64, 3, stride=2, padding=1),  # Convolution: [B, 32, 112, 112] -> [B, 64, 56, 56]
            nn.ReLU(),  # Activation function
            nn.Conv2d(64, 128, 3, stride=2, padding=1),  # Convolution: [B, 64, 56, 56] -> [B, 128, 28, 28]
            nn.ReLU()  # Activation function
        )
        # Decoder: Reconstructs the original image from the encoded representation
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1),  # Transposed Convolution: [B, 128, 28, 28] -> [B, 64, 56, 56]
            nn.ReLU(),  # Activation function
            nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1),  # Transposed Convolution: [B, 64, 56, 56] -> [B, 32, 112, 112]
            nn.ReLU(),  # Activation function
            nn.ConvTranspose2d(32, 3, 3, stride=2, padding=1, output_padding=1),  # Transposed Convolution: [B, 32, 112, 112] -> [B, 3, 224, 224]
            nn.Sigmoid()  # Activation function to normalize pixel values between 0 and 1
        )

    def forward(self, x):
        # Forward pass through the encoder
        x = self.encoder(x)
        # Forward pass through the decoder
        x = self.decoder(x)
        return x


### Reconstruction error

In [None]:
from torchvision import transforms
from torch.utils.data import DataLoader

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Use only normal images for training
class NormalOnlyDataset(MultiImageLivenessDataset):
    def __init__(self, root_dir, transform=None):
        super().__init__(root_dir, transform)
        self.samples = [s for s in self.samples if s[1] == 0]

train_dataset = NormalOnlyDataset("dataset/train", transform=transform)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

# Use all images for validation
val_dataset = MultiImageLivenessDataset("dataset/dev", transform=transform)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)


In [37]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleAutoEncoder().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.MSELoss()

In [38]:
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for image_group, _ in train_loader:  # image_group: [B, 4, 3, H, W]
        images = image_group.view(-1, 3, 224, 224).to(device)  # [B*4, 3, H, W]
        outputs = model(images)
        loss = criterion(outputs, images)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {total_loss/len(train_loader):.4f}")


Epoch 1, Loss: 0.0610
Epoch 2, Loss: 0.0240
Epoch 3, Loss: 0.0092
Epoch 4, Loss: 0.0069
Epoch 5, Loss: 0.0056
Epoch 6, Loss: 0.0048
Epoch 7, Loss: 0.0041
Epoch 8, Loss: 0.0037
Epoch 9, Loss: 0.0036
Epoch 10, Loss: 0.0031


In [None]:
from sklearn.metrics import classification_report

def evaluate_autoencoder(model, dataloader, threshold=None):
    """
    Evaluate the autoencoder model for liveness detection.

    Args:
        model: The trained autoencoder model.
        dataloader: DataLoader for the dataset to evaluate.
        threshold: Threshold for classification. If None, the function will find the best threshold.

    Returns:
        If threshold is None, returns the best threshold. Otherwise, prints the classification report.
    """
    model.eval()  # Set the model to evaluation mode
    y_true, y_pred, scores = [], [], []  # Initialize lists to store true labels, predicted labels, and scores

    with torch.no_grad():  # Disable gradient computation for evaluation
        for image_group, labels in dataloader:
            batch_size = image_group.size(0)  # Get the batch size
            # Flatten the image group for processing: [B*4, 3, 224, 224]
            images = image_group.view(-1, 3, 224, 224).to(device)
            # Reconstruct the images using the autoencoder
            recons = model(images)

            # Compute the Mean Squared Error (MSE) for each image: [B*4]
            mse = F.mse_loss(recons, images, reduction='none')
            mse = mse.view(batch_size, 4, -1).mean(dim=2)  # Reshape and compute mean MSE for each image group: [B, 4]
            group_mse = mse.mean(dim=1)  # Compute the mean MSE for the group: [B]

            # Classification based on the threshold
            if threshold is None:
                # If no threshold is provided, store scores and true labels for threshold optimization
                scores.extend(group_mse.cpu().numpy())
                y_true.extend(labels.numpy())
            else:
                # Classify based on the threshold
                pred_labels = (group_mse > threshold).long()  # Predict labels: 1 if MSE > threshold, else 0
                y_pred.extend(pred_labels.cpu().numpy())
                y_true.extend(labels.numpy())

    if threshold is None:
        # If no threshold is provided, find the best threshold using ROC curve
        from sklearn.metrics import roc_curve
        fpr, tpr, thres = roc_curve(y_true, scores)  # Compute False Positive Rate, True Positive Rate, and thresholds
        best_idx = (tpr - fpr).argmax()  # Find the index of the best threshold (maximizing TPR - FPR)
        best_threshold = thres[best_idx]  # Get the best threshold
        print(f"[INFO] Best threshold: {best_threshold:.4f}")
        return best_threshold  # Return the best threshold
    else:
        # If a threshold is provided, print the classification report
        report = classification_report(y_true, y_pred, target_names=["normal", "spoof"])
        print(report)


In [None]:
# find best threshold
best_thresh = evaluate_autoencoder(model, val_loader, threshold=None)

# evaluate with the best threshold
evaluate_autoencoder(model, val_loader, threshold=best_thresh)


[INFO] Best threshold: 0.0024
              precision    recall  f1-score   support

      normal       0.53      0.49      0.51       201
       spoof       0.53      0.57      0.55       204

    accuracy                           0.53       405
   macro avg       0.53      0.53      0.53       405
weighted avg       0.53      0.53      0.53       405



In [41]:
# Save the model's state dictionary
torch.save(model.state_dict(), "AEReconstruction_liveness_multi.pth")

In [None]:
# model.load_state_dict(torch.load("AEReconstruction_liveness_multi.pth"))

<All keys matched successfully>

### AE+Resnet Classifier

In [42]:
train_set = MultiImageLivenessDataset("dataset/train", transform=transform)
val_set = MultiImageLivenessDataset("dataset/dev", transform=transform)
train_loader = DataLoader(train_set, batch_size=8, shuffle=True)
val_loader = DataLoader(val_set, batch_size=8, shuffle=False)

In [None]:
from torchvision.models import resnet18

class ResNetClassifier(nn.Module):
    def __init__(self):
        super(ResNetClassifier, self).__init__()
        # Load a pre-trained ResNet18 model
        base = resnet18(pretrained=True)
        # Remove the final fully connected (FC) layer to use it as a feature extractor
        base.fc = nn.Identity()
        self.backbone = base
        # Add a new classifier layer for binary classification (2 classes)
        self.classifier = nn.Linear(512, 2)

    def forward(self, x):
        # Extract features using the ResNet backbone
        feat = self.backbone(x)
        # Pass the extracted features through the classifier
        out = self.classifier(feat)
        return out


In [45]:
class AE_ResNet_Pipeline(nn.Module):
    def __init__(self, autoencoder, classifier):
        super(AE_ResNet_Pipeline, self).__init__()
        self.autoencoder = autoencoder
        self.classifier = classifier

    def forward(self, x):
        # x: [B, 4, 3, 224, 224]
        b, s, c, h, w = x.size()
        x = x.view(-1, c, h, w)  # [B*4, 3, H, W]

        # AE reconstruct
        recon = self.autoencoder(x)  # [B*4, 3, H, W]

        # Classifier
        logits = self.classifier(recon)  # [B*4, 2]

        logits = logits.view(b, s, 2).mean(dim=1)  # [B, 2], avg over 4 imgs
        return logits


In [None]:
model = AE_ResNet_Pipeline(SimpleAutoEncoder(), ResNetClassifier()).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=5e-4)
criterion = nn.CrossEntropyLoss()



In [47]:
for epoch in range(10):
    model.train()
    total_loss = 0
    for image_group, labels in train_loader:  # [B, 4, 3, H, W]
        image_group = image_group.to(device)
        labels = labels.to(device)

        outputs = model(image_group)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    print(f"[Epoch {epoch+1}] Loss: {total_loss / len(train_loader):.4f}")


[Epoch 1] Loss: 0.6647
[Epoch 2] Loss: 0.5729
[Epoch 3] Loss: 0.5718
[Epoch 4] Loss: 0.4925
[Epoch 5] Loss: 0.4539
[Epoch 6] Loss: 0.3784
[Epoch 7] Loss: 0.3457
[Epoch 8] Loss: 0.2916
[Epoch 9] Loss: 0.2515
[Epoch 10] Loss: 0.1931


In [48]:
from sklearn.metrics import classification_report

def evaluate_pipeline(model, dataloader):
    model.eval()
    y_true, y_pred = [], []

    with torch.no_grad():
        for image_group, labels in dataloader:
            image_group = image_group.to(device)
            outputs = model(image_group)
            preds = torch.argmax(outputs, dim=1)

            y_true.extend(labels.numpy())
            y_pred.extend(preds.cpu().numpy())

    print(classification_report(y_true, y_pred, target_names=["normal", "spoof"]))


In [49]:
evaluate_pipeline(model, val_loader)

              precision    recall  f1-score   support

      normal       0.89      0.49      0.63       201
       spoof       0.65      0.94      0.77       204

    accuracy                           0.72       405
   macro avg       0.77      0.71      0.70       405
weighted avg       0.77      0.72      0.70       405



In [50]:
# Save the model's state dictionary
torch.save(model.state_dict(), "AEClassifier_liveness_multi.pth")

In [None]:
# model.load_state_dict(torch.load("AeClassifier_liveness_multi.pth"))