In [None]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import numpy as np
import torchaudio
from transformers import ASTFeatureExtractor, ASTForAudioClassification
from tqdm import tqdm
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print("CUDA available:", torch.cuda.is_available())
print("GPU name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU")


CUDA available: True
GPU name: NVIDIA GeForce RTX 4090 D


In [None]:
LABEL_MAP = {"bonafide": 1, "spoof": 0}
sampling_rate = 16000
feature_extractor = ASTFeatureExtractor.from_pretrained("/root/autodl-fs/ast-extractor")

class ASTDataset(Dataset):
    def __init__(self, csv_path, audio_dir, nrows=None):
        self.df = pd.read_csv(csv_path, sep=" ", header=None, nrows=nrows)
        self.audio_dir = audio_dir
        self.labels = [LABEL_MAP[label] for label in self.df[4]]

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        audio_filename = row[1] + ".flac"
        label = LABEL_MAP[row[4]]
        flac_path = os.path.join(self.audio_dir, audio_filename)
        waveform, sr = torchaudio.load(flac_path)
        waveform = waveform.mean(dim=0)
        if sr != 16000:
            resampler = torchaudio.transforms.Resample(sr, 16000)
            waveform = resampler(waveform)
        return {"waveform": waveform.numpy(), "labels": torch.tensor(label),  "utt_id": row[1]}

def ast_collate_fn(batch):
    waveforms = [item["waveform"] for item in batch]
    labels = [item["labels"] for item in batch]
    utt_ids = [x["utt_id"] for x in batch]
    inputs = feature_extractor(waveforms, sampling_rate=16000, padding=True, return_tensors="pt")
    return {"input_values": inputs["input_values"], "labels": torch.stack(labels),  "utt_id": utt_ids}



In [None]:


batch_size = 8


eval_dataset = ASTDataset(
    csv_path="/root/autodl-fs/ASVspoof2019.LA.cm.eval.trl.txt",
    audio_dir="/root/autodl-tmp/ASVspoof2019_LA_eval/flac",
)

eval_dataloader = DataLoader(eval_dataset, batch_size=8, shuffle=True, collate_fn=ast_collate_fn)

for batch in eval_dataloader:
    print(batch["input_values"].shape)  
    print(batch["labels"])
    break


    


torch.Size([8, 1024, 128])
tensor([1, 0, 0, 0, 1, 0, 1, 1])
torch.Size([8, 1024, 128])
tensor([1, 0, 0, 0, 1, 0, 0, 0])


In [None]:
model_path = "/root/autodl-fs/ast-audioset"
model_lp_wl = ASTForAudioClassification.from_pretrained(
    model_path,
    num_labels=2,                      
    ignore_mismatched_sizes=True    
).to(device)  
model_pt_wl = ASTForAudioClassification.from_pretrained(
    model_path,
    num_labels=2,                     
    ignore_mismatched_sizes=True      
).to(device)
model_ft_wl = ASTForAudioClassification.from_pretrained(
    model_path,
    num_labels=2,                      
    ignore_mismatched_sizes=True      
).to(device)
model_lp_ws = ASTForAudioClassification.from_pretrained(
    model_path,
    num_labels=2,                      
    ignore_mismatched_sizes=True      
).to(device)
model_pt_ws = ASTForAudioClassification.from_pretrained(
    model_path,
    num_labels=2,                      
    ignore_mismatched_sizes=True      
).to(device)
model_ft_ws = ASTForAudioClassification.from_pretrained(
    model_path,
    num_labels=2,                      
    ignore_mismatched_sizes=True      
).to(device)




