# Audio Spoofing Detection Notebook

This notebook aims to detect spoofed audio using Symbolic Regression with PySR. 

In [2]:
# Setup and Initial Imports
%load_ext autoreload
%autoreload 2

import sys
import os
import time
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

# Add the project root folder to PYTHONPATH
sys.path.append(os.path.abspath('..'))

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Import Machine Learning and SR Libraries

In this cell, we import the necessary libraries for SR and data processing.

In [3]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, confusion_matrix, det_curve
from lightgbm import LGBMClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.svm import SVC

## Define File Paths

Define the paths for the processed features and protocol file. Make sure to adjust these paths as needed.

In [4]:
features_path = '../data/processed/filterbanks_features.pkl'
protocol_path = '../data/raw/LA/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.train.trn.txt'

## Load Features

Load the feature data from the pickle file.

In [5]:
with open(features_path, 'rb') as f:
    data = pickle.load(f)

In [9]:
data[0]['features'].shape

(640,)

## Read Protocol File

Define a function to read the protocol file, which maps audio files to labels, and load the DataFrame.

In [10]:
def read_cm_protocol(filepath: str) -> pd.DataFrame:
    """
    Reads the CM protocol file and returns a DataFrame.
    
    Args:
        filepath (str): Path to the protocol file.
    
    Returns:
        pd.DataFrame: DataFrame with speaker_id, audio_file, system_id, and key.
    """
    column_names = ["speaker_id", "audio_file", "system_id", "unused", "key"]
    df = pd.read_csv(filepath, sep=' ', names=column_names, index_col=False)
    df.drop(columns=["unused"], inplace=True)
    return df

protocol_df = read_cm_protocol(protocol_path)
print("Protocol Data (first 5 rows):")
print(protocol_df.head())

Protocol Data (first 5 rows):
  speaker_id    audio_file system_id       key
0    LA_0079  LA_T_1138215         -  bonafide
1    LA_0079  LA_T_1271820         -  bonafide
2    LA_0079  LA_T_1272637         -  bonafide
3    LA_0079  LA_T_1276960         -  bonafide
4    LA_0079  LA_T_1341447         -  bonafide


## Create DataFrame for Features and Labels

Create a DataFrame where each row contains a file name and its corresponding features. Label 1 represents "spoof" and 0 represents "bonafide".

In [11]:
label_dict = dict(zip(protocol_df['audio_file'], protocol_df['key']))
df_features = pd.DataFrame(
    [(d['file'], *d['features']) for d in data],
    columns=["file"] + [f"feature_{i}" for i in range(len(data[0]['features']))]
)

df_features["label"] = df_features["file"].apply(
    lambda x: 1 if label_dict.get(x.replace('.flac', '').replace('.wav', ''), 'bonafide') == 'spoof' else 0
)

print("Features DataFrame (first 5 rows):")
print(df_features.head())

Features DataFrame (first 5 rows):
                file  feature_0  feature_1  feature_2  feature_3  feature_4  \
0  LA_T_1000137.flac   0.000010   0.000022   0.000072   0.001275   0.046314   
1  LA_T_1000406.flac   0.000011   0.000016   0.000036   0.000889   0.011094   
2  LA_T_1000648.flac   0.000003   0.000003   0.000006   0.000099   0.001488   
3  LA_T_1000824.flac   0.000017   0.000050   0.000262   0.011847   0.271454   
4  LA_T_1001074.flac   0.000002   0.000004   0.000015   0.000821   0.015475   

   feature_5  feature_6  feature_7  feature_8  ...  feature_631  feature_632  \
0   0.565488   2.776286   6.029241  10.804525  ...     0.000097     0.000081   
1   0.033539   0.144185   1.887749   7.177183  ...     0.000002     0.000002   
2   0.003842   0.004142   0.003567   0.004978  ...     0.000042     0.000038   
3   2.040871   4.335693   3.998420   1.990429  ...     0.000004     0.000006   
4   0.060690   0.101386   0.129564   0.271895  ...     0.000003     0.000002   

   featur

## Data Exploration

Display the distribution of labels to verify the balance of the dataset.

