In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# =========================
# -------- RawTFNet --------
# =========================

class TFAttention(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.fc1 = nn.Linear(channels, channels // 4)
        self.fc2 = nn.Linear(channels // 4, channels)

    def forward(self, x):
        w = x.mean(dim=2)
        w = F.relu(self.fc1(w))
        w = torch.sigmoid(self.fc2(w)).unsqueeze(-1)
        return x * w


class TFResidualBlock(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()

        self.conv1 = nn.Conv1d(in_ch, out_ch, 3, padding=1)
        self.bn1 = nn.BatchNorm1d(out_ch)

        self.conv2 = nn.Conv1d(out_ch, out_ch, 3, padding=1)
        self.bn2 = nn.BatchNorm1d(out_ch)

        self.att = TFAttention(out_ch)
        self.act = nn.LeakyReLU(0.3)

        self.skip = nn.Conv1d(in_ch, out_ch, 1) if in_ch != out_ch else nn.Identity()
        self.pool = nn.MaxPool1d(3)

    def forward(self, x):
        identity = self.skip(x)
        x = self.act(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))
        x = self.att(x)
        x = self.act(x + identity)
        return self.pool(x)


class RawTFNet(nn.Module):
    def __init__(self, base_channels=32):
        super().__init__()

        self.frontend = nn.Sequential(
            nn.Conv1d(1, base_channels, 251, padding=125),
            nn.BatchNorm1d(base_channels),
            nn.LeakyReLU(0.3),
            nn.MaxPool1d(3)
        )

        self.block1 = TFResidualBlock(base_channels, base_channels * 2)
        self.block2 = TFResidualBlock(base_channels * 2, base_channels * 4)
        self.block3 = TFResidualBlock(base_channels * 4, base_channels * 8)

        self.gru = nn.GRU(base_channels * 8, 512, batch_first=True)
        self.fc1 = nn.Linear(512, 512)
        self.fc2 = nn.Linear(512, 2)

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.frontend(x)
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)

        x = x.permute(0, 2, 1)
        x, _ = self.gru(x)
        x = F.normalize(self.fc1(x[:, -1]), dim=1)
        return self.fc2(x)


# =========================
# -------- RawNet2 --------
# =========================
class FixedSincConv(nn.Module):
    def __init__(self, out_channels=128, kernel_size=129, sample_rate=16000):
        super().__init__()

        if kernel_size % 2 == 0:
            kernel_size += 1

        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.sample_rate = sample_rate

        # Linear-scale fixed filterbank (S3 setup in paper)
        low_hz = 0
        high_hz = sample_rate // 2
        hz = torch.linspace(low_hz, high_hz, out_channels + 1)

        self.low = hz[:-1]
        self.high = hz[1:]

        n = torch.arange(-(kernel_size // 2), kernel_size // 2 + 1)
        self.register_buffer("n", n)
        self.register_buffer("window", torch.hamming_window(kernel_size))

    def forward(self, x):
        device = x.device
        filters = []

        for low, high in zip(self.low, self.high):
            f1 = low / self.sample_rate
            f2 = high / self.sample_rate
            n = self.n.to(device)

            sinc1 = torch.sin(2 * math.pi * f1 * n) / (n + 1e-9)
            sinc2 = torch.sin(2 * math.pi * f2 * n) / (n + 1e-9)
            band = (sinc2 - sinc1) * self.window.to(device)
            filters.append(band)

        filters = torch.stack(filters).view(self.out_channels, 1, self.kernel_size)
        return F.conv1d(x, filters, stride=1, padding=self.kernel_size // 2)

class FMS(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.fc = nn.Linear(channels, channels)

    def forward(self, x):
        s = x.mean(dim=2)
        s = torch.sigmoid(self.fc(s))
        s = s.unsqueeze(2)
        return x * s + s


class RawNetResidualBlock(nn.Module):
    def __init__(self, channels):
        super().__init__()

        self.conv1 = nn.Conv1d(channels, channels, 3, padding=1)
        self.bn1 = nn.BatchNorm1d(channels)
        self.conv2 = nn.Conv1d(channels, channels, 3, padding=1)
        self.bn2 = nn.BatchNorm1d(channels)

        self.fms = FMS(channels)
        self.pool = nn.MaxPool1d(3)

    def forward(self, x):
        residual = x
        x = F.leaky_relu(self.bn1(self.conv1(x)), 0.3)
        x = F.leaky_relu(self.bn2(self.conv2(x)), 0.3)
        x = x + residual
        x = self.pool(x)
        x = self.fms(x)
        return x


class RawNet2_AntiSpoof(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()

        self.sinc = FixedSincConv(out_channels=128, kernel_size=129)
        self.bn0 = nn.BatchNorm1d(128)

        self.block1 = nn.Sequential(
            RawNetResidualBlock(128),
            RawNetResidualBlock(128)
        )

        self.conv_expand = nn.Conv1d(128, 512, 1)

        self.block2 = nn.Sequential(
            RawNetResidualBlock(512),
            RawNetResidualBlock(512),
            RawNetResidualBlock(512),
            RawNetResidualBlock(512)
        )

        self.gru = nn.GRU(512, 1024, batch_first=True)
        self.fc1 = nn.Linear(1024, 1024)
        self.fc2 = nn.Linear(1024, num_classes)

    def forward(self, x):
        x = self.sinc(x)
        x = F.leaky_relu(self.bn0(x), 0.3)

        x = self.block1(x)
        x = self.conv_expand(x)
        x = self.block2(x)

        x = x.permute(0, 2, 1)

        self.gru.flatten_parameters()
        x, _ = self.gru(x)

        x = x[:, -1, :]
        x = F.leaky_relu(self.fc1(x), 0.3)
        return self.fc2(x)
    
    
# =========================
# -------- AASIST --------
# =========================
class AASIST(nn.Module):
    def __init__(self):
        super().__init__()

        self.frontend = nn.Sequential(
            nn.Conv1d(1, 64, 251, padding=125),
            nn.BatchNorm1d(64),
            nn.LeakyReLU(0.3),
            nn.MaxPool1d(3)
        )

        self.block1 = STBlock(64, 128)
        self.block2 = STBlock(128, 256)
        self.block3 = STBlock(256, 512)

        self.gru = nn.GRU(512, 512, batch_first=True)
        self.fc1 = nn.Linear(512, 512)
        self.fc2 = nn.Linear(512, 2)

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.frontend(x)
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)

        x = x.permute(0, 2, 1)
        x, _ = self.gru(x)

        x = F.normalize(self.fc1(x[:, -1]), dim=1)
        return self.fc2(x)

class STBlock(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()

        self.conv = nn.Sequential(
            nn.Conv1d(in_ch, out_ch, 3, padding=1),
            nn.BatchNorm1d(out_ch),
            nn.LeakyReLU(0.3),

            # ðŸ”¥ extra downsampling BEFORE attention
            nn.MaxPool1d(4)
        )

        self.attn = TemporalSelfAttention(out_ch, heads=2)

    def forward(self, x):
        x = self.conv(x)          # (B, C, Tâ†“)
        x = x.permute(0, 2, 1)    # (B, Tâ†“, C)
        x = self.attn(x)
        return x.permute(0, 2, 1)


class TemporalSelfAttention(nn.Module):
    def __init__(self, dim, heads=4):
        super().__init__()
        self.attn = nn.MultiheadAttention(
            embed_dim=dim,
            num_heads=heads,
            batch_first=True
        )

    def forward(self, x):
        # x: (B, T, C)
        out, _ = self.attn(x, x, x)
        return out


# =========================
# -------- RawformerL --------
# =========================
class RawFormerL(nn.Module):
    def __init__(self):
        super().__init__()

        self.frontend = nn.Sequential(
            nn.Conv1d(1, 128, kernel_size=251, stride=5, padding=125),
            nn.BatchNorm1d(128),
            nn.LeakyReLU(0.3),
            nn.MaxPool1d(3),

            nn.Conv1d(128, 256, kernel_size=5, stride=2, padding=2),
            nn.BatchNorm1d(256),
            nn.LeakyReLU(0.3),
            nn.MaxPool1d(3)
        )

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=256,
            nhead=8,
            dim_feedforward=1024,
            dropout=0.1,
            batch_first=True
        )

        self.transformer = nn.TransformerEncoder(
            encoder_layer,
            num_layers=4
        )

        self.fc = nn.Sequential(
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Linear(256, 2)
        )

    def forward(self, x):
        x = x.unsqueeze(1)          # (B,1,T)
        x = self.frontend(x)        # (B,256,T')
        x = x.permute(0, 2, 1)      # (B,T',256)

        x = self.transformer(x)

        mean = x.mean(dim=1)
        std = x.std(dim=1)
        stats = torch.cat([mean, std], dim=1)  # (B,512)

        return self.fc(stats)



A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.2.6 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "c:\Users\bhatt\miniconda3\envs\torch118\lib\runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "c:\Users\bhatt\miniconda3\envs\torch118\lib\runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "c:\Users\bhatt\miniconda3\envs\torch118\lib\site-packages\ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
  File "c:\Users\bhatt\miniconda3\envs\torch118\lib\site-packages\traitlets\config\application.py", line 1075, in launch_instance
 

In [2]:
def compute_eer(scores, labels):
    fpr, tpr, _ = roc_curve(labels, scores, pos_label=1)
    fnr = 1 - tpr
    return 100 * fpr[np.nanargmin(np.abs(fnr - fpr))]
def compute_min_tDCF(scores, labels):
    Ptar, Cmiss, Cfa = 0.01, 1, 1
    fpr, tpr, _ = roc_curve(labels, scores, pos_label=1)
    fnr = 1 - tpr
    return np.min(Cmiss * Ptar * fnr + Cfa * (1 - Ptar) * fpr)


In [6]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset

class ProtocolNPYTestDataset(Dataset):
    def __init__(self, root_dir, protocol_path):
        self.samples = []

        # -------------------------
        # Read protocol file
        # -------------------------
        label_dict = {}

        with open(protocol_path, "r") as f:
            for line in f:
                parts = line.strip().split()
                utt_id = parts[1]  # ASVspoof ID
                label = 1 if parts[-1] == "spoof" else 0
                label_dict[utt_id] = label

        # -------------------------
        # Match .npy files to protocol
        # -------------------------
        for f in os.listdir(root_dir):
            if f.endswith(".npy"):

                utt_id = os.path.splitext(f)[0]

                if utt_id in label_dict:
                    path = os.path.join(root_dir, f)
                    self.samples.append((path, label_dict[utt_id]))

        print(f"[INFO] Loaded {len(self.samples)} files from {root_dir}")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        path, label = self.samples[idx]

        audio = np.load(path)

        return (
            torch.tensor(audio, dtype=torch.float32),
            torch.tensor(label, dtype=torch.long)
        )
from torch.utils.data import DataLoader

def get_test_loader(test_path):

    protocol_path = "LA/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.eval.trl.txt"

    dataset = ProtocolNPYTestDataset(test_path, protocol_path)

    loader = DataLoader(
        dataset,
        batch_size=16,
        shuffle=False,
        num_workers=2
    )

    return loader


In [None]:
import os
import torch
import pandas as pd

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

TRAIN_RATIOS = ["25", "50", "75"]
TEST_RATIOS = ["25", "50", "75"]

MODEL_ROOT = "ModelsSaved"
TEST_ROOT = "TestData"

# -----------------------
# Model factory
# -----------------------

def get_model(model_name):
    if model_name == "aasist":
        return AASIST()
    elif model_name == "rawnet2":
        return RawNet2_AntiSpoof()
    elif model_name == "rawformerL":
        return RawFormerL()
    elif model_name == "rawtfnet_16":
        return RawTFNet(base_channels=16)
    elif model_name == "rawtfnet_32":
        return RawTFNet(base_channels=32)
    else:
        raise ValueError("Unknown model")

# -----------------------
# Evaluation loop
# -----------------------

results = []
@torch.no_grad()
def evaluate(model, loader, model_name):
    model.eval()

    scores = []
    labels = []

    for x, y in loader:

        x = x.to(DEVICE)
        y = y.to(DEVICE)

        # RawNet2 expects (B,1,T)
        if model_name == "rawnet2":
            x = x.unsqueeze(1)

        logits = model(x)

        probs = torch.softmax(logits, dim=1)
        spoof_scores = probs[:, 1]  # probability of spoof

        scores.extend(spoof_scores.cpu().numpy())
        labels.extend(y.cpu().numpy())

    return np.array(scores), np.array(labels)

for train_ratio in TRAIN_RATIOS:

    train_folder = os.path.join(MODEL_ROOT, f"{train_ratio}% CS")

    for model_file in os.listdir(train_folder):

        if not model_file.endswith(".pth"):
            continue

        model_name = model_file.replace(f"_cs_{train_ratio}.pth", "")
        model_path = os.path.join(train_folder, model_file)

        print(f"\nLoading model: {model_file}")

        model = get_model(model_name).to(DEVICE)
        checkpoint = torch.load(model_path, map_location=DEVICE)
        model.load_state_dict(checkpoint["model"])        
        model.eval()

        for test_ratio in TEST_RATIOS:

            print(f"Evaluating Train {train_ratio} â†’ Test {test_ratio}")

            test_loader = get_test_loader(
                os.path.join(TEST_ROOT, f"cs_{test_ratio}")
            )

            scores, labels = evaluate(model, test_loader, model_name)

            eer = compute_eer(scores, labels)
            tdcf = compute_min_tDCF(scores, labels)

            results.append({
                "Model": model_name,
                "Train_CS": train_ratio,
                "Test_CS": test_ratio,
                "EER": eer,
                "min_tDCF": tdcf
            })

# -----------------------
# Save results
# -----------------------

df = pd.DataFrame(results)
df.to_csv("CS_Robustness_Results.csv", index=False)

print("\nEvaluation complete. Results saved to CS_Robustness_Results.csv")



Loading model: aasist_cs_25.pth
Evaluating Train 25 â†’ Test 25
[INFO] Loaded 71237 files from TestData\cs_25
