In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

**CONFIG--**



In [None]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, losses, callbacks
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, GlobalAveragePooling2D, Dense, Add
import librosa
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, roc_auc_score
import gc  # Import garbage collector

# Set random seeds for reproducibility
tf.random.set_seed(42)
np.random.seed(42)

##########################################
# Inspect Dataset Directory Structure
##########################################
# Run this cell to check the available directories:
# !find /kaggle/input/asvpoof-2019-dataset/ -type d

##########################################
# Corrected Paths based on your image:
# Note the removal of 'ASVspoof2019_root' and the addition of the nested 'LA' folder
protocol_dir = "/kaggle/input/asvpoof-2019-dataset/LA/LA/ASVspoof2019_LA_cm_protocols/"
audio_base_dir = "/kaggle/input/asvpoof-2019-dataset/LA/LA/"  # Base directory containing train, dev, eval audio folders

##########################################
# Feature Extraction using librosa
##########################################
class FeatureExtractor:
    def __init__(self, sample_rate=16000, n_fft=1024, hop_length=256, duration=4.0):
        self.sample_rate = sample_rate
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.duration = duration  # seconds
        self.target_length = int(np.ceil(self.duration * self.sample_rate / self.hop_length))
        # Calculate frequency bins, which is n_fft // 2 + 1
        self.freq_bins = self.n_fft // 2 + 1

    def extract_spectrogram(self, audio_path):
        try:
            waveform, sr = librosa.load(audio_path, sr=self.sample_rate)
            # Pad or truncate waveform to match duration * sample_rate exactly
            target_samples = int(self.duration * self.sample_rate)
            if len(waveform) > target_samples:
                waveform = waveform[:target_samples]
            elif len(waveform) < target_samples:
                waveform = np.pad(waveform, (0, target_samples - len(waveform)), mode='constant')

            stft = librosa.stft(waveform, n_fft=self.n_fft, hop_length=self.hop_length)
            spectrogram = np.abs(stft) ** 2
            log_spectrogram = np.log1p(spectrogram)
            log_spectrogram = log_spectrogram.T  # shape: (time, freq)

            # Ensure the time dimension matches target_length due to potential rounding differences
            current_time_steps = log_spectrogram.shape[0]
            if current_time_steps > self.target_length:
                log_spectrogram = log_spectrogram[:self.target_length, :]
            elif current_time_steps < self.target_length:
                pad_width = self.target_length - current_time_steps
                log_spectrogram = np.pad(log_spectrogram, ((0, pad_width), (0, 0)), mode='constant')

            # Ensure frequency dimension is correct
            if log_spectrogram.shape[1] != self.freq_bins:
                print(f"Warning: Unexpected frequency bins for {audio_path}. Expected {self.freq_bins}, got {log_spectrogram.shape[1]}. Padding/Truncating.")
                if log_spectrogram.shape[1] > self.freq_bins:
                    log_spectrogram = log_spectrogram[:, :self.freq_bins]
                else:
                    pad_width = self.freq_bins - log_spectrogram.shape[1]
                    log_spectrogram = np.pad(log_spectrogram, ((0, 0), (0, pad_width)), mode='constant')
        except Exception as e:
            print(f"Error extracting features from {audio_path}: {e}")
            log_spectrogram = np.zeros((self.target_length, self.freq_bins))
        return log_spectrogram  # shape: (target_length, freq_bins)

##########################################
# Custom Max Feature Map (MFM) Layer
##########################################
class MaxFeatureMap(layers.Layer):
    def __init__(self, **kwargs):
        super(MaxFeatureMap, self).__init__(**kwargs)

    def call(self, inputs):
        # Allow dynamic channel size determination
        input_shape = tf.shape(inputs)
        channels = input_shape[-1]

        # Assuming channels are even as per model design.
        split = tf.split(inputs, num_or_size_splits=2, axis=-1)
        return tf.maximum(split[0], split[1])

    def compute_output_shape(self, input_shape):
        shape = list(input_shape)
        if shape[-1] is not None:
            shape[-1] = shape[-1] // 2
        else:
            shape[-1] = None
        return tuple(shape)

