In [27]:
pip install wandb

Note: you may need to restart the kernel to use updated packages.


In [28]:
pip install nbformat

Note: you may need to restart the kernel to use updated packages.


In [29]:
import wandb

In [30]:
# wandb.login()

In [64]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import pickle
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
from eer import compute_eer
from sklearn.metrics import classification_report, confusion_matrix

In [32]:
from datetime import datetime

In [33]:
import os

In [34]:
def get_data(X_path, y_path):
    with open(X_path, 'rb') as f:
        X_train = pickle.load(f)
    with open(y_path, 'rb') as f:
        y_train = pickle.load(f)   
    return X_train, y_train

In [35]:
X_train, y_train = get_data(X_path ='../data/dataset/mfcc_train_60_cnn/X_train_60_cnn.pkl', y_path = '../data/dataset/mfcc_train_60_cnn/y_train_60_cnn.pkl')
X_dev, y_dev = get_data(X_path ='../data/dataset/mfcc_dev_60_cnn/X_dev_60_cnn.pkl', y_path = '../data/dataset/mfcc_dev_60_cnn/y_dev_60_cnn.pkl')

In [36]:
print(X_train[0].shape)
print(X_train[1].shape)

(60, 75)
(60, 88)


In [37]:
def pad_and_tensorize(X, y, max_len):
    # (n_mfcc, T)
    max_len = max([x.shape[1] for x in X])  
    X_padded = [ np.pad(x, ((0, 0), (0, max_len - x.shape[1])), mode='constant') for x in X]
    X_tensor = torch.tensor(np.stack(X_padded), dtype=torch.float32).unsqueeze(1)  # (N, 1, n_mfcc, T)
    y_tensor = torch.tensor(y, dtype=torch.long)
    return X_tensor, y_tensor

In [38]:
max_len = max(max([x.shape[1] for x in X_train]), max([x.shape[1] for x in X_dev]))

In [39]:
print(max_len)

413


In [40]:
X_train_final, y_train_final = pad_and_tensorize(X_train, y_train, max_len)
X_dev_final, y_dev_final = pad_and_tensorize(X_dev, y_dev, max_len)

In [41]:
class CNNSpoofDetector(nn.Module):
    def __init__(self):
        super(CNNSpoofDetector, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=(3,3), padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.pool1 = nn.MaxPool2d(kernel_size=(2,2))

        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=(3,3), padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.pool2 = nn.MaxPool2d(kernel_size=(2,2))

        self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3,3), padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.pool3 = nn.AdaptiveMaxPool2d((1, 1))  # Output size: [batch, 64, 1, 1]
        
        self.fc = nn.Linear(64, 1)  
        
    def forward(self, x):
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        x = self.pool3(F.relu(self.bn3(self.conv3(x))))
        x = x.view(x.size(0), -1)  # flatten
        x = self.fc(x)
        return x

In [None]:
def train_model(X_train, y_train, X_dev, y_dev, run_name, config):
    wandb.finish()
    os.environ["WANDB_START_METHOD"] = "thread"
    # Initialize wandb experiment and log hyperparameters
    wandb.init(project="asvspoof-baseline", name=run_name, config=config, reinit=True)
    config = wandb.config  # Use wandb.config for convenient access
    
    # ===== Set device and initialize model =====
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = CNNSpoofDetector().to(device)
    criterion = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=config.lr)

    # ===== Prepare DataLoaders =====
    train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=config.batch_size, shuffle=True)
    dev_loader = DataLoader(TensorDataset(X_dev, y_dev), batch_size=config.batch_size, shuffle=False)

    # ===== Training loop =====
    best_eer = float("inf")  # Track best EER score
    best_model_path = 'best_model.pth'
    num_epochs = config.epochs
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        for X_batch, y_batch in train_loader:
        # Move data to device
            X_batch, y_batch = X_batch.to(device), y_batch.to(device).float()

            optimizer.zero_grad()           # Clear gradients
            outputs = model(X_batch).squeeze()  # Forward pass and remove singleton dimension
            loss = criterion(outputs, y_batch)  # Compute binary cross-entropy loss
            loss.backward()                # Backpropagate
            optimizer.step()               # Update model weights

            running_loss += loss.item()    # Accumulate batch loss

        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss:.4f}")

        # ------------------ Evaluate EER on development set ------------------
        model.eval()  # Set model to evaluation mode
        all_outputs = []
        all_labels = []

        with torch.no_grad():  # No gradient calculation needed during evaluation
            for X_dev, y_dev in dev_loader:
                X_dev, y_dev = X_dev.to(device), y_dev.to(device).float()
                outputs = model(X_dev).squeeze()  # Forward pass
                all_outputs.extend(outputs.cpu().numpy())  # Save outputs
                all_labels.extend(y_dev.cpu().numpy())     # Save true labels

        all_outputs = np.array(all_outputs)
        all_labels = np.array(all_labels)
        all_probs = 1 / (1 + np.exp(-all_outputs))  # Sigmoid
        bonafide_probs = all_probs[all_labels == 1]
        spoof_probs    = all_probs[all_labels == 0]
    

        # Compute Equal Error Rate and the threshold that achieves it
        eer, threshold = compute_eer(np.array(bonafide_probs), np.array(spoof_probs))
        print(f"Epoch {epoch+1} -- Dev EER: {eer:.4f}, Threshold: {threshold:.4f}")

        # Log metrics to wandb
        wandb.log({
            "epoch": epoch + 1,
            "loss": running_loss,
            "dev_eer": float(eer),
            "threshold": float(threshold)
        })

        if eer < best_eer:
            best_eer = eer
            torch.save(model.state_dict(), best_model_path)
            print(f"✅ Saved new best model with EER: {eer:.4f}")
        print(f"✅ Training complete. Best EER: {best_eer:.4f}")
    wandb.finish()  # Finalize wandb run
    return best_model_path

