In [4]:
import torch
import numpy as np
import fairseq
import os
import matplotlib.pyplot as plt
import librosa
from tqdm import tqdm
from sklearn.manifold import TSNE
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_curve, auc
from fairseq.checkpoint_utils import load_model_ensemble_and_task
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import torch.nn as nn
import torch.optim as optim

ModuleNotFoundError: No module named 'fairseq'

In [None]:
input_audio, sample_rate = librosa.load("/content/bla.wav",  sr=16000)

model_name = "facebook/wav2vec2-large-xlsr-53"
feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(model_name)
model = Wav2Vec2Model.from_pretrained(model_name)

i= feature_extractor(input_audio, return_tensors="pt", sampling_rate=sample_rate)
with torch.no_grad():
  o= model(i.input_values)
print(o.keys())
print(o.last_hidden_state.shape)
print(o.extract_features.shape)

In [None]:
class SSLModel(nn.Module):
    def __init__(self, device):
        super(SSLModel, self).__init__()
        cp_path = 'xlsr2_300m.pt'
        model, _, _ = fairseq.checkpoint_utils.load_model_ensemble_and_task([cp_path])
        self.model = model[0]
        self.device = device
        self.out_dim = 1024
    
    def extract_feat(self, input_data):
        if next(self.model.parameters()).device != input_data.device or next(self.model.parameters()).dtype != input_data.dtype:
            self.model.to(input_data.device, dtype=input_data.dtype)
            self.model.train()

        if input_data.ndim == 3:
            input_tmp = input_data[:, :, 0]
        else:
            input_tmp = input_data
        
        emb = self.model(input_tmp, mask=False, features_only=True)['x']
        return emb

In [None]:
def extract_features(audio_file_path, device, model):
    # Load and preprocess the audio file
    audio, sr = librosa.load(audio_file_path, sr=16000)
    duration = librosa.get_duration(y=audio, sr=sr)
    min_duration = 4.0  # Adjust as needed
    if duration < min_duration:
        pad_samples = int((min_duration - duration) * sr)
        audio = np.pad(audio, (0, pad_samples), mode='constant')
    elif duration > min_duration:
        audio = audio[:int(min_duration * sr)]
    
    audio_reshaped = np.reshape(audio, (1, -1))
    
    input_values = torch.tensor(audio_reshaped, dtype=torch.float32).unsqueeze(0).to(device)
    
    with torch.no_grad():
        # Forward pass through the model
        features = model.extract_feat(input_values)
    
    return features.cpu().numpy().squeeze()


def read_labels(labels_file):
    labels_dict = {}
    with open(labels_file, 'r') as file:
        for line in file:
            parts = line.strip().split()
            audio_name = parts[1]
            label = parts[4]
            label = 1 if label == 'spoof' else 0
            labels_dict[audio_name] = label
    return labels_dict

Extract features for training, validation, and testing sets:

In [None]:
if __name__ == "__main__":
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = SSLModel(device)
    
    for phase in ['Train', 'dev', 'eval']:
        audio_path = f"F:\\Awais_data\\Datasets\\PartialSpoof\\{phase.lower()}\\con_wav"
        labels_file = f"F:\\Awais_data\\Datasets\\PartialSpoof\\protocols\\PartialSpoof_LA_cm_protocols\\PartialSpoof.LA.cm.{phase.lower()}.trl.txt"
        output_features_file = f"F:\\Awais_data\\Datasets\\PartialSpoof\\Features\\training\\SSL\\XLSR_{phase.lower()}_features.npy"
        output_labels_file = f"F:\\Awais_data\\Datasets\\PartialSpoof\\Features\\training\\SSL\\XLSR_{phase.lower()}_labels.npy"
        
        labels_dict = read_labels(labels_file)
        
        features_list = []
        labels_list = []
        
        for audio_name, label in tqdm(labels_dict.items(), desc=f"Extracting features for {phase}"):
            audio_file = os.path.join(audio_path, audio_name + ".wav")
            if not os.path.exists(audio_file):
                print(f"File '{audio_file}' not found. Skipping...")
                continue
            features = extract_features(audio_file, device, model)
            features_list.append(features)
            labels_list.append(label)
        
        features_array = np.array(features_list)
        labels_array = np.array(labels_list)
        
        np.save(output_features_file, features_array)
        np.save(output_labels_file, labels_array)

Train Light CNN and MLP Models
  
  1#Prepare the dataset and dataloaders:

In [None]:
class AudioDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.features[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.long)

# Load features and labels
X_train = np.load('F:\\Awais_data\\Datasets\\PartialSpoof\\Features\\training\\SSL\\Train_features.npy')
y_train = np.load('F:\\Awais_data\\Datasets\\PartialSpoof\\Features\\training\\SSL\\Train_labels.npy')
X_val = np.load('F:\\Awais_data\\Datasets\\PartialSpoof\\Features\\training\\SSL\\dev_features.npy')
y_val = np.load('F:\\Awais_data\\Datasets\\PartialSpoof\\Features\\training\\SSL\\dev_labels.npy')
X_test = np.load('F:\\Awais_data\\Datasets\\PartialSpoof\\Features\\training\\SSL\\eval_features.npy')
y_test = np.load('F:\\Awais_data\\Datasets\\PartialSpoof\\Features\\training\\SSL\\eval_labels.npy')

train_dataset = AudioDataset(X_train, y_train)
val_dataset = AudioDataset(X_val, y_val)
test_dataset = AudioDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

Define and train the Light CNN and MLP models:

In [None]:
class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return self.softmax(x)

# Define and train MLP
mlp = MLP(input_dim=1024, hidden_dim=512, output_dim=2).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(mlp.parameters(), lr=0.001)

def train_model(model, train_loader, val_loader, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        for features, labels in train_loader:
            features, labels = features.to(device), labels.to(device)
            outputs = model(features)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        # Validation phase
        model.eval()
        val_loss = 0
        correct = 0
        with torch.no_grad():
            for features, labels in val_loader:
                features, labels = features.to(device), labels.to(device)
                outputs = model(features)
                val_loss += criterion(outputs, labels).item()
                _, predicted = torch.max(outputs, 1)
                correct += (predicted == labels).sum().item()
        
        print(f'Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss/len(val_loader)}, Accuracy: {correct/len(val_dataset)}')

train_model(mlp, train_loader, val_loader)