<a href="https://colab.research.google.com/github/Maha028/Multi_module_Deepfake_Audio_Detector/blob/main/Multi_module_Deepfake_Audio_Detector.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# IMPORTANT: SOME KAGGLE DATA SOURCES ARE PRIVATE
# RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES.
import kagglehub
kagglehub.login()


In [None]:
# IMPORT KAGGLE DATA SOURCES,

awsaf49_asvpoof_2019_dataset_path = kagglehub.dataset_download('awsaf49/asvpoof-2019-dataset')
maha028_wav2vec2_path = kagglehub.dataset_download('maha028/wav2vec2')

print('Data source import complete.')


In [None]:
import os

# Create directory (only if it doesn't already exist)
os.makedirs("project", exist_ok=True)


In [None]:
code="""
import torch
import torchaudio
import numpy as np
from scipy.stats import entropy

bundle = torchaudio.pipelines.WAV2VEC2_BASE
model = bundle.get_model().eval()

def extract_entropy_features(waveform: torch.Tensor, sample_rate: int = 16000):
    if waveform.shape[0] > 1:
        waveform = waveform.mean(dim=0, keepdim=True)

    if sample_rate != 16000:
        resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
        waveform = resampler(waveform)

    with torch.inference_mode():
        features, _ = model.extract_features(waveform)
    last_layer_feats = features[-1].squeeze(0).cpu().numpy()

    abs_feats = np.abs(last_layer_feats)
    normalized_feats = abs_feats / np.clip(abs_feats.sum(axis=1, keepdims=True), 1e-10, None)

    entropies = [entropy(frame) for frame in normalized_feats]

    return np.array([np.mean(entropies), np.std(entropies)])

"""

with open("project/features_entropy.py", "w") as f:
    f.write(code)


In [None]:
with open("project/features_melcnn.py", "w") as f:
    f.write("""
import torch
import torch.nn as nn
import torchaudio.transforms as T
import numpy as np
import torchaudio
import matplotlib.pyplot as plt

class MelCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1), nn.ReLU(), nn.AdaptiveAvgPool2d((1, 1))
        )
        self.fc = nn.Linear(64, 2)  # 2 classes

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

mel_spectrogram = T.MelSpectrogram(sample_rate=16000, n_mels=64, n_fft=1024, hop_length=256)

def extract_mel_spectrogram(waveform, sr, n_mels=64):
    mel_spec_transform = torchaudio.transforms.MelSpectrogram(
        sample_rate=sr,
        n_fft=1024,
        hop_length=256,
        n_mels=n_mels
    )
    mel_spec = mel_spec_transform(waveform)
    mel_spec = torchaudio.functional.amplitude_to_DB(mel_spec, multiplier=10, amin=1e-10, db_multiplier=0)

    if mel_spec.shape[0] > 1:
        mel_spec = mel_spec.mean(dim=0, keepdim=True)  # Mono

    # Visualization of the Mel-Spectrogram
    plt.figure(figsize=(10, 4))
    plt.imshow(mel_spec[0].detach().numpy(), cmap='viridis', origin='lower', aspect='auto')
    plt.title('Mel-Spectrogram')
    plt.xlabel('Time (Frames)')
    plt.ylabel('Frequency (Mel bins)')
    plt.colorbar(format="%+2.0f dB")
    plt.tight_layout()
    plt.show()

    return mel_spec.squeeze(0)  # Returns shape: [n_mels, time]
""")

In [None]:
code="""
import torch
import torch.nn as nn
import numpy as np

class RawWaveformCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=7, stride=2, padding=3), nn.ReLU(), nn.MaxPool1d(4),
            nn.Conv1d(16, 32, kernel_size=5, stride=2, padding=2), nn.ReLU(), nn.MaxPool1d(4),
            nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1), nn.ReLU(), nn.AdaptiveAvgPool1d(1)
        )
        self.fc = nn.Linear(64, 2)

    def forward(self, x):
        x = self.net(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)"""
with open("project/features_rawcnn.py", "w") as f:
    f.write(code)

In [None]:
code="""
import torchaudio.transforms as T
import numpy as np

mfcc_transform = T.MFCC(sample_rate=16000, n_mfcc=20, melkwargs={"n_fft":1024, "hop_length":256, "n_mels":64})

def extract_mfcc_features(waveform, sample_rate=16000):
    if waveform.shape[0] > 1:
        waveform = waveform.mean(dim=0, keepdim=True)
    if sample_rate != 16000:
        resampler = T.Resample(orig_freq=sample_rate, new_freq=16000)
        waveform = resampler(waveform)
    mfcc = mfcc_transform(waveform)
    # Average MFCCs over time axis
    mfcc_mean = mfcc.mean(dim=2).squeeze().cpu().numpy()
    return mfcc_mean
"""

