#0. Creating CSV file

In [None]:
from google.colab import drive
drive.mount('/content/drive')

base_dir = "/content/drive/MyDrive/face_liveness_rose"
videos_dir = f"{base_dir}/videos_raw"


Mounted at /content/drive


In [None]:
%%writefile build_rose_youtu_csv.py
# (paste your final working script here)

#!/usr/bin/env python3
"""
Build CSV metadata for the ROSE-Youtu Face Liveness dataset.

Usage example:

    python build_rose_youtu_csv.py \
        --videos_dir "/Users/you/ROSE_Youtu/videos_raw" \
        --output_csv "/Users/you/ROSE_Youtu/rose_youtu_filelist.csv"
"""

import os
import csv
import re
import argparse

# Official training subject IDs from the protocol
TRAIN_IDS = {2, 3, 4, 5, 6, 7, 9, 10, 11, 12}


def get_attack_type(L: str) -> str:
    """Map the first token L to a human-readable attack type."""
    if L == "G":
        return "genuine"
    elif L in ("Ps", "Pq"):
        return "print"
    elif L in ("Vl", "Vm"):
        return "replay"
    elif L in ("Mc", "Mf", "Mu", "Ml"):
        return "mask"
    else:
        return "unknown"


def parse_args():
    parser = argparse.ArgumentParser(
        description="Build ROSE-Youtu CSV metadata from videos."
    )
    parser.add_argument(
        "--videos_dir",
        type=str,
        required=True,
        help="Path to the folder containing all ROSE-Youtu videos (videos_raw).",
    )
    parser.add_argument(
        "--output_csv",
        type=str,
        required=True,
        help="Path where the output CSV will be saved.",
    )
    return parser.parse_args()


def main():
    args = parse_args()
    videos_dir = args.videos_dir
    csv_path = args.output_csv

    if not os.path.isdir(videos_dir):
        raise FileNotFoundError(f"videos_dir does not exist or is not a directory: {videos_dir}")

    rows = []
    skipped = 0

    print(f"Scanning videos in: {videos_dir}")

    for root, dirs, files in os.walk(videos_dir):
        for fname in files:
            if not fname.lower().endswith(".mp4"):
                continue

            full_path = os.path.join(root, fname)
            name = os.path.splitext(fname)[0]
            parts = name.split("_")

            if len(parts) != 7:
                print(f"⚠️ Skipping unexpected name format ({len(parts)} parts): {fname}")
                skipped += 1
                continue

            L, S, D, X, E, p_token, N = parts

            # Label: 1 = Real (G), 0 = Spoof
            label = 1 if L == "G" else 0
            attack_type = get_attack_type(L)

            # Person ID:
            # handle both "p16" and "16" styles
            if p_token.startswith("p"):
                raw_pid = p_token[1:]
            else:
                raw_pid = p_token

            pid_str = re.sub(r"[^0-9]", "", raw_pid)
            if not pid_str:
                print(f"⚠️ Could not parse person id from '{p_token}' in {fname}")
                skipped += 1
                continue

            try:
                person_id = int(pid_str)
            except ValueError:
                print(f"⚠️ Invalid person id '{pid_str}' parsed from '{p_token}' in {fname}")
                skipped += 1
                continue

            # Train/test split by protocol
            split = "train" if person_id in TRAIN_IDS else "test"

            rows.append({
                "video_path": os.path.abspath(full_path),
                "filename": fname,
                "L": L,
                "S": S,
                "D": D,
                "X": X,
                "E": E,
                "person_token": p_token,
                "index_token": N,
                "person_id": person_id,
                "label": label,          # 1 = Real, 0 = Spoof
                "attack_type": attack_type,
                "split": split
            })

    # Save to CSV
    fieldnames = [
        "video_path", "filename",
        "L", "S", "D", "X", "E",
        "person_token", "index_token",
        "person_id", "label", "attack_type", "split"
    ]

    os.makedirs(os.path.dirname(csv_path) or ".", exist_ok=True)

    with open(csv_path, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        for r in rows:
            writer.writerow(r)

    print("\n✅ Done.")
    print(f"  Videos found & written: {len(rows)}")
    print(f"  Videos skipped (bad names etc.): {skipped}")
    print(f"  CSV saved to: {csv_path}")


if __name__ == "__main__":
    main()


Writing build_rose_youtu_csv.py


In [None]:
!python build_rose_youtu_csv.py \
  --videos_dir "$videos_dir" \
  --output_csv "$base_dir/metadata/rose_youtu_filelist.csv"


Scanning videos in: /content/drive/MyDrive/face_liveness_rose/videos_raw

✅ Done.
  Videos found & written: 3497
  Videos skipped (bad names etc.): 0
  CSV saved to: /content/drive/MyDrive/face_liveness_rose/metadata/rose_youtu_filelist.csv


In [None]:
import pandas as pd

csv_path = f"{base_dir}/metadata/rose_youtu_filelist.csv"
df = pd.read_csv(csv_path)

print(df.head())
print(df['label'].value_counts())
print(df['split'].value_counts())


                                          video_path                 filename  \
0  /content/drive/MyDrive/face_liveness_rose/vide...  Vm_NT_HW_g_E_22_169.mp4   
1  /content/drive/MyDrive/face_liveness_rose/vide...   G_NT_HS_wg_E_23_10.mp4   
2  /content/drive/MyDrive/face_liveness_rose/vide...    G_NT_5s_wg_E_23_5.mp4   
3  /content/drive/MyDrive/face_liveness_rose/vide...    G_NT_5s_wg_E_23_2.mp4   
4  /content/drive/MyDrive/face_liveness_rose/vide...  Vl_NT_HW_g_E_22_149.mp4   

    L   S   D   X  E  person_token  index_token  person_id  label attack_type  \
0  Vm  NT  HW   g  E            22          169         22      0      replay   
1   G  NT  HS  wg  E            23           10         23      1     genuine   
2   G  NT  5s  wg  E            23            5         23      1     genuine   
3   G  NT  5s  wg  E            23            2         23      1     genuine   
4  Vl  NT  HW   g  E            22          149         22      0      replay   

  split  
0  test  
1  tes

#1. Imports & basic setup

In [None]:
import os
import cv2
import torch
import random
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image


#2. Define transforms (for MobileNetV3)

In [None]:
# ImageNet mean/std (same you used before)
imagenet_mean = [0.485, 0.456, 0.406]
imagenet_std  = [0.229, 0.224, 0.225]

train_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=imagenet_mean, std=imagenet_std),
])