In [12]:
label_dist = df_features['label'].value_counts().rename(index={0: 'bonafide', 1: 'spoof'})
label_pct = df_features['label'].value_counts(normalize=True).mul(100).rename(index={0: 'bonafide', 1: 'spoof'}).round(2)
df_label_summary = pd.DataFrame({'Count': label_dist, 'Percent (%)': label_pct})
print("Label Distribution:")
print(df_label_summary)

Label Distribution:
          Count  Percent (%)
label                       
spoof     22800        89.83
bonafide   2580        10.17


## Data Preprocessing

Separate the features (X) and labels (y), standardize the features, and split the data into training and testing sets.

In [13]:
X = df_features.drop(columns=["file", "label"]).values
y = df_features["label"].values

scaler = StandardScaler()
X = scaler.fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

## Dataset Statistics

Define a function to display the label distribution in the training and testing sets.

In [14]:
def dataset_stats(y_train: np.ndarray, y_test: np.ndarray) -> None:
    """
    Prints the distribution of labels for training and testing sets.
    
    Args:
        y_train (np.ndarray): Array of training labels.
        y_test (np.ndarray): Array of testing labels.
    """
    train_dist = pd.Series(y_train).value_counts().rename(index={0: 'bonafide', 1: 'spoof'})
    test_dist = pd.Series(y_test).value_counts().rename(index={0: 'bonafide', 1: 'spoof'})
    train_pct = pd.Series(y_train).value_counts(normalize=True).mul(100).rename(index={0: 'bonafide', 1: 'spoof'}).round(2)
    test_pct = pd.Series(y_test).value_counts(normalize=True).mul(100).rename(index={0: 'bonafide', 1: 'spoof'}).round(2)
    
    df_stats = pd.DataFrame({
        'Train Count': train_dist, 'Train %': train_pct,
        'Test Count': test_dist, 'Test %': test_pct
    })
    print("\nDataset distribution after split:")
    print(df_stats)

dataset_stats(y_train, y_test)


Dataset distribution after split:
          Train Count  Train %  Test Count  Test %
spoof           18233     89.8        4567   89.97
bonafide         2071     10.2         509   10.03


## Define, Train, and Evaluate Models

Define a dictionary of models, then train and evaluate each one. For each model, we calculate the training time, accuracy, confusion matrix, and Equal Error Rate (EER).

In [1]:
import sys
print(sys.executable)

/home/vicosbe/victo/audio_symbolic_regression/audio_symbolic_regression_env/bin/python


In [None]:
from pysr import PySRRegressor

In [None]:
# Definir modelos
models = {
    "PySR (Basic)": PySRRegressor(niterations=80, binary_operators=["+", "-", "*", "/"], unary_operators=["exp", "log"], loss="loss(x, y) = (x - y)^2", random_state=42),
    "PySR (Extended)": PySRRegressor(niterations=100, binary_operators=["+", "-", "*", "/"], unary_operators=["exp", "log", "sin", "cos"], loss="loss(x, y) = (x - y)^2", random_state=42),
    "PySR (Complex)": PySRRegressor(niterations=200, binary_operators=["+", "-", "*", "/", "pow"], unary_operators=["exp", "log", "sin", "cos", "abs"], loss="loss(x, y) = (x - y)^2", random_state=42)
}

# CSV para salvar métricas
csv_file = "model_metrics.csv"

if not os.path.exists(csv_file):
    pd.DataFrame(columns=["Model", "Training Time (sec)", "Accuracy", "EER"]).to_csv(csv_file, index=False)

print("\nTreinando modelos PySR...")

for name, model in tqdm(models.items(), desc="Models"):
    print(f"\n{name}:")
    start_time = time.time()

    model.fit(X_train, y_train)
    train_time = time.time() - start_time

    y_pred_proba = model.predict(X_test)
    y_pred = (y_pred_proba > 0.5).astype(int)

    fpr, fnr, thresholds = det_curve(y_test, y_pred_proba)
    idx = np.nanargmin(np.absolute(fnr - fpr))
    eer = fpr[idx]
    acc = accuracy_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)

    print(f"Tempo de treinamento: {train_time:.2f} segundos")
    print(f"Acurácia: {acc:.4f}")
    print("Matriz de Confusão:\n", cm)
    print(f"EER: {eer:.4f}")

    model_filename = f"{name.replace(' ', '_').replace('(', '').replace(')', '')}.pkl"
    with open(model_filename, "wb") as f:
        pickle.dump(model, f)
    print(f"Modelo salvo em {model_filename}")

    metrics = {"Model": name, "Training Time (sec)": round(train_time, 2), "Accuracy": round(acc, 4), "EER": round(eer, 4)}
    pd.DataFrame([metrics]).to_csv(csv_file, mode="a", header=False, index=False)
    print(f"Métricas para {name} adicionadas ao arquivo {csv_file}")