with open("project/features_mfcc.py", "w") as f:
    f.write(code)

In [None]:
code="""import os
import torchaudio

def load_protocol_file(protocol_path):
    samples = []
    with open(protocol_path, 'r') as f:
        for line in f:
            parts = line.strip().split()
            utt_id = parts[1]
            label_str = parts[-1]
            label = 1 if label_str == "bonafide" else 0
            samples.append((utt_id, label))
    return samples

def load_audio(audio_dir, utt_id):
    audio_path = os.path.join(audio_dir, utt_id + ".flac")
    waveform, sr = torchaudio.load(audio_path)
    return waveform, sr
"""
with open("project/utils.py", "w") as f:
    f.write(code)

In [None]:
code="""import os
import torch
import numpy as np
from tqdm import tqdm
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
import torch.nn.functional as F
import torchaudio
import random
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

# Import our feature extraction and model definitions
from features_entropy import extract_entropy_features  # Disabled until model downloaded
from features_melcnn import MelCNN, extract_mel_spectrogram
from features_rawcnn import RawWaveformCNN
from features_mfcc import extract_mfcc_features

# Utils to load protocol and audio
def load_protocol_file(protocol_path):
    samples = []
    with open(protocol_path, 'r') as f:
        for line in f:
            parts = line.strip().split()
            utt_id = parts[1]
            label_str = parts[-1]
            label = 1 if label_str == "bonafide" else 0
            samples.append((utt_id, label))
    return samples
def pad_batch(batch_data, add_channel_dim=True):
    max_len = max(tensor.size(-1) for tensor in batch_data)
    padded_batch = []
    for tensor in batch_data:
        pad_amount = max_len - tensor.size(-1)
        padded_tensor = F.pad(tensor, (0, pad_amount))
        if add_channel_dim:
            padded_tensor = padded_tensor.unsqueeze(0)
        padded_batch.append(padded_tensor)
    return torch.stack(padded_batch)



def load_audio(audio_dir, utt_id):
    audio_path = os.path.join(audio_dir, utt_id + ".flac")
    waveform, sr = torchaudio.load(audio_path)
    return waveform, sr

# Training functions for classical classifiers
def train_entropy_classifier(X, y):
    clf = LogisticRegression(max_iter=500)
    clf.fit(X, y)
    return clf

def train_mfcc_classifier(X, y):
    clf = RandomForestClassifier(n_estimators=100)
    clf.fit(X, y)
    return clf

# Training functions for CNNs
def train_cnn_classifier(model, data, labels, epochs=12, batch_size=16, lr=1e-3, add_channel_dim=True):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = torch.nn.CrossEntropyLoss()

    dataset_size = len(data)
    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        for i in range(0, dataset_size, batch_size):
            batch_data = data[i:i+batch_size]
            batch_labels = labels[i:i+batch_size]

            # Stack batch tensors

            inputs = pad_batch(batch_data, add_channel_dim=add_channel_dim).to(device)
            targets = torch.tensor(batch_labels).to(device)

            optimizer.zero_grad()
            inputs = inputs.squeeze(2)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * len(batch_data)

        print(f"Epoch {epoch+1}/{epochs} - Loss: {running_loss/dataset_size:.4f}")

    return model

def get_indices(split, utt_to_idx):
    return [utt_to_idx[utt_id] for utt_id, _ in split]

def subset_feats(indices, entropy_feats, mfcc_feats, melcnn_inputs, rawcnn_inputs, labels_np):
    return {
        'entropy': entropy_feats[indices],
        'mfcc': mfcc_feats[indices],
        'melcnn': [melcnn_inputs[i] for i in indices],
        'rawcnn': [rawcnn_inputs[i] for i in indices],
        'labels': labels_np[indices],
    }


def evaluate_cnn(model, data, labels, batch_size=16, add_channel_dim=True):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.eval()
    preds = []
    true_labels = []
    with torch.no_grad():
        for i in range(0, len(data), batch_size):
            batch_data = data[i:i+batch_size]
            inputs = pad_batch(batch_data, add_channel_dim=add_channel_dim).to(device)
            inputs = inputs.squeeze(2)
            outputs = model(inputs)
            predicted = torch.argmax(outputs, dim=1).cpu().numpy()
            preds.extend(predicted)
            true_labels.extend(labels[i:i+batch_size])

    # Return both true labels and predictions for evaluation
    return true_labels, preds

def main():
    PROTOCOL_PATH = "/kaggle/input/asvpoof-2019-dataset/LA/LA/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.train.trn.txt"
    AUDIO_DIR = "/kaggle/input/asvpoof-2019-dataset/LA/LA/ASVspoof2019_LA_train/flac"

    # After loading dataset with load_protocol_file
    dataset = load_protocol_file(PROTOCOL_PATH)

    # Shuffle the dataset
    random.shuffle(dataset)

    # Check label distribution before selecting samples
    labels_all = [label for _, label in dataset]
    unique, counts = np.unique(labels_all, return_counts=True)
    print("Full dataset label distribution:", dict(zip(unique, counts)))

    # Now select 200 samples from the shuffled dataset
    MAX_SAMPLES = 5000
    selected_dataset = dataset[:MAX_SAMPLES]

    # Check label distribution in selected dataset too
    selected_labels = [label for _, label in selected_dataset]
    unique_sel, counts_sel = np.unique(selected_labels, return_counts=True)
    print(f"Selected {MAX_SAMPLES} samples label distribution:", dict(zip(unique_sel, counts_sel)))

    if len(unique_sel) < 2:
      raise ValueError(f"Selected {MAX_SAMPLES} samples contain only one class! Consider increasing sample size or reshuffling.")




    # Train / Val / Test split (60/20/20 stratified)
    train_val, test = train_test_split(selected_dataset, test_size=0.2, stratify=selected_labels, random_state=42)
    train, val = train_test_split(train_val, test_size=0.25, stratify=[label for _, label in train_val], random_state=42)

    entropy_feats = []
    mfcc_feats = []
    melcnn_inputs = []
    rawcnn_inputs = []
    labels = []

    print("Extracting features...")
    for utt_id, label in tqdm(dataset[:MAX_SAMPLES]):
        waveform, sr = load_audio(AUDIO_DIR, utt_id)

        e_feat = extract_entropy_features(waveform, sr)
        entropy_feats.append(e_feat)

        # MFCC features (numpy)
        mfcc_feat = extract_mfcc_features(waveform, sr)
        mfcc_feats.append(mfcc_feat)

        # Mel spectrogram for MelCNN (tensor [1,1,n_mels,time])
        mel_spec = extract_mel_spectrogram(waveform, sr)
        melcnn_inputs.append(mel_spec)

        # Raw waveform for RawCNN (tensor [1,1,samples])
        if waveform.shape[0] > 1:
            waveform = waveform.mean(dim=0, keepdim=True)
        rawcnn_inputs.append(waveform)

        labels.append(label)

    # Convert lists to numpy arrays
    entropy_feats = np.array(entropy_feats)
    mfcc_feats = np.array(mfcc_feats)
    labels_np = np.array(labels)
    # Map utt_id to feature index (since you extracted features in order of dataset[:MAX_SAMPLES])
    utt_to_idx = {utt_id: i for i, (utt_id, _) in enumerate(dataset[:MAX_SAMPLES])}



    train_indices = get_indices(train, utt_to_idx)
    val_indices = get_indices(val, utt_to_idx)
    test_indices = get_indices(test, utt_to_idx)


    train_feats = subset_feats(train_indices, entropy_feats, mfcc_feats, melcnn_inputs, rawcnn_inputs, labels_np)
    val_feats = subset_feats(val_indices, entropy_feats, mfcc_feats, melcnn_inputs, rawcnn_inputs, labels_np)
    test_feats = subset_feats(test_indices, entropy_feats, mfcc_feats, melcnn_inputs, rawcnn_inputs, labels_np)



    # ===== ENTROPY CLASSIFIER =====
    print("Training Entropy-based Logistic Regression classifier...")
    entropy_clf = train_entropy_classifier(train_feats['entropy'], train_feats['labels'])
    val_preds = entropy_clf.predict(val_feats['entropy'])
    test_preds = entropy_clf.predict(test_feats['entropy'])
    print("Entropy Classifier - Val Accuracy:", accuracy_score(val_feats['labels'], val_preds))
    print("Entropy Classifier - Test Accuracy:", accuracy_score(test_feats['labels'], test_preds))

    # ===== MFCC CLASSIFIER =====
    print("Training MFCC-based Random Forest classifier...")
    mfcc_clf = train_mfcc_classifier(train_feats['mfcc'], train_feats['labels'])
    val_preds = mfcc_clf.predict(val_feats['mfcc'])
    test_preds = mfcc_clf.predict(test_feats['mfcc'])
    print("MFCC Classifier - Val Accuracy:", accuracy_score(val_feats['labels'], val_preds))
    print("MFCC Classifier - Test Accuracy:", accuracy_score(test_feats['labels'], test_preds))

    # ===== MelCNN =====
    print("Training MelSpectrogram CNN classifier...")
    melcnn_model = train_cnn_classifier(MelCNN(), train_feats['melcnn'], train_feats['labels'])
    melcnn_val_acc = evaluate_cnn(melcnn_model, val_feats['melcnn'], val_feats['labels'], add_channel_dim=True)
    melcnn_test_acc = evaluate_cnn(melcnn_model, test_feats['melcnn'], test_feats['labels'], add_channel_dim=True)
    print("MelCNN - Val Accuracy:", melcnn_val_acc)
    print("MelCNN - Test Accuracy:", melcnn_test_acc)

    # ===== RawCNN =====
    print("Training Raw Waveform CNN classifier...")
    rawcnn_model = train_cnn_classifier(RawWaveformCNN(), train_feats['rawcnn'], train_feats['labels'])
    rawcnn_val_acc = evaluate_cnn(rawcnn_model, val_feats['rawcnn'], val_feats['labels'], add_channel_dim=False)
    rawcnn_test_acc = evaluate_cnn(rawcnn_model, test_feats['rawcnn'], test_feats['labels'], add_channel_dim=False)
    print("RawCNN - Val Accuracy:", rawcnn_val_acc)
    print("RawCNN - Test Accuracy:", rawcnn_test_acc)


    # Train entropy classifier - disabled
    #print("Training Entropy-based Logistic Regression classifier...")
    #entropy_clf = train_entropy_classifier(entropy_feats, labels_np)

    #print("Training MFCC-based Random Forest classifier...")
    #mfcc_clf = train_mfcc_classifier(mfcc_feats, labels_np)

    #print("Training MelSpectrogram CNN classifier...")
    #melcnn_model = MelCNN()
    #melcnn_model = train_cnn_classifier(melcnn_model, melcnn_inputs, labels_np, epochs=12, batch_size=16, lr=1e-3, add_channel_dim=True)


    #print("Training Raw Waveform CNN classifier...")
    #rawcnn_model = RawWaveformCNN()
    #rawcnn_model = train_cnn_classifier(rawcnn_model, rawcnn_inputs, labels_np, epochs=12, batch_size=16, lr=1e-3, add_channel_dim=False)


    print("Training complete!")

    # Optionally, save models here
    torch.save(melcnn_model.state_dict(), "melcnn_model.pth")
    torch.save(rawcnn_model.state_dict(), "rawcnn_model.pth")
    import joblib
    joblib.dump(entropy_clf, "entropy_clf.joblib")
    joblib.dump(mfcc_clf, "mfcc_clf.joblib")

if __name__ == "__main__":
    main()
"""
with open("project/main.py", "w") as f:
    f.write(code)