##########################################
# Residual Block with MFM activation
##########################################
##########################################
# Residual Block with MFM activation (Corrected)
##########################################
def res_block(input_tensor, filters, stride=1):
    # --- Main Path ---
    # First Conv block changes dimensions if stride != 1
    x = Conv2D(filters * 2, kernel_size=3, strides=stride, padding='same', use_bias=False)(input_tensor)
    x = BatchNormalization()(x)
    x = MaxFeatureMap()(x)  # Output channels = filters

    # Second Conv block always has stride 1
    x = Conv2D(filters * 2, kernel_size=3, strides=1, padding='same', use_bias=False)(x)
    x = BatchNormalization()(x) # Shape just before potential Add: (batch, height, width, filters * 2)

    # --- Shortcut Path ---
    shortcut = input_tensor
    input_channels_static = input_tensor.shape[-1] # Get static channel dimension if available

    # Determine if projection is needed:
    # 1. If stride is not 1, dimensions change, so projection is needed.
    # 2. If stride is 1, but input channels don't match the channels of 'x' before Add (which is filters * 2), projection is needed.
    needs_projection = False
    if stride != 1:
        needs_projection = True
    # Check channel mismatch only if static shape is known
    if input_channels_static is not None and input_channels_static != (filters * 2):
        needs_projection = True
        # If stride was 1, we still need projection because channels don't match x before Add

    if needs_projection:
        # Project shortcut to match the shape of 'x' before the Add layer
        # Use 1x1 Conv with filters*2 channels and the same stride
        shortcut = Conv2D(filters * 2, kernel_size=1, strides=stride, padding='same', use_bias=False)(input_tensor)
        shortcut = BatchNormalization()(shortcut)
        # Note: NO MaxFeatureMap here on the shortcut before Add. It needs to have filters*2 channels.

    # --- Add ---
    # Now both x and shortcut should have shape (..., filters * 2)
    x = Add()([x, shortcut])

    # --- Final Activation ---
    # Apply MFM after adding. Output channels = filters
    x = MaxFeatureMap()(x)
    return x

##########################################
# Build the ResMax Model
##########################################
def build_resmax(input_shape, num_classes=2):
    inputs = Input(shape=input_shape)
    # Initial Conv Layer (Output channels must be even for MFM; 64 is fine)
    x = Conv2D(64, kernel_size=3, strides=1, padding='same', use_bias=False)(inputs)
    x = BatchNormalization()(x)
    x = MaxFeatureMap()(x)  # Output channels = 32

    # Residual Blocks
    x = res_block(x, filters=32, stride=1)  # Input=32, Output=32
    x = res_block(x, filters=32, stride=1)  # Input=32, Output=32

    x = res_block(x, filters=64, stride=2)  # Downsample: Input=32, Output=64
    x = res_block(x, filters=64, stride=1)  # Input=64, Output=64

    x = res_block(x, filters=128, stride=2)  # Downsample: Input=64, Output=128
    x = res_block(x, filters=128, stride=1)  # Input=128, Output=128

    x = res_block(x, filters=256, stride=2)  # Downsample: Input=128, Output=256
    x = res_block(x, filters=256, stride=1)  # Input=256, Output=256

    # Final Layers
    x = GlobalAveragePooling2D()(x)
    outputs = Dense(num_classes)(x)  # Output layer for 2 classes (bonafide/spoof)
    model = models.Model(inputs=inputs, outputs=outputs)
    return model

##########################################
# Data Generator using tf.data API
##########################################
def data_generator(protocol_file, audio_folder_path, feature_extractor, batch_size=32, is_eval=False):
    try:
        data = pd.read_csv(protocol_file, sep='\s+', header=None, engine='python')
        if len(data.columns) == 5:
            data.columns = ['speaker_id', 'file_name', 'field1', 'system_id', 'label_text']
            data['label'] = data['label_text'].apply(lambda x: 0 if x == 'bonafide' else 1)
        elif len(data.columns) == 4 and is_eval:
            data.columns = ['speaker_id', 'file_name', 'field1', 'system_id']
            data['label'] = data['system_id'].apply(lambda x: 0 if x == 'bonafide' else 1)
        else:
            raise ValueError(f"Unexpected number of columns ({len(data.columns)}) in protocol file: {protocol_file}")

        file_names = data['file_name'].values
        labels = data['label'].values
    except Exception as e:
        print(f"Error reading or processing protocol file {protocol_file}: {e}")
        file_names = []
        labels = []

    def gen():
        if not os.path.isdir(audio_folder_path):
            print(f"Error: Audio directory not found: {audio_folder_path}")
            return
        for f, l in zip(file_names, labels):
            audio_path = os.path.join(audio_folder_path, f"{f}.flac")
            if not os.path.exists(audio_path):
                print(f"Warning: Audio file not found: {audio_path}. Skipping.")
                spec = np.zeros((feature_extractor.target_length, feature_extractor.freq_bins))
            else:
                spec = feature_extractor.extract_spectrogram(audio_path)
            spec = np.expand_dims(spec, axis=-1)
            yield spec.astype(np.float32), np.int32(l)

    output_signature = (
        tf.TensorSpec(shape=(feature_extractor.target_length, feature_extractor.freq_bins, 1), dtype=tf.float32),
        tf.TensorSpec(shape=(), dtype=tf.int32)
    )

    dataset = tf.data.Dataset.from_generator(gen, output_signature=output_signature)
    if not is_eval:
        dataset = dataset.shuffle(buffer_size=1000)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)
    return dataset

