In [None]:
!pip install scikeras

In [None]:
import os
import pandas as pd
import json
import matplotlib.pyplot as plt
%matplotlib inline
import librosa
import numpy as np
import tensorflow as tf
import tensorflow_io as tfio
import seaborn as sns
import joblib
from IPython.display import Audio
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization, Input
from tensorflow.keras.models import Model
from scikeras.wrappers import KerasClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, roc_curve,  auc

In [None]:
# Audio params
SAMPLE_RATE = 16000
DURATION = 20.0 # duration in second
AUDIO_LEN = int(SAMPLE_RATE * DURATION)

# Spectrogram params
N_MELS = 128 # freq axis
N_FFT = 2048
SPEC_WIDTH = 256 # time axis
HOP_LEN = 512 # non-overlap region
FMAX = SAMPLE_RATE//2 # max frequency

# Ada-CNN params
NUM_CLASSES = 1 # bonafide or spoof
BATCH_SIZE = 32 # adjust based on your memory
EPOCHS = 50 # adjust based on your time
LEARNING_RATE = 0.0001 # adjust based on your model performance
N_ESTIMATORS = 10
RANDOM_STATE = 42

In [None]:
# Fungsi untuk membentuk path lengkap sebuah file
def get_file_path(directory, filename):
    return os.path.join(directory, f'{filename}.flac')

# Fungsi untuk membaca dataset
def read_dataset(protocol_path, directory):
    """Reads the dataset from a protocol file and returns a DataFrame."""
    df = pd.read_csv(protocol_path, sep=' ', header=None, names=['speaker_id', 'filename', 'system_id', 'null', 'class_name'])
    df['filepath'] = df['filename'].apply(lambda x: get_file_path(directory, x))
    df.drop('null', axis=1, inplace=True)
    df.dropna(inplace=True)
    return df

# Fungsi untuk mengubah class_name menjadi integer
def label_to_int(class_name):
    if class_name == 'bonafide':
        return 0
    else:
        return 1

# Menambahkan kolom target dan subset ke DataFrame
def add_columns(df, subset):
    df['target'] = df['class_name'].apply(label_to_int)
    df['subset'] = subset
    return df

# Fungsi untuk melakukan resampling data
def sample_per_group(df, column, frac):
    return df.groupby(column).apply(lambda x: x.sample(frac=frac)).reset_index(drop=True)

In [None]:
# Create DataFrames for each dataset
train_df = read_dataset(os.path.join(protocol_dir, 'ASVspoof2019.LA.cm.train.trn.txt'), train_dir)
val_df = read_dataset(os.path.join(protocol_dir, 'ASVspoof2019.LA.cm.dev.trl.txt'), val_dir)
test_df = read_dataset(os.path.join(protocol_dir, 'ASVspoof2019.LA.cm.eval.trl.txt'), test_dir)

# Menambahkan kolom ke setiap DataFrame
train_df = add_columns(train_df, 'train')
val_df = add_columns(val_df, 'val')
test_df = add_columns(test_df, 'test')

# Mengambil 10% sampel dari setiap speaker pada setiap DataFrame
train_df = sample_per_group(train_df, 'speaker_id', 0.1)
val_df = sample_per_group(val_df, 'speaker_id', 0.1)
test_df = sample_per_group(test_df, 'speaker_id', 0.1)

In [None]:
#Menggabungkan ketiga dataframe menjadi satu dataframe
data_df = pd.concat([train_df, val_df, test_df], ignore_index=True)

In [None]:
#Menampilkan dataframe
data_df.head(len(data_df))

In [None]:
# Membuat DataFrame baru untuk plot dengan hanya 'bonafide' dan 'spoof'
plot_df = data_df[data_df['class_name'].isin(['bonafide', 'spoof'])]

# Mengelompokkan data berdasarkan 'class_name' dan 'subset'
plot_df = plot_df.groupby(['subset', 'class_name']).size().reset_index(name='counts')

plt.figure(figsize=(10, 6))
barplot = sns.barplot(x='class_name', y='counts', hue='subset', data=plot_df)

