In [None]:
# Load various imports 
from datetime import datetime
from os import listdir
from os.path import isfile, join

import keras
import librosa
import librosa.display
import librosa.effects


import numpy as np
import pandas as pd

from IPython.display import Audio

import tensorflow as tf
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Dense, Dropout, Conv2D, MaxPooling2D, Flatten, GlobalAveragePooling2D, Input, Activation
#from tensorflow.keras.layers.normalization import BatchNormalization
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import optimizers

from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

import matplotlib.pyplot as plt
import seaborn as sns
import soundfile as sf

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model

def make_attention(x0, l_out_features):
    y0 = GlobalAveragePooling2D()(x0)
    y0 = Dense(units = l_out_features, input_shape=(y0.shape[1],))(y0)
    y0 = Activation('sigmoid')(y0)
    y0 = tf.expand_dims(tf.expand_dims(y0, axis=1), axis=1)
    x = (x0 * y0) + y0
    x = MaxPooling2D(pool_size=(2,2))(x)
    return x 

def resblock1(x0):
    model = Conv2D(filters = 20, kernel_size = 3, padding = 'same', strides=(1,1))(x0) 
    model = BatchNormalization()(model) 
    model = LeakyReLU(alpha=0.3)(model)
    model = Conv2D(filters = 20, kernel_size = 3, padding = 'same', strides=(1,1))(model) 
    #Downsample kernel 1x1
    x1 = Conv2D(filters = 20, kernel_size = 1, padding = 'valid', strides=(1,1))(x0) 
    model = model + x1
    model = MaxPooling2D(pool_size=(2,2))(model)
    return model

def resblock2(x0): 
    model = BatchNormalization()(x0) 
    model = LeakyReLU(alpha=0.3)(model)
    model = Conv2D(filters = 64, kernel_size = 3, padding = 'same', strides=(1,1))(model) 
    model = BatchNormalization()(model) 
    model = LeakyReLU(alpha=0.3)(model)
    model = Conv2D(filters = 64, kernel_size = 3, padding = 'same', strides=(1,1))(model) 
    #Downsample kernel 1x1
    x1 = Conv2D(filters = 64, kernel_size = 1, padding = 'valid', strides=(1,1))(x0)
    model = model + x1
    model = MaxPooling2D(pool_size=(2,2))(model)
    return model

def resblock3(x0): 
    model = BatchNormalization()(x0) 
    model = LeakyReLU(alpha=0.3)(model)
    model = Conv2D(filters = 64, kernel_size = 3, padding = 'same', strides=(1,1))(model) 
    model = BatchNormalization()(model) 
    model = LeakyReLU(alpha=0.3)(model)
    model = Conv2D(filters = 64, kernel_size = 3, padding = 'same', strides=(1,1))(model) 
    model = model + x0
    model = MaxPooling2D(pool_size=(2,2))(model)
    return model

def gru_layer(x0):
    gru1 = Bidirectional(GRU(64, return_sequences=True))
    x = gru1(x0)
    gru2 = Bidirectional(GRU(64, return_sequences=False))
    x = gru2(x)
    return x
    
def fc(x0):
    x0 = Dense(units = 128, input_shape=(x0.shape[1],))(x0)
    x0 = Dense(units = 1, input_shape=(x0.shape[1],), use_bias=True)(x0)
    x0 = Activation("sigmoid")(x0)
    return x0

In [None]:
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, BatchNormalization, Activation, Input, Permute, Lambda
import tensorflow as tf

# ORIGINAL
num_rows = 80
num_columns = 404
num_channels = 1

def init_model():
    inputs = Input(shape = (num_rows, num_columns, num_channels))
    bn = BatchNormalization()(inputs) 
    selu_layer = Activation('selu')(bn)
    model = resblock1(selu_layer)
    model = make_attention(model,20)
    model = resblock2(model)
    model = make_attention(model,64)
    model = resblock3(model)
    model = make_attention(model,64)
    bn = BatchNormalization()(model) 
    selu_layer = Activation('selu')(bn)
    x0 = tf.squeeze(selu_layer,axis = 1)
    model = gru_layer(x0)
    model = fc(model)
    # Create model
    model = tf.keras.Model(inputs=inputs, outputs=model)
    return model

In [None]:
import os, fnmatch, glob
global features_list
global labels_list


features_list=[]
labels_list=[]

def find(pattern, path, truth):
    for root, dirs, files in os.walk(path):
        for name in files:
            if fnmatch.fnmatch(name, pattern):
                features_list.append(os.path.join(root, name))
                labels_list.append(truth)

find('*.wav', '/kaggle/input/wavefake/generated_audio/','spoof')
find('*.wav', '/kaggle/input/ljspeech/','bonafide')
find('*.wav', '/kaggle/input/jsut-dataset/','bonafide')