In [43]:
def generate_run_name(config, model_name="cnn"):
    """
    Generate a descriptive wandb run name based on model name, key hyperparameters, and timestamp.
    
    Example output:
        cnn-lr0.001-bs32-ep10-20250622-1730
    """
    timestamp = datetime.now().strftime("%Y%m%d-%H%M")
    
    # Optional: format lr nicely
    lr_str = f"{config['lr']:.0e}" if config['lr'] < 1e-2 else str(config['lr'])

    run_name = f"{model_name}-lr{lr_str}-bs{config['batch_size']}-ep{config['epochs']}-{timestamp}"
    return run_name

In [44]:
config = {
    "lr": 1e-3,
    "batch_size": 32,
    "epochs": 200,
    "best_model_path": "best_model.pth"
}

In [45]:
run_name = generate_run_name(config, model_name="cnn")

In [46]:
print(run_name)

cnn-lr1e-03-bs32-ep200-20250623-0613


In [None]:
best_model_path = train_model(X_train_final, y_train_final, X_dev_final, y_dev_final, run_name, config)

Epoch 1/200, Loss: 107.0072
Epoch 1 -- Dev EER: 0.0667, Threshold: 0.1032
✅ Saved new best model with EER: 0.0667
✅ Training complete. Best EER: 0.0667
Epoch 2/200, Loss: 41.9766
Epoch 2 -- Dev EER: 0.0503, Threshold: 0.3996
✅ Saved new best model with EER: 0.0503
✅ Training complete. Best EER: 0.0503
Epoch 3/200, Loss: 24.8991
Epoch 3 -- Dev EER: 0.0483, Threshold: 0.0349
✅ Saved new best model with EER: 0.0483
✅ Training complete. Best EER: 0.0483
Epoch 4/200, Loss: 18.3727
Epoch 4 -- Dev EER: 0.0360, Threshold: 0.3394
✅ Saved new best model with EER: 0.0360
✅ Training complete. Best EER: 0.0360
Epoch 5/200, Loss: 13.5709
Epoch 5 -- Dev EER: 0.0361, Threshold: 0.0071
✅ Training complete. Best EER: 0.0360
Epoch 6/200, Loss: 10.3497
Epoch 6 -- Dev EER: 0.0271, Threshold: 0.1600
✅ Saved new best model with EER: 0.0271
✅ Training complete. Best EER: 0.0271
Epoch 7/200, Loss: 9.1370
Epoch 7 -- Dev EER: 0.0349, Threshold: 0.0029
✅ Training complete. Best EER: 0.0271
Epoch 8/200, Loss: 8.82

0,1
dev_eer,▄▃▃▂▄▂▂▂▂▂▂▄▂▂▃▂▁█▃▂▂▂▁▁▂▁▁▂▁▁▁▁▂▂█▂▂▂▂▂
epoch,▁▁▁▂▂▂▂▂▂▃▃▃▃▃▄▄▄▄▄▄▅▅▅▆▆▆▆▆▆▆▆▆▆▆▇▇▇▇██
loss,█▄▄▃▃▁▁▁▁▁▁▁▁▁▂▁▁▁▁▁▁▂▂▁▁▃▁▁▂▂▂▁▁▁▁▁▂▂▁▁
threshold,█▂▅▄▆▅▄▃▆▅▅▁▅▅▃▂▂▁▂▂▁▂▄▂▂▂▂▁▁▁▂▁▁▂▁▁▁▁▁▁

0,1
dev_eer,0.02081
epoch,200.0
loss,0.00078
threshold,1e-05


In [65]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNNSpoofDetector().to(device)

criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# DataLoader
train_loader = DataLoader(TensorDataset(X_train_final, y_train_final), batch_size=32, shuffle=True)
dev_loader = DataLoader(TensorDataset(X_dev_final, y_dev_final), batch_size=64, shuffle=False)

In [49]:
# # Initialize the CNN model and move it to the appropriate device (CPU or GPU)
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Use GPU if available, otherwise fallback to CPU
# model = CNNSpoofDetector().to(device)

# # Define the loss function: binary cross-entropy with logits (for 0 vs. 1 classification)
# criterion = nn.BCEWithLogitsLoss()

# # Use Adam optimizer for model training
# optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
# best_eer = float('inf')
# best_model_path = 'best_model.pth'

# # Loop over the number of training epochs
# num_epochs = 5
# for epoch in range(num_epochs):
#     model.train()  # Set the model to training mode
#     running_loss = 0.0  # Accumulate loss over batches

#     for X_batch, y_batch in train_loader:
#         # Move data to device
#         X_batch, y_batch = X_batch.to(device), y_batch.to(device).float()

#         optimizer.zero_grad()           # Clear gradients
#         outputs = model(X_batch).squeeze()  # Forward pass and remove singleton dimension
#         loss = criterion(outputs, y_batch)  # Compute binary cross-entropy loss
#         loss.backward()                # Backpropagate
#         optimizer.step()               # Update model weights

#         running_loss += loss.item()    # Accumulate batch loss

#     print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss:.4f}")

#     # ------------------ Evaluate EER on development set ------------------
#     model.eval()  # Set model to evaluation mode
#     all_outputs = []
#     all_labels = []

#     with torch.no_grad():  # No gradient calculation needed during evaluation
#         for X_dev, y_dev in dev_loader:
#             X_dev, y_dev = X_dev.to(device), y_dev.to(device).float()
#             outputs = model(X_dev).squeeze()  # Forward pass
#             all_outputs.extend(outputs.cpu().numpy())  # Save outputs
#             all_labels.extend(y_dev.cpu().numpy())     # Save true labels

#     all_outputs = np.array(all_outputs)
#     all_labels = np.array(all_labels)
#     all_probs = 1 / (1 + np.exp(-all_outputs))  # Sigmoid
#     bonafide_probs = all_probs[all_labels == 1]
#     spoof_probs    = all_probs[all_labels == 0]
    

#     # Compute Equal Error Rate and the threshold that achieves it
#     eer, threshold = compute_eer(np.array(bonafide_probs), np.array(spoof_probs))
#     print(f"Epoch {epoch+1} -- Dev EER: {eer:.4f}, Threshold: {threshold:.4f}")

#     if eer < best_eer:
#         best_eer = eer
#         torch.save(model.state_dict(), best_model_path)
#         print(f"✅ Saved new best model with EER: {eer:.4f}") 

In [None]:
model = CNNSpoofDetector().to(device)
model.load_state_dict(torch.load(best_model_path))
model.eval()

all_outputs, all_labels = [], []
with torch.no_grad():
    for X_dev, y_dev in dev_loader:
        X_dev, y_dev = X_dev.to(device), y_dev.to(device).float()
        outputs = model(X_dev).squeeze()
        all_outputs.extend(outputs.cpu().numpy())
        all_labels.extend(y_dev.cpu().numpy())

all_outputs = np.array(all_outputs)
all_labels = np.array(all_labels)
all_probs = 1 / (1 + np.exp(-all_outputs))  # Sigmoid
bonafide_probs = all_probs[all_labels == 1]
spoof_probs = all_probs[all_labels == 0]

eer, threshold = compute_eer(np.array(bonafide_probs), np.array(spoof_probs))
print(f"Final evaluation -- EER: {eer:.4f}, Threshold: {threshold:.4f}")

  model.load_state_dict(torch.load(best_model_path))


Final evaluation -- EER: 0.0176, Threshold: 0.0002


In [51]:
# model_eer_0341_path = 'cnn_mfcc_eer_0.0341_0617.pth'
# torch.save(model.state_dict(), model_eer_0341_path)
# print(f"✅ model has saved as {model_eer_0341_path}")

In [67]:
# Turn logits into binary predictions
y_pred = (np.array(all_outputs) > threshold).astype(int)

# Print classification report
print("Classification Report:")
print(classification_report(all_labels, y_pred))

# Print confusion matrix
print("Confusion Matrix:")
print(confusion_matrix(all_labels, y_pred))

Classification Report:
              precision    recall  f1-score   support

         0.0       0.99      1.00      0.99     22296
         1.0       0.99      0.87      0.93      2548

    accuracy                           0.99     24844
   macro avg       0.99      0.94      0.96     24844
weighted avg       0.99      0.99      0.99     24844

Confusion Matrix:
[[22279    17]
 [  326  2222]]