# Menambahkan nilai pada setiap bar
for p in barplot.patches:
    height = p.get_height()
    barplot.text(p.get_x()+p.get_width()/2.,
            height + 3,
            '{:1.0f}'.format(height),
            ha="center") 

plt.title('Distribusi Data Bonafide dan Spoof Setelah Melakukan Resampling dan Pengambilan Sampel')
plt.show()

print(f"Total keseluruhan data adalah sebanyak {len(data_df)} data")

In [None]:
speaker_id = data_df['speaker_id'].iloc[0]
sample_df = data_df

selected_bonafide = sample_df[(sample_df['class_name'] == 'bonafide') & (sample_df['speaker_id'] == speaker_id)].iloc[0]
selected_spoof = sample_df[(sample_df['class_name'] == 'spoof') & (sample_df['speaker_id'] == speaker_id)].iloc[0]

for selected_audio in [selected_bonafide, selected_spoof]:
    # Muat file audio
    audio_tensor = tfio.audio.AudioIOTensor(selected_audio['filepath'])
    audio = audio_tensor.to_tensor()

    # Dapatkan sample rate
    sample_rate = audio_tensor.rate.numpy()

    # Tampilkan speaker_id, class_name dan nama file
    print(f"Speaker ID: {selected_audio['speaker_id']}")
    print(f"Class Name: {selected_audio['class_name']}")
    print(f"File: {selected_audio['filepath']}")

    # Tampilkan waveplot
    plt.figure(figsize=(10, 4))
    plt.plot(audio.numpy())
    plt.title(f"Waveplot for {selected_audio['class_name']}")
    plt.show()

    # Putar audio
    display(Audio(audio.numpy().squeeze(), rate=SAMPLE_RATE))

In [None]:
# Fungsi untuk membaca file audio dan mengubahnya menjadi spektrogram
def audio_to_spectrogram(filepath):
    # Membaca file audio
    audio = tfio.audio.AudioIOTensor(filepath)
    
    # Mengambil sampel audio dan mengubahnya menjadi float32
    audio_slice = audio.to_tensor()
    audio_tensor = tf.squeeze(audio_slice, axis=[-1])
    audio_tensor = tf.cast(audio_tensor, tf.float32)
    
    # Normalisasi audio
    tensor = tf.math.divide(
        tf.subtract(
            audio_tensor, 
            tf.reduce_min(audio_tensor)
        ), 
        tf.subtract(
            tf.reduce_max(audio_tensor), 
            tf.reduce_min(audio_tensor)
        )
    )
    
    # Mengubah audio menjadi spektrogram
    spectrogram = tfio.audio.spectrogram(
        tensor, 
        nfft=N_FFT, 
        window=N_FFT, 
        stride=HOP_LEN
    )
    
    # Mengubah spektrogram menjadi skala Mel
    mel_spectrogram = tfio.audio.melscale(
        spectrogram, 
        rate=SAMPLE_RATE, 
        mels=N_MELS, 
        fmin=0, 
        fmax=FMAX
    )
    
    # Mengubah skala Mel menjadi desibel
    mel_spectrogram = tfio.audio.dbscale(mel_spectrogram, top_db=80)
    
    # Menambahkan dimensi saluran
    mel_spectrogram = tf.expand_dims(mel_spectrogram, axis=-1)
    
    # Mengubah ukuran spektrogram menjadi ukuran yang ditentukan
    image = tf.image.resize(mel_spectrogram, [SPEC_WIDTH, N_MELS])
    
    return image

# Menambahkan kolom spektrogram ke DataFrame
data_df['spectrogram'] = data_df['filepath'].apply(audio_to_spectrogram)

In [None]:
# Pilih spektrogram dari DataFrame
first_spectrogram = sample_df.loc[0, 'spectrogram']