In [None]:
def remove_silence(audio,sr):

    buffer = 0.7 * sr
    samples_total = len(audio)
    samples_wrote = 0
    counter = 1
    i = 0
    arr=[]
    
    while samples_wrote < samples_total:
        #check if the buffer is not exceeding total samples 
        if buffer > (samples_total - samples_wrote):
            buffer = samples_total - samples_wrote
        block = audio[int(samples_wrote) : int(samples_wrote + buffer)]
        # Write 2 second segment
        a,_ = librosa.effects.trim(block, top_db=20)
        arr[i:] = a[:]
        i=i+len(a)
        counter += 1
        samples_wrote += buffer
        
    return np.array(arr)


def extract_features(file_name):
    max_pad_len = 404
    sample_rate = 16000

    try:
        
        audio,sample_rate = librosa.load(file_name, duration=4, sr = sample_rate)        
        audio = remove_silence(audio,sample_rate)
            
        mfcc_audio = librosa.feature.mfcc(y=audio, sr = sample_rate, n_mfcc=80)
        pad_width = max_pad_len - mfcc_audio.shape[1]
        mfcc_audio = np.pad(mfcc_audio, pad_width=((0, 0), (0, pad_width)), mode='constant')
        
        return mfcc_audio

    except Exception as e:
        print(e)
        return e
    

def start_computation(features_list, labels_list):

    features_eval=[]
    labels_eval=[]

    index = 0
    data = []
    n_mini_audios = 0
    
    for i in features_list:
        data = extract_features(i)

        features_eval.append(data)

        if labels_list[index] == 'bonafide':
            labels_eval.append(0)
        else:
            labels_eval.append(1)

        index = index + 1

           
    labels_eval=np.array(labels_eval)
    features_eval=np.array(features_eval)
    features_eval = np.reshape(features_eval, (*features_eval.shape,1))
    return features_eval, labels_eval

In [None]:
class Custom_Generator(keras.utils.Sequence) :
  
    def __init__(self, features, labels, batch_size) :
        self.features = features
        self.labels = labels
        self.batch_size = batch_size

    
    def __len__(self):
        return (np.ceil(len(self.features) / float(self.batch_size))).astype(int)
  

    def __getitem__(self, idx) :
        batch_x = self.features[idx * self.batch_size : (idx+1) * self.batch_size]
        batch_y = self.labels[idx * self.batch_size : (idx+1) * self.batch_size]
        
        batch_features, batch_labels = start_computation(batch_x,batch_y)
        
        return batch_features,batch_labels

In [None]:
x_train, x_test, y_train, y_test = train_test_split(features_list, labels_list, stratify=labels_list, test_size=0.3, random_state = 42)
x_test, x_eval, y_test, y_eval = train_test_split(x_test, y_test, stratify=y_test, test_size=0.5, random_state = 42)

In [None]:
batch_size = 32

my_training_batch_generator = Custom_Generator(x_train, y_train, batch_size)
my_validation_batch_generator = Custom_Generator(x_test, y_test, batch_size)

In [None]:
!mkdir model-history

In [None]:
import csv
import tensorflow.keras.backend as K
from tensorflow import keras
import os

model_directory = './model-history/'

class StoreModelHistory(keras.callbacks.Callback):

    def on_epoch_end(self,batch,logs=None):
        if ('lr' not in logs.keys()):
            logs.setdefault('lr',0)
            logs['lr'] = K.get_value(self.model.optimizer.lr)

        if not ('model_history.csv' in os.listdir(model_directory)):
            with open(model_directory+'model_history.csv','a') as f:
                y=csv.DictWriter(f,logs.keys())
                y.writeheader()

        with open(model_directory+'model_history.csv','a') as f:
            y=csv.DictWriter(f,logs.keys())
            y.writerow(logs)

In [None]:
import os
from keras.callbacks import ReduceLROnPlateau 

epochs = 10
batch_size = 16
verbose = 1
optimizer = optimizers.Adam(learning_rate = 0.001, beta_1 = 0.9, beta_2 = 0.999)

model=init_model()

checkpoint_path = './model.ckpt'
checkpoint_dir = os.path.dirname(checkpoint_path)

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=2, min_lr=0.00001)

checkpoint_callback = keras.callbacks.ModelCheckpoint(filepath=checkpoint_path, 
                                               monitor='val_loss', 
                                               save_weights_only=True,
                                               verbose=1)

es = keras.callbacks.EarlyStopping(
        monitor='val_loss', 
        mode='min', 
        verbose=1, 
        patience=2)



model.fit(my_training_batch_generator,steps_per_epoch = int( len(x_train) / batch_size),epochs = epochs,verbose = verbose,validation_data = my_validation_batch_generator,validation_steps = int(len(x_test)/ batch_size),callbacks=[checkpoint_callback,reduce_lr,StoreModelHistory(),es])

