In [None]:
!pip install -q kaggle


In [None]:
!nvidia-smi

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("yousirui1/fsd50k")

print("Path to dataset files:", path)


In [None]:
# STEP 2: inspect the directory structure

import os

print("Root path from kagglehub:", path)
print("Contents:")
print(os.listdir(path))


In [None]:
import os

root_dir = os.path.join(path, "fsd50k")

print("Inside fsd50k folder:")
print(os.listdir(root_dir))

In [None]:
import os

root_dir = os.path.join(path, "fsd50k")

DEV_AUDIO_DIR = os.path.join(root_dir, "FSD50K.dev_audio_16k")
EVAL_AUDIO_DIR = os.path.join(root_dir, "FSD50K.eval_audio_16k")
GROUND_TRUTH_DIR = os.path.join(root_dir, "FSD50K.ground_truth")
METADATA_DIR = os.path.join(root_dir, "FSD50K.metadata")

print("DEV AUDIO:", DEV_AUDIO_DIR)
print("EVAL AUDIO:", EVAL_AUDIO_DIR)
print("GROUND TRUTH:", GROUND_TRUTH_DIR)
print("METADATA:", METADATA_DIR)

print("\nDEV audio sample:", os.listdir(DEV_AUDIO_DIR)[:5])
print("GT files:", os.listdir(GROUND_TRUTH_DIR))
print("Metadata files:", os.listdir(METADATA_DIR))

In [None]:
import pandas as pd

# Load DEV ground truth CSV
for f in os.listdir(GROUND_TRUTH_DIR):
    if "dev" in f.lower() and f.endswith(".csv"):
        DEV_GT_PATH = os.path.join(GROUND_TRUTH_DIR, f)
        break

print("Using GT file:", DEV_GT_PATH)

dev_gt = pd.read_csv(DEV_GT_PATH)
print("Total Dev Labels:", len(dev_gt))
dev_gt.head()


In [None]:
# Check column names
print("Columns:", dev_gt.columns)


In [None]:
# STEP 6: auto-detect filename column

possible_cols = ["fname", "filename", "file_name", "clip_id", "id", "audio_id"]
file_col = None

for col in dev_gt.columns:
    if col.lower() in possible_cols:
        file_col = col
        break

# If not matched directly, pick the first string/object column
if file_col is None:
    for col in dev_gt.columns:
        if dev_gt[col].dtype == "object":
            file_col = col
            break

print("Detected filename column:", file_col)


In [None]:
# STEP 7: build full file paths

dev_gt["file_path"] = dev_gt[file_col].apply(
    lambda x: os.path.join(DEV_AUDIO_DIR, str(x) + ".wav")
)

# Check if files exist
print("First 5 paths:")
print(dev_gt["file_path"].head())

print("\nFile exists check for first sample:",
      os.path.exists(dev_gt["file_path"].iloc[0]))

In [None]:
import os
import pandas as pd

# STEP 8: Load metadata and merge to get class names

meta_files = os.listdir(METADATA_DIR)
print("Metadata files:", meta_files)

# find class label metadata
for f in meta_files:
    if "class" in f.lower() or "label" in f.lower():
        META_PATH = os.path.join(METADATA_DIR, f)
        break

print("Using metadata file:", META_PATH)

# Changed to pd.read_json as the file is a .json file
meta_df = pd.read_json(META_PATH)
print("Metadata columns:", meta_df.columns)
meta_df.head()

In [None]:
# Try common key-based merge
common_cols = list(set(dev_gt.columns).intersection(set(meta_df.columns)))
print("Common columns:", common_cols)


In [None]:
# STEP 9: Transform meta_df and dev_gt for merging and perform the merge

# 1. Transpose meta_df and reset index to make MIDs a column
meta_df_transposed = meta_df.T.reset_index()
meta_df_transposed = meta_df_transposed.rename(columns={'index': 'mid'})

print("Transposed Metadata (first 5 rows):")
display(meta_df_transposed.head())

# 2. Expand dev_gt by splitting 'mids' and exploding
# First, convert 'mids' to string type to handle potential non-string entries safely
dev_gt_expanded = dev_gt.assign(mids=dev_gt['mids'].astype(str).str.split(',')).explode('mids')

print("\nExpanded Dev Ground Truth (first 5 rows):")
display(dev_gt_expanded.head())

# 3. Merge the expanded dev_gt with the transposed meta_df
real_df = pd.merge(dev_gt_expanded, meta_df_transposed, left_on='mids', right_on='mid', how='left')