# Gunakan fungsi imshow untuk menampilkan spektrogram
plt.figure(figsize=(10, 4))
plt.imshow(tf.squeeze(first_spectrogram), aspect='auto', origin='lower')
plt.title('Spectrogram')
plt.ylabel('Frequency bin')
plt.xlabel('Time frame')
plt.colorbar(format='%+2.0f dB')
plt.tight_layout()
plt.show()
print("Ukuran spektrogram adalah :", first_spectrogram.shape)

In [None]:
# Membagi data berdasarkan kolom 'subset'
train_data = data_df[data_df['subset'] == 'train']
val_data = data_df[data_df['subset'] == 'val']
test_data = data_df[data_df['subset'] == 'test']

# Mengubah list spektrogram menjadi array NumPy
X_train = np.stack(train_data['spectrogram'].to_list())
X_val = np.stack(val_data['spectrogram'].to_list())
X_test = np.stack(test_data['spectrogram'].to_list())

# Mengubah target menjadi array NumPy
y_train = train_data['target'].to_numpy()
y_val = val_data['target'].to_numpy()
y_test = test_data['target'].to_numpy()

In [None]:
print(f"X_train shape: {X_train.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"X_val shape: {X_val.shape}")
print(f"y_val shape: {y_val.shape}")
print(f"X_test shape: {X_test.shape}")
print(f"y_test shape: {y_test.shape}")

In [None]:
# Fungsi untuk membuat model CNN
def create_model(input_shape):
    model = Sequential()
    # 1st conv layer
    model.add(Conv2D(32, (3, 3), activation='relu', input_shape=input_shape))
    model.add(MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
    model.add(BatchNormalization())

    # 2nd conv layer
    model.add(Conv2D(32, (3, 3), activation='relu'))
    model.add(MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
    model.add(BatchNormalization())

    # 3rd conv layer
    model.add(Conv2D(32, (2, 2), activation='relu'))
    model.add(MaxPooling2D((2, 2), strides=(2, 2), padding='same'))
    model.add(BatchNormalization())

    # 4th conv layer
    model.add(Conv2D(64, (2, 2), activation='relu'))
    model.add(MaxPooling2D((2, 2), strides=(2, 2), padding='same'))
    model.add(BatchNormalization())
    
    model.add(Flatten())
    model.add(Dense(64, activation='relu'))
    model.add(Dropout(0.3))

    # Additional dense layer
    model.add(Dense(32, activation='relu'))
    model.add(Dropout(0.3))

    model.add(Dense(1, activation='sigmoid'))
    
    # Membuat optimizer baru dengan learning rate yang diinginkan
    optimizer=tf.keras.optimizers.RMSprop(learning_rate = LEARNING_RATE)
    
    model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    return model

# Membuat model
keras_model = KerasClassifier(model=create_model((X_train.shape[1], X_train.shape[2], 1)), epochs=EPOCHS, batch_size=BATCH_SIZE)

# Menerapkan Adaboost pada model CNN
ada_model = AdaBoostClassifier(estimator=keras_model, n_estimators=N_ESTIMATORS, random_state=RANDOM_STATE)

# Menampilkan ringkasan model
ada_model.estimator.model.summary()

In [None]:
# Melatih model
ada_model.fit(X_train, y_train)

In [None]:
# Membuat dictionary untuk mapping
class_dict = {0: 'bonafide', 1: 'spoof'}

# Melakukan prediksi pada set evaluasi
y_test_pred = ada_model.predict(X_test)

# Menghitung laporan klasifikasi
report = classification_report(y_test, y_test_pred, target_names=class_dict.values())

# Mencetak laporan
print(report)

In [None]:
# Membuat DataFrame dari label sebenarnya dan label yang diprediksi
error_df = pd.DataFrame({'Actual': y_test, 'Predicted': y_test_pred})

# Menambahkan kolom 'Correct' yang menunjukkan apakah prediksi itu benar
error_df['Correct'] = error_df['Actual'] == error_df['Predicted']

# Menampilkan tabel analisis kesalahan
error_df.head(len(data_df))

In [None]:
# Menghitung jumlah prediksi yang benar dan salah untuk setiap kelas
error_analysis = error_df.groupby(['Actual', 'Correct']).size().unstack(fill_value=0)
# Membuat plot
plt.figure(figsize=(10, 7))
sns.heatmap(error_analysis, annot=True, fmt='d', cmap='Blues')
plt.title('Error Analysis')
plt.xlabel('Correct Prediction')
plt.ylabel('Actual Class')
plt.show()

In [None]:
# Menghitung confusion matrix
cm = confusion_matrix(y_test, y_test_pred)

# Membuat plot
plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, fmt='d', xticklabels=class_dict.values(), yticklabels=class_dict.values())
plt.title('Confusion Matrix')
plt.ylabel('Actual Class')
plt.xlabel('Predicted Class')

# Menampilkan plot
plt.show()

In [None]:
# Menghitung skor probabilitas
y_score = ada_model.predict_proba(X_test)[:, 1]

# Menghitung nilai-nilai ROC
fpr, tpr, thresholds = roc_curve(y_test, y_score)

# Menghitung AUC
roc_auc = auc(fpr, tpr)

# Membuat plot
plt.figure(figsize=(10, 7))
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC)')

# Menambahkan anotasi
optimal_idx = np.argmax(tpr - fpr)
optimal_threshold = thresholds[optimal_idx]
plt.annotate('Threshold: {:.3f}'.format(optimal_threshold), xy=(fpr[optimal_idx], tpr[optimal_idx]), xytext=(fpr[optimal_idx], tpr[optimal_idx]-0.2),
             arrowprops=dict(facecolor='red', shrink=0.05))

plt.legend(loc="lower right")
plt.show()

In [None]:
# Menentukan filepath untuk file pertama dan kedua
first_filepath = '/kaggle/input/audiotesting/bonafide/LA_E_1027220.flac'
second_filepath = '/kaggle/input/audiotesting/spoof/LA_E_1611480.flac'
third_filepath = '/kaggle/input/audiotesting/bonafide/LA_E_3003752.flac'
forth_filepath = '/kaggle/input/audiotesting/spoof/LA_E_2834763.flac'
fifth_filepath = '/kaggle/input/audiotesting/bonafide/LA_E_3379393.flac'
sixth_filepath = '/kaggle/input/audiotesting/spoof/LA_E_5932896.flac'
seventh_filepath = '/kaggle/input/audiotesting/bonafide/LA_E_3757378.flac'
eighth_filepath = '/kaggle/input/audiotesting/spoof/LA_E_6163791.flac'
ninth_filepath = '/kaggle/input/audiotesting/bonafide/LA_E_4581379.flac'
tenth_filepath = '/kaggle/input/audiotesting/spoof/LA_E_6828287.flac'

# Membuat spektrogram untuk kedua file
first_spectrogram = audio_to_spectrogram(first_filepath)
second_spectrogram = audio_to_spectrogram(second_filepath)
third_spectrogram = audio_to_spectrogram(third_filepath)
forth_spectrogram = audio_to_spectrogram(forth_filepath)
fifth_spectrogram = audio_to_spectrogram(fifth_filepath)
sixth_spectrogram = audio_to_spectrogram(sixth_filepath)
seventh_spectrogram = audio_to_spectrogram(seventh_filepath)
eighth_spectrogram = audio_to_spectrogram(eighth_filepath)
ninth_spectrogram = audio_to_spectrogram(ninth_filepath)
tenth_spectrogram = audio_to_spectrogram(tenth_filepath)

X_new = np.stack([first_spectrogram, second_spectrogram, third_spectrogram, forth_spectrogram, fifth_spectrogram, 
                  sixth_spectrogram, seventh_spectrogram, eighth_spectrogram, ninth_spectrogram, tenth_spectrogram])

# Melakukan prediksi
y_pred = ada_model.predict(X_new)

# Menerapkan mapping ke prediksi
y_pred_classes = [class_dict[i] for i in y_pred]

# Mencetak hasil
for i, (pred, pred_class) in enumerate(zip(y_pred, y_pred_classes)):
    print(f"File {i+1} memiliki nilai prediksi {pred} dan diklasifikasikan sebagai {pred_class}")