In [None]:
# Select 10 features

# Definir modelos
models = {
    "PySR_K10 (Basic)": PySRRegressor(niterations=40, binary_operators=["+", "-", "*", "/"], unary_operators=["exp", "log"], loss="loss(x, y) = (x - y)^2", random_state=42,select_k_features=10),
    "PySR_K10 (Extended)": PySRRegressor(niterations=60, binary_operators=["+", "-", "*", "/"], unary_operators=["exp", "log", "sin", "cos"], loss="loss(x, y) = (x - y)^2", random_state=42,select_k_features=10),
    "PySR_K10 (Complex)": PySRRegressor(niterations=80, binary_operators=["+", "-", "*", "/", "pow"], unary_operators=["exp", "log", "sin", "cos", "abs"], loss="loss(x, y) = (x - y)^2", random_state=42,select_k_features=10)
}

# CSV para salvar métricas
csv_file = "model_metrics.csv"

if not os.path.exists(csv_file):
    pd.DataFrame(columns=["Model", "Training Time (sec)", "Accuracy", "EER"]).to_csv(csv_file, index=False)

print("\nTreinando modelos PySR...")

for name, model in tqdm(models.items(), desc="Models"):
    print(f"\n{name}:")
    start_time = time.time()

    model.fit(X_train, y_train)
    train_time = time.time() - start_time

    y_pred_proba = model.predict(X_test)
    y_pred = (y_pred_proba > 0.5).astype(int)

    fpr, fnr, thresholds = det_curve(y_test, y_pred_proba)
    idx = np.nanargmin(np.absolute(fnr - fpr))
    eer = fpr[idx]
    acc = accuracy_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)

    print(f"Tempo de treinamento: {train_time:.2f} segundos")
    print(f"Acurácia: {acc:.4f}")
    print("Matriz de Confusão:\n", cm)
    print(f"EER: {eer:.4f}")

    model_filename = f"{name.replace(' ', '_').replace('(', '').replace(')', '')}.pkl"
    with open(model_filename, "wb") as f:
        pickle.dump(model, f)
    print(f"Modelo salvo em {model_filename}")

    metrics = {"Model": name, "Training Time (sec)": round(train_time, 2), "Accuracy": round(acc, 4), "EER": round(eer, 4)}
    pd.DataFrame([metrics]).to_csv(csv_file, mode="a", header=False, index=False)
    print(f"Métricas para {name} adicionadas ao arquivo {csv_file}")

In [None]:
# Batch

# Definir modelos
models = {
    "PySR_batch (Basic)": PySRRegressor(niterations=40, binary_operators=["+", "-", "*", "/"], unary_operators=["exp", "log"], loss="loss(x, y) = (x - y)^2", random_state=42, batching=True, , batch_size=200),
    "PySR_batch (Extended)": PySRRegressor(niterations=60, binary_operators=["+", "-", "*", "/"], unary_operators=["exp", "log", "sin", "cos"], loss="loss(x, y) = (x - y)^2", random_state=42, batching=True, batch_size=200),
    "PySR_batch (Complex)": PySRRegressor(niterations=80, binary_operators=["+", "-", "*", "/", "pow"], unary_operators=["exp", "log", "sin", "cos", "abs"], loss="loss(x, y) = (x - y)^2", random_state=42, batching=True, batch_size=200)
}

# CSV para salvar métricas
csv_file = "model_metrics.csv"

if not os.path.exists(csv_file):
    pd.DataFrame(columns=["Model", "Training Time (sec)", "Accuracy", "EER"]).to_csv(csv_file, index=False)

print("\nTreinando modelos PySR...")