print(f"\nMerged rows: {len(real_df)}")
display(real_df.head())

In [None]:
# STEP 9: Mark as REAL (1)

real_df["label"] = 1   # 1 = REAL, 0 = FAKE

# Keep only what we need
real_df = real_df[["file_path", "label"] + list(real_df.columns[1:3])]
real_df.head()


In [None]:
# STEP 10: Select diverse real sound classes automatically

# Get most frequent classes
class_col = real_df.columns[2]   # auto detect class column
top_classes = real_df[class_col].value_counts().head(10).index.tolist()

subset_real_df = real_df[real_df[class_col].isin(top_classes)]
print("Selected classes:", top_classes)
print("Subset size:", len(subset_real_df))


In [None]:
subset_real_csv = "/content/fsd50k_real_subset.csv"
subset_real_df.to_csv(subset_real_csv, index=False)
print("Saved REAL subset CSV to:", subset_real_csv)

subset_real_df.head()


In [None]:
!pip install -q diffusers transformers accelerate soundfile torchaudio librosa


In [None]:
import torch
from diffusers import AudioLDM2Pipeline

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

pipe = AudioLDM2Pipeline.from_pretrained(
    "cvssp/audioldm2",
    torch_dtype=torch.float16 if device == "cuda" else torch.float32
).to(device)


In [None]:
import pandas as pd

real_df = pd.read_csv("/content/fsd50k_real_subset.csv")
print("Real samples:", len(real_df))
real_df.head()


In [None]:
CLASS_COL = real_df.columns[2]  # auto-detected earlier
classes = real_df[CLASS_COL].unique().tolist()
print("Classes selected:", classes)


In [None]:
import torch
from diffusers import AudioLDMPipeline

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

pipe = AudioLDMPipeline.from_pretrained(
    "cvssp/audioldm",
    torch_dtype=torch.float16 if device == "cuda" else torch.float32
).to(device)


In [None]:
import os

FAKE_ROOT = "/content/fake_audio"
os.makedirs(FAKE_ROOT, exist_ok=True)

print("Fake audio root:", FAKE_ROOT)



In [None]:
import soundfile as sf
import re

clips_per_class = 5   # you can increase later (10â€“20 for paper)

for cls in classes:
    cls_clean = cls.replace(" ", "_")
    cls_dir = os.path.join(FAKE_ROOT, cls_clean)
    os.makedirs(cls_dir, exist_ok=True)

    prompt = f"realistic {cls.replace('_',' ')} environmental sound"
    print(f"\nGenerating FAKE for class: {cls}")

    for i in range(clips_per_class):
        with torch.no_grad():
            output = pipe(
                prompt,
                num_inference_steps=30,
                audio_length_in_s=5.0
            )

        audio = output.audios[0]

        fname = f"fake_{cls_clean}_{i}.wav"
        save_path = os.path.join(cls_dir, fname)

        sf.write(save_path, audio, 16000)
        print("Saved:", save_path)


In [None]:
import pandas as pd

fake_rows = []

for cls in classes:
    cls_clean = cls.replace(" ", "_")
    cls_dir = os.path.join(FAKE_ROOT, cls_clean)

    for fname in os.listdir(cls_dir):
        fake_rows.append({
            "file_path": os.path.join(cls_dir, fname),
            "label": 0,              # 0 = FAKE
            CLASS_COL: cls
        })

fake_df = pd.DataFrame(fake_rows)

print("Total FAKE samples:", len(fake_df))
fake_df.head()


In [None]:
final_df = pd.concat([real_df, fake_df], ignore_index=True)
final_df = final_df.sample(frac=1, random_state=42).reset_index(drop=True)

print("FINAL dataset size:", len(final_df))
print(final_df["label"].value_counts())

final_csv_path = "/content/real_vs_fake_fsd50k.csv"
final_df.to_csv(final_csv_path, index=False)

print("Saved final dataset to:", final_csv_path)


In [None]:
fake_p = [p for t, p in zip(all_labels, probs) if t==0]
real_p = [p for t, p in zip(all_labels, probs) if t==1]

plt.figure()
plt.hist(fake_p, bins=30, alpha=0.7, label="FAKE")
plt.hist(real_p, bins=30, alpha=0.7, label="REAL")
plt.title("Prediction Confidence Distribution")
plt.legend(); plt.show()


In [None]:
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np

# Ensure model is in evaluation mode
model.eval()

all_labels = []
probs = []