In [None]:
!python project/main.py


In [None]:
import shutil
import os

# Step 1: Define which model files to download (change paths if needed)
model_files = [
    "/content/melcnn_model.pth",
    "/content/rawcnn_model.pth",
    "/content/entropy_clf.joblib",
    "/content/mfcc_clf.joblib"
]

# Step 2: Create a directory to store them temporarily
download_dir = "/content/models_to_download"
os.makedirs(download_dir, exist_ok=True)

# Step 3: Copy files into that directory
for file_path in model_files:
    if os.path.exists(file_path):
        shutil.copy(file_path, download_dir)
    else:
        print(f"Warning: {file_path} does not exist!")

# Step 4: Zip the folder
shutil.make_archive("/content/model_backup", 'zip', download_dir)

# Step 5: Provide download link
from google.colab import files
files.download("/content/model_backup.zip")


In [None]:
code="""
import torch
import torch.nn.functional as F
import joblib
import numpy as np
import torchaudio
import os

#Import your model classes and feature extractors here
from features_entropy import extract_entropy_features
from features_melcnn import MelCNN, extract_mel_spectrogram
from features_rawcnn import RawWaveformCNN
from features_mfcc import extract_mfcc_features

# Load models
entropy_clf = joblib.load("/content/entropy_clf.joblib")
mfcc_clf = joblib.load("/content/mfcc_clf.joblib")

melcnn_model = MelCNN()
melcnn_model.load_state_dict(torch.load("/content/melcnn_model.pth"))
melcnn_model.eval()

rawcnn_model = RawWaveformCNN()
rawcnn_model.load_state_dict(torch.load("/content/rawcnn_model.pth"))
rawcnn_model.eval()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
melcnn_model.to(device)
rawcnn_model.to(device)

label_map = {0: "fake", 1: "real"}

def pad_tensor(tensor, target_len):
    pad_len = target_len - tensor.shape[-1]
    if pad_len > 0:
        return F.pad(tensor, (0, pad_len))
    else:
        return tensor

def predict_audio(audio_path):
    waveform, sr = torchaudio.load(audio_path)
    if waveform.shape[0] > 1:
        waveform = waveform.mean(dim=0, keepdim=True)  # Mono

    # Extract features
    entropy_feat = extract_entropy_features(waveform, sr).reshape(1, -1)  # shape (1, features)
    mfcc_feat = extract_mfcc_features(waveform, sr).reshape(1, -1)  # shape (1, features)
    mel_spec = extract_mel_spectrogram(waveform, sr)  # Tensor shape [1, 1, n_mels, time]
    raw_waveform = waveform  # Tensor shape [1, samples]

    # ENTROPY prediction
    entropy_pred_prob = entropy_clf.predict_proba(entropy_feat)[0]
    entropy_pred_label = np.argmax(entropy_pred_prob)
    entropy_confidence = entropy_pred_prob[entropy_pred_label]

    # MFCC prediction
    mfcc_pred_prob = mfcc_clf.predict_proba(mfcc_feat)[0]
    mfcc_pred_label = np.argmax(mfcc_pred_prob)
    mfcc_confidence = mfcc_pred_prob[mfcc_pred_label]

    # MelCNN prediction
    mel_spec = mel_spec.unsqueeze(0).unsqueeze(0).to(device)
    with torch.no_grad():
        mel_outputs = melcnn_model(mel_spec)
        mel_probs = torch.softmax(mel_outputs, dim=1).cpu().numpy()[0]
        mel_pred_label = np.argmax(mel_probs)
        mel_confidence = mel_probs[mel_pred_label]

    # RawCNN prediction
    raw_input = raw_waveform.to(device)
    raw_input = pad_tensor(raw_input, target_len=raw_input.shape[-1])  # Pad if needed, adjust length as required
    raw_input = raw_input.unsqueeze(0)  # batch dim
    with torch.no_grad():
        raw_outputs = rawcnn_model(raw_input)
        raw_probs = torch.softmax(raw_outputs, dim=1).cpu().numpy()[0]
        raw_pred_label = np.argmax(raw_probs)
        raw_confidence = raw_probs[raw_pred_label]

    # Combine predictions by majority vote weighted by confidence (simple approach)
    preds = [entropy_pred_label, mfcc_pred_label, mel_pred_label, raw_pred_label]
    confidences = [entropy_confidence, mfcc_confidence, mel_confidence, raw_confidence]

    # Majority vote
    labels, counts = np.unique(preds, return_counts=True)
    majority_label = labels[np.argmax(counts)]

    # Average confidence for majority label among models that predicted it
    avg_confidence = np.mean([conf for pred, conf in zip(preds, confidences) if pred == majority_label])

    # Output results
    result = {
    "overall_prediction": label_map[int(majority_label)],
    "overall_confidence": float(avg_confidence),
    "model_predictions": {
        "entropy_clf": {"label": label_map[int(entropy_pred_label)], "confidence": float(entropy_confidence)},
        "mfcc_clf": {"label": label_map[int(mfcc_pred_label)], "confidence": float(mfcc_confidence)},
        "melcnn": {"label": label_map[int(mel_pred_label)], "confidence": float(mel_confidence)},
        "rawcnn": {"label": label_map[int(raw_pred_label)], "confidence": float(raw_confidence)},
    } }

    return result

# Example usage:
audio_path = "/kaggle/input/asvpoof-2019-dataset/LA/LA/ASVspoof2019_LA_dev/flac/LA_D_1002130.flac"  # Replace with your audio path
prediction_result = predict_audio(audio_path)
print(prediction_result)
"""
with open("project/predict.py", "w") as f:
    f.write(code)


In [None]:
!python project/predict.py

In [None]:
!pip install "pip<24.1" --upgrade

In [None]:
!pip install fairseq==0.12.2