##########################################
# EER Calculation Function
##########################################
def compute_eer(y_true, y_scores):
    fpr, tpr, thresholds = roc_curve(y_true, y_scores, pos_label=1)
    fnr = 1 - tpr
    eer_index = np.nanargmin(np.abs(fnr - fpr))
    eer = np.mean((fpr[eer_index], fnr[eer_index]))
    return eer * 100.0

##########################################
# Training and Evaluation
##########################################
# Training and Evaluation
##########################################
def train_and_evaluate(protocol_dir, audio_base_dir, batch_size=16, num_epochs=5):
    # Ensure all lines inside this function are indented consistently (e.g., 4 spaces)

    # Define protocol file paths
    train_protocol = os.path.join(protocol_dir, 'ASVspoof2019.LA.cm.train.trn.txt')
    dev_protocol = os.path.join(protocol_dir, 'ASVspoof2019.LA.cm.dev.trl.txt')
    eval_protocol = os.path.join(protocol_dir, 'ASVspoof2019.LA.cm.eval.trl.txt')

    # Define specific audio directory paths
    # *** Double-check if 'flac' subfolder exists or if files are directly in train/dev/eval ***
    train_audio_dir = os.path.join(audio_base_dir, 'ASVspoof2019_LA_train', 'flac')
    dev_audio_dir = os.path.join(audio_base_dir, 'ASVspoof2019_LA_dev', 'flac')
    eval_audio_dir = os.path.join(audio_base_dir, 'ASVspoof2019_LA_eval', 'flac') # This was likely the problematic line area (around 238)

    # Instantiate feature extractor
    feature_extractor = FeatureExtractor() # Using default parameters

    # Create datasets
    print("Creating Training Dataset...")
    train_ds = data_generator(train_protocol, train_audio_dir, feature_extractor, batch_size)
    print("Creating Development Dataset...")
    dev_ds = data_generator(dev_protocol, dev_audio_dir, feature_extractor, batch_size)
    print("Creating Evaluation Dataset...")
    # Note: Set is_eval=True if eval protocol format differs or shuffling is not desired
    eval_ds = data_generator(eval_protocol, eval_audio_dir, feature_extractor, batch_size, is_eval=True)

    # Determine input shape for the model from the feature extractor
    input_shape = (feature_extractor.target_length, feature_extractor.freq_bins, 1)
    print(f"Model Input Shape: {input_shape}")

    # Build the model
    model = build_resmax(input_shape=input_shape, num_classes=2)
    model.summary()

    # Compile the model
    model.compile(optimizer=optimizers.Adam(learning_rate=1e-4),
                  loss=losses.SparseCategoricalCrossentropy(from_logits=True),
                  metrics=['accuracy'])

    # Callbacks
    lr_reducer = callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, verbose=1)
    early_stopper = callbacks.EarlyStopping(monitor='val_loss', patience=5, verbose=1, restore_best_weights=True)
    # Consider adding ModelCheckpoint to save the best model

    # Train the model
    print("Starting Training...")
    history = model.fit(train_ds,
                        validation_data=dev_ds,
                        epochs=num_epochs,
                        callbacks=[lr_reducer, early_stopper]) # Added early stopping

    # --- Evaluation on Development Set (for EER/AUC) ---
    print("Evaluating on Development Set for EER/AUC...")
    y_true_dev, y_scores_dev = [], []
    # Recreate dev_ds without shuffle for consistent evaluation order
    dev_ds_eval = data_generator(dev_protocol, dev_audio_dir, feature_extractor, batch_size, is_eval=True)

    for specs, labels in dev_ds_eval: # Use the non-shuffled version
        logits = model.predict(specs)
        # Use the score for the spoof class (index 1)
        probs = tf.nn.softmax(logits, axis=1).numpy()[:, 1]
        y_scores_dev.extend(probs)
        y_true_dev.extend(labels.numpy())
        # Clear memory
        del specs, labels, logits, probs
        gc.collect()


    if not y_true_dev:
         print("Warning: No data processed for Dev Set EER/AUC calculation.")
         auc_dev = float('nan')
         eer_dev = float('nan')
    else:
        y_true_dev = np.array(y_true_dev)
        y_scores_dev = np.array(y_scores_dev)
        auc_dev = roc_auc_score(y_true_dev, y_scores_dev)
        eer_dev = compute_eer(y_true_dev, y_scores_dev)
        print(f"Development Set AUC: {auc_dev:.4f}, EER: {eer_dev:.2f}%")


    # --- Evaluation on Evaluation Set (Loss/Accuracy) ---
    print("Evaluating on Evaluation Set...")
    eval_loss, eval_acc = model.evaluate(eval_ds) # Use the original eval_ds
    print(f"Evaluation Set Loss: {eval_loss:.4f}, Accuracy: {eval_acc*100:.2f}%")

    # --- Optional: Calculate EER/AUC on Evaluation Set ---
    print("Evaluating on Evaluation Set for EER/AUC...")
    y_true_eval, y_scores_eval = [], []
    for specs, labels in eval_ds: # eval_ds is already non-shuffled
        logits = model.predict(specs)
        probs = tf.nn.softmax(logits, axis=1).numpy()[:, 1]
        y_scores_eval.extend(probs)
        y_true_eval.extend(labels.numpy())
         # Clear memory
        del specs, labels, logits, probs
        gc.collect()


    if not y_true_eval:
         print("Warning: No data processed for Eval Set EER/AUC calculation.")
         auc_eval = float('nan')
         eer_eval = float('nan')
    else:
        y_true_eval = np.array(y_true_eval)
        y_scores_eval = np.array(y_scores_eval)
        auc_eval = roc_auc_score(y_true_eval, y_scores_eval)
        eer_eval = compute_eer(y_true_eval, y_scores_eval)
        print(f"Evaluation Set AUC: {auc_eval:.4f}, EER: {eer_eval:.2f}%")


    return model, history, (eval_loss, eval_acc, eer_dev, auc_dev, eer_eval, auc_eval)

