In [None]:
import gdown

url = "https://drive.google.com/uc?id=13hVWxm4GbaFUj5SijJIZ40vTPSYnyNxq"
output = "training_data.tar.gz"
gdown.download(url, output, False)

Downloading...
From: https://drive.google.com/uc?id=13hVWxm4GbaFUj5SijJIZ40vTPSYnyNxq
To: /content/training_data.tar.gz
100%|██████████| 5.52G/5.52G [01:40<00:00, 55.0MB/s]


'training_data.tar.gz'

In [None]:
%%capture

!tar --gunzip --extract --verbose --file=training_data.tar.gz

In [None]:
import IPython.display as ipd
ipd.Audio("Training_Data/human/human_00000.wav")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import librosa
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import csv
import os
import pathlib
import tensorflow as tf
from tensorflow.math import count_nonzero
from keras import backend as K
from keras.models import Sequential
from keras import layers, callbacks
import keras

from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score
from sklearn.preprocessing import LabelEncoder, MinMaxScaler, StandardScaler, PolynomialFeatures
from sklearn.metrics import roc_curve, make_scorer, accuracy_score
from sklearn.linear_model import LogisticRegression, Lasso
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report

In [None]:
def equal_error_rate_tf(y_true, y_pred):
    n_imp = count_nonzero(tf.equal(y_true, 0), dtype=tf.float32) + tf.constant(K.epsilon())
    n_gen = count_nonzero(tf.equal(y_true, 1), dtype=tf.float32) + tf.constant(K.epsilon())

    scores_imp = tf.boolean_mask(y_pred, tf.equal(y_true, 0))
    scores_gen = tf.boolean_mask(y_pred, tf.equal(y_true, 1))

    loop_vars = (tf.constant(0.0), tf.constant(1.0), tf.constant(0.0))
    cond = lambda t, fpr, fnr: tf.greater_equal(fpr, fnr)
    body = lambda t, fpr, fnr: (
        t + 0.001,
        tf.divide(count_nonzero(tf.greater_equal(scores_imp, t), dtype=tf.float32), n_imp),
        tf.divide(count_nonzero(tf.less(scores_gen, t), dtype=tf.float32), n_gen)
    )
    t, fpr, fnr = tf.while_loop(cond, body, loop_vars, back_prop=False)
    eer = (fpr + fnr) / 2

    return eer

def equal_error_rate_sklearn(y, y_pred):
  fpr, tpr, threshold = roc_curve(y, y_pred, pos_label=1)
  fnr = 1 - tpr
  return fpr[np.nanargmin(np.absolute((fnr - fpr)))]

eer_scorer = make_scorer(equal_error_rate_sklearn)   

In [None]:
def preprocess_data_into_file(output_dir, input_dir): 
  header = 'filename chroma_stft spectral_centroid spectral_bandwidth rolloff zero_crossing_rate'
  for i in range(1, 11):
      header += f' mfcc{i}'
  header += ' label'
  header = header.split()
  with open(input_dir, 'w+') as csv_file:
      writer = csv.writer(csv_file)
      writer.writerow(header)
  audio_types = 'human spoof'.split()
  for a_type in audio_types:
      for file_name in os.listdir(f'{output_dir}/{a_type}')[:5]:
          audio_name = f'{output_dir}/{a_type}/{file_name}'
          y, sr = librosa.load(audio_name)
          chroma_stft = librosa.feature.chroma_stft(y=y, sr=sr)
          spec_cent = librosa.feature.spectral_centroid(y=y, sr=sr)
          spec_bw = librosa.feature.spectral_bandwidth(y=y, sr=sr)
          rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)
          zcr = librosa.feature.zero_crossing_rate(y)
          mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=10)
          to_append = f'{file_name} {np.mean(chroma_stft)} {np.mean(spec_cent)} {np.mean(spec_bw)} {np.mean(rolloff)} {np.mean(zcr)}'    
          for e in mfcc:
              to_append += f' {np.mean(e)}'
          to_append += f' {a_type}'
          with open(input_dir, 'a', newline='') as csv_file:
              writer = csv.writer(csv_file)
              writer.writerow(to_append.split())

