In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [2]:
!pip install librosa soundfile



In [3]:
import os
import numpy as np
import librosa
import torch
from torch.utils.data import Dataset, DataLoader
from scipy.fftpack import dct

In [4]:
def get_label(filename):
  # 0 - og
  # 1 - spoofed
    if "_it" in filename:
        return 1
    else:
        return 0

In [5]:
SAMPLE_RATE = 16000
WINDOW_SEC = 3.0
HOP_SEC = 1.0
WINDOW_SAMPLES = int(WINDOW_SEC * SAMPLE_RATE)
HOP_SAMPLES = int(HOP_SEC * SAMPLE_RATE)

In [6]:
def extract_logmel(y, sr):
    mel = librosa.feature.melspectrogram(
        y=y,
        sr=sr,
        n_fft=1024,
        hop_length=256,
        n_mels=80,
        power=2.0
    )
    return librosa.power_to_db(mel, ref=np.max)

In [7]:
def extract_lfcc(y, sr, n_lfcc=40, n_fft=1024, hop_length=256, n_filters=128):
    stft = librosa.stft(y, n_fft=n_fft, hop_length=hop_length)
    magnitude = np.abs(stft)
    power_spectrum = magnitude ** 2

    linear_filters = create_linear_filterbank(
        n_filters=n_filters,
        n_fft=n_fft,
        sr=sr
    )
    filtered = np.dot(linear_filters, power_spectrum)
    log_filtered = np.log(filtered + 1e-10)
    lfcc = dct(log_filtered, type=2, axis=0, norm='ortho')[:n_lfcc]
    lfcc = apply_cms(lfcc)

    return lfcc


def create_linear_filterbank(n_filters, n_fft, sr):

    n_freq_bins = n_fft // 2 + 1
    freq_bins = np.linspace(0, sr / 2, n_freq_bins)
    center_freqs = np.linspace(0, sr / 2, n_filters + 2)
    filterbank = np.zeros((n_filters, n_freq_bins))

    for i in range(n_filters):
        left = center_freqs[i]
        center = center_freqs[i + 1]
        right = center_freqs[i + 2]

        rising_mask = (freq_bins >= left) & (freq_bins <= center)
        filterbank[i, rising_mask] = (freq_bins[rising_mask] - left) / (center - left)

        falling_mask = (freq_bins >= center) & (freq_bins <= right)
        filterbank[i, falling_mask] = (right - freq_bins[falling_mask]) / (right - center)

    return filterbank

def apply_cms(features):
    mean = np.mean(features, axis=1, keepdims=True)
    return features - mean

In [8]:
class ReplayDataset(Dataset):
    def __init__(self, root_dir):
        self.samples = []
        for fname in os.listdir(root_dir):
            if not fname.lower().endswith('.wav'):
                continue

            path = os.path.join(root_dir, fname)
            label = get_label(fname)

            y, sr = librosa.load(path, sr=SAMPLE_RATE)

            for start in range(0, len(y) - WINDOW_SAMPLES + 1, HOP_SAMPLES):
                chunk = y[start:start + WINDOW_SAMPLES]

                self.samples.append((chunk, label, fname))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        chunk, label, fname = self.samples[idx]

        lfcc = extract_lfcc(chunk, SAMPLE_RATE, n_lfcc=40)
        lfcc = torch.tensor(lfcc).unsqueeze(0)  # [1, 40, T]

        return (
            lfcc.float(),
            torch.tensor(label).long(),
            fname
        )

In [9]:
base_path = "/content/drive/MyDrive/audio_wav"
train_ds = ReplayDataset(os.path.join(base_path, "train"))
val_ds   = ReplayDataset(os.path.join(base_path, "valid"))
test_ds  = ReplayDataset(os.path.join(base_path, "test"))

In [10]:
train = DataLoader(train_ds, batch_size=32, shuffle=True) #??
val   = DataLoader(val_ds, batch_size=32, shuffle=False)
test  = DataLoader(test_ds, batch_size=32, shuffle=False)

In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [12]:
class MFM(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1):
        super().__init__()
        self.conv = nn.Conv2d(
            in_channels,
            out_channels * 2,
            kernel_size=kernel_size,
            stride=stride,
            padding=padding
        )
    def forward(self, x):
        x = self.conv(x)
        # Split channels into two halves
        c = x.shape[1] // 2
        x1, x2 = x[:, :c, :, :], x[:, c:, :, :]
        return torch.max(x1, x2)

