In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [1]:
import os
import torch
import torchaudio
import librosa
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, roc_curve
import matplotlib.pyplot as plt
from tqdm import tqdm


In [2]:
class ASVspoofDataset(Dataset):
    def __init__(self, data_dir, label_file, sample_rate=16000):
        self.data_dir = data_dir
        self.sample_rate = sample_rate
        self.files, self.labels = self.load_labels(label_file)

    def load_labels(self, label_file):
        files = []
        labels = {}
        with open(label_file, 'r') as f:
            for line in f.readlines():
                parts = line.strip().split()
                filename = parts[1]  # Filename
                label = 1 if parts[-1] == 'spoof' else 0
                files.append(filename)
                labels[filename] = label
        return files, labels

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        filename = self.files[idx]
        label = self.labels[filename]
        filepath = os.path.join(self.data_dir, filename + ".flac")  # .flac extension
        
        waveform, sr = torchaudio.load(filepath)
        if sr != self.sample_rate:
            waveform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=self.sample_rate)(waveform)
        
        return waveform, torch.tensor(label, dtype=torch.long)


In [5]:
base_path = "./ASVspoof2019_root/LA"  # Base LA path
train_dir = os.path.join(base_path, "ASVspoof2019_LA_train", "flac")  # .flac folder path
train_label_file = os.path.join(base_path, "ASVspoof2019_LA_cm_protocols", "ASVspoof2019.LA.cm.train.trn.txt")



In [6]:
ls ./ASVspoof2019_root/LA/ASVspoof2019_LA_train/flac | head


ls: cannot access './ASVspoof2019_root/LA/ASVspoof2019_LA_train/flac': No such file or directory


In [8]:
import os
print(os.listdir("/kaggle/input/asvpoof-2019-dataset/LA/LA/ASVspoof2019_LA_train/flac")[:5])


['LA_T_9552332.flac', 'LA_T_2040122.flac', 'LA_T_5827423.flac', 'LA_T_8315701.flac', 'LA_T_2298291.flac']


In [9]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
import torchaudio

class ASVspoofDataset(Dataset):
    def __init__(self, data_dir, label_file, sample_rate=16000):
        self.data_dir = data_dir
        self.sample_rate = sample_rate
        self.files, self.labels = self.load_labels(label_file)

    def load_labels(self, label_file):
        files = []
        labels = {}
        with open(label_file, 'r') as f:
            for line in f.readlines():
                parts = line.strip().split()
                filename = parts[1]  # e.g., LA_T_1000137
                label = 1 if parts[-1] == 'spoof' else 0
                files.append(filename)
                labels[filename] = label
        return files, labels

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        filename = self.files[idx]
        label = self.labels[filename]
        filepath = os.path.join(self.data_dir, filename + ".flac")
        
        waveform, sr = torchaudio.load(filepath)
        if sr != self.sample_rate:
            waveform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=self.sample_rate)(waveform)
        
        return waveform, torch.tensor(label, dtype=torch.long)


In [16]:
class ASVspoofDataset(Dataset):
    def __init__(self, data_dir, label_file, sample_rate=16000, fixed_length=64000):
        self.data_dir = data_dir
        self.sample_rate = sample_rate
        self.fixed_length = fixed_length
        self.files, self.labels = self.load_labels(label_file)

    def load_labels(self, label_file):
        files = []
        labels = {}
        with open(label_file, 'r') as f:
            for line in f.readlines():
                parts = line.strip().split()
                filename = parts[1]
                label = 1 if parts[-1] == 'spoof' else 0
                files.append(filename)
                labels[filename] = label
        return files, labels

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        filename = self.files[idx]
        label = self.labels[filename]
        filepath = os.path.join(self.data_dir, filename + ".flac")

        waveform, sr = torchaudio.load(filepath)
        if sr != self.sample_rate:
            waveform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=self.sample_rate)(waveform)

        # Pad or trim to fixed length
        if waveform.shape[1] < self.fixed_length:
            padding = self.fixed_length - waveform.shape[1]
            waveform = torch.nn.functional.pad(waveform, (0, padding))
        else:
            waveform = waveform[:, :self.fixed_length]

        return waveform, torch.tensor(label, dtype=torch.long)


In [17]:
# Updated base path and label file path based on your structure
train_dir = "/kaggle/input/asvpoof-2019-dataset/LA/LA/ASVspoof2019_LA_train/flac"
train_label_file = "/kaggle/input/asvpoof-2019-dataset/LA/LA/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.train.trn.txt"

# Load dataset
train_dataset = ASVspoofDataset(data_dir=train_dir, label_file=train_label_file)
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)  # smaller batch size for Kaggle GPU


In [24]:
class ResBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1):
        super(ResBlock, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size, stride, padding=kernel_size//2)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size, stride, padding=kernel_size//2)
        self.bn2 = nn.BatchNorm1d(out_channels)

        # Shortcut layer to match dimensions
        self.shortcut = nn.Sequential()
        if in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv1d(in_channels, out_channels, kernel_size=1),
                nn.BatchNorm1d(out_channels)
            )

    def forward(self, x):
        residual = self.shortcut(x)
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))
        x += residual
        return F.relu(x)


class RawNet2ResNet(nn.Module):
    def __init__(self, input_dim=16000, num_classes=2):
        super(RawNet2ResNet, self).__init__()
        self.conv1 = nn.Conv1d(1, 32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm1d(32)
        self.resblock1 = ResBlock(32, 64)
        self.resblock2 = ResBlock(64, 64)
        
        # Corrected input_size for GRU
        self.gru = nn.GRU(64, 64, batch_first=True)
        self.fc = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.resblock1(x)
        x = self.resblock2(x)
        x, _ = self.gru(x.permute(0, 2, 1))  # shape: (batch, seq_len, features)
        x = self.fc(x[:, -1, :])             # use last GRU output
        return x



In [25]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = RawNet2ResNet().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

def train_model(model, train_loader, criterion, optimizer, epochs=5):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for waveforms, labels in tqdm(train_loader):
            waveforms, labels = waveforms.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(waveforms)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        print(f"Epoch {epoch+1}, Loss: {total_loss / len(train_loader):.4f}")


In [None]:
train_model(model, train_loader, criterion, optimizer)


  6%|▋         | 206/3173 [2:10:34<31:52:58, 38.69s/it]