In [None]:
import csv

epochs = 3
history_dataframe = pd.read_csv(model_directory+'model_history.csv',sep=',')

# Plot training & validation loss values
plt.style.use("ggplot")
plt.plot(range(1,epochs+1),
         history_dataframe['loss'])
plt.plot(range(1,epochs+1),
         history_dataframe['val_loss'],
         linestyle='--')
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()

plt.plot(range(1,epochs+1),
         history_dataframe['accuracy'])
plt.plot(range(1,epochs+1),
         history_dataframe['val_accuracy'],
         linestyle='--')
plt.title('Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()

In [None]:
from keras.models import load_model
from keras.utils import plot_model

new_model = init_model()
new_model.load_weights('/kaggle/input/model-trained/model.ckpt')

In [None]:
def extract_features_pred(file_name):
    max_pad_len = 200
    try:
        audio, sample_rate = librosa.load(file_name, duration=4, sr=8000) 
        mfccs = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=80)
        pad_width = max_pad_len - mfccs.shape[1]
        mfccs = np.pad(mfccs, pad_width=((0, 0), (0, pad_width)), mode='constant')
    
    except Exception as e:
        print("Error encountered while parsing file: " + file_name)
        return None

    return mfccs

def start_computation_pred(features_list, labels_list):

    features_dev=[]
    labels_dev=[]
    
    for i in features_list:
        data = (extract_features_pred)(i)
        features_dev.append(data)
        
    for j in labels_list:
        if j == 'bonafide':        
            labels_dev.append(0)
        else:
            labels_dev.append(1)
        
    labels_dev=np.array(labels_dev)
    features_dev=np.array(features_dev)

    features_dev = np.reshape(features_dev, (*features_dev.shape,1))

    return features_dev, labels_dev

In [None]:
class My_Custom_Generator(keras.utils.Sequence) :
  
    def __init__(self, features, labels, batch_size) :
        self.features = features
        self.labels = labels
        self.batch_size = batch_size

    
    def __len__(self):
        return (np.ceil(len(self.features) / float(self.batch_size))).astype(int)
  

    def __getitem__(self, idx) :
        batch_x = self.features[idx * self.batch_size : (idx+1) * self.batch_size]
        batch_y = self.labels[idx * self.batch_size : (idx+1) * self.batch_size]
        
        batch_features, batch_labels = start_computation_pred(batch_x,batch_y)
        
        return batch_features,batch_labels

In [None]:
batch_size = 32
my_test_batch_generator = My_Custom_Generator(x_eval, y_eval, batch_size)
preds = new_model.predict(x=my_test_batch_generator,steps = int( len(x_eval) / batch_size), verbose=1)

In [None]:
labels_dev=np.array(y_eval[:preds.shape[0]])

labels = []
for i in labels_dev:
    if i == "bonafide":
        labels.append(0)
    else:
        labels.append(1)

In [None]:
x = np.rint(preds.flatten()).astype(int)

In [None]:
c_names = ['bonafide', 'spoof']

In [None]:
# Classification Report
print(classification_report(labels, x, target_names=c_names))

In [None]:
from sklearn.metrics import auc
# Calcola i tassi di veri positivi (TPR), i tassi di falsi positivi (FPR) e i threshold
fpr, tpr, thresholds = roc_curve(labels, preds, pos_label=1)
# Calcola l'area sotto la curva ROC (AUC)
roc_auc = auc(fpr, tpr)
# Disegna la curva ROC
plt.figure()
plt.plot(fpr, tpr, label='AUC = %0.2f' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.show()

In [None]:
import numpy as np
from sklearn.metrics import roc_curve, roc_auc_score
# supponiamo che y_true sia l'array di etichette di classe true (0 o 1)
# e y_pred sia l'array di probabilità predette dal modello



def calculate_eer(y_true, y_pred):
    fpr, tpr, thresholds = roc_curve(y_true, y_pred, pos_label=1)
    fnr = 1 - tpr # get FNR , however FPR is same as FAR
    eer_threshold = thresholds[np.nanargmin(np.absolute((fnr - fpr)))]
    eer = fpr[np.nanargmin(np.absolute((fnr - fpr)))]
    return eer


eer = calculate_eer(labels, preds)
print("Mean EER: {:.4f}".format(eer))

# AUC mean
# y_true: le vere etichette di classe
# y_pred: le etichette di classe predette dal modello
# classes: l'elenco delle classi

aucs = []
for i in range(2):
    # Calcola l'AUC per la classe i
    auc = roc_auc_score(labels, preds)
    aucs.append(auc)

# Calcola il Mean AUC
mean_auc = np.mean(aucs)
print(mean_auc)