In [None]:
# !pip uninstall torch torchaudio torchvision -y

Found existing installation: torch 2.2.2

Uninstalling torch-2.2.2:

  Successfully uninstalled torch-2.2.2

Found existing installation: torchaudio 2.2.2

Uninstalling torchaudio-2.2.2:

  Successfully uninstalled torchaudio-2.2.2

Found existing installation: torchvision 0.17.2

Uninstalling torchvision-0.17.2:

  Successfully uninstalled torchvision-0.17.2


[0m

In [None]:
# !pip install torch==2.2.2 torchaudio==2.2.2

Collecting torch==2.2.2

  Using cached torch-2.2.2-cp310-cp310-manylinux1_x86_64.whl (755.5 MB)

Collecting torchaudio==2.2.2

  Using cached torchaudio-2.2.2-cp310-cp310-manylinux1_x86_64.whl (3.3 MB)






















Installing collected packages: torch, torchaudio

Successfully installed torch-2.2.2 torchaudio-2.2.2


[0m

In [1]:
!pip install torch torchaudio torchvision datasets -q

In [27]:
import random
import IPython

import datasets

from datasets.utils import DownloadManager

import numpy as np

import torch
import torchaudio
import torchaudio.transforms as T

from torch.utils.data import DataLoader, Dataset

import os
import sys

from tqdm import tqdm
import torchvision
import soundfile as sf
import torch.nn as nn
import torchvision.models as models
from librosa import util
import librosa.feature

In [28]:
sys.path.append('src')

In [33]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cuda'

# Load Dataset .zip from link

Logical access (LA):

- ```speaker_id:``` LA_****, a 4-digit speaker ID
- ```audio_file_name:``` name of the audio file
- ```audio:``` '****.flac'  the path to the downloaded audio file in FLAC format (https://xiph.org/flac/).
- ```system_id:``` ID of the speech spoofing system (A01 - A19), or, for bonafide speech SYSTEM-ID is left blank ('-')
- ```key:``` 'bonafide' for genuine speech, or, 'spoof' for spoofing speech

In [18]:
# # For kaggle
# import sys
# sys.path.append('/kaggle/input/modulesspeechdetection')


In [29]:
%%time
from load_avsspoof19 import ASVspoof2019

dl_manager = DownloadManager()

speech_dataset = ASVspoof2019()
asv_datasets = speech_dataset._split_generators(dl_manager)

Computing checksums: 100%|##########| 1/1 [00:23<00:00, 23.53s/it]

CPU times: user 22.5 s, sys: 1.18 s, total: 23.7 s
Wall time: 1min


In [30]:
# 0 - train, 1 - eval, 2 - test in asv_datasets
train_metadata_filepath = asv_datasets[0].gen_kwargs["metadata_filepath"]
train_audios_dir = asv_datasets[0].gen_kwargs["audios_dir"]

val_metadata_filepath = asv_datasets[1].gen_kwargs["metadata_filepath"]
val_audios_dir = asv_datasets[1].gen_kwargs["audios_dir"]

test_metadata_filepath = asv_datasets[2].gen_kwargs["metadata_filepath"]
test_audios_dir = asv_datasets[2].gen_kwargs["audios_dir"]

In [31]:
train_samples = speech_dataset._generate_examples(train_metadata_filepath, train_audios_dir)
val_samples = speech_dataset._generate_examples(val_metadata_filepath, val_audios_dir)
test_samples = speech_dataset._generate_examples(test_metadata_filepath, test_audios_dir)

## Listen to random sample

Display some audio from train/validation set.

In [34]:
def get_sample(samples):
    return random.sample(samples, k=1)[0]

sample = get_sample(train_samples)
sample

{'speaker_id': 'LA_0083',
 'audio_file_name': 'LA_T_7655679',
 'system_id': 'A02',
 'key': 'spoof',
 'audio': '/root/.cache/huggingface/datasets/downloads/extracted/911103f86670b6f7e96211444d0f39fc5ffab511156a395f67b098c2f45dce18/LA/ASVspoof2019_LA_train/flac/LA_T_7655679.flac'}

In [35]:
print(sample['key'])
IPython.display.Audio(sample['audio'])

spoof


In [37]:
sample = get_sample(val_samples)
sample

{'speaker_id': 'LA_0102',
 'audio_file_name': 'LA_D_9354139',
 'system_id': '-',
 'key': 'bonafide',
 'audio': '/root/.cache/huggingface/datasets/downloads/extracted/911103f86670b6f7e96211444d0f39fc5ffab511156a395f67b098c2f45dce18/LA/ASVspoof2019_LA_dev/flac/LA_D_9354139.flac'}

In [39]:
print(sample['key'])
IPython.display.Audio(sample['audio'])

bonafide


## Audio preprocessing


In [40]:
def audio_preprocess(waveform, sample_rate, resample_rate, desired_duration):
    """
        Resample audio to target frequency (16 kHz or 22.05 kHz) \
        Set equal duration for all audios
    """
    resampler = T.Resample(sample_rate, resample_rate)
    resampled_waveform = resampler(waveform)

    desired_length = int(desired_duration * resample_rate)
    if len(resampled_waveform) < desired_length:
        resampled_waveform = resampled_waveform.tile(((desired_length // resampled_waveform.shape[1]) + 1,))
    resampled_waveform = resampled_waveform[:,0: desired_length]

    return resampled_waveform


def peak_normalize(waveform):
    """
        Normalize audio
    """
    waveform /= torch.max(torch.abs(waveform))
    return waveform

In [41]:
class AudioDataset(Dataset):
    def __init__(self, raw_dataset, desired_duration, resample_rate, transform=None):
        self.raw_data = raw_dataset
        self.transform = transform
        self.sample_rate = resample_rate
        self.duration = desired_duration

    def __len__(self):
        return len(self.raw_data)

    def __getitem__(self, idx):
        waveform, sample_rate = torchaudio.load(self.raw_data[idx]['audio'])
        if self.transform:
            waveform = self.transform[0](waveform, sample_rate, self.sample_rate, self.duration)
            waveform = self.transform[1](waveform)

        label = 1 if self.raw_data[idx]['key'] == 'spoof' else 0
        return waveform, label

In [42]:
_DURATION = 6
_SAMPLE_RATE = 16_000

train_dataset = AudioDataset(train_samples, _DURATION, _SAMPLE_RATE, transform=[audio_preprocess, peak_normalize])
train_loader= DataLoader(train_dataset, batch_size=32, shuffle=True)

test_dataset = AudioDataset(test_samples, _DURATION, _SAMPLE_RATE, transform=[audio_preprocess, peak_normalize])
test_loader= DataLoader(test_dataset, batch_size=32, shuffle=True)

In [43]:
train_dataset[4][0].shape

torch.Size([1, 96000])

# Feature Extraction Definition
Log power spectrum (LPS), Mel-frequency cepstrum coefficient (MFCC), constant Q cepstral coefficient (CQCC).

In [48]:
from feature_ext import get_MFCC, get_CQCC, get_LPS

feature_extractor = 'MFCC'
# feature_extr.MFCC()

# Model definition

In [170]:
# model = torchvision.models.resnet18(pretrained=True)
# n_ftrs = model.fc.in_features
# model.fc=nn.Linear(n_ftrs,2)
# model.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
# model.conv1 = nn.Conv2d(1, 64, kernel_size=(9, 3), stride=(3, 1), padding=(1, 1), bias=False)

In [45]:
from models import TE_ResNet

In [56]:
#device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TE_ResNet(
    embed_size=32,
    num_layers=6,
    heads=8,
    ff_hidden_size=1024,
    dropout=0.1,
    device=device,
    max_length=200
).to(device)

# Instantiate the model
# model = TE_ResNet(
#     embed_size=32,
#     num_layers=6,
#     heads=8,
#     ff_hidden_size=1024,
#     dropout=0.1,
#     device=device,
#     max_length=188
# ).to(device)

#print(model)

# Training

In [57]:
def truncate_sequence(features, max_length):
    batch_size, seq_length, feature_dim = features.size()
    if seq_length > max_length:
        features = features[:, :max_length, :]
    else:
        padding = torch.zeros(batch_size, max_length - seq_length, feature_dim, device=features.device)
        features = torch.cat((features, padding), dim=1)
    return features

def train_loop(model, loader, feature_extractor, criterion, optimizer, n_epochs, device, max_length):
    feature_extr = {
        'MFCC': get_MFCC,
        'CQCC': get_CQCC,
        'LPS': get_LPS
    }

    for epoch in range(n_epochs):
        model.train(True)
        sum_loss = 0.0
        num_correct = 0
        total_samples = 0

        for data, labels in loader:
            data = data.to(device)
            labels = labels.to(device)

            # Debug: Print initial shapes
            #print(f"Data shape: {data.shape}")
            #print(f"Labels shape: {labels.shape}")

            # Extract features
            if feature_extractor in feature_extr:
                features = feature_extr[feature_extractor](data, loader.dataset.sample_rate)
            else:
                raise ValueError(f"Invalid feature extractor: {feature_extractor}")

            # Debug: Print feature shapes after extraction
            #print(f"Features shape after extraction: {features.shape}")

            features = features.to(device)
            features = truncate_sequence(features, max_length)  # Truncate or pad to max_length

            # Generate a mask of ones
            mask = torch.ones(features.shape[:2], device=device).bool()

            # Debug: Print shapes before model forward pass
            #print(f"Features shape before model: {features.shape}")
            #print(f"Mask shape: {mask.shape}")

            # Forward pass through the model
            output = model(features, mask)

            # Debug: Print output shape
            #print(f"Output shape: {output.shape}")
            #print(f"Labels shape: {labels.shape}")

            # Check for shape mismatch
            if output.shape[0] != labels.shape[0]:
                raise ValueError(f"Mismatch in batch sizes: output batch size {output.shape[0]}, labels batch size {labels.shape[0]}")

            # Compute loss
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

            sum_loss += loss.item()
            max_val, predicted = output.max(1)
            num_correct += (predicted == labels).sum().item()
            total_samples += data.size(0)

        train_accuracy = num_correct / total_samples
        train_avg_loss = sum_loss / len(loader)
        print(f'Epoch [{epoch+1}/{n_epochs}], Training Accuracy: {train_accuracy:.4f}, Training Loss: {train_avg_loss:.4f}')



In [49]:
num_epochs = 3
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [None]:
train_loop(
    model,
    train_loader,
    feature_extractor=feature_extractor,
    criterion=criterion,
    optimizer=optimizer,
    n_epochs=num_epochs,
    device=device,
    max_length=188
)

# Evaluation

In [50]:
import torch
from tqdm import tqdm

def evaluate_model(model, test_loader, feature_extractor, criterion, device, max_length):
    feature_extr = {
        'MFCC': get_MFCC,
        'CQCC': get_CQCC,
        'LPS': get_LPS
    }

    model.eval()
    test_loss = 0.0
    test_correct = 0
    total_test_samples = 0

    with torch.no_grad():
        for waveform, label in tqdm(test_loader):
            waveform, label = waveform.to(device), label.to(device)

            # Debug: Print initial shapes
            #print(f"Waveform shape: {waveform.shape}")
            #print(f"Label shape: {label.shape}")

            # Extract features
            if feature_extractor in feature_extr:
                features = feature_extr[feature_extractor](waveform, test_loader.dataset.sample_rate)
            else:
                raise ValueError(f"Invalid feature extractor: {feature_extractor}")

            # Debug: Print feature shapes after extraction
            #print(f"Features shape after extraction: {features.shape}")

            features = features.to(device)
            features = truncate_sequence(features, max_length)  # Truncate or pad to max_length

            # Generate a mask of ones
            mask = torch.ones(features.shape[:2], device=device).bool()

            # Debug: Print shapes before model forward pass
            #print(f"Features shape before model: {features.shape}")
            #print(f"Mask shape: {mask.shape}")

            # Forward pass through the model
            output = model(features, mask)

            # Debug: Print output shape
            #print(f"Output shape: {output.shape}")
            #print(f"Label shape: {label.shape}")

            # Check for shape mismatch
            if output.shape[0] != label.shape[0]:
                raise ValueError(f"Mismatch in batch sizes: output batch size {output.shape[0]}, label batch size {label.shape[0]}")

            # Compute loss
            loss = criterion(output, label)
            test_loss += loss.item()

            _, predicted = output.max(1)
            test_correct += (predicted == label).sum().item()
            total_test_samples += label.size(0)

    test_accuracy = test_correct / total_test_samples
    test_loss /= len(test_loader)

    print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')
    return test_loss, test_accuracy  # Return values for further analysis if needed


In [51]:
evaluate_model(
    model,
    test_loader,
    feature_extractor=feature_extractor,
    criterion=criterion,
    device=device,
    max_length=188
)

100%|██████████| 2227/2227 [26:03<00:00,  1.42it/s]

Test Loss: 1.0491, Test Accuracy: 0.1032





(1.0490612892704156, 0.10324690820781336)

In [None]:
# def train_loop(loader):
#     n_epochs = 5
#     for epoch in n_epochs:
#         for data, labels in loader:
#             feature_extr.MFCC() # or any other
#             output = model(data)
#             loss = ...
#     pass