# Make sure the rest of your code (like function definitions before this one,
# and the main execution block after this one) also has correct indentation.
 
    

##########################################
# Plotting Training History
##########################################
def plot_training_history(history):
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Dev Loss')
    plt.title('Model Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)

    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Dev Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.savefig('resmax_training_history.png')
    print("Training history plot saved as resmax_training_history.png")
    plt.show()

##########################################
# Inference on a Single Audio File
##########################################
def predict_audio(model, audio_path, feature_extractor):
    print(f"Predicting for: {audio_path}")
    if not os.path.exists(audio_path):
        print("Error: Audio file not found.")
        return None

    spec = feature_extractor.extract_spectrogram(audio_path)
    spec = np.expand_dims(spec, axis=-1)
    spec = np.expand_dims(spec, axis=0)

    logits = model.predict(spec)
    probs = tf.nn.softmax(logits, axis=1).numpy()[0]

    prediction_idx = np.argmax(probs)
    prediction = "Bonafide" if prediction_idx == 0 else "Spoof"

    return {"prediction": prediction,
            "bonafide_probability": probs[0],
            "spoof_probability": probs[1]}

##########################################
# Main Execution
##########################################
protocol_dir = "/kaggle/input/asvpoof-2019-dataset/LA/LA/ASVspoof2019_LA_cm_protocols/"
audio_base_dir = "/kaggle/input/asvpoof-2019-dataset/LA/LA/"

model, hist, eval_results = train_and_evaluate(protocol_dir, audio_base_dir, batch_size=16, num_epochs=5)
plot_training_history(hist)

eval_loss, eval_acc, dev_eer, dev_auc, eval_eer, eval_auc = eval_results
print("\n--- Final Summary ---")
print(f"Development Set: EER = {dev_eer:.2f}%, AUC = {dev_auc:.4f}")
print(f"Evaluation Set: Loss = {eval_loss:.4f}, Accuracy = {eval_acc*100:.2f}%")
print(f"Evaluation Set: EER = {eval_eer:.2f}%, AUC = {eval_auc:.4f}")


Creating Training Dataset...
Creating Development Dataset...
Creating Evaluation Dataset...
Model Input Shape: (250, 513, 1)


Starting Training...
Epoch 1/5
     72/Unknown [1m5461s[0m 75s/step - accuracy: 0.9057 - loss: 0.2319

In [None]:


# --- Example Inference ---
# Create the feature extractor instance for prediction function
inference_feature_extractor = FeatureExtractor()
# IMPORTANT: Replace with an *actual* file name from the evaluation set
# List files in eval directory to find one: !ls /kaggle/input/asvpoof-2019-dataset/LA/LA/ASVspoof2019_LA_eval/flac/ | head
example_file_path = "/kaggle/input/asvpoof-2019-dataset/LA/LA/ASVspoof2019_LA_eval/flac/LA_E_1000181.flac" # Replace with a real file!

# Check if the example file exists before predicting
if os.path.exists(example_file_path):
    result = predict_audio(model, example_file_path, inference_feature_extractor)
    if result:
        print("\n--- Example Inference ---")
        print(f"File: {example_file_path}")
        print(f"Prediction: {result['prediction']}")
        print(f"Bonafide Probability: {result['bonafide_probability']:.4f}")
        print(f"Spoof Probability: {result['spoof_probability']:.4f}")
else:
    print(f"\nWarning: Example inference file not found at {example_file_path}. Skipping inference example.")

print("\nScript finished.")