In [36]:
# Install dependencies directly from Jupyter Notebook
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
!pip install tqdm librosa soundfile scikit-learn

Looking in indexes: https://download.pytorch.org/whl/cpu
Collecting torch
  Downloading https://download.pytorch.org/whl/cpu/torch-2.6.0%2Bcpu-cp313-cp313-win_amd64.whl.metadata (28 kB)
Collecting torchvision
  Downloading https://download.pytorch.org/whl/cpu/torchvision-0.21.0%2Bcpu-cp313-cp313-win_amd64.whl.metadata (6.3 kB)
Collecting torchaudio
  Downloading https://download.pytorch.org/whl/cpu/torchaudio-2.6.0%2Bcpu-cp313-cp313-win_amd64.whl.metadata (6.7 kB)
Collecting filelock (from torch)
  Downloading https://download.pytorch.org/whl/filelock-3.13.1-py3-none-any.whl.metadata (2.8 kB)
Collecting networkx (from torch)
  Downloading https://download.pytorch.org/whl/networkx-3.3-py3-none-any.whl.metadata (5.1 kB)
Collecting fsspec (from torch)
  Downloading https://download.pytorch.org/whl/fsspec-2024.6.1-py3-none-any.whl.metadata (11 kB)
Collecting sympy==1.13.1 (from torch)
  Downloading https://download.pytorch.org/whl/sympy-1.13.1-py3-none-any.whl (6.2 MB)
     ---------------