val_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=imagenet_mean, std=imagenet_std),
])


# 3. Create the train/test DataFrames

In [None]:
train_df = df[df["split"] == "train"].reset_index(drop=True)
test_df  = df[df["split"] == "test"].reset_index(drop=True)

len(train_df), len(test_df)


(1748, 1749)

#4. PyTorch Dataset: sample one random frame per video

In [None]:
class RoseYoutuFrameDataset(Dataset):
    def __init__(self, df, transform=None, max_attempts=5):
        """
        df: pandas DataFrame with columns ['video_path', 'label', ...]
        transform: torchvision transforms to apply to each frame
        max_attempts: how many times to try reading a valid frame before giving up
        """
        self.df = df.reset_index(drop=True)
        self.transform = transform
        self.max_attempts = max_attempts

    def __len__(self):
        # One sample = one video (we pick a random frame each time)
        return len(self.df)

    def _read_random_frame(self, video_path):
        """Open a video and return a random frame as a PIL.Image."""
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise RuntimeError(f"Failed to open video: {video_path}")

        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        if frame_count <= 0:
            cap.release()
            raise RuntimeError(f"No frames found in video: {video_path}")

        # choose random frame index
        frame_idx = random.randint(0, frame_count - 1)
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
        ret, frame = cap.read()
        cap.release()

        if not ret or frame is None:
            raise RuntimeError(f"Failed to read frame {frame_idx} from {video_path}")

        # OpenCV gives BGR, convert to RGB
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        img = Image.fromarray(frame)
        return img

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        video_path = row["video_path"]
        label = int(row["label"])  # 0 = spoof, 1 = real

        # retry mechanism in case a random frame is bad
        last_exception = None
        for _ in range(self.max_attempts):
            try:
                img = self._read_random_frame(video_path)
                break
            except Exception as e:
                last_exception = e
                continue
        else:
            # if all attempts failed:
            raise last_exception if last_exception is not None else RuntimeError(
                f"Could not read any frame from {video_path}"
            )

        if self.transform is not None:
            img = self.transform(img)

        return img, label


#5. Create Dataset + DataLoader objects

In [None]:
batch_size = 32

train_dataset = RoseYoutuFrameDataset(train_df, transform=train_transform)
test_dataset  = RoseYoutuFrameDataset(test_df,  transform=val_transform)

train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=2,      # you can try 2–4; if errors on Colab, set to 0
    pin_memory=True,
)

test_loader = DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=2,
    pin_memory=True,
)


#6. Sanity check: iterate one batch

In [None]:
batch = next(iter(train_loader))
images, labels = batch

print("Images shape:", images.shape)   # expected: [batch_size, 3, 224, 224]
print("Labels shape:", labels.shape)   # expected: [batch_size]
print("Labels:", labels[:10])


Images shape: torch.Size([32, 3, 224, 224])
Labels shape: torch.Size([32])
Labels: tensor([0, 0, 0, 0, 0, 0, 1, 0, 0, 0])


#7. Load MobileNetV3-Large (Pretrained)

In [None]:
import torch
import torch.nn as nn
from torchvision import models

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Load pretrained MobileNetV3-Large
model = models.mobilenet_v3_large(weights=models.MobileNet_V3_Large_Weights.IMAGENET1K_V1)