Some weights of ASTForAudioClassification were not initialized from the model checkpoint at /root/autodl-fs/ast-audioset and are newly initialized because the shapes did not match:
- classifier.dense.bias: found shape torch.Size([527]) in the checkpoint and torch.Size([2]) in the model instantiated
- classifier.dense.weight: found shape torch.Size([527, 768]) in the checkpoint and torch.Size([2, 768]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of ASTForAudioClassification were not initialized from the model checkpoint at /root/autodl-fs/ast-audioset and are newly initialized because the shapes did not match:
- classifier.dense.bias: found shape torch.Size([527]) in the checkpoint and torch.Size([2]) in the model instantiated
- classifier.dense.weight: found shape torch.Size([527, 768]) in the checkpoint and torch.Size([2, 768]) in the model instantiated
You should probably TRAIN t

In [None]:
model_path_lp_wl = "/root/autodl-tmp/ast_wl_lp/epoch_20.pt"
model_lp_wl.load_state_dict(torch.load(model_path_lp_wl))
model_path_pt_wl = "/root/autodl-tmp/ast_wl_partial/epoch_10.pt"
model_pt_wl.load_state_dict(torch.load(model_path_pt_wl))
model_path_ft_wl = "/root/autodl-tmp/ast_wl_fully/epoch_7.pt"
model_ft_wl.load_state_dict(torch.load(model_path_ft_wl))
model_path_lp_ws = "/root/autodl-tmp/ast_ws_lp/epoch_20.pt"

model_lp_ws.load_state_dict(torch.load(model_path_lp_ws))
model_path_pt_ws = "/root/autodl-tmp/ast_ws_partial/epoch_15.pt"
model_pt_ws.load_state_dict(torch.load(model_path_pt_ws))
model_path_ft_ws = "/root/autodl-tmp/ast_ws_fully/epoch_5.pt"
model_ft_ws.load_state_dict(torch.load(model_path_ft_ws))

# Put all models in evaluation mode
model_lp_wl.eval()
model_pt_wl.eval()
model_ft_wl.eval()
model_lp_ws.eval()
model_pt_ws.eval()
model_ft_ws.eval()







In [None]:
import torch
import pandas as pd
from tqdm import tqdm
def write_model_predictions(model, dataloader, device, output_csv_path):
    model.eval()
    model.to(device)
    all_results = []

    with torch.no_grad():
        for batch in tqdm(dataloader, desc=f"Evaluating {output_csv_path}"):
            x = batch["input_values"].to(device)
            y = batch["labels"].to(device)
            utt_ids = batch["utt_id"]  

            output = model(x)           
            logits = output.logits      
            preds = torch.argmax(logits, dim=1)

            for uid, label, pred, logit in zip(
                utt_ids, y.cpu().numpy(), preds.cpu().numpy(), logits.cpu().numpy()
            ):
                all_results.append({
                    'id': uid,
                    'label': int(label),
                    'prediction': int(pred),
                    'logit_bonafide': float(logit[0]),
                    'logit_spoof': float(logit[1])
                })

    df = pd.DataFrame(all_results)
    df.to_csv(output_csv_path, index=False)
    print(f"✅ Saved predictions to {output_csv_path}")


# === Example usage for multiple models ===
model_list = {
    'model_lp_wl':model_lp_wl,
    'model_pt_wl':model_pt_wl,
    'model_ft_wl':model_ft_wl,
    'model_lp_ws':model_lp_ws,
    'model_pt_ws':model_pt_ws,
    'model_ft_ws':model_ft_ws
}

for model_name, model in model_list.items():
    csv_name = f"ast_{model_name}_eval_predictions.csv"
    write_model_predictions(model, eval_dataloader, device, csv_name)


In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import det_curve


df = pd.read_csv("/root/autodl-tmp/convnext_result/conv_model_conv_dev_predictions.csv")

y_true = df["label"].values  # 1 = bonafide, 0 = spoof
logits = df[["logit_spoof", "logit_bonafide"]].values.astype(np.float32)
y_pred = df["prediction"].values
accuracy = (y_true == y_pred).mean()

print(f"Accuracy: {accuracy:.6f}")

def softmax(x):
    e_x = np.exp(x - np.max(x, axis=1, keepdims=True))
    return e_x / np.sum(e_x, axis=1, keepdims=True)

probs = softmax(logits)
y_score = probs[:, 1]  


fpr, fnr, thresholds = det_curve(y_true, y_score, pos_label=1)

eer_idx = np.nanargmin(np.abs(fpr - fnr))
eer = (fpr[eer_idx] + fnr[eer_idx]) / 2
eer_threshold = thresholds[eer_idx]


cost_model = {
    'Ptar': 0.99 * (1 - 0.05),
    'Pnon': 0.01 * (1 - 0.05),
    'Pspoof': 0.05,
    'Cmiss_asv': 1,
    'Cfa_asv': 10,
    'Cmiss_cm': 1,
    'Cfa_cm': 10
}

# Fixed ASV errors (standard practice for CM-only t-DCF)
Pmiss_asv = 0.01
Pfa_asv = 0.01
Pmiss_spoof_asv = 0.05

# t-DCF at EER threshold
def compute_tdcf_at_threshold(y_true, y_score, threshold, cost_model):
    y_pred = (y_score >= threshold).astype(int)
    miss = np.sum((y_pred == 0) & (y_true == 1)) / np.sum(y_true == 1)
    fa = np.sum((y_pred == 1) & (y_true == 0)) / np.sum(y_true == 0)
    C1 = cost_model['Ptar'] * (cost_model['Cmiss_cm'] - cost_model['Cmiss_asv'] * Pmiss_asv) - \
         cost_model['Pnon'] * cost_model['Cfa_asv'] * Pfa_asv
    C2 = cost_model['Cfa_cm'] * cost_model['Pspoof'] * (1 - Pmiss_spoof_asv)
    tdcf = C1 * miss + C2 * fa
    return tdcf / min(C1, C2)

tdcf_at_eer = compute_tdcf_at_threshold(y_true, y_score, eer_threshold, cost_model)

# min-tDCF over all thresholds
def compute_min_tdcf(fpr, fnr, cost_model):
    C1 = cost_model['Ptar'] * (cost_model['Cmiss_cm'] - cost_model['Cmiss_asv'] * Pmiss_asv) - \
         cost_model['Pnon'] * cost_model['Cfa_asv'] * Pfa_asv
    C2 = cost_model['Cfa_cm'] * cost_model['Pspoof'] * (1 - Pmiss_spoof_asv)
    tdcf_curve = C1 * fnr + C2 * fpr
    return np.min(tdcf_curve / min(C1, C2))

min_tdcf = compute_min_tdcf(fpr, fnr, cost_model)

print("\n CM-only Evaluation")
print(f"EER          : {eer:.6f}")
print(f"t-DCF@EER    : {tdcf_at_eer:.6f}")
print(f"min-tDCF     : {min_tdcf:.6f}")
