In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
# Cell 1 - setup
!pip install -q librosa==0.10.0 soundfile==0.12.1 tqdm

import os, random, math, warnings
import numpy as np, pandas as pd
from tqdm import tqdm
import librosa, soundfile as sf
import matplotlib.pyplot as plt

# TensorFlow / Keras
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Masking, LSTM, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

# sklearn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix

# Reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

# Suppress warnings for clean output
warnings.filterwarnings("ignore", category=FutureWarning)

print("TensorFlow version:", tf.__version__)
print("GPU available:", tf.config.list_physical_devices('GPU'))


In [None]:
# Cell 2 - dataset path detection
DATASET_PATH = "/kaggle/input/deep-voice-deepfake-voice-recognition/KAGGLE/AUDIO"  # << edit if needed

if not os.path.exists(DATASET_PATH):
    # auto-detect any folder in /kaggle/input containing 'deep' or 'fake'
    base = "/kaggle/input"
    DATASET_PATH = None
    for d in os.listdir(base):
        if any(x in d.lower() for x in ["deep", "deepfake", "fake"]):
            candidate = os.path.join(base, d)
            if os.path.isdir(candidate):
                DATASET_PATH = candidate
                break

if DATASET_PATH is None:
    raise FileNotFoundError("Couldn't auto-detect dataset. Upload your dataset to Kaggle and set DATASET_PATH.")

# verify structure (expect REAL and FAKE folders inside)
expected = [os.path.join(DATASET_PATH, sub) for sub in ["REAL", "FAKE"]]
for path in expected:
    if not os.path.isdir(path):
        print(f" Warning: expected subfolder not found: {path}")

print("Using dataset path:", DATASET_PATH)


In [None]:
# Cell 3 - helpers
def infer_label_from_path(path):
    """Infer label from file path: 0 = real, 1 = fake, None = unknown"""
    lower = path.lower()
    parent = os.path.basename(os.path.dirname(path)).lower()

    if "real" in parent or "real" in lower:
        return 0
    if any(x in parent for x in ["fake", "deepfake", "synth"]) or \
       any(x in lower for x in ["fake", "deepfake", "synth"]):
        return 1
    return None


def extract_mfcc_chunks(file_path,
                        sr=22050,
                        chunk_duration=1.0,
                        n_mfcc=40,
                        n_fft=2048,
                        hop_length=512,
                        max_pad_len=44):
    """
    Load audio, split into chunks, compute MFCCs.
    Returns: array (n_chunks, max_pad_len, n_mfcc)
    """
    try:
        y, _ = librosa.load(file_path, sr=sr, mono=True)
    except Exception as e:
        print(f" Error loading {file_path}: {e}")
        return np.zeros((0, max_pad_len, n_mfcc), dtype=np.float32)

    samples_per_chunk = int(chunk_duration * sr)
    mfcc_chunks = []

    for start in range(0, len(y), samples_per_chunk):
        chunk = y[start:start + samples_per_chunk]
        if len(chunk) == 0:
            continue
        if len(chunk) < samples_per_chunk:
            chunk = np.pad(chunk, (0, samples_per_chunk - len(chunk)))

        mfcc = librosa.feature.mfcc(y=chunk, sr=sr, n_mfcc=n_mfcc,
                                    n_fft=n_fft, hop_length=hop_length).T

        # pad/truncate to fixed length
        mfcc = (mfcc[:max_pad_len, :]
                if mfcc.shape[0] >= max_pad_len
                else np.pad(mfcc, ((0, max_pad_len - mfcc.shape[0]), (0, 0))))

        mfcc_chunks.append(mfcc)

    if not mfcc_chunks:
        return np.zeros((0, max_pad_len, n_mfcc), dtype=np.float32)

    return np.array(mfcc_chunks, dtype=np.float32)


In [None]:
# Cell 4 - collect and extract features
from joblib import Parallel, delayed

# Controls
CHUNK_DURATION = 1.0   # seconds
N_MFCC = 40
MAX_PAD_LEN = 44       # frames per 1s chunk
MAX_SAMPLES_PER_CLASS = None  # set to int for limit, None = all

# Gather labeled audio paths
all_audio_paths = []
for root, _, files in os.walk(DATASET_PATH):
    for f in files:
        if f.lower().endswith(('.wav', '.flac', '.mp3', '.ogg')):
            p = os.path.join(root, f)
            label = infer_label_from_path(p)
            if label is not None:
                all_audio_paths.append((p, label))

print("Total labeled audio files found:", len(all_audio_paths))

# Balance / limit samples
real_paths = [p for p, l in all_audio_paths if l == 0]
fake_paths = [p for p, l in all_audio_paths if l == 1]

if MAX_SAMPLES_PER_CLASS:
    real_paths = real_paths[:MAX_SAMPLES_PER_CLASS]
    fake_paths = fake_paths[:MAX_SAMPLES_PER_CLASS]