# Modify final classifier layer → output 1 logit (binary)
in_features = model.classifier[-1].in_features
model.classifier[-1] = nn.Linear(in_features, 1)

model = model.to(device)
print(model)


Using device: cuda
Downloading: "https://download.pytorch.org/models/mobilenet_v3_large-8738ca79.pth" to /root/.cache/torch/hub/checkpoints/mobilenet_v3_large-8738ca79.pth


100%|██████████| 21.1M/21.1M [00:00<00:00, 73.6MB/s]


MobileNetV3(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
      (2): Hardswish()
    )
    (1): InvertedResidual(
      (block): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=16, bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
        )
        (1): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
        )
      )
    )
    (2): InvertedResidual(
      (block): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 64, kernel_size=(1, 1), stride=(1, 1), bi

#8. Define Loss Function + Optimizer

In [None]:
criterion = nn.BCEWithLogitsLoss()

optimizer = torch.optim.Adam(
    model.parameters(),
    lr=1e-4,         # good starting LR
    weight_decay=1e-5
)


#9. Training Loop (Simple & Clean)

In [None]:
def train_one_epoch(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    total_correct = 0
    total_samples = 0

    for images, labels in loader:
        images = images.to(device)
        labels = labels.float().to(device)  # BCE needs float

        optimizer.zero_grad()

        outputs = model(images).squeeze(1)   # [batch] shape
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        total_loss += loss.item() * images.size(0)

        # Predictions: sigmoid → threshold 0.5
        preds = (torch.sigmoid(outputs) >= 0.5).long()
        total_correct += (preds.cpu() == labels.cpu().long()).sum().item()
        total_samples += labels.size(0)

    avg_loss = total_loss / total_samples
    accuracy = total_correct / total_samples

    return avg_loss, accuracy


#10. Validation Loop

In [None]:
def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for images, labels in loader:
            images = images.to(device)
            labels = labels.float().to(device)

            outputs = model(images).squeeze(1)
            loss = criterion(outputs, labels)

            total_loss += loss.item() * images.size(0)

            preds = (torch.sigmoid(outputs) >= 0.5).long()
            total_correct += (preds.cpu() == labels.cpu().long()).sum().item()
            total_samples += labels.size(0)

    avg_loss = total_loss / total_samples
    accuracy = total_correct / total_samples

    return avg_loss, accuracy


#11. Main Training Loop

In [None]:
epochs = 10   # we can increase later
train_losses = []
test_losses = []
train_accs = []
test_accs = []

for epoch in range(1, epochs + 1):
    train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion, device)
    val_loss, val_acc = evaluate(model, test_loader, criterion, device)

    train_losses.append(train_loss)
    test_losses.append(val_loss)
    train_accs.append(train_acc)
    test_accs.append(val_acc)

    print(f"Epoch {epoch}/{epochs}")
    print(f"  Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
    print(f"  Val   Loss: {val_loss:.4f} | Val   Acc: {val_acc:.4f}")


Epoch 1/10
  Train Loss: 0.2343 | Train Acc: 0.9027
  Val   Loss: 0.0535 | Val   Acc: 0.9846
Epoch 2/10
  Train Loss: 0.0209 | Train Acc: 0.9943
  Val   Loss: 0.0628 | Val   Acc: 0.9737
Epoch 3/10
  Train Loss: 0.0215 | Train Acc: 0.9943
  Val   Loss: 0.0340 | Val   Acc: 0.9891
Epoch 4/10
  Train Loss: 0.0101 | Train Acc: 0.9971
  Val   Loss: 0.0588 | Val   Acc: 0.9817
Epoch 5/10
  Train Loss: 0.0040 | Train Acc: 0.9994
  Val   Loss: 0.0141 | Val   Acc: 0.9960
Epoch 6/10
  Train Loss: 0.0019 | Train Acc: 1.0000
  Val   Loss: 0.0199 | Val   Acc: 0.9937
Epoch 7/10
  Train Loss: 0.0028 | Train Acc: 0.9994
  Val   Loss: 0.0173 | Val   Acc: 0.9937
Epoch 8/10
  Train Loss: 0.0016 | Train Acc: 1.0000
  Val   Loss: 0.0264 | Val   Acc: 0.9914
Epoch 9/10
  Train Loss: 0.0009 | Train Acc: 1.0000
  Val   Loss: 0.0179 | Val   Acc: 0.9937
Epoch 10/10
  Train Loss: 0.0098 | Train Acc: 0.9954
  Val   Loss: 0.0676 | Val   Acc: 0.9806


#12. Save Your Model

In [18]:
save_path = f"{base_dir}/checkpoints/mobilenetv3_liveness.pth"
os.makedirs(f"{base_dir}/checkpoints", exist_ok=True)
torch.save(model.state_dict(), save_path)

print("Model saved to:", save_path)

Model saved to: /content/drive/MyDrive/face_liveness_rose/checkpoints/mobilenetv3_liveness.pth