for name, model in tqdm(models.items(), desc="Models"):
    print(f"\n{name}:")
    start_time = time.time()

    model.fit(X_train, y_train)
    train_time = time.time() - start_time

    y_pred_proba = model.predict(X_test)
    y_pred = (y_pred_proba > 0.5).astype(int)

    fpr, fnr, thresholds = det_curve(y_test, y_pred_proba)
    idx = np.nanargmin(np.absolute(fnr - fpr))
    eer = fpr[idx]
    acc = accuracy_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)

    print(f"Tempo de treinamento: {train_time:.2f} segundos")
    print(f"Acurácia: {acc:.4f}")
    print("Matriz de Confusão:\n", cm)
    print(f"EER: {eer:.4f}")

    model_filename = f"{name.replace(' ', '_').replace('(', '').replace(')', '')}.pkl"
    with open(model_filename, "wb") as f:
        pickle.dump(model, f)
    print(f"Modelo salvo em {model_filename}")

    metrics = {"Model": name, "Training Time (sec)": round(train_time, 2), "Accuracy": round(acc, 4), "EER": round(eer, 4)}
    pd.DataFrame([metrics]).to_csv(csv_file, mode="a", header=False, index=False)
    print(f"Métricas para {name} adicionadas ao arquivo {csv_file}")

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix, det_curve, DetCurveDisplay
# Ajustar rótulos para -1 (spoof) e +1 (bonafide), conforme exige o L2MarginLoss
y_train_margin = np.where(y_train == 1, -1, 1)
y_test_margin = np.where(y_test == 1, -1, 1)

# Definir modelo PySR com boas práticas para classificação
models = {
    "PySR (L2MarginLoss)": PySRRegressor(
        niterations=60,
        binary_operators=["+", "-", "*", "/"],
        unary_operators=["exp", "log", "abs"],
        loss="L2MarginLoss()",
        random_state=42
    )
}

# Mesmo CSV utilizado pelos outros modelos
csv_file = "model_metrics.csv"

if not os.path.exists(csv_file):
    pd.DataFrame(columns=["Model", "Training Time (sec)", "Accuracy", "EER"]).to_csv(csv_file, index=False)

print("\nTraining PySR model...")

for name, model in tqdm(models.items(), desc="PySR"):
    print(f"\n{name}:")
    start_time = time.time()

    model.fit(X_train, y_train_margin)
    train_time = time.time() - start_time

    # Previsões e classificação binária por sinal
    y_scores = model.predict(X_test)
    y_pred_margin = np.sign(y_scores)
    y_pred = np.where(y_pred_margin == -1, 1, 0)  # volta ao formato 0=bonafide, 1=spoof
    y_test_bin = np.where(y_test_margin == -1, 1, 0)

    acc = accuracy_score(y_test_bin, y_pred)
    cm = confusion_matrix(y_test_bin, y_pred)

    # Cálculo do EER
    fpr, fnr, thresholds = det_curve(y_test_bin, y_scores)
    idx = np.nanargmin(np.abs(fpr - fnr))
    eer = fpr[idx]

    print(f"Training time: {train_time:.2f} seconds")
    print(f"Accuracy: {acc:.4f}")
    print("Confusion Matrix:\n", cm)
    print(f"EER: {eer:.4f}")

    # Salvar modelo
    model_filename = f"{name.replace(' ', '_').replace('(', '').replace(')', '')}.pkl"
    with open(model_filename, "wb") as f:
        pickle.dump(model, f)
    print(f"Model saved to {model_filename}")

    # Salvar métricas no CSV
    metrics = {
        "Model": name,
        "Training Time (sec)": round(train_time, 2),
        "Accuracy": round(acc, 4),
        "EER": round(eer, 4)
    }

    pd.DataFrame([metrics]).to_csv(csv_file, mode="a", header=False, index=False)
    print(f"Metrics for {name} appended to {csv_file}")

    # Curva DET (opcional)
    DetCurveDisplay(fpr=fpr, fnr=fnr).plot()
    plt.title(f"DET Curve - {name}")
    plt.title("DET Curve - PySR")
    plt.savefig("det_curve_pysr.png", dpi=300)
    plt.show()