In [40]:
#load libraries
import os
import numpy as np
import pickle
import argparse
import soundfile as sf
import librosa
import time
import pandas as pd
import gc
from sklearn.svm import SVC
import matplotlib.pyplot as plt
from scipy.optimize import brentq
from scipy.interpolate import interp1d
from sklearn.metrics import roc_curve
from sklearn.mixture import GaussianMixture
from sklearn.metrics import roc_auc_score, roc_curve, plot_roc_curve, confusion_matrix, accuracy_score, classification_report, plot_confusion_matrix
from sklearn.preprocessing import label_binarize
from python_speech_features import mfcc
from sklearn.mixture import GaussianMixture as GMM
from sklearn import preprocessing
from pdb import set_trace
from scipy import stats

In [2]:
#Load training data 
#'/Users/asimadnan/Desktop/Mres/ASVSPOOF_DATA/train_sample.pkl'

## Useful Functions

In [3]:
def extract_pkl(path,feature_type):
    X = []
    y = []

    max_len = 50  # 1.25 seconds  # check the timesteps of cqcc and mfcc 
    lens = []
    with open(path, 'rb') as infile:
        data = pickle.load(infile)
        for feat_cqcc, feat_mfcc, label in data:
            #mfcc = mfccs.mean(0) # sum over all timesteps for now    # timesteps X 13
            #lens.append(mfccs.shape[0])
            if feature_type == "cqcc":
                feats = feat_cqcc
            elif feature_type == "mfcc":
                feats = feat_mfcc
            num_dim = feats.shape[1]
            if len(feats) > max_len:
                feats = feats[:max_len]
            elif len(feats) < max_len:
                feats = np.concatenate((feats, np.array([[0.]*num_dim]*(max_len-len(feats)))), axis=0)
            X.append(feats.reshape(-1))
            y.append(label)
    X = np.array(X)
    return X,y

In [4]:
def convert_to_binary(labels):
    return label_binarize(labels, list(set(labels))[::-1]).flatten()

In [5]:
def class_label(n):
    #sorted classes normally get using 
    # model.classes_
    class_list = ['bonafide', 'spoof']
    return class_list[n]

In [6]:
def evaluate( model, model_name,  x_test,y_test,threshold = .5):
    test_prob = model.predict_proba(x_test)
    if test_prob.shape[1] >1:
        test_prob = test_prob[:,1]
    y_pred = np.argmax(probas,axis=1)
    y_pred = list(map(class_label,y_pred))
    y_pred= convert_to_binary(y_pred)
    y_test = convert_to_binary(y_test)
    acc = accuracy_score(y_test, y_pred)
    auc = roc_auc_score(y_test, test_prob) 
   
    conf_matrix = confusion_matrix(y_test, y_pred)
    #print(conf_matrix)
    TP = conf_matrix[1][1]
    TN = conf_matrix[0][0]
    FP = conf_matrix[0][1]
    FN = conf_matrix[1][0]
    
    # calculate the sensitivity
    conf_sensitivity = (TP / float(TP + FN))
    # calculate the specificity
    conf_specificity = (TN / float(TN + FP))
    

    fpr, tpr, thr = roc_curve(y_test, test_prob)
    
    y_score = model.decision_function(x_test)
    fpr, tpr, thresholds = roc_curve(y_test, y_score)
    eer = brentq(lambda x : 1. - x - interp1d(fpr, tpr)(x), 0., 1.)
    
    #plt.plot(fpr, tpr, marker='.', label="Model")
    #plt.plot([0, 1], [0, 1], linestyle='--', lw=2, color='r',
    #    label="Chance", alpha=.8)
    
    
    #print("\n\n")
    rr = {'Model': model_name,
          'Accuracy' : acc,
          'EER' : eer,
          'AUC':auc, 
          'Sensitivity': conf_sensitivity, 
          'specificity': conf_specificity}
    print(rr)

In [19]:
# windows Path
train_pkl_path = r'C:\Users\Administrator\Downloads\ASV\ASV_DATA\LA\train_cqcc.pkl'
dev_pkl_path = r'C:\Users\Administrator\Downloads\ASV\ASV_DATA\LA\train_cqcc.pkl'
eval_pkl_path = r'C:\Users\Administrator\Downloads\ASV\ASV_DATA\LA\eval_cqcc.pkl'