In [None]:
preprocess_data_into_file('Training_Data', 'Training_Data/dataset.csv')
preprocess_data_into_file('Testing_Data', 'Testing_Data/dataset.csv')

In [None]:
data = pd.read_csv('drive/MyDrive/ML/training_2v_id_rd.csv', sep=',')
audio_list = data.iloc[:, -1]
encoder = LabelEncoder()
y = encoder.fit_transform(audio_list)
scaler = StandardScaler()
X = scaler.fit_transform(np.array(data.iloc[:, :-1], dtype = float))
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

In [None]:
early_stopping = callbacks.EarlyStopping(
    min_delta=0.0001,
    patience=20,
    restore_best_weights=True,
)

model = Sequential()
model.add(layers.Dense(256, activation='relu', input_shape=(X_train.shape[1],)))
model.add(layers.Dropout(0.3))
model.add(layers.BatchNormalization())
model.add(layers.Dense(256, activation='relu'))
model.add(layers.Dropout(0.3))
model.add(layers.BatchNormalization())
model.add(layers.Dense(256, activation='relu'))
model.add(layers.Dropout(0.3))
model.add(layers.BatchNormalization())
model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])

In [None]:
y_train = np.asarray(y_train).astype('float32').reshape((-1,1))
classifier = model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=250, callbacks=[early_stopping])

In [None]:
classification_report(y_test, np.round(model.predict(X_test)).flatten(), output_dict=True)

{'0': {'f1-score': 0.9283582089552239,
  'precision': 0.9472081218274112,
  'recall': 0.9102439024390244,
  'support': 2050},
 '1': {'f1-score': 0.9819774718397997,
  'precision': 0.9770859277708592,
  'recall': 0.9869182389937107,
  'support': 7950},
 'accuracy': 0.9712,
 'macro avg': {'f1-score': 0.9551678403975118,
  'precision': 0.9621470247991353,
  'recall': 0.9485810707163675,
  'support': 10000},
 'weighted avg': {'f1-score': 0.9709855229484616,
  'precision': 0.9709609775524525,
  'recall': 0.9712,
  'support': 10000}}

In [None]:
model.evaluate(X_test, y_test)



[0.07858716696500778, 0.9711999893188477]

In [None]:
test_data = pd.read_csv('drive/MyDrive/ML/testing_id_rd.csv', sep=',')
X_test = scaler.fit_transform(np.array(test_data.iloc[:, 1:], dtype = float))
filenames = test_data[['file_name']]
filenames['score'] = model.predict(X_test)
filenames.to_csv("drive/MyDrive/ML/testing_scores.csv", header=False, index=False)

In [None]:
# def print_optimizer_info(X_test, y_test, optimizer):
#     print(optimizer.best_estimator_)
#     print(optimizer.best_params_)
#     print(equal_error_rate(y_test, optimizer.predict_proba(X_test)[:,1]))    

In [None]:
audio_list = data.iloc[:, -1]
encoder = LabelEncoder()
y = encoder.fit_transform(audio_list)

#transform = PolynomialFeatures(2)
#X = transform.fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

#param_grid = {'C': [0.01, 0.05, 0.1, 0.5, 1, 5, 10],          \
#             'penalty' : ['l2', 'l1', 'elasticnet'], \
#            'solver' : ['newton-cg', 'lbfgs', 'liblinear', 'sag', 'saga']}

#estimator = LogisticRegression(class_weight='balanced')              
#optimizer = GridSearchCV(estimator, param_grid, scoring = eer_scorer, cv=3)
#optimizer.fit(X_train, y_train)
#print_optimizer_info(X_test, y_test, optimizer)

random_forest_clf = RandomForestClassifier(random_state=0)
#cross_val_score(random_forest_clf, X, y, scoring=eer_scorer, cv=3).mean()
random_forest_clf.fit(X_train, y_train)
accuracy_score(y_test, random_forest_clf.predict(X_test))

#estimator = LogisticRegression(penalty='l1', class_weight='balanced', solver='liblinear')
#cross_val_score(estimator, X, y, scoring = eer_scorer, cv=3).mean()

0.8908333333333334