with torch.no_grad():
    for i, (xb, yb) in enumerate(val_loader):
        # Handle cases where custom_collate_fn returns None for the entire batch
        if xb is None or yb is None:
            print(f"Skipping batch {i} due to None values from custom_collate_fn.")
            continue

        xb = xb.to(device)
        yb = yb.to(device)

        with torch.cuda.amp.autocast():
            out = model(xb)
            # Apply softmax to get probabilities for each class
            batch_probs = F.softmax(out, dim=1)
            # We are interested in the probability of being REAL (class 1)
            batch_probs_real = batch_probs[:, 1].cpu().numpy()
            batch_labels = yb.cpu().numpy()

            # Ensure consistent lengths before extending
            if len(batch_probs_real) != len(batch_labels):
                print(f"WARNING: Inconsistent lengths in batch {i}! Probs len: {len(batch_probs_real)}, Labels len: {len(batch_labels)}. Skipping this batch.")
                continue

            probs.extend(batch_probs_real)
            all_labels.extend(batch_labels)

# Convert to numpy arrays for easier filtering
all_labels = np.array(all_labels)
probs = np.array(probs)

# Separate probabilities for fake and real samples
# Ensure both arrays are not empty before processing for histogram
if len(all_labels) == 0 or len(probs) == 0:
    print("WARNING: No valid samples collected for plotting histogram. Please check data loading.")
else:
    fake_p = [p for t, p in zip(all_labels, probs) if t == 0]
    real_p = [p for t, p in zip(all_labels, probs) if t == 1]

    plt.figure(figsize=(10, 6))
    plt.hist(fake_p, bins=30, alpha=0.7, label="FAKE predictions")
    plt.hist(real_p, bins=30, alpha=0.7, label="REAL predictions")
    plt.title("Prediction Confidence Distribution")
    plt.xlabel("Probability of being REAL")
    plt.ylabel("Frequency")
    plt.legend()
    plt.show()


In [None]:
import pandas as pd

# Load the merged REAL + FAKE dataset
final_csv_path = "/content/real_vs_fake_fsd50k.csv"   # change if different
final_df = pd.read_csv(final_csv_path)

print("Total samples:", len(final_df))
print(final_df["label"].value_counts())   # 1 = REAL, 0 = FAKE
final_df.head()


In [None]:
from sklearn.model_selection import train_test_split

# Stratified split to keep real/fake balance
train_df, val_df = train_test_split(
    final_df,
    test_size=0.2,
    stratify=final_df["label"],
    random_state=42
)

print("Train size:", len(train_df))
print("Val size:", len(val_df))
print("Train label counts:\n", train_df["label"].value_counts())
print("Val label counts:\n", val_df["label"].value_counts())


In [None]:
import librosa
import numpy as np

SR = 16000          # sample rate
N_MELS = 128        # Mel bands
N_FFT = 1024        # FFT window
HOP_LENGTH = 256    # hop
FIXED_TIME_FRAMES = 128  # we will pad/crop to this many frames

def extract_logmel(path):
    y, sr = librosa.load(path, sr=SR)

    # compute mel spectrogram
    mel = librosa.feature.melspectrogram(
        y=y,
        sr=sr,
        n_fft=N_FFT,
        hop_length=HOP_LENGTH,
        n_mels=N_MELS
    )
    logmel = librosa.power_to_db(mel, ref=np.max)

    # time dimension = axis=1. We pad/crop to FIXED_TIME_FRAMES
    if logmel.shape[1] < FIXED_TIME_FRAMES:
        pad_width = FIXED_TIME_FRAMES - logmel.shape[1]
        logmel = np.pad(logmel, ((0,0),(0,pad_width)), mode="constant")
    else:
        logmel = logmel[:, :FIXED_TIME_FRAMES]

    # shape = (N_MELS, FIXED_TIME_FRAMES)
    return logmel


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class AntiFoleyDataset(Dataset):
    def __init__(self, df):
        self.df = df.reset_index(drop=True)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        x = extract_logmel(row["file_path"])   # (N_MELS, T)
        x = torch.tensor(x).unsqueeze(0).float()  # (1, N_MELS, T)
        y = torch.tensor(row["label"]).long()    # 0 = fake, 1 = real
        return x, y

train_dataset = AntiFoleyDataset(train_df)
val_dataset   = AntiFoleyDataset(val_df)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2)
val_loader   = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=2)

len(train_dataset), len(val_dataset)


In [None]:
import torch.nn as nn
import torch.nn.functional as F

class AntiFoleyNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)

        self.pool = nn.MaxPool2d(2)
        self.apool = nn.AdaptiveAvgPool2d((8, 8))  # (C, 8, 8)

        self.fc1 = nn.Linear(128 * 8 * 8, 256)
        self.fc2 = nn.Linear(256, 2)  # 2 classes: real / fake

        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        # x: (B, 1, N_MELS, T)
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))

        x = self.apool(x)            # (B, 128, 8, 8)
        x = x.view(x.size(0), -1)    # flatten

        x = self.dropout(F.relu(self.fc1(x)))
        x = self.fc2(x)
        return x

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AntiFoleyNet().to(device)
print(model)


In [None]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)


In [None]:
# ============================================================
# COMPLETE FINAL TRAINING CODE â€” AntiFoley++ (ONE CELL)
# ============================================================

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score
from sklearn.utils import resample
from copy import deepcopy
import numpy as np
import librosa
import pandas as pd
import os # Import os for path handling in error messages

# ================================
# 1. DATASET (LOG-MEL)
# ================================
SR = 16000          # sample rate
N_MELS = 128        # Mel bands
N_FFT = 1024        # FFT window
HOP_LENGTH = 256    # hop
FIXED_TIME_FRAMES = 128  # we will pad/crop to this many frames

class AntiFoleyDataset(Dataset):
    def __init__(self, df):
        self.df = df.reset_index(drop=True)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        file_path = row["file_path"]
        try:
            y, sr = librosa.load(file_path, sr=SR)

            # compute mel spectrogram
            mel = librosa.feature.melspectrogram(
                y=y,
                sr=sr,
                n_fft=N_FFT,
                hop_length=HOP_LENGTH,
                n_mels=N_MELS
            )
            logmel = librosa.power_to_db(mel, ref=np.max)

            # time dimension = axis=1. We pad/crop to FIXED_TIME_FRAMES
            if logmel.shape[1] < FIXED_TIME_FRAMES:
                pad_width = FIXED_TIME_FRAMES - logmel.shape[1]
                logmel = np.pad(logmel, ((0,0),(0,pad_width)), mode="constant")
            else:
                logmel = logmel[:, :FIXED_TIME_FRAMES]

            x = torch.tensor(logmel).unsqueeze(0).float()  # (1, N_MELS, T)
            y_label = torch.tensor(row["label"]).long()    # 0 = fake, 1 = real

            return x, y_label
        except Exception as e:
            print(f"Error loading or processing file: {file_path}. Error: {e}")
            return None, None # Return None for problematic samples

def custom_collate_fn(batch):
    # Filter out None samples
    batch = [(x, y) for x, y in batch if x is not None and y is not None]
    if not batch:
        return None, None # Return None if the entire batch is invalid
    return torch.utils.data.dataloader.default_collate(batch)


# ================================
# 2. BALANCE DATA (CRITICAL)
# ================================
real_df_bal = train_df[train_df["label"] == 1]
fake_df_bal = train_df[train_df["label"] == 0]

fake_df_bal = resample(fake_df_bal,
                       replace=True,
                       n_samples=len(real_df_bal),
                       random_state=42)

train_df_balanced = pd.concat([real_df_bal, fake_df_bal]).sample(frac=1, random_state=42).reset_index(drop=True)

train_dataset = AntiFoleyDataset(train_df_balanced)
val_dataset   = AntiFoleyDataset(val_df)

# ================================
# 3. FAST DATALOADERS
# ================================
BATCH_SIZE = 32

train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=0, # Keeping at 0 for debugging. Set back to 4 later if stable.
    pin_memory=True,
    persistent_workers=False, # Set to False if num_workers is 0
    collate_fn=custom_collate_fn # Use custom collate_fn to handle None samples
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0, # Keeping at 0 for debugging. Set back to 4 later if stable.
    pin_memory=True,
    persistent_workers=False, # Set to False if num_workers is 0
    collate_fn=custom_collate_fn # Use custom collate_fn to handle None samples
)