In [7]:
# Mac Path
train_pkl_path = '/Users/asimadnan/Desktop/Mres/ASVSPOOF_DATA/train_sample.pkl'
dev_pkl_path = '/Users/asimadnan/Desktop/Mres/ASVSPOOF_DATA/dev_sample.pkl'
eval_pkl_path = '/Users/asimadnan/Desktop/Mres/ASVSPOOF_DATA/eval_sample.pkl'

# Training,Dev, and Eval MFCC Features 

In [8]:
X_train_mfcc,y_train_mfcc = extract_pkl(train_pkl_path,'mfcc')

In [9]:
X_train_mfcc.shape

(6, 650)

In [10]:
X_eval_mfcc,y_eval_mfcc = extract_pkl(eval_pkl_path,'mfcc')

In [11]:
X_eval_mfcc.shape

(41, 650)

In [12]:
X_eval_mfcc,y_eval_mfcc = extract_pkl(dev_pkl_path,'mfcc')

In [13]:
X_eval_mfcc.shape

(7, 650)

# Training,Dev, and Eval CQCC Features 

In [25]:
X_train_cqcc,y_train_cqcc = extract_pkl(train_pkl_path,'cqcc')

In [26]:
X_train_cqcc.shape

(6, 3000)

In [27]:
X_eval_cqcc,y_eval_cqcc = extract_pkl(eval_pkl_path,'cqcc')

# Classifiers Training - MFCC

## SVM

In [28]:
clf_mfcc = SVC(probability=True)
clf_mfcc.fit(X_train_mfcc, y_train_mfcc)

SVC(probability=True)

In [29]:
evaluate( clf_mfcc, 'SVM',  X_test_mfcc,y_test_mfcc,threshold = .5)

NameError: name 'X_test_mfcc' is not defined

## GMM

In [14]:
gmm_bon = GMM(n_components = 144, covariance_type='diag',n_init = 50) # min shape[0] = 135 # max = 1112
# 2580 1112 337.8709302325581 289
gmm_sp  = GMM(n_components = 144, covariance_type='diag',n_init = 50)  # min shape[0] = 64  # max = 1318

In [21]:
bondata = []
spdata = []
feature_type = 'mfcc'
dest = '/Users/asimadnan/Desktop/Mres/Experiments/Models/'

In [22]:
 with open(train_pkl_path, 'rb') as infile:
        data = pickle.load(infile)
        for feat_cqcc, feat_mfcc, label in data:
            # feature selection
            if feature_type == "cqcc":
                feats = feat_cqcc
            elif feature_type == "mfcc":
                feats = feat_mfcc
            # label selection
            if (label == 'bonafide'):
                #i += 1
                bondata.append(feats)
            elif(label == 'spoof'):
                spdata.append(feats)
            '''# debug
            if (i > 10):
                break'''
Xbon = np.vstack(bondata)
print('Bon feature stacked, shape as: ', Xbon.shape)
Xsp = np.vstack(spdata)
print('Sp feature stacked, shape as: ', Xsp.shape)


Bon feature stacked, shape as:  (994, 13)
Sp feature stacked, shape as:  (890, 13)


In [23]:
t0 = time.time()
gmm_bon.fit(Xbon)
print('Bon gmm trained, time spend:', time.time() - t0)

t0 = time.time()
gmm_sp.fit(Xsp)
print('Sp gmm trained, time spend:', time.time() - t0)

pickle.dump(gmm_bon, open(dest + 'bon' + '.gmm', 'wb'))
pickle.dump(gmm_sp, open(dest + 'sp' + '.gmm', 'wb'))
print('GMM model created')



Bon gmm trained, time spend: 18.723251342773438
Sp gmm trained, time spend: 11.82088589668274
GMM model created


In [None]:
# GMM Test

In [24]:
gmm_bon = pickle.load(open(dest + 'bon' + '.gmm','rb'))
gmm_sp  = pickle.load(open(dest + 'sp' + '.gmm','rb'))

In [28]:
bondata = []
spdata = []
    # debug
    #j = 0