labeled_files = [(p, 0) for p in real_paths] + [(p, 1) for p in fake_paths]
random.shuffle(labeled_files)  # shuffle before extraction
print("Using files -> real:", len(real_paths), "fake:", len(fake_paths))

# Parallel MFCC extraction
def process_file(p, label):
    try:
        chunks = extract_mfcc_chunks(p,
                                     chunk_duration=CHUNK_DURATION,
                                     n_mfcc=N_MFCC,
                                     max_pad_len=MAX_PAD_LEN)
        if chunks.shape[0] > 0:
            return chunks, [label] * chunks.shape[0]
    except Exception as e:
        print(f" Error in {p}: {e}")
    return None

results = Parallel(n_jobs=-1, backend="multiprocessing")(
    delayed(process_file)(p, label) for p, label in tqdm(labeled_files, desc="Extracting MFCC chunks")
)

# Collect results
X_chunks, y_chunks = [], []
for r in results:
    if r:
        chunks, labels = r
        X_chunks.append(chunks)
        y_chunks.extend(labels)

if not X_chunks:
    raise RuntimeError("No chunks extracted. Check dataset & parameters.")

# Stack arrays
X = np.vstack(X_chunks)   # (total_chunks, max_pad_len, n_mfcc)
y = np.array(y_chunks)
print("Final dataset shape (chunks):", X.shape, y.shape)

# Save for reuse
np.save('X_mfcc_chunks.npy', X)
np.save('y_mfcc_chunks.npy', y)
print(" Saved X_mfcc_chunks.npy and y_mfcc_chunks.npy")


In [None]:
# Cell 5 - scale, split
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import joblib

# Reshape to (total_frames, n_mfcc) for scaling
n_samples, time_steps, n_mfcc = X.shape
X_flat = X.reshape(-1, n_mfcc)

# Scale per coefficient
scaler = StandardScaler(copy=False)
X_flat = scaler.fit_transform(X_flat)
X = X_flat.reshape(n_samples, time_steps, n_mfcc)

# Stratified split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=SEED
)

print("Train:", X_train.shape, y_train.shape, "Test:", X_test.shape, y_test.shape)

# Check class balance
unique, counts = np.unique(y_train, return_counts=True)
print("Class distribution (train):", dict(zip(unique, counts)))
unique, counts = np.unique(y_test, return_counts=True)
print("Class distribution (test):", dict(zip(unique, counts)))

# Save scaler for inference
joblib.dump(scaler, 'scaler.joblib')
print(" Saved scaler.joblib")

# Optional: save split datasets
# np.savez_compressed("train_test_split.npz", 
#                     X_train=X_train, y_train=y_train, 
#                     X_test=X_test, y_test=y_test)


In [None]:
# Cell 6 - model
def make_lstm_model(time_steps, n_mfcc,
                    lstm_units=[128, 64],
                    dense_units=32,
                    dropout_rates=[0.3, 0.3, 0.2],
                    lr=1e-4):
    model = Sequential([
        Masking(mask_value=0., input_shape=(time_steps, n_mfcc)),

        LSTM(lstm_units[0], return_sequences=True),
        BatchNormalization(),
        Dropout(dropout_rates[0]),

        LSTM(lstm_units[1]),
        BatchNormalization(),
        Dropout(dropout_rates[1]),

        Dense(dense_units, activation='relu'),
        BatchNormalization(),
        Dropout(dropout_rates[2]),

        Dense(1, activation='sigmoid')
    ])
    
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=lr, amsgrad=True),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    return model

model = make_lstm_model(time_steps, n_mfcc)
model.summary()


In [None]:
# Cell 7 - train
batch_size = 32
epochs = 5

callbacks = [
    EarlyStopping(monitor='val_loss', patience=6, restore_best_weights=True, verbose=1),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, verbose=1),
    ModelCheckpoint('best_lstm.h5', monitor='val_loss', save_best_only=True, verbose=1)
]

history = model.fit(
    X_train, y_train,
    validation_split=0.1,   # better: split train/val earlier if dataset is small
    epochs=epochs,
    batch_size=batch_size,
    shuffle=True,
    callbacks=callbacks,
    verbose=2
)

# Save history for later plotting
np.save("training_history.npy", history.history)


In [None]:
# Cell 8 - evaluate
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve

# Predictions
preds = model.predict(X_test, batch_size=64)
y_pred = (preds.flatten() >= 0.5).astype(int)

# Metrics
print(classification_report(y_test, y_pred, digits=4, zero_division=0))
print("ROC-AUC:", roc_auc_score(y_test, preds))

# Confusion matrix
cm = confusion_matrix(y_test, y_pred)
print("Confusion matrix:\n", cm)

# Plot normalized + raw CM
import seaborn as sns
fig, axes = plt.subplots(1, 2, figsize=(10,4))

sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['real','fake'], yticklabels=['real','fake'], ax=axes[0])
axes[0].set_title("Confusion Matrix (Counts)")
axes[0].set_xlabel("Predicted")
axes[0].set_ylabel("True")

sns.heatmap(cm/cm.sum(axis=1, keepdims=True), annot=True, fmt=".2f", cmap='Blues',
            xticklabels=['real','fake'], yticklabels=['real','fake'], ax=axes[1])
axes[1].set_title("Confusion Matrix (Normalized)")
axes[1].set_xlabel("Predicted")
axes[1].set_ylabel("True")

plt.tight_layout()
plt.show()

# Optional: ROC Curve
fpr, tpr, _ = roc_curve(y_test, preds)
plt.plot(fpr, tpr, label=f"AUC={roc_auc_score(y_test, preds):.4f}")
plt.plot([0,1], [0,1], 'k--')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend()
plt.show()

# Save results
np.savez_compressed("eval_results.npz", y_true=y_test, y_pred=y_pred, preds=preds)


In [None]:
# Cell 9 - save and inference helper
model.save('lstm_deepfake_audio.h5')
print("Saved model: lstm_deepfake_audio.h5")

def predict_file(path, model, scaler_path='scaler.joblib',
                 chunk_duration=1.0, n_mfcc=N_MFCC, max_pad_len=MAX_PAD_LEN,
                 threshold=0.5):
    scaler = joblib.load(scaler_path)
    chunks = extract_mfcc_chunks(path, chunk_duration=chunk_duration,
                                 n_mfcc=n_mfcc, max_pad_len=max_pad_len)
    if chunks.shape[0] == 0:
        return None, None, "No audio chunks extracted"
    
    # scale features
    flat = chunks.reshape(-1, chunks.shape[-1])
    flat = scaler.transform(flat)
    chunks = flat.reshape(-1, chunks.shape[1], chunks.shape[2])
    
    preds = model.predict(chunks, verbose=0)
    avg_score = preds.mean()
    label = "FAKE" if avg_score >= threshold else "REAL"
    return avg_score, preds, label

# Example usage:
# avg_score, per_chunk, label = predict_file("/kaggle/working/test.wav", model)
# print(f"Prediction: {label} (avg_score={avg_score:.4f})")


In [None]:
def predict_folder(folder_path, model, scaler_path='scaler.joblib',
                   chunk_duration=1.0, n_mfcc=N_MFCC, max_pad_len=MAX_PAD_LEN,
                   threshold=0.5):
    results = []
    for root, _, files in os.walk(folder_path):
        for f in files:
            if f.lower().endswith(('.wav', '.flac', '.mp3', '.ogg')):
                file_path = os.path.join(root, f)
                avg_score, preds, label = predict_file(file_path, model,
                                                       scaler_path=scaler_path,
                                                       chunk_duration=chunk_duration,
                                                       n_mfcc=n_mfcc,
                                                       max_pad_len=max_pad_len,
                                                       threshold=threshold)
                if avg_score is not None:
                    results.append((file_path, avg_score, label))
                    print(f"{file_path} â†’ {label} (avg_score={avg_score:.4f})")
    return results

In [None]:
# ðŸ”¹ Predict on multiple folders
folders = [
    "/kaggle/input/deep-voice-deepfake-voice-recognition/DEMONSTRATION/DEMONSTRATION",
    "/kaggle/input/deep-voice-deepfake-voice-recognition/KAGGLE"
]

all_results = []
for folder in folders:
    print("\nRunning predictions on:", folder)
    res = predict_folder(folder, model)
    all_results.extend(res)

# Save combined results
import pandas as pd
df_results = pd.DataFrame(all_results, columns=["file", "avg_score", "label"])
df_results.to_csv("all_predictions.csv", index=False)
print("\n Saved predictions to all_predictions.csv")


# ðŸ”¹ Handle the CSV dataset (if it has file paths)
import pandas as pd

csv_path = "/kaggle/input/deep-voice-deepfake-voice-recognition/KAGGLE/DATASET-balanced.csv"
df = pd.read_csv(csv_path)
print("CSV loaded with shape:", df.shape)

# If the CSV contains a column with file paths (e.g., "path" or "file")
if "path" in df.columns:
    print("\nRunning predictions from CSV paths...")
    csv_results = []
    for p in df["path"]:
        avg_score, preds, label = predict_file(p, model, scaler_path="scaler.joblib")
        if avg_score is not None:
            csv_results.append((p, avg_score, label))
    pd.DataFrame(csv_results, columns=["file", "avg_score", "label"]).to_csv("csv_predictions.csv", index=False)
    print(" Saved predictions to csv_predictions.csv")
else:
    print(" CSV doesnâ€™t contain file paths column. Please check its columns:", df.columns)
