In [None]:
import os
import numpy as np
import librosa
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks
from sklearn.model_selection import train_test_split

Apply augmentation only on Positive Samples

In [None]:
# Function for preprocessing data with audio augmentation
def preprocess_data_with_augmentation(file_path, max_time_steps=109, sample_rate=16000, duration=3, n_mels=80):
    audio, _ = librosa.load(file_path, sr=sample_rate, duration=duration)

    # Apply data augmentation
    pitch_shifted_audio = librosa.effects.pitch_shift(audio,sr=sample_rate, n_steps=np.random.uniform(-2, 2))
    time_stretched_audio = librosa.effects.time_stretch(audio, rate=np.random.uniform(0.8, 1.2))
    amplitude_scaled_audio = audio * np.random.uniform(0.5, 1.5)

    augmented_audios = [audio, pitch_shifted_audio, time_stretched_audio, amplitude_scaled_audio]

    mel_spectrograms = []

    for augmented_audio in augmented_audios:
        # Extract Mel spectrogram using librosa
        mel_spectrogram = librosa.feature.melspectrogram(y=augmented_audio, sr=sample_rate, n_mels=n_mels)
        mel_spectrogram = librosa.power_to_db(mel_spectrogram, ref=np.max)

        # Ensure all spectrograms have the same width (time steps)
        if mel_spectrogram.shape[1] < max_time_steps:
            mel_spectrogram = np.pad(mel_spectrogram, ((0, 0), (0, max_time_steps - mel_spectrogram.shape[1])), mode='constant')
        else:
            mel_spectrogram = mel_spectrogram[:, :max_time_steps]

        mel_spectrograms.append(mel_spectrogram)

    return mel_spectrograms

In [None]:
# Set your file paths and constants
TRAINING_LABEL = '/data/common_source/datasets/asvpoof-2019-dataset/LA/LA/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.train.trn.txt'
TRAINING_DATA = '/data/common_source/datasets/asvpoof-2019-dataset/LA/LA/ASVspoof2019_LA_train/flac'
VALIDATION_DATA = '/data/common_source/datasets/asvpoof-2019-dataset/LA/LA/ASVspoof2019_LA_dev/flac'
VALIDATION_LABEL = '/data/common_source/datasets/asvpoof-2019-dataset/LA/LA/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.dev.trl.txt'
SAMPLE_RATE = 16000  # Adjust if your sample rate is different
DURATION = 3  # Adjust the duration of your audio samples
N_MELS = 80  # Adjust the number of mel filters
max_time_steps = 109 

In [None]:
# Load labels for training Data
if os.path.exists('labels/X_train_MEL.npy') and os.path.exists('labels/y_train_MEL.npy'):
    X = np.load('labels/X_train_MEL.npy')
    y = np.load('labels/y_train_MEL.npy')
else:
    train_labels = {}

    with open(TRAINING_LABEL, 'r') as label_file:
        lines = label_file.readlines()

    for line in lines:
        parts = line.strip().split()
        file_name = parts[1]
        label = 1 if parts[-1] == "bonafide" else 0
        train_labels[file_name] = label

    X = []
    y = []

 # Define the maximum time steps for your model

    for file_name, label in train_labels.items():
        file_path = os.path.join(TRAINING_DATA, file_name + ".flac")

        # Use the preprocess_data function
        mel_spectrogram = preprocess_data_with_augmentation(file_path, max_time_steps=max_time_steps)

        X.append(mel_spectrogram)
        y.append(label)
        y.append(label)
        y.append(label)
        y.append(label)

    X = np.array(X)
    y = np.array(y)

    # Save the preprocessed data
    np.save('labels/X_train_MEL.npy', X)
    np.save('labels/y_train_MEL.npy', y)


In [None]:
# Print out the shapes for debugging
print("Shape of X before reshape:", X.shape)
print("Shape of y before reshape:", y.shape)

# Reshape input data to match the required input shape for ResNet
X_new = X.reshape((X.shape[0]*4, N_MELS, max_time_steps, 1))

print(X_new.shape)

In [None]:
# Define input shape and number of classes
input_shape = X[0].shape
num_classes = 2  # Assuming you have two classes (0 and 1)

In [None]:
# Load and preprocess evaluation data
if os.path.exists('labels/val_X_MEL.npy') and os.path.exists('labels/val_y_MEL.npy'):
    eval_X = np.load('labels/val_X_MEL.npy')
    eval_y = np.load('labels/val_y_MEL.npy')