with open(dev_pkl_path, 'rb') as infile:
    data = pickle.load(infile)
    for feat_cqcc, feat_mfcc, label in data:
        # feature selection
        if feature_type == "cqcc":
            feats = feat_cqcc
        elif feature_type == "mfcc":
            feats = feat_mfcc
        # label selection
        if (label == 'bonafide'):
            #j += 1
            bondata.append(feats)
        elif(label == 'spoof'):
            spdata.append(feats)
        # debug
        #if (j > 10):
        #    break
print(len(bondata), bondata[0].shape)
print(len(spdata), spdata[0].shape)



4 (280, 13)
3 (294, 13)


In [34]:
predb = []
preds = []
j_bon = len(bondata)
k_sp  = len(spdata)


for i in range(j_bon):
    if (i % 50 == 0):
        print('Evaluating Bon sample at',i/j_bon * 100, '%')
    X = bondata[i]
    bscore = gmm_bon.score(X)
    sscore = gmm_sp.score(X)

    #predb.append(np.exp(bscore)-np.exp(sscore))
    predb.append(bscore-sscore)

for i in range(k_sp):
    if (i % 50 == 0):
        print('Evaluating Sp sample at',i/k_sp * 100, '%')
    X = spdata[i]
    bscore = gmm_bon.score(X)
    sscore = gmm_sp.score(X)

    #preds.append(np.exp(bscore)-np.exp(sscore))
    preds.append(bscore-sscore)

predb1 = np.asarray(predb)
preds1 = np.asarray(preds)



Evaluating Bon sample at 0.0 %
Evaluating Sp sample at 0.0 %


In [35]:
predb1

array([ 3.38630196,  2.21148223, -0.17349277,  5.98835172])

In [36]:
preds1

array([ 4.60933951, -5.810361  ,  2.19222455])

In [37]:
predb1[predb1 < 0] = 0
predb1[predb1 > 0] = 1
predbresult1 = np.sum(predb1)
print(predbresult1, 'Bon samples were CORRECTLY evaluated out of', j_bon,'samples. Bon_Accuracy = ', predbresult1/j_bon )# 0.7356


preds1[preds1 > 0] = 0
preds1[preds1 < 0] = 1
predsresult = np.sum(preds1)
print(predsresult, 'Sp samples were CORRECTLY evaluated out of', k_sp, 'samples. Sp_Accuracy = ', predsresult/k_sp)# 0.4092

print('Total GMM Classifier Accuracy = ',(predbresult1 + predsresult)/(j_bon + k_sp))

3.0 Bon samples were CORRECTLY evaluated out of 4 samples. Bon_Accuracy =  0.75
1.0 Sp samples were CORRECTLY evaluated out of 3 samples. Sp_Accuracy =  0.3333333333333333
Total GMM Classifier Accuracy =  0.5714285714285714


# Looking at All CSV Files provided with Data 
## (LA; speech synthesis/conversion attacks)  (PA; replay attacks)

In [42]:
train_labels = '/Users/asimadnan/Desktop/Mres/ASVSPOOF_DATA/LA/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.train.trn.txt'

In [47]:
la_labels = pd.read_csv(train_labels, delimiter = " ", header=None)
la_labels.columns = ['SPEAKER_ID','AUDIO_FILE_NAME','not_used','SYSTEM_ID','KEY']
la_labels

Unnamed: 0,SPEAKER_ID,AUDIO_FILE_NAME,not_used,SYSTEM_ID,KEY
0,LA_0079,LA_T_1138215,-,-,bonafide
1,LA_0079,LA_T_1271820,-,-,bonafide
2,LA_0079,LA_T_1272637,-,-,bonafide
3,LA_0079,LA_T_1276960,-,-,bonafide
4,LA_0079,LA_T_1341447,-,-,bonafide
...,...,...,...,...,...
25375,LA_0098,LA_T_9717580,-,A06,spoof
25376,LA_0098,LA_T_9779814,-,A06,spoof
25377,LA_0098,LA_T_9783312,-,A06,spoof
25378,LA_0098,LA_T_9839348,-,A06,spoof


In [49]:
pa__train_labels = '/Users/asimadnan/Desktop/Mres/ASVSPOOF_DATA/PA/ASVspoof2019_PA_cm_protocols/ASVspoof2019.PA.cm.train.trn.txt'