[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [37]:
import requests
import tarfile
from tqdm import tqdm

# Step 2: Stream `.flac` files
def stream_flac_files_in_memory(tar_url, limit=200):
    """
    Stream a .tar archive and store `.flac` files in memory.
    
    Parameters:
        tar_url (str): URL of the .tar file.
        limit (int): Number of `.flac` files to process.
    
    Returns:
        list: A list of tuples containing (filename, audio_data).
    """
    response = requests.get(tar_url, stream=True, timeout=30)
    if response.status_code != 200:
        raise RuntimeError(f"Failed to download TAR archive: {tar_url} (Status code: {response.status_code})")
    
    # Track progress
    total_size = int(response.headers.get('content-length', 0))
    progress = tqdm(total=total_size, unit='B', unit_scale=True, desc="Streaming .tar")

    extracted_files = []
    extracted_count = 0

    # Process the `.tar` file
    with tarfile.open(fileobj=response.raw, mode="r|") as archive:
        for member in archive:
            if member.name.endswith(".flac"):  # Only `.flac` files
                flac_file = archive.extractfile(member)
                if flac_file:
                    audio_data = flac_file.read()
                    extracted_files.append((member.name, audio_data))
                    extracted_count += 1
                    if extracted_count >= limit:
                        break
            progress.update(member.size)

    progress.close()
    return extracted_files

# Example Usage: Stream the ASVspoof 5 dataset
zenodo_url = "https://zenodo.org/api/records/14498691"
response = requests.get(zenodo_url).json()
file_urls = [file['links']['self'] for file in response['files']]
tar_urls = [url for url in file_urls if url.endswith(".tar/content")]

# Stream the first .tar file
flac_files = stream_flac_files_in_memory(tar_urls[0], limit=200)
print(f"Streamed {len(flac_files)} `.flac` files successfully!")

Streaming .tar:   0%|▎                                                                         | 27.8M/6.65G [00:03<14:57, 7.38MB/s]

Streamed 200 `.flac` files successfully!





In [39]:
import soundfile as sf
import io

# Debug and refine validation process
def debug_and_validate_audio(file_name, audio_data):
    """
    Debug and validate audio files.
    
    Parameters:
        file_name (str): Name of the audio file.
        audio_data (bytes): Raw audio data.

    Returns:
        np.array: Validated audio array, or None if validation fails.
    """
    try:
        # Decode audio using PySoundFile
        with sf.SoundFile(io.BytesIO(audio_data)) as f:
            audio = f.read(dtype='float32')
            sample_rate = f.samplerate

        # Ensure audio contains finite values
        if not np.all(np.isfinite(audio)):
            raise ValueError("Audio contains non-finite values.")

        return audio, sample_rate
    except Exception as e:
        print(f"Validation/Debug Error for {file_name}: {e}")
        return None, None

# Debug and validate all files
validated_files = []
for file_name, audio_data in flac_files:
    validated_audio, sample_rate = debug_and_validate_audio(file_name, audio_data)
    if validated_audio is not None:
        validated_files.append((file_name, validated_audio, sample_rate))

print(f"Validated {len(validated_files)} files successfully!")

Validated 200 files successfully!


In [40]:
import librosa

# Step 4: Extract MFCC features from validated audio
def extract_mfcc_features(audio_data, sample_rate=16000):
    """
    Extract MFCC features from audio data.
    
    Parameters:
        audio_data (np.array): Validated audio array.
        sample_rate (int): Sampling rate for MFCC feature extraction.
    
    Returns:
        np.array: Mean MFCC feature array, or None if extraction fails.
    """
    try:
        # Extract 13 MFCCs
        mfcc = librosa.feature.mfcc(y=audio_data, sr=sample_rate, n_mfcc=13)
        return np.mean(mfcc, axis=1)
    except Exception as e:
        print(f"MFCC Extraction Error: {e}")
        return None

# Extract MFCCs for all validated files
features = []
for file_name, audio_data, sample_rate in validated_files:
    mfcc_features = extract_mfcc_features(audio_data, sample_rate=sample_rate)
    if mfcc_features is not None:
        features.append((file_name, mfcc_features))

print(f"Extracted MFCC features for {len(features)} files successfully!")

Extracted MFCC features for 200 files successfully!


In [41]:
import torch
from torch.utils.data import Dataset, DataLoader

# Step 5: Define a custom dataset class
class AudioDataset(Dataset):
    def __init__(self, features, labels):
        """
        Initialize the dataset with features and labels.

        Parameters:
            features (list): List of MFCC feature arrays.
            labels (list): List of binary labels (0 for real, 1 for spoof).
        """
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        """
        Fetch a single item (feature, label) by index.

        Parameters:
            idx (int): Index of the item.

        Returns:
            tuple: Feature tensor and label tensor.
        """
        return torch.tensor(self.features[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.float32)

# Create mock labels (binary classification: 0 = real, 1 = spoof)
# Replace these with actual labels if available
labels = [random.randint(0, 1) for _ in range(len(features))]

# Instantiate the dataset
audio_dataset = AudioDataset(
    features=[feature[1] for feature in features],  # Extract MFCC arrays
    labels=labels
)

# Split dataset into training and testing sets
train_size = int(0.8 * len(audio_dataset))
test_size = len(audio_dataset) - train_size

train_dataset, test_dataset = torch.utils.data.random_split(audio_dataset, [train_size, test_size])

# Prepare dataloaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

print(f"Prepared datasets: {len(train_dataset)} training samples, {len(test_dataset)} testing samples.")

Prepared datasets: 160 training samples, 40 testing samples.


In [42]:
import torch.nn as nn
import torch

# Step 6: Define AASIST model
class AASIST(nn.Module):
    def __init__(self):
        super(AASIST, self).__init__()
        # Input size: 13 (number of MFCC features)
        self.fc1 = nn.Linear(13, 64)  # First fully connected layer
        self.fc2 = nn.Linear(64, 32)  # Second fully connected layer
        self.fc3 = nn.Linear(32, 1)   # Output layer (binary classification)
        self.dropout = nn.Dropout(0.2)  # Dropout for regularization
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.dropout(torch.relu(self.fc2(x)))
        x = torch.sigmoid(self.fc3(x))  # Sigmoid for binary classification
        return x

# Initialize the model
model = AASIST()
print(model)

AASIST(
  (fc1): Linear(in_features=13, out_features=64, bias=True)
  (fc2): Linear(in_features=64, out_features=32, bias=True)
  (fc3): Linear(in_features=32, out_features=1, bias=True)
  (dropout): Dropout(p=0.2, inplace=False)
)


In [43]:
# Step 7: Train the AASIST model
def train_model(model, dataloader, criterion, optimizer, epochs=10):
    """
    Train the AASIST model.
    
    Parameters:
        model (nn.Module): AASIST model to train.
        dataloader (DataLoader): Training data loader.
        criterion: Loss function.
        optimizer: Optimization algorithm.
        epochs (int): Number of training epochs.
    """
    model.train()  # Set model to training mode
    for epoch in range(epochs):
        epoch_loss = 0
        for inputs, labels in dataloader:
            optimizer.zero_grad()  # Reset gradients
            outputs = model(inputs)  # Forward pass
            loss = criterion(outputs.squeeze(), labels)  # Compute loss
            loss.backward()  # Backward pass
            optimizer.step()  # Update weights
            epoch_loss += loss.item()  # Accumulate loss
        print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/len(dataloader):.4f}")

# Initialize loss function, optimizer, and train the model
criterion = nn.BCELoss()  # Binary Cross Entropy Loss
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  # Adam optimizer

# Train the model for 10 epochs
train_model(model, train_loader, criterion, optimizer, epochs=10)

Epoch 1/10, Loss: 2.3237
Epoch 2/10, Loss: 1.3685
Epoch 3/10, Loss: 1.4789
Epoch 4/10, Loss: 1.0559
Epoch 5/10, Loss: 1.0488
Epoch 6/10, Loss: 0.8370
Epoch 7/10, Loss: 0.8104
Epoch 8/10, Loss: 0.8543
Epoch 9/10, Loss: 0.8967
Epoch 10/10, Loss: 0.7870


In [44]:
from sklearn.metrics import roc_auc_score

# Step 8: Evaluate the AASIST model
def evaluate_model(model, dataloader):
    """
    Evaluate the AASIST model using AUC-ROC metric.
    
    Parameters:
        model (nn.Module): Trained AASIST model.
        dataloader (DataLoader): Testing data loader.
    
    Returns:
        float: AUC score for evaluation.
    """
    model.eval()  # Set model to evaluation mode
    all_labels = []
    all_preds = []
    with torch.no_grad():  # Disable gradient computation for evaluation
        for inputs, labels in dataloader:
            outputs = model(inputs).squeeze()  # Forward pass
            all_preds.extend(outputs.numpy())  # Collect predictions
            all_labels.extend(labels.numpy())  # Collect true labels
    
    # Compute AUC-ROC score
    auc_score = roc_auc_score(all_labels, all_preds)
    print(f"AUC Score: {auc_score:.4f}")
    return auc_score

# Evaluate the model on the test dataset
test_auc = evaluate_model(model, test_loader)

AUC Score: 0.5893