In [13]:
class LCNNBlock(nn.Module):
    def __init__(self, in_ch, out_ch, kernel_size=3, stride=1, padding=1):
        super().__init__()
        self.mfm = MFM(in_ch, out_ch, kernel_size, stride, padding)
        self.bn = nn.BatchNorm2d(out_ch)

    def forward(self, x):
        x = self.mfm(x)
        x = self.bn(x)
        return x

In [14]:
class LCNN(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()

        self.features = nn.Sequential(
            LCNNBlock(1, 32),
            nn.MaxPool2d(2),

            LCNNBlock(32, 64),
            nn.MaxPool2d(2),

            LCNNBlock(64, 128),
            nn.MaxPool2d(2),

            LCNNBlock(128, 256),
            nn.MaxPool2d(2),
        )

        self.global_pool = nn.AdaptiveAvgPool2d((1, 1))

        self.classifier = nn.Linear(256, num_classes)
    def forward(self, x):
        x = self.features(x)
        x = self.global_pool(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

In [15]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = LCNN(num_classes=2).to(device)

In [16]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(
    model.parameters(),
    lr=1e-4,
    weight_decay=1e-5
)

In [17]:
dummy = torch.randn(4, 1, 80, 300).to(device)
out = model(dummy)
print(out.shape) #[4, 2]

torch.Size([4, 2])


In [18]:
def train_one_epoch(model, loader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for mel, label, _ in loader:
        mel = mel.to(device)       # (B, 1, 80, T)
        label = label.to(device)   # (B,)

        optimizer.zero_grad()

        logits = model(mel)
        loss = criterion(logits, label)

        loss.backward()
        optimizer.step()

        running_loss += loss.item() * mel.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == label).sum().item()
        total += label.size(0)

    avg_loss = running_loss / total
    acc = correct / total

    return avg_loss, acc


In [19]:
@torch.no_grad()
def evaluate(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    for mel, label, _ in loader:
        mel = mel.to(device)
        label = label.to(device)

        logits = model(mel)
        loss = criterion(logits, label)

        running_loss += loss.item() * mel.size(0)
        _, preds = torch.max(logits, 1)
        correct += (preds == label).sum().item()
        total += label.size(0)

    return running_loss / total, correct / total

In [20]:
num_epochs = 20
best_val_acc = 0.0

patience = 5
counter = 0
min_delta = 1e-4

SAVE_PATH = "/content/drive/MyDrive/best_lcnn_model.pth"

for epoch in range(num_epochs):
    train_loss, train_acc = train_one_epoch(
        model, train, optimizer, criterion, device
    )

    val_loss, val_acc = evaluate(
        model, val, criterion, device
    )

    if val_acc > best_val_acc + min_delta:
        best_val_acc = val_acc
        counter = 0   # reset patience counter
        torch.save(model.state_dict(), SAVE_PATH)
        print(f"Best model saved at epoch {epoch+1}")
    else:
        counter += 1
        print(f"No improvement for {counter}/{patience} epochs")

    print(
        f"Epoch {epoch+1}/{num_epochs} | "
        f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
        f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}"
    )

    if counter >= patience:
        print("\nEarly stopping triggered.")
        break

print(f"\nBest Val Acc: {best_val_acc:.4f}")
print(f"Best model saved to: {SAVE_PATH}")


Best model saved at epoch 1
Epoch 1/20 | Train Loss: 0.1883, Train Acc: 0.9305 | Val Loss: 0.0906, Val Acc: 0.9477
Best model saved at epoch 2
Epoch 2/20 | Train Loss: 0.0505, Train Acc: 0.9865 | Val Loss: 0.0332, Val Acc: 0.9869
Best model saved at epoch 3
Epoch 3/20 | Train Loss: 0.0362, Train Acc: 0.9891 | Val Loss: 0.0348, Val Acc: 0.9935
No improvement for 1/5 epochs
Epoch 4/20 | Train Loss: 0.0189, Train Acc: 0.9957 | Val Loss: 0.0437, Val Acc: 0.9771
No improvement for 2/5 epochs
Epoch 5/20 | Train Loss: 0.0156, Train Acc: 0.9954 | Val Loss: 0.0476, Val Acc: 0.9804
No improvement for 3/5 epochs
Epoch 6/20 | Train Loss: 0.0124, Train Acc: 0.9974 | Val Loss: 0.0205, Val Acc: 0.9935
No improvement for 4/5 epochs
Epoch 7/20 | Train Loss: 0.0081, Train Acc: 0.9984 | Val Loss: 0.0255, Val Acc: 0.9935
No improvement for 5/5 epochs
Epoch 8/20 | Train Loss: 0.0039, Train Acc: 1.0000 | Val Loss: 0.0297, Val Acc: 0.9869

Early stopping triggered.

Best Val Acc: 0.9935
Best model saved to: 

In [21]:
from collections import defaultdict
import torch.nn.functional as F
import numpy as np

In [28]:
model.load_state_dict(torch.load("/content/drive/MyDrive/best_lcnn_model.pth"))
model.eval()

file_probs = defaultdict(list)   # file_id -> list of spoof probs
file_labels = {}                 # file_id -> true label

with torch.no_grad():
    for mels, labels, file_ids in test:
        mels = mels.to(device)
        labels = labels.to(device)

        logits = model(mels)              # (B, 2)
        probs = F.softmax(logits, dim=1)  # (B, 2)
        spoof_probs = probs[:, 1]         # class-1 = spoof

        for i, fid in enumerate(file_ids):
            file_probs[fid].append(spoof_probs[i].item())
            file_labels[fid] = labels[i].item()

In [29]:
y_true = []
y_score = []

for fid in file_probs:
    y_true.append(file_labels[fid])
    y_score.append(np.mean(file_probs[fid]))  # mean over chunks

y_true = np.array(y_true)
y_score = np.array(y_score)


In [41]:
import numpy as np
from sklearn.metrics import roc_curve
file_ids = list(file_probs.keys())

y_score = np.array([np.mean(file_probs[fid]) for fid in file_ids])
y_true  = np.array([file_labels[fid] for fid in file_ids])  # 0=bonafide, 1=spoof

threshold = 0.5
y_pred = (y_score >= threshold).astype(int)

file_acc = (y_pred == y_true).mean()
file_error = 1.0 - file_acc

print(f"File-level Accuracy: {file_acc:.4f}")
print(f"File-level Error Rate: {file_error:.4f}")

TP = np.sum((y_pred == 1) & (y_true == 1))  # spoof → spoof
TN = np.sum((y_pred == 0) & (y_true == 0))  # bonafide → bonafide
FP = np.sum((y_pred == 1) & (y_true == 0))  # bonafide → spoof
FN = np.sum((y_pred == 0) & (y_true == 1))  # spoof → bonafide
FAR = FP / (FP + TN + 1e-8)
FRR = FN / (FN + TP + 1e-8)

print(f"\nFAR (False Accept Rate): {FAR:.4f}")
print(f"FRR (False Reject Rate): {FRR:.4f}")
fpr, tpr, thresholds = roc_curve(y_true, y_score, pos_label=1)
fnr = 1 - tpr

eer_idx = np.argmin(np.abs(fpr - fnr))
eer = (fpr[eer_idx] + fnr[eer_idx]) / 2
eer_threshold = thresholds[eer_idx]

print(f"\nEER: {eer*100:.2f}%")
print(f"EER Threshold: {eer_threshold:.4f}")


File-level Accuracy: 1.0000
File-level Error Rate: 0.0000

FAR (False Accept Rate): 0.0000
FRR (False Reject Rate): 0.0000

EER: 0.00%
EER Threshold: 0.9479


In [34]:
def get_original_id(filename):
    name = filename.split(".")[0]
    return name.split("_it")[0] + ".wav"


In [35]:
train_originals = set()

for _, _, file_ids in train:
    for fid in file_ids:
        train_originals.add(get_original_id(fid))


In [36]:
test_originals = set()

for _, _, file_ids in test:
    for fid in file_ids:
        test_originals.add(get_original_id(fid))


In [37]:
leakage = train_originals & test_originals

print(f"Number of overlapping original files: {len(leakage)}")
print(leakage)


Number of overlapping original files: 0
set()