a triplet (S,R,D_s), which take one letter in the set {a,b,c} as categorical value, defined as:

								a		b		c
			--------------------------------------------------------------------------------
			S:   Room size (square meters)		2-5		5-10		10-20
			R:   T60 (ms)				50-200		200-600		600-1000
			D_s: Talker-to-ASV distance (cm)	10-50		50-100		100-150


   	4) ATTACK_ID:		a duple (D_a,Q), which take one letter in the set {A,B,C} as categorical value, defined as

								A		B		C
			-----------------------------------------------------------------------------
			Z: Attacker-to-talker distance (cm)	10-50		50-100		> 100
			Q: Replay device quality		perfect		high		low

			for bonafide speech, ATTACK_ID is left blank ('-')
		

In [51]:
pa_labels = pd.read_csv(pa__train_labels, delimiter = " ", header=None)
pa_labels.columns = ['SPEAKER_ID','AUDIO_FILE_NAME','ENVIRONMENT_ID','ATTACK_ID','KEY']
pa_labels


Unnamed: 0,SPEAKER_ID,AUDIO_FILE_NAME,ENVIRONMENT_ID,ATTACK_ID,KEY
0,PA_0079,PA_T_0000001,aaa,-,bonafide
1,PA_0079,PA_T_0000002,aaa,-,bonafide
2,PA_0079,PA_T_0000003,aaa,-,bonafide
3,PA_0079,PA_T_0000004,aaa,-,bonafide
4,PA_0079,PA_T_0000005,aaa,-,bonafide
...,...,...,...,...,...
53995,PA_0098,PA_T_0053996,ccc,CC,spoof
53996,PA_0098,PA_T_0053997,ccc,CC,spoof
53997,PA_0098,PA_T_0053998,ccc,CC,spoof
53998,PA_0098,PA_T_0053999,ccc,CC,spoof


# Classifiers Training - CQCC

In [19]:
clf_cqcc = SVC()
clf_cqcc.fit(X_train_cqcc, y_train_cqcc)

SVC()

In [None]:
#Classifer testing

In [12]:
clf_mfcc.score(X_test_mfcc, y_test_mfcc)

1.0

In [None]:
class_probabilities = model.predict_proba(X_test_mfcc)


In [21]:
clf_cqcc.score(X_test_cqcc, y_test_cqcc)

0.8571428571428571

In [41]:
#EER

In [51]:
def eer(model,X,y):
    y_score = model.decision_function(X)
    fpr, tpr, thresholds = roc_curve(y, y_score, pos_label='spoof')
    eer = brentq(lambda x : 1. - x - interp1d(fpr, tpr)(x), 0., 1.)
    thresh = interp1d(fpr, thresholds)(eer)
    print ('EER', eer)


In [52]:
eer(clf_mfcc,X_test_mfcc, y_test_mfcc)

EER 0.25


In [53]:
eer(clf_cqcc,X_test_cqcc, y_test_cqcc)

EER 0.0


In [None]:
# (t-DCF) calculation

In [None]:
results = pd.DataFrame(columns=['Classifier', 'Feature', 'Score','EER'])


In [None]:
# Ensemble Cqcc & Mfcc

In [None]:
cqcc vs mfcc

In [None]:
# t_DCF

In [None]:
import sys
import numpy as np
import eval_metrics as em
import matplotlib.pyplot as plt

# Replace CM scores with your own scores or provide score file as the first argument.
cm_score_file = 'scores/cm_dev.txt'
# Replace ASV scores with organizers' scores or provide score file as the second argument.
asv_score_file = 'scores/asv_dev.txt'



# Fix tandem detection cost function (t-DCF) parameters
Pspoof = 0.05
cost_model = {
    'Pspoof': Pspoof,  # Prior probability of a spoofing attack
    'Ptar': (1 - Pspoof) * 0.99,  # Prior probability of target speaker
    'Pnon': (1 - Pspoof) * 0.01,  # Prior probability of nontarget speaker
    'Cmiss_asv': 1,  # Cost of ASV system falsely rejecting target speaker
    'Cfa_asv': 10,  # Cost of ASV system falsely accepting nontarget speaker
    'Cmiss_cm': 1,  # Cost of CM system falsely rejecting target speaker
    'Cfa_cm': 10,  # Cost of CM system falsely accepting spoof
}

# Load organizers' ASV scores
asv_data = np.genfromtxt(asv_score_file, dtype=str)
asv_sources = asv_data[:, 0]
asv_keys = asv_data[:, 1]
asv_scores = asv_data[:, 2].astype(np.float)

# Load CM scores
cm_data = np.genfromtxt(cm_score_file, dtype=str)
cm_utt_id = cm_data[:, 0]
cm_sources = cm_data[:, 1]
cm_keys = cm_data[:, 2]
cm_scores = cm_data[:, 3].astype(np.float)

# Extract target, nontarget, and spoof scores from the ASV scores
tar_asv = asv_scores[asv_keys == 'target']
non_asv = asv_scores[asv_keys == 'nontarget']
spoof_asv = asv_scores[asv_keys == 'spoof']

# Extract bona fide (real human) and spoof scores from the CM scores
bona_cm = cm_scores[cm_keys == 'bonafide']
spoof_cm = cm_scores[cm_keys == 'spoof']

# EERs of the standalone systems and fix ASV operating point to EER threshold
eer_asv, asv_threshold = em.compute_eer(tar_asv, non_asv)
eer_cm = em.compute_eer(bona_cm, spoof_cm)[0]


[Pfa_asv, Pmiss_asv, Pmiss_spoof_asv] = em.obtain_asv_error_rates(tar_asv, non_asv, spoof_asv, asv_threshold)


# Compute t-DCF
tDCF_curve, CM_thresholds = em.compute_tDCF(bona_cm, spoof_cm, Pfa_asv, Pmiss_asv, Pmiss_spoof_asv, cost_model, True)

# Minimum t-DCF
min_tDCF_index = np.argmin(tDCF_curve)
min_tDCF = tDCF_curve[min_tDCF_index]


print('ASV SYSTEM')
print('   EER            = {:8.5f} % (Equal error rate (target vs. nontarget discrimination)'.format(eer_asv * 100))
print('   Pfa            = {:8.5f} % (False acceptance rate of nontargets)'.format(Pfa_asv * 100))
print('   Pmiss          = {:8.5f} % (False rejection rate of targets)'.format(Pmiss_asv * 100))
print('   1-Pmiss,spoof  = {:8.5f} % (Spoof false acceptance rate)'.format((1 - Pmiss_spoof_asv) * 100))

print('\nCM SYSTEM')
print('   EER            = {:8.5f} % (Equal error rate for countermeasure)'.format(eer_cm * 100))

print('\nTANDEM')
print('   min-tDCF       = {:8.5f}'.format(min_tDCF))


# Visualize ASV scores and CM scores
plt.figure()
ax = plt.subplot(121)
plt.hist(tar_asv, histtype='step', density=True, bins=50, label='Target')
plt.hist(non_asv, histtype='step', density=True, bins=50, label='Nontarget')
plt.hist(spoof_asv, histtype='step', density=True, bins=50, label='Spoof')
plt.plot(asv_threshold, 0, 'o', markersize=10, mfc='none', mew=2, clip_on=False, label='EER threshold')
plt.legend()
plt.xlabel('ASV score')
plt.ylabel('Density')
plt.title('ASV score histogram')

ax = plt.subplot(122)
plt.hist(bona_cm, histtype='step', density=True, bins=50, label='Bona fide')
plt.hist(spoof_cm, histtype='step', density=True, bins=50, label='Spoof')
plt.legend()
plt.xlabel('CM score')
#plt.ylabel('Density')
plt.title('CM score histogram')


# Plot t-DCF as function of the CM threshold.
plt.figure()
plt.plot(CM_thresholds, tDCF_curve)
plt.plot(CM_thresholds[min_tDCF_index], min_tDCF, 'o', markersize=10, mfc='none', mew=2)
plt.xlabel('CM threshold index (operating point)')
plt.ylabel('Norm t-DCF');
plt.title('Normalized tandem t-DCF')
plt.plot([np.min(CM_thresholds), np.max(CM_thresholds)], [1, 1], '--', color='black')
plt.legend(('t-DCF', 'min t-DCF ({:.5f})'.format(min_tDCF), 'Arbitrarily bad CM (Norm t-DCF=1)'))
plt.xlim([np.min(CM_thresholds), np.max(CM_thresholds)])
plt.ylim([0, 1.5])

plt.show()