# ================================
# 4. AntiFoley++ MODEL (CNN + TRANSFORMER)
# ================================
class AntiFoleyPlus(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv1 = nn.Conv2d(1, 32, 3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)

        self.bn1 = nn.BatchNorm2d(32)
        self.bn2 = nn.BatchNorm2d(64)
        self.bn3 = nn.BatchNorm2d(128)

        self.pool = nn.MaxPool2d(2)
        self.apool = nn.AdaptiveAvgPool2d((16, 16))

        self.embed = nn.Linear(128, 256)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=256, nhead=8, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=2)

        self.fc1 = nn.Linear(256, 128)
        self.fc2 = nn.Linear(128, 2)

        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.pool(self.bn1(F.relu(self.conv1(x))))
        x = self.pool(self.bn2(F.relu(self.conv2(x))))
        x = self.pool(self.bn3(F.relu(self.conv3(x))))

        x = self.apool(x)
        x = x.mean(dim=2)
        x = x.transpose(1, 2)

        x = self.embed(x)
        x = self.transformer(x)
        x = x.mean(dim=1)

        x = self.dropout(F.relu(self.fc1(x)))
        return self.fc2(x)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AntiFoleyPlus().to(device)

# ================================
# 5. FOCAL LOSS
# ================================
class FocalLoss(nn.Module):
    def __init__(self, gamma=2.0):
        super().__init__()
        self.gamma = gamma
        self.ce = nn.CrossEntropyLoss(reduction='none') # Change reduction to 'none'

    def forward(self, logits, targets):
        ce_loss = self.ce(logits, targets)
        pt = torch.exp(-ce_loss)
        return ((1 - pt) ** self.gamma * ce_loss).mean() # Apply mean after weighting

criterion = FocalLoss(gamma=2.0)

# ================================
# 6. OPTIMIZER + SCHEDULER + AMP
# ================================
optimizer = torch.optim.AdamW(
    model.parameters(), lr=2e-5, weight_decay=1e-3
)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode="min", patience=3, factor=0.5
)

scaler = torch.cuda.amp.GradScaler()
torch.backends.cudnn.benchmark = True

# ================================
# 7. TRAINING WITH EARLY STOPPING
# ================================
EPOCHS = 1
patience = 1

best_val_loss = float("inf")
patience_counter = 0
best_model_state = None

train_loss_list = [] # Initialize list to store training losses
val_loss_list = []   # Initialize list to store validation losses

for epoch in range(EPOCHS):

    model.train()
    total_loss = 0
    train_preds, train_labels = [], []

    for xb, yb in train_loader:
        # Handle cases where custom_collate_fn returns None for the entire batch
        if xb is None or yb is None:
            continue

        xb = xb.to(device, non_blocking=True)
        yb = yb.to(device, non_blocking=True)

        optimizer.zero_grad()

        with torch.cuda.amp.autocast():
            out = model(xb)
            loss = criterion(out, yb)

        scaler.scale(loss).backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 2.0)
        scaler.step(optimizer)
        scaler.update()

        total_loss += loss.item()
        train_preds.extend(out.argmax(1).cpu().numpy())
        train_labels.extend(yb.cpu().numpy())

    train_loss = total_loss / len(train_loader)
    train_acc = accuracy_score(train_labels, train_preds)
    train_loss_list.append(train_loss) # Store training loss

    model.eval()
    val_loss = 0
    val_preds, val_labels = [], []

    with torch.no_grad():
        for xb, yb in val_loader:
            # Handle cases where custom_collate_fn returns None for the entire batch
            if xb is None or yb is None:
                continue

            xb = xb.to(device)
            yb = yb.to(device)

            with torch.cuda.amp.autocast():
                out = model(xb)
                loss = criterion(out, yb)

            val_loss += loss.item()
            val_preds.extend(out.argmax(1).cpu().numpy())
            val_labels.extend(yb.cpu().numpy())

    val_loss /= len(val_loader)
    val_acc = accuracy_score(val_labels, val_preds)
    val_loss_list.append(val_loss) # Store validation loss
    scheduler.step(val_loss)

    print(f"Epoch {epoch+1}/{EPOCHS} | "
          f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} | "
          f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        best_model_state = deepcopy(model.state_dict())
        print("âœ… New best model saved")
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print("ðŸ›‘ Early stopping triggered")
            break

# ================================
# 8. LOAD & SAVE BEST MODEL
# ================================
model.load_state_dict(best_model_state)
torch.save(model.state_dict(), "/content/antifoley_96_model.pth")
print("âœ… Final Best Model Saved")

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for xb, yb in val_loader:
        xb = xb.to(device)
        out = model(xb)
        preds = out.argmax(dim=1)

        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(yb.numpy())

acc = accuracy_score(all_labels, all_preds)
print("Final Validation Accuracy:", acc)

print("\nClassification Report:")
print(classification_report(all_labels, all_preds, target_names=["FAKE", "REAL"]))