else:
    eval_X = []
    eval_y = []

    with open(VALIDATION_LABEL, 'r') as eval_label_file:
        eval_lines = eval_label_file.readlines()

    eval_labels = {}

    for line in eval_lines:
        parts = line.strip().split()
        file_name = parts[1]
        label = 1 if parts[-1] == "bonafide" else 0
        eval_labels[file_name] = label

    for file_name, label in eval_labels.items():
        file_path = os.path.join(VALIDATION_DATA, file_name + ".flac")

        # Use the preprocess_data function
        mel_spectrogram = preprocess_data_with_augmentation(file_path, max_time_steps=max_time_steps)

        eval_X.append(mel_spectrogram)
        eval_y.append(label)
        eval_y.append(label)
        eval_y.append(label)
        eval_y.append(label)

    eval_X = np.array(eval_X)
    eval_y = np.array(eval_y)

    # Save the preprocessed data
    np.save('labels/val_X_MEL.npy', eval_X)
    np.save('labels/val_y_MEL.npy', eval_y)


In [None]:
# Print out the shapes for debugging
print("Shape of eval_X before reshape:", eval_X.shape)
print("Shape of eval_y before reshape:", eval_y.shape)

eval_X_reshaped = eval_X.reshape((eval_X.shape[0] * 4, N_MELS, max_time_steps, 1))

print(eval_X_reshaped.shape)

In [None]:
from tensorflow.keras import layers, models

def resnet_block(x, filters, kernel_size=3, stride=1, conv_shortcut=False):
    shortcut = x
    if conv_shortcut:
        shortcut = layers.Conv2D(filters, (1, 1), strides=(stride, stride))(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)

    x = layers.Conv2D(filters, (kernel_size, kernel_size), strides=(stride, stride), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    x = layers.Conv2D(filters, (kernel_size, kernel_size), padding='same')(x)
    x = layers.BatchNormalization()(x)

    x = layers.add([x, shortcut])
    x = layers.Activation('relu')(x)
    return x

# Build the ResNet model
def build_resnet(input_shape, num_classes):
    input_tensor = layers.Input(shape=input_shape)

    x = layers.Conv2D(64, (7, 7), strides=(2, 2), padding='same')(input_tensor)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    x = layers.MaxPooling2D(pool_size=(3, 3), strides=(2, 2), padding='same')(x)

    # ResNet blocks
    for size in [64, 128, 256, 512]:
        x = resnet_block(x, size, conv_shortcut=True)
        x = resnet_block(x, size)

    x = layers.GlobalAveragePooling2D()(x)
    
    x = layers.Dense(num_classes, activation='softmax')(x)

    model = models.Model(inputs=input_tensor, outputs=x, name='resnet_model')
    return model

In [None]:
num_classes=2

# Assuming input_shape is defined as the shape of one sample in your data
input_shape = (80, 109, 1)

# Build the model
model = build_resnet(input_shape=input_shape, num_classes=num_classes)

In [None]:
# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Early Stopping
early_stopping = callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)


In [None]:
# Display the model summary
model.summary()

In [None]:
# Train the model
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_new, y, test_size=0.2, random_state=42)

# Calculate class weights
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(y_train), y=y_train)
class_weights_dict = {i: w for i, w in zip(np.unique(y_train), class_weights)}

print(class_weights_dict)


In [None]:
# Train the model with audio data augmentation
NUM_EPOCHS = 50
BATCH_SIZE = 64
history = model.fit(X_train, y_train, epochs=NUM_EPOCHS, batch_size=BATCH_SIZE,
                            validation_data=(eval_X_reshaped, eval_y), class_weight = class_weights_dict, callbacks=[early_stopping])

In [None]:
# Evaluate the model on the test set
loss, accuracy = model.evaluate(X_test, y_test)
print(f'Test Loss: {loss:.4f}, Test Accuracy: {accuracy * 100:.2f}%')

# Evaluate the model on the separate evaluation dataset
eval_loss, eval_accuracy = model.evaluate(eval_X_reshaped, eval_y)
print(f'Evaluation Loss: {eval_loss:.4f}, Evaluation Accuracy: {eval_accuracy * 100:.2f}%')

In [None]:
import matplotlib.pyplot as plt

# Plot training & validation loss values
plt.figure(figsize=(10, 5))
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper right')
plt.show()

# Plot training & validation accuracy values
plt.figure(figsize=(10, 5))
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='lower right')
plt.show()

In [None]:
# Save the trained model
model.save("models/weighted_loss_augmentation.h5")