In [2]:
import os
import torch
import librosa
import numpy as np
import pandas as pd
from torch import nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from tqdm import tqdm

# ==========================
# PATH CONFIGURATION
# ==========================
BASE_PATH = "/home/alaine/Downloads/LA"
AUDIO_PATH = os.path.join(BASE_PATH, "ASVspoof2019_LA_train/flac")
PROTOCOL = os.path.join(BASE_PATH, "ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.train.trn.txt")

TRAIN_ATTACKS = ["A01", "A02", "A03"]
TEST_ATTACKS = ["A04", "A05"]

# ==========================
# PREPROCESSING
# ==========================
def preprocess_audio(path):
    y, sr = librosa.load(path, sr=16000)

    max_len = 4 * 16000
    if len(y) > max_len:
        y = y[:max_len]
    else:
        y = np.pad(y, (0, max_len - len(y)))

    y = librosa.util.normalize(y)
    return y, sr

def extract_spectral(y, sr):
    mel = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128)
    mel_db = librosa.power_to_db(mel)

    mel_db = (mel_db - mel_db.min()) / (mel_db.max() - mel_db.min() + 1e-6)

    mel_db = mel_db[:, :128]
    if mel_db.shape[1] < 128:
        mel_db = np.pad(mel_db, ((0,0),(0,128-mel_db.shape[1])))

    mel_db = torch.tensor(mel_db).unsqueeze(0)
    return mel_db.float()

def extract_temporal(y):
    frames = librosa.util.frame(y, frame_length=400, hop_length=160)
    frames = frames.T
    return torch.tensor(frames).float()

# ==========================
# DATASET
# ==========================
class ASVDataset(Dataset):
    def __init__(self, audio_path, protocol_file, mode="train"):
        self.samples = []

        rows = []
        with open(protocol_file) as f:
            for line in f:
                parts = line.strip().split()
                file_id = parts[1]
                attack = parts[3]
                label = parts[4]
                rows.append([file_id, attack, label])

        df = pd.DataFrame(rows, columns=["file","attack","label"])

        if mode == "train":
            df = df[(df["attack"].isin(TRAIN_ATTACKS)) | (df["label"]=="bonafide")]
        else:
            df = df[(df["attack"].isin(TEST_ATTACKS)) | (df["label"]=="bonafide")]

        for _, row in df.iterrows():
            label = 0 if row["label"]=="bonafide" else 1
            self.samples.append((row["file"], label))

        self.audio_path = audio_path

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_id, label = self.samples[idx]
        path = os.path.join(self.audio_path, file_id + ".flac")

        y, sr = preprocess_audio(path)
        spectral = extract_spectral(y, sr)
        temporal = extract_temporal(y)

        return spectral, temporal, torch.tensor(label).float()

# ==========================
# MODELS
# ==========================
class SpectralCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(1,16,3,padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16,32,3,padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.fc = nn.Linear(32*32*32,64)

    def forward(self,x):
        x = self.conv(x)
        x = x.view(x.size(0),-1)
        x = self.fc(x)
        return x

class TemporalGRU(nn.Module):
    def __init__(self):
        super().__init__()
        self.gru = nn.GRU(input_size=400, hidden_size=128, batch_first=True)
        self.fc = nn.Linear(128,64)

    def forward(self,x):
        _, h = self.gru(x)
        x = self.fc(h[-1])
        return x

class FusionModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.spectral = SpectralCNN()
        self.temporal = TemporalGRU()
        self.classifier = nn.Sequential(
            nn.Linear(128,64),
            nn.ReLU(),
            nn.Linear(64,1)
        )

    def forward(self, spec, temp):
        f1 = self.spectral(spec)
        f2 = self.temporal(temp)
        fused = torch.cat((f1,f2), dim=1)
        out = self.classifier(fused)
        return out.squeeze()

# ==========================
# EVALUATION
# ==========================
def evaluate(model,loader,device):
    model.eval()
    preds, labels = [],[]

    with torch.no_grad():
        for spec,temp,label in loader:
            spec,temp = spec.to(device),temp.to(device)
            output = torch.sigmoid(model(spec,temp))
            pred = (output>0.5).cpu().numpy()

            preds.extend(pred)
            labels.extend(label.numpy())

    print("\n===== Evaluation =====")
    print("Accuracy:", accuracy_score(labels,preds))
    print("Precision:", precision_score(labels,preds))
    print("Recall:", recall_score(labels,preds))
    print("F1 Score:", f1_score(labels,preds))
    print("Confusion Matrix:\n", confusion_matrix(labels,preds))

# ==========================
# TRAINING
# ==========================
def train_model():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Using device:", device)

    train_data = ASVDataset(AUDIO_PATH, PROTOCOL, mode="train")
    test_data  = ASVDataset(AUDIO_PATH, PROTOCOL, mode="test")

    train_loader = DataLoader(train_data, batch_size=8, shuffle=True, num_workers=0)
    test_loader  = DataLoader(test_data, batch_size=8, shuffle=False, num_workers=0)

    model = FusionModel().to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
    criterion = nn.BCEWithLogitsLoss()

    for epoch in range(5):
        model.train()
        for spec,temp,label in tqdm(train_loader):
            spec,temp,label = spec.to(device),temp.to(device),label.to(device)

            optimizer.zero_grad()
            output = model(spec,temp)
            loss = criterion(output,label)
            loss.backward()
            optimizer.step()

        print("Epoch",epoch+1,"completed")

    # Save unified hybrid model
    torch.save(model.state_dict(), "hybrid_spoof_model.pth")
    print("\nHybrid model saved as hybrid_spoof_model.pth")

    evaluate(model,test_loader,device)

if __name__=="__main__":
    train_model()

Using device: cuda


100%|██████████| 1748/1748 [05:36<00:00,  5.20it/s]


Epoch 1 completed


100%|██████████| 1748/1748 [05:24<00:00,  5.38it/s]


Epoch 2 completed


100%|██████████| 1748/1748 [05:19<00:00,  5.47it/s]


Epoch 3 completed


100%|██████████| 1748/1748 [05:16<00:00,  5.52it/s]


Epoch 4 completed


100%|██████████| 1748/1748 [05:22<00:00,  5.43it/s]


Epoch 5 completed

Hybrid model saved as hybrid_spoof_model.pth

===== Evaluation =====
Accuracy: 0.9152259332023576
Precision: 0.9986676535899334
Recall: 0.8876315789473684
F1 Score: 0.939881574364333
Confusion Matrix:
 [[2571    9]
 [ 854 6746]]


In [4]:
import torch
import librosa
import numpy as np
import os
from torch import nn
from moviepy import VideoFileClip

# =========================
# CHANGE THIS
# =========================
INPUT_PATH = "/home/alaine/Downloads/LA/ASVspoof2019_LA_dev/flac/LA_D_1023711.flac"
MODEL_PATH = "hybrid_spoof_model.pth"

# =========================
# PREPROCESSING
# =========================
def preprocess_audio(path):
    y, sr = librosa.load(path, sr=16000)

    max_len = 4 * 16000
    if len(y) > max_len:
        y = y[:max_len]
    else:
        y = np.pad(y, (0, max_len - len(y)))

    y = librosa.util.normalize(y)
    return y, sr

def extract_spectral(y, sr):
    mel = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128)
    mel_db = librosa.power_to_db(mel)

    mel_db = (mel_db - mel_db.min()) / (mel_db.max() - mel_db.min() + 1e-6)

    mel_db = mel_db[:, :128]
    if mel_db.shape[1] < 128:
        mel_db = np.pad(mel_db, ((0,0),(0,128-mel_db.shape[1])))

    mel_db = torch.tensor(mel_db).unsqueeze(0)
    return mel_db.float()

def extract_temporal(y):
    frames = librosa.util.frame(y, frame_length=400, hop_length=160)
    frames = frames.T
    return torch.tensor(frames).float()

# =========================
# MODEL DEFINITIONS
# =========================
class SpectralCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(1,16,3,padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16,32,3,padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.fc = nn.Linear(32*32*32,64)

    def forward(self,x):
        x = self.conv(x)
        x = x.view(x.size(0),-1)
        x = self.fc(x)
        return x

class TemporalGRU(nn.Module):
    def __init__(self):
        super().__init__()
        self.gru = nn.GRU(input_size=400, hidden_size=128, batch_first=True)
        self.fc = nn.Linear(128,64)

    def forward(self,x):
        _, h = self.gru(x)
        x = self.fc(h[-1])
        return x

class FusionModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.spectral = SpectralCNN()
        self.temporal = TemporalGRU()
        self.classifier = nn.Sequential(
            nn.Linear(128,64),
            nn.ReLU(),
            nn.Linear(64,1)
        )

    def forward(self, spec, temp):
        f1 = self.spectral(spec)
        f2 = self.temporal(temp)
        fused = torch.cat((f1,f2), dim=1)
        out = self.classifier(fused)
        return out.squeeze()

# =========================
# VIDEO HANDLING
# =========================
def extract_audio_from_video(video_path):
    temp_audio = "temp_audio.wav"
    clip = VideoFileClip(video_path)
    clip.audio.write_audiofile(temp_audio, fps=16000, logger=None)
    clip.close()
    return temp_audio

# =========================
# LOAD MODEL
# =========================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = FusionModel().to(device)
model.load_state_dict(torch.load(MODEL_PATH, map_location=device))
model.eval()

# =========================
# HANDLE INPUT
# =========================
video_extensions = (".mp4", ".mkv", ".avi", ".mov")

if INPUT_PATH.lower().endswith(video_extensions):
    print("Video detected. Extracting audio...")
    audio_path = extract_audio_from_video(INPUT_PATH)
else:
    audio_path = INPUT_PATH

# =========================
# PREDICTION
# =========================
y, sr = preprocess_audio(audio_path)

spectral = extract_spectral(y, sr).unsqueeze(0).to(device)
temporal = extract_temporal(y).unsqueeze(0).to(device)

with torch.no_grad():
    output = model(spectral, temporal)
    probability = torch.sigmoid(output).item()

print("\n----- RESULT -----")

if probability > 0.5:
    print("Prediction: AI (spoof)")
else:
    print("Prediction: REAL (bonafide)")

print(f"Confidence: {probability:.4f}")

if audio_path == "temp_audio.wav" and os.path.exists(audio_path):
    os.remove(audio_path)


----- RESULT -----
Prediction: AI (spoof)
Confidence: 0.9948
