In [57]:
import os
import numpy as np
import pandas as pd
import librosa
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import roc_auc_score, roc_curve
from tqdm import tqdm

In [39]:
DATA_ROOT = "/Users/ishaan/Documents/ASVspoof 2019 Dataset"
BATCH_SIZE = 64
N_LFCC = 60          # Number of LFCC coefficients
MAX_SEQ_LEN = 187   #  3 seconds frames
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
NUM_EPOCHS = 30

In [59]:
def load_protocol(split="train"):
    protocol_path = f"{DATA_ROOT}/protocols/ASVspoof.LA.{split}.txt"
    df = pd.read_csv(
        protocol_path, 
        sep=" ",  
        header=None, 
        names=["speaker_id", "filename", "label"]  
    )
    
    df["audio_path"] = df["filename"].apply(
        lambda x: os.path.join(DATA_ROOT, "audio", split, "flac", f"{x}.flac")
    )
    
    df["exists"] = df["audio_path"].apply(os.path.exists)
    print(f"Missing files in {split}: {len(df[~df['exists']])}")
    df = df[df["exists"]].drop(columns=["exists"])
    
    return df

In [22]:
train_df = load_protocol("train")
dev_df = load_protocol("dev")
eval_df = load_protocol("eval")

Missing files in train: 0
Missing files in dev: 0
Missing files in eval: 0


In [33]:
def extract_lfcc(audio_path, n_lfcc=60, sr=16000):
    """Extract LFCC features from audio (ASVspoof baseline)."""
    y, _ = librosa.load(audio_path, sr=sr)
    
    lfccs = librosa.feature.mfcc(
        y=y, sr=sr, n_mfcc=n_lfcc, dct_type=2, lifter=0,
        n_fft=512, hop_length=256, n_mels=64, fmin=0, fmax=8000
    )
    return lfccs.T  # Shape: (time, n_lfcc)

In [63]:
class ASVSpoofDataset(Dataset):
    def __init__(self, df):
        self.df = df
        self.label_map = {"bonafide": 0, "spoof": 1}

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        
        # Load and process LFCCs
        lfccs = extract_lfcc(row["audio_path"])
        
        # Pad/truncate to fixed length
        if lfccs.shape[0] < MAX_SEQ_LEN:
            pad = ((0, MAX_SEQ_LEN - lfccs.shape[0]), (0, 0))
            lfccs = np.pad(lfccs, pad, mode='constant')
        else:
            lfccs = lfccs[:MAX_SEQ_LEN]
            
        # Add channel dimension (for Conv2d)
        lfccs = torch.FloatTensor(lfccs).unsqueeze(0)  # (1, 187, 60)
        label = torch.tensor(self.label_map[row["label"]])
        
        return lfccs, label

In [65]:
class LCNN(nn.Module):
    def __init__(self, num_classes=2):
        super(LCNN, self).__init__()
        
        # Feature extraction
        self.conv1 = nn.Conv2d(1, 64, kernel_size=(5, 5), padding=(2, 2))
        self.bn1 = nn.BatchNorm2d(64)
        self.pool1 = nn.MaxPool2d((2, 2))
        
        # Deep feature learning
        self.conv2 = nn.Conv2d(64, 128, kernel_size=(3, 3), padding=(1, 1))
        self.bn2 = nn.BatchNorm2d(128)
        self.pool2 = nn.MaxPool2d((2, 2))
        
        # Temporal modeling
        self.gru = nn.GRU(128 * 15, 64, batch_first=True)  # Input: (batch, seq, features)
        
        # Classification
        self.fc = nn.Linear(64, num_classes)

    def forward(self, x):
        # Input shape: (batch, 1, 187, 60)
        x = self.pool1(torch.relu(self.bn1(self.conv1(x))))  # (b, 64, 93, 30)
        x = self.pool2(torch.relu(self.bn2(self.conv2(x))))   # (b, 128, 46, 15)
        
        # Prepare for GRU
        x = x.permute(0, 2, 1, 3).flatten(2)  # (batch, 46, 128*15=1920)
        x, _ = self.gru(x)                     # (batch, 46, 64)
        x = x[:, -1, :]                        # Last timestep
        
        return self.fc(x)

In [69]:
def train_epoch(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0
    for inputs, labels in tqdm(loader, desc="Training"):
        inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(loader)

def validate(model, loader, criterion):
    model.eval()
    total_loss = 0
    all_labels, all_scores = [], []
    
    with torch.no_grad():
        for inputs, labels in tqdm(loader, desc="Validation"):
            inputs = inputs.to(DEVICE)
            outputs = model(inputs)
            loss = criterion(outputs, labels.cpu())
            total_loss += loss.item()
            
            scores = torch.softmax(outputs.cpu(), dim=1)[:, 1]
            all_labels.extend(labels.numpy())
            all_scores.extend(scores.numpy())
    
    return total_loss / len(loader), compute_eer(all_labels, all_scores)

def compute_eer(labels, scores):
    fpr, tpr, _ = roc_curve(labels, scores)
    eer = fpr[np.nanargmin(np.abs(fpr - (1 - tpr)))]
    return eer

In [None]:
def main():
    # Load data
    train_df = load_protocol("train")
    dev_df = load_protocol("dev")
    train_dataset = ASVSpoofDataset(train_df)
    dev_dataset = ASVSpoofDataset(dev_df)
    
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
    dev_loader = DataLoader(dev_dataset, batch_size=BATCH_SIZE, num_workers=4)
    
    # Initialize model
    model = LCNN().to(DEVICE)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()
    
    # Training loop
    best_eer = 1.0
    for epoch in range(NUM_EPOCHS):
        train_loss = train_epoch(model, train_loader, optimizer, criterion)
        val_loss, val_eer = validate(model, dev_loader, criterion)
        
        print(f"Epoch {epoch+1}/{NUM_EPOCHS}")
        print(f"Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | Val EER: {val_eer:.4f}")