# Fingerprint Spoofing Detection

## 0 - Modules

### Globals

In [None]:
!git clone https://github.com/Giuseppe-Distefano/FingerprintSpoofingDetection
!cp -r ./FingerprintSpoofingDetection/data ./
!rm -rf ./FingerprintSpoofingDetection

!mkdir output
!mkdir output/FeaturesAnalysis
!mkdir output/FeaturesAnalysis/Histograms output/FeaturesAnalysis/Heatmaps
!mkdir output/DimensionalityReduction
!mkdir output/DimensionalityReduction/PCA output/DimensionalityReduction/LDA
!mkdir output/Training
!mkdir output/Calibration_Fusion
!mkdir output/Evaluation

In [None]:
import numpy
import scipy.linalg
import numpy.linalg
import scipy.special
import matplotlib.pyplot
import seaborn
import scipy.optimize
import math
import csv

In [None]:
pi_t = 0.5
Cfn = 1
Cfp = 10
effective_prior = (pi_t*Cfn) / (pi_t*Cfn + (1-pi_t)*Cfp)
output_csv_name = "output/Training/Results.csv"

### Utility

In [None]:
# ----- Convert an array into a column -----
def row_to_column (array):
    return array.reshape((array.size, 1))


# ----- Convert an array into a row -----
def column_to_row (array):
    return array.reshape((1, array.size))


# ----- Compute mean -----
def compute_mean (data):
    return row_to_column(data.mean(1))


# ----- Compute covariance -----
def compute_covariance (data):
    mu = compute_mean(data)
    dc = data - mu.reshape((mu.size, 1))
    cov = numpy.dot(dc, dc.T) / float(data.shape[1])
    return cov


# ----- Compute Pearson correlation -----
def compute_correlation(x, y):
    x1_sum, y1_sum = numpy.sum(x), numpy.sum(y)
    x2_sum, y2_sum = numpy.sum(x**2), numpy.sum(y**2)
    cp_sum = numpy.sum(x * y.T)
    n = x.shape[0]

    num = n*cp_sum - x1_sum*y1_sum
    den = numpy.sqrt((n*x2_sum - x1_sum**2) * (n*y2_sum - y1_sum**2))
    return (num/den)


# ----- Compute logarithm of probability density function of a Gaussian distribution -----
def logpdf_GAU_ND (X, mu, C):
    _,det = numpy.linalg.slogdet(C)
    inv = numpy.linalg.inv(C)

    term1 = -0.5 * X.shape[0] * numpy.log(2*numpy.pi)
    term2 = -0.5 * det
    term3 = -0.5 * numpy.dot((X-mu).T, numpy.dot(inv, (X-mu))).sum(0)

    return (term1+term2+term3)


# ----- Classify samples matching log-likelihood ratio and a threshold -----
def predict_labels (llr, threshold):
    predicted = numpy.zeros(len(llr))
    for i in range(len(llr)):
        if (llr[i]>threshold): predicted[i] = 1
    return predicted


# ----- Count the number of mispredictions -----
def count_mispredictions (predicted, LTE):
    wrong = 0
    for i in range(len(LTE)):
        if (predicted[i]!=LTE[i]): wrong += 1
    return wrong


# ----- Square a matrix and transpose it -----
def square_and_transpose (matrix):
    x = matrix[:,None]
    xxT = x.dot(x.T).reshape((x.size)**2, order='F')
    return xxT


# ----- Compute ZNorm -----
def compute_znorm (DTR, DTE):
    mu_r = row_to_column(DTR.mean(1))
    sigma_r = row_to_column(DTR.std(1))
    normalized_r = (DTR-mu_r) / sigma_r
    normalized_e = (DTE-mu_r) / sigma_r
    return normalized_r, normalized_e


# ----- Randomize data -----
def randomize (D, L, seed):
    numpy.random.seed(seed)
    indexes = numpy.random.permutation(D.shape[1])
    d = numpy.zeros((10, D.shape[1]))
    l = numpy.zeros((L.size,))
    i = 0
    for ind in indexes:
        d[:,i] = D[:,ind]
        l[i] = L[ind]
        i += 1
    return d, l


### Dataset

In [None]:
############################
#     GLOBAL VARIABLES     #
############################
features = int(10)
distinct_classes = int(2)
training_input = "data/Train.txt"
test_input = "data/Test.txt"
histograms_folder = "output/FeaturesAnalysis/Histograms"
heatmaps_folder = "output/FeaturesAnalysis/Heatmaps"


#####################
#     FUNCTIONS     #
#####################
# ----- Read file -----
def read_file (filename):
    D = []
    L = []
    with open(filename) as file:
        for line in file:
            try:
                attributes = line.split(",")[0:features]
                attributes = row_to_column(numpy.array([float(i) for i in attributes]))
                label = int(line.split(",")[-1].strip())
                D.append(attributes)
                L.append(label)
            except:
                pass
    return numpy.hstack(D), numpy.array(L, dtype=numpy.int32)


# ----- Load training set -----
def load_training_set ():
    DTR,LTR = read_file(training_input)
    return DTR,LTR


# ----- Load test set -----
def load_test_set ():
    DTE,LTE = read_file(test_input)
    return DTE,LTE


# ----- Plot histograms of training set -----
def plot_histograms (DTR, LTR):
    # Split according to label
    D0 = DTR[:, LTR==0]
    D1 = DTR[:, LTR==1]

    # Plot histograms for each feature
    for x in range(features):
        matplotlib.pyplot.figure()
        matplotlib.pyplot.title("Feature %d" % x)
        matplotlib.pyplot.hist(D0[x,:], bins=40, density=True, alpha=0.4, edgecolor="black", label="Spoofed")
        matplotlib.pyplot.hist(D1[x,:], bins=40, density=True, alpha=0.4, edgecolor="black", label="Authentic")
        matplotlib.pyplot.legend()
        matplotlib.pyplot.savefig("%s/histogram_%d.png" % (histograms_folder, x))
        matplotlib.pyplot.close()


# ----- Plot heatmaps of training set ---
def plot_heatmaps (DTR, LTR):
    # Consider all samples
    corr = numpy.zeros((features, features))
    for x in range(features):
        for y in range(features):
            corr[x][y] = compute_correlation(DTR[x,:], DTR[y,:])
    seaborn.set()
    heatmap = seaborn.heatmap(numpy.abs(corr), cmap="YlGnBu", linewidth=0.3, square=True, cbar=False)
    figure = heatmap.get_figure()
    figure.savefig("%s/heatmap_all.png" % (heatmaps_folder))

    # Consider only samples labeled as spoofed fingerprints
    corr = numpy.zeros((features, features))
    for x in range(features):
        for y in range(features):
            corr[x][y] = compute_correlation(DTR[x,LTR==0], DTR[y,LTR==0])
    seaborn.set()
    heatmap = seaborn.heatmap(numpy.abs(corr), cmap="coolwarm", linewidth=0.3, square=True, cbar=False)
    figure = heatmap.get_figure()
    figure.savefig("%s/heatmap_spoofed.png" % (heatmaps_folder))

    # Consider only samples labeled as authentic fingerprints
    corr = numpy.zeros((features, features))
    for x in range(features):
        for y in range(features):
            corr[x][y] = compute_correlation(DTR[x,LTR==1], DTR[y,LTR==1])
    seaborn.set()
    heatmap = seaborn.heatmap(numpy.abs(corr), cmap="BuPu", linewidth=0.3, square=True, cbar=False)
    figure = heatmap.get_figure()
    figure.savefig("%s/heatmap_authentic.png" % (heatmaps_folder))


### PCA and LDA

In [None]:
# ----- Principal Component Analysis -----
def apply_pca (D, L, m, output_folder=None):
    # Identify the largest eigenvalues and eigenvectors over to which we project original data
    _,U = numpy.linalg.eigh(compute_covariance(D))
    P = U[:, ::-1][:, 0:m]
    
    if output_folder!=None:
        # Project data
        DP = numpy.dot(P.T, D)
        plot_pca_scatters(DP, L, output_folder)

    return P


# ----- Linear Discriminant Analysis -----
def apply_lda (D, L, m, output_folder=None):
    # Compute between- and within-class covariance matrices
    D0 = D[:, L==0]
    D1 = D[:, L==1]
    diff0 = compute_mean(D0) - compute_mean(D)
    diff1 = compute_mean(D1) - compute_mean(D)
    SB = numpy.outer(diff0,diff0) + numpy.outer(diff1,diff1)
    SW = compute_covariance(D0) + compute_covariance(D1)
    
    # Identify the m largest eigenvalues and eigenvectors over to which we project original data
    U,s,_ = numpy.linalg.svd(SW)
    P1 = numpy.dot(U, row_to_column(1.0/s**0.5)*U.T)
    SBt = numpy.dot(P1, numpy.dot(SB, P1.T))
    U,_,_ = numpy.linalg.svd(SBt)
    P2 = U[:, 0:m]
    W = numpy.dot(P1.T, P2)

    if output_folder!=None:
        # Project data
        DP = numpy.dot(W.T, D)
        plot_lda_histograms(DP, L, output_folder)


# ----- Plot scatters of data projected after PCA -----
def plot_pca_scatters (D, L, output_folder):
    D0 = D[:, L==0]
    D1 = D[:, L==1]

    matplotlib.pyplot.figure()
    matplotlib.pyplot.scatter(D0[0,:], D0[1,:], label="Spoofed")
    matplotlib.pyplot.scatter(D1[0,:], D1[1,:], label="Authentic")
    matplotlib.pyplot.legend()
    matplotlib.pyplot.savefig("%s/scatter_pca.png" % (output_folder))
    matplotlib.pyplot.close()


# ----- Plot histograms of data projected after LDA -----
def plot_lda_histograms (D, L, output_folder):
    # Split according to label
    D0 = D[:, L==0]
    D1 = D[:, L==1]

    # Plot histograms for each feature
    matplotlib.pyplot.figure()
    matplotlib.pyplot.hist(D0[0,:], bins=40, density=True, alpha=0.4, edgecolor="black", label="Spoofed")
    matplotlib.pyplot.hist(D1[0,:], bins=40, density=True, alpha=0.4, edgecolor="black", label="Authentic")
    matplotlib.pyplot.legend()
    matplotlib.pyplot.savefig("%s/histogram_lda.png" % (output_folder))
    matplotlib.pyplot.close()


# ----- Plot scatters of data projected after LDA -----
def plot_lda_scatters (D, L, output_folder):
    D0 = D[:, L==0]
    D1 = D[:, L==1]

    matplotlib.pyplot.figure()
    matplotlib.pyplot.scatter(D0[0,:], D0[1,:], label="Spoofed")
    matplotlib.pyplot.scatter(D1[0,:], D1[1,:], label="Authentic")
    matplotlib.pyplot.legend()
    matplotlib.pyplot.savefig("%s/scatter_lda.png" % (output_folder))
    matplotlib.pyplot.close()


### Costs

In [None]:
# ----- Compute un-normalized Detection Cost Function -----
def compute_unnormalized_DCF (pi_t, Cfn, Cfp, ll_ratios, LTE, threshold=None, is_effective=False):
    # Define threshold if not previously defined
    if threshold is None:
        if is_effective is False:
            effective_prior = (pi_t*Cfn) / (pi_t*Cfn + (1-pi_t)*Cfp)
        else:
            effective_prior = pi_t
        threshold = -numpy.log(effective_prior / (1-effective_prior))
    
    # Build confusion matrix
    predicted = (ll_ratios>threshold).astype(int)
    confusion_matrix = numpy.zeros((2,2))
    for i in range(len(LTE)):
        confusion_matrix[predicted[i], LTE[i].astype(int)] += 1
    
    # Compute DCF
    fnr = confusion_matrix[0][1] / (confusion_matrix[0][1]+confusion_matrix[1][1])
    fpr = confusion_matrix[1][0] / (confusion_matrix[0][0]+confusion_matrix[1][0])
    dcf = pi_t*Cfn*fnr + (1-pi_t)*Cfp*fpr

    return dcf


# ----- Compute normalized Detection Cost Function -----
def compute_normalized_DCF (pi_t, Cfn, Cfp, DCFu):
    dummy_costs = numpy.array([pi_t*Cfn, (1-pi_t)*Cfp])
    index = numpy.argmin(dummy_costs)
    dcf = DCFu / dummy_costs[index]
    return dcf


# ----- Compute actual Detection Cost Function -----
def compute_actual_DCF (pi_t, Cfn, Cfp, ll_ratios, LTE, is_effective):
    dcfu = compute_unnormalized_DCF(pi_t, Cfn, Cfp, ll_ratios, LTE, None, is_effective)
    dcf = compute_normalized_DCF(pi_t, Cfn, Cfp, dcfu)
    return dcf


# ----- Compute minimum Detection Cost Function -----
def compute_min_DCF (pi_t, Cfn, Cfp, ll_ratios, LTE):
    dcf_collection = numpy.zeros(ll_ratios.shape)
    sorted = numpy.sort(ll_ratios)

    for i in range(len(ll_ratios)):
        threshold = sorted[i]
        unnormalized = compute_unnormalized_DCF(pi_t, Cfn, Cfp, ll_ratios, LTE, threshold)
        dcf_collection[i] = compute_normalized_DCF(pi_t, Cfn, Cfp, unnormalized)
    return numpy.min(dcf_collection)

## 1 - Dataset analysis

In [None]:
# ----- Load dataset -----
def load_dataset ():
    DTR, LTR = load_training_set()
    DTE, LTE = load_test_set()
    return (DTR,LTR), (DTE,LTE)


# ----- Analysis of features -----
def features_analysis (DTR, LTR):
    plot_histograms(DTR, LTR)
    plot_heatmaps(DTR, LTR)


# ----- Dimensionality reduction -----
def dimensionality_reduction (D, L):
    m = 2
    apply_pca(D, L, m, "output/DimensionalityReduction/PCA")
    apply_lda(D, L, m, "output/DimensionalityReduction/LDA")

### Run dataset analysis

In [None]:
# ---- Dataset analysis ----
(DTR,LTR), (DTE,LTE) = load_dataset()
DTR,LTR = randomize(DTR, LTR, 0)
features_analysis(DTR, LTR)
dimensionality_reduction(DTR, LTR)

## 2 - Training

### Generative models

In [None]:
# ----- Naive Bayes without tied covariance -----
def naive_bayes (DTR, LTR, DTE, LTE):
    # Compute mean and covariance for each class
    D0 = DTR[:, LTR==0]
    D1 = DTR[:, LTR==1]
    mu0, cov0 = compute_mean(D0), compute_covariance(D0)*numpy.identity(DTR.shape[0])
    mu1, cov1 = compute_mean(D1), compute_covariance(D1)*numpy.identity(DTR.shape[0])

    # Compute likelihoods
    S = numpy.empty((2,DTE.shape[1]))
    for i in range(DTE.shape[1]):
        S[0,i] = numpy.exp(logpdf_GAU_ND(row_to_column(DTE[:,i]), mu0, cov0))
        S[1,i] = numpy.exp(logpdf_GAU_ND(row_to_column(DTE[:,i]), mu1, cov1))

    # Compute log-likelihood ratio and use it to classify samples
    llr = numpy.log(S[1,:] / S[0,:])
    predicted_labels = predict_labels(llr, 0)
    wrong_predictions = count_mispredictions(predicted_labels, LTE)

    return wrong_predictions, llr


# ----- Naive Bayes with tied covariance -----
def tied_naive_bayes (DTR, LTR, DTE, LTE):
    # Compute mean for each class and tied covariance matrix
    D0 = DTR[:, LTR==0]
    D1 = DTR[:, LTR==1]
    mu0, cov0 = compute_mean(D0), compute_covariance(D0)*numpy.identity(DTR.shape[0])
    mu1, cov1 = compute_mean(D1), compute_covariance(D1)*numpy.identity(DTR.shape[0])
    tied_cov = (cov0*D0.shape[1] + cov1*D1.shape[1]) / DTR.shape[1]

    # Compute likelihoods
    S = numpy.empty((2,DTE.shape[1]))
    for i in range(DTE.shape[1]):
        S[0,i] = numpy.exp(logpdf_GAU_ND(row_to_column(DTE[:,i]), mu0, tied_cov))
        S[1,i] = numpy.exp(logpdf_GAU_ND(row_to_column(DTE[:,i]), mu1, tied_cov))
    
    # Compute log-likelihood ratio and use it to classify samples
    llr = numpy.log(S[1,:] / S[0,:])
    predicted_labels = predict_labels(llr, 0)
    wrong_predictions = count_mispredictions(predicted_labels, LTE)

    return wrong_predictions, llr


# ----- Multivariate Gaussian without tied covariance -----
def mvg (DTR, LTR, DTE, LTE):
    # Compute mean and covariance for each class
    D0 = DTR[:, LTR==0]
    D1 = DTR[:, LTR==1]
    mu0, cov0 = compute_mean(D0), compute_covariance(D0)
    mu1, cov1 = compute_mean(D1), compute_covariance(D1)

    # Compute likelihoods
    S = numpy.empty((2,DTE.shape[1]))
    for i in range(DTE.shape[1]):
        S[0,i] = numpy.exp(logpdf_GAU_ND(row_to_column(DTE[:,i]), mu0, cov0))
        S[1,i] = numpy.exp(logpdf_GAU_ND(row_to_column(DTE[:,i]), mu1, cov1))
    
    # Compute log-likelihood ratio and use it to classify samples
    llr = numpy.log(S[1,:] / S[0,:])
    predicted_labels = predict_labels(llr, 0)
    wrong_predictions = count_mispredictions(predicted_labels, LTE)

    return wrong_predictions, llr


# ----- Multivariate Gaussian with tied covariance -----
def tied_mvg (DTR, LTR, DTE, LTE):
    # Compute mean for each class and tied covariance matrix
    D0 = DTR[:, LTR==0]
    D1 = DTR[:, LTR==1]
    mu0, cov0 = compute_mean(D0), compute_covariance(D0)
    mu1, cov1 = compute_mean(D1), compute_covariance(D1)
    tied_cov = (cov0*D0.shape[1] + cov1*D1.shape[1]) / DTR.shape[1]

    # Compute likelihoods
    S = numpy.empty((2,DTE.shape[1]))
    for i in range(DTE.shape[1]):
        S[0,i] = numpy.exp(logpdf_GAU_ND(row_to_column(DTE[:,i]), mu0, tied_cov))
        S[1,i] = numpy.exp(logpdf_GAU_ND(row_to_column(DTE[:,i]), mu1, tied_cov))
    
    # Compute log-likelihood ratio and use it to classify samples
    llr = numpy.log(S[1,:] / S[0,:])
    predicted_labels = predict_labels(llr, 0)
    wrong_predictions = count_mispredictions(predicted_labels, LTE)

    return wrong_predictions, llr


In [None]:
# ----- Train model using K-Fold -----
def gen_kfold (D, L, K, pca_value, pi_value):
    classifiers = [
        (mvg, "MVG"),
        (tied_mvg, "MVG with tied covariance"),
        (naive_bayes, "Naive Bayes"),
        (tied_naive_bayes, "Naive Bayes with tied covariance")
    ]
    output_csv = open(output_csv_name, "a")
    N = int(D.shape[1]/K)

    if pca_value==0:
        print("No PCA, pi: %.3f\n" % (pi_value))
    else:
        print("PCA: %d, pi: %.3f\n" % (pca_value, pi_value))
    
    for j,(fun,name) in enumerate(classifiers):
        wrong_predictions = 0
        numpy.random.seed(j)
        ll_ratios = []
        indexes = numpy.random.permutation(D.shape[1])

        for i in range(K):
            # Select which subset to use for evaluation
            idxTest = indexes[i*N:(i+1)*N]
            if i>0: idxTrainLeft = indexes[0:i*N]
            elif (i+1)<K: idxTrainRight = indexes[(i+1)*N:]
            if i==0: idxTrain = idxTrainRight
            elif (i+1)==K: idxTrain = idxTrainLeft
            else: idxTrain = numpy.hstack([idxTrainLeft, idxTrainRight])
            DTR,LTR = D[:,idxTrain], L[idxTrain]
            DTE,LTE = D[:,idxTest], L[idxTest]

            # Apply PCA if necessary
            if pca_value!=0:
                P = apply_pca(DTR, LTR, pca_value)
                DTR,DTE = numpy.dot(P.T, DTR), numpy.dot(P.T, DTE)

            # Apply classifier
            wrong, llr = fun(DTR, LTR, DTE, LTE)
            wrong_predictions += wrong
            ll_ratios.append(llr)

        # Evaluate accuracy, error rate, and minDCF
        error_rate = wrong_predictions / D.shape[1]
        accuracy = 1 - error_rate
        cost = compute_min_DCF(pi_value, Cfn, Cfp, numpy.hstack(ll_ratios), L)

        # Save results in CSV format
        output_csv.write("%s,%d,_,%.3f,_,_,_,_,_,%.3f,%.3f,%.5f\n" % (name, pca_value, pi_value, 100.0*accuracy, 100.0*error_rate, cost))
        print("%s,%d,_,%.3f,_,_,_,_,_,%.3f,%.3f,%.5f\n" % (name, pca_value, pi_value, 100.0*accuracy, 100.0*error_rate, cost))

        # Print results to console
        print("  %s" % (name))
        print("    Accuracy: %.3f%%" % (100.0*accuracy))
        print("    Error rate: %.3f%%" % (100.0*error_rate))
        print("    min DCF: %.3f" % (cost))
        print("\n")

    output_csv.close()

### Discriminative models

In [None]:
# ----- Logistic Regression objective -----
def lr_obj_wrap (DTR, LTR, lambda_value, pi_value):
    def lr_obj (v):
        w,b = v[0:-1], v[-1]
        N = DTR.shape[1]
        term1 = lambda_value/2 * numpy.linalg.norm(w)**2
        term2 = 0
        term3 = 0

        nt = DTR[:,LTR==1].shape[1]
        nf = N-nt

        for i in range(N):
            ci = LTR[i]
            zi = 2*ci-1
            xi = DTR[:,i]
            internal_sum = b + numpy.dot(w.T, xi)
            internal_prod = -numpy.dot(zi, internal_sum)
            if LTR[i]==0: term3 += numpy.logaddexp(0, internal_prod)
            else: term2 += numpy.logaddexp(0, internal_prod)
        loss = term1 + (pi_value/nt)*term2 + ((1-pi_value)/nf)*term3
        return loss
    return lr_obj


# ----- Compute Logistic Regression scores -----
def lr_compute_scores (DTE, v):
    s = numpy.empty((DTE.shape[1]))
    w,b = v[0:-1], v[-1]
    for i in range(DTE.shape[1]):
        xt = DTE[:,i]
        s[i] = b + numpy.dot(w.T, xt)
    return s


# ----- Numerical optimization -----
def numerical_optimization (function, x0, grad=None):
    if grad is None: x,_,_=scipy.optimize.fmin_l_bfgs_b(function,x0,fprime=numpy.gradient(function))
    else: x,_,_=scipy.optimize.fmin_l_bfgs_b(function,x0,fprime=grad)
    return x


# ----- Logistic Regression gradient -----
def lr_compute_gradient (DTR, LTR, lambda_value, pi_value):
    z = numpy.empty((LTR.shape[0]))
    z = 2*LTR-1

    def gradient (v):
        w,b = v[0:-1], v[-1]
        term1 = lambda_value * w
        term2 = 0
        term3 = 0
        
        nt = DTR[:, LTR == 1].shape[1]
        nf = DTR.shape[1]-nt
        
        for i in range(DTR.shape[1]):
            S = numpy.dot(w.T, DTR[:,i]) + b
            zi_si = numpy.dot(z[i], S)
            if LTR[i]==1: term2 += numpy.dot(numpy.exp(-zi_si),(numpy.dot(-z[i],DTR[:,i])))/(1+numpy.exp(-zi_si))
            else: term3 += numpy.dot(numpy.exp(-zi_si),(numpy.dot(-z[i],DTR[:,i])))/(1+numpy.exp(-zi_si))
        dw = term1 + (pi_value/nt)*term2 + (1-pi_value)/(nf)*term3

        term1 = 0           
        term2 = 0
        for i in range(DTR.shape[1]):
            S=numpy.dot(w.T,DTR[:,i])+b
            zi_si = numpy.dot(z[i], S)
            if LTR[i] == 1: term1 += (numpy.exp(-zi_si) * (-z[i]))/(1+numpy.exp(-zi_si))
            else: term2 += (numpy.exp(-zi_si) * (-z[i]))/(1+numpy.exp(-zi_si))
        db = (pi_value/nt)*term1 + (1-pi_value)/(nf)*term2

        return numpy.hstack((dw, db))
    return gradient


# ----- Linear Logistic Regression -----
def linear_logistic_regression (DTR, LTR, DTE, LTE, lam, pi):
    x0 = numpy.zeros(DTR.shape[0]+1)
    x = numerical_optimization(lr_obj_wrap(DTR, LTR, lam, pi), x0, lr_compute_gradient(DTR, LTR, lam, pi))
    
    # Compute scores
    scores = lr_compute_scores(DTE, x)
    predicted_labels = predict_labels(scores, 0)
    wrong_predictions = count_mispredictions(predicted_labels, LTE)

    return wrong_predictions, scores


# ----- Quadratic Logistic Regression -----
def quadratic_logistic_regression (DTR, LTR, DTE, LTE, lam, pi):
    DTRe = numpy.apply_along_axis(square_and_transpose, 0, DTR)
    DTEe = numpy.apply_along_axis(square_and_transpose, 0, DTE)
    phi_T = numpy.array(numpy.vstack([DTRe, DTR]))
    phi_E = numpy.array(numpy.vstack([DTEe, DTE]))
        
    x0 = numpy.zeros(phi_T.shape[0]+1)
    x = numerical_optimization(lr_obj_wrap(phi_T, LTR, lam, pi), x0, lr_compute_gradient(phi_T, LTR, lam, pi))

    # Compute scores and wrong predictions
    scores = lr_compute_scores(phi_E, x)
    predicted_labels = predict_labels(scores, 0)
    wrong_predictions = count_mispredictions(predicted_labels, LTE)

    return wrong_predictions, scores


In [None]:
# ----- Train model using K-Fold -----
def dis_kfold (D, L, K, pca_value, z_value, pi_value, lambda_value):
    classifiers = [
        (linear_logistic_regression, "Linear Logistic Regression"),
        (quadratic_logistic_regression, "Quadratic Logistic Regression")
    ]
    output_csv = open(output_csv_name, "a")
    N = int(D.shape[1]/K)

    if pca_value==0:
        if z_value==0:
            print("No PCA, No ZNorm, pi: %.3f, lambda: %.7f\n" % (pi_value, lambda_value))
        else:
            print("No PCA, ZNorm, pi: %.3f, lambda: %.7f\n" % (pi_value, lambda_value))
    else:
        if z_value==0:
            print("PCA: %d, No ZNorm, pi: %.3f, lambda: %.7f\n" % (pca_value, pi_value, lambda_value))
        else:
            print("PCA: %d, ZNorm, pi: %.3f, lambda: %.7f\n" % (pca_value, pi_value, lambda_value))
    
    for j,(fun,name) in enumerate(classifiers):
        wrong_predictions = 0
        numpy.random.seed(j)
        ll_ratios = []
        indexes = numpy.random.permutation(D.shape[1])

        for i in range(K):
            # Select which subset to use for evaluation
            idxTest = indexes[i*N:(i+1)*N]
            if i>0: idxTrainLeft = indexes[0:i*N]
            elif (i+1)<K: idxTrainRight = indexes[(i+1)*N:]
            if i==0: idxTrain = idxTrainRight
            elif (i+1)==K: idxTrain = idxTrainLeft
            else: idxTrain = numpy.hstack([idxTrainLeft, idxTrainRight])
            DTR,LTR = D[:,idxTrain], L[idxTrain]
            DTE,LTE = D[:,idxTest], L[idxTest]

            # Apply ZNorm if necessary
            if z_value!=0:
                DTR,DTE = compute_znorm(DTR, DTE)

            # Apply PCA if necessary
            if pca_value!=0:
                P = apply_pca(DTR, LTR, pca_value)
                DTR,DTE = numpy.dot(P.T, DTR), numpy.dot(P.T, DTE)

            # Apply classifier
            wrong, scores = fun(DTR, LTR, DTE, LTE, lambda_value, pi_value)
            wrong_predictions += wrong
            ll_ratios.append(scores)

        # Evaluate accuracy, error rate, and minDCF
        error_rate = wrong_predictions / D.shape[1]
        accuracy = 1 - error_rate
        cost = compute_min_DCF(pi_value, Cfn, Cfp, numpy.hstack(ll_ratios), L)

        # Save results in CSV format
        output_csv.write("%s,%d,%d,%.3f,%.7f,_,_,_,_,%.3f,%.3f,%.5f\n" % (name, pca_value, z_value, pi_value, lambda_value, 100.0*accuracy, 100.0*error_rate, cost))
        print("%s,%d,%d,%.3f,%.7f,_,_,_,_,%.3f,%.3f,%.5f\n" % (name, pca_value, z_value, pi_value, lambda_value, 100.0*accuracy, 100.0*error_rate, cost))

        # Print results to console
        print("  %s" % (name))
        print("    Accuracy: %.3f%%" % (100.0*accuracy))
        print("    Error rate: %.3f%%" % (100.0*error_rate))
        print("    min DCF: %.3f" % (cost))
        print("\n")
    
    output_csv.close()

In [None]:
# ----- Train a specific model -----
def train_qlr (D, L, K, pca_value, z_value, pi_value, lambda_value):
    N = int(D.shape[1]/K)

    wrong_predictions = 0
    numpy.random.seed(1)
    ll_ratios = []
    labels = []
    indexes = numpy.random.permutation(D.shape[1])

    for i in range(K):
        # Select which subset to use for evaluation
        idxTest = indexes[i*N:(i+1)*N]
        if i>0: idxTrainLeft = indexes[0:i*N]
        elif (i+1)<K: idxTrainRight = indexes[(i+1)*N:]
        if i==0: idxTrain = idxTrainRight
        elif (i+1)==K: idxTrain = idxTrainLeft
        else: idxTrain = numpy.hstack([idxTrainLeft, idxTrainRight])
        DTR,LTR = D[:,idxTrain], L[idxTrain]
        DTE,LTE = D[:,idxTest], L[idxTest]

        # Apply ZNorm if necessary
        if z_value!=0:
            DTR,DTE = compute_znorm(DTR, DTE)

        # Apply PCA if necessary
        if pca_value!=0:
            P = apply_pca(DTR, LTR, pca_value)
            DTR,DTE = numpy.dot(P.T, DTR), numpy.dot(P.T, DTE)

        # Apply classifier
        wrong, scores = quadratic_logistic_regression(DTR, LTR, DTE, LTE, lambda_value, pi_value)
        wrong_predictions += wrong
        ll_ratios.append(scores)
        labels.append(LTE)
    
    return numpy.hstack(ll_ratios)

### Support Vector Machines

In [None]:
# ----- ___ -----
def compute_modified_H (D, L, K):
    # Distance
    row = numpy.zeros(D.shape[1]) + K
    d = numpy.vstack([D, row])
    
    # Assign a class label to each sample
    z = numpy.zeros(len(L))
    for i in range(len(L)):
        if (L[i]==0): z[i] = -1
        else: z[i] = 1
    
    G_ij = numpy.dot(d.T, d)
    z_ij = numpy.dot(row_to_column(z), column_to_row(z))
    H = z_ij * G_ij

    return d,z,H


# ----- ___ -----
def compute_kernel_H (D, L, K):
    # Assign a class label to each sample
    z = numpy.zeros(len(L))
    for i in range(len(L)):
        if (L[i]==0): z[i] = -1
        else: z[i] = 1
    
    ker = (numpy.dot(D.T, D)+1)**2 + K**2
    p1 = numpy.dot(row_to_column(z), column_to_row(z))
    H = p1 * ker

    return z,H


# ----- ___ -----
def compute_polynomial_kernel_H (D, L, c, d, K):
    # Assign a class label to each sample
    z = numpy.zeros(len(L))
    for i in range(len(L)):
        if (L[i]==0): z[i] = -1
        else: z[i] = 1
    
    ker = (numpy.dot(D.T, D)+c)**d + K**2
    p1 = numpy.dot(row_to_column(z), column_to_row(z))
    H = p1 * ker

    return z,H


# ----- ___ -----
def compute_radial_kernel_H (D, L, gamma, K):
    # Assign a class label to each sample
    z = numpy.zeros(len(L))
    for i in range(len(L)):
        if (L[i]==0): z[i] = -1
        else: z[i] = 1

    ker = numpy.zeros((D.shape[1], D.shape[1]))
    for i in range(D.shape[1]):
        for j in range(D.shape[1]):
            ker[i,j] = numpy.exp(-gamma * (numpy.linalg.norm(D[:,i]-D[:,j])**2)) + K**2
    p1 = numpy.dot(row_to_column(z), column_to_row(z))
    H = p1 * ker

    return z,H


# ----- Compute gradient -----
def compute_gradient (a, H):
    p1 = numpy.dot(H, row_to_column(a))
    p2 = numpy.dot(column_to_row(a), p1)
    s = a.sum()
    return 0.5*p2.ravel()-s, p1.ravel()-numpy.ones(a.size)


# ----- Optimal dual solution -----
def minimize_dual (D, z, H, C):
    bounds = [(0,C)] * D.shape[1]
    alpha, dual, _ = scipy.optimize.fmin_l_bfgs_b(compute_gradient, numpy.zeros(D.shape[1]), args=(H,), bounds=bounds, factr=1.0)
    return alpha, -dual


# ----- Find optimal primal solution -----
def primal_model (D, alpha, z, K, DTE, LTE):
    w = numpy.dot(D, row_to_column(alpha) * row_to_column(z))
    S = numpy.dot(column_to_row(w), D)
    loss = numpy.maximum(numpy.zeros(S.shape), 1-z*S).sum()

    row = numpy.zeros(DTE.shape[1]) + K
    DTEe = numpy.vstack([DTE, row])
    Se = numpy.dot(w.T, DTEe)
    predicted_labels = 1 * (Se>0)
    wrong = numpy.array(predicted_labels!=LTE).sum()

    return wrong, Se.ravel()


# ----- ___ -----
def primal_model_kernel (DTR, alpha, z, K, DTE, LTE):
    S = numpy.sum(numpy.dot(column_to_row(alpha*z), (numpy.dot(DTR.T, DTE)+1)**2 + K), axis=0)
    predicted_labels = 1 * (S>0)
    wrong = numpy.array(predicted_labels!=LTE).sum()

    return wrong, S.ravel()


# ----- Linear SVM ---
def linear_svm (DTR, LTR, DTE, LTE, K, C, gamma):
    D,z,H = compute_modified_H(DTR, LTR, K)
    alpha, dual = minimize_dual(D, z, H, C)
    wrong, scores = primal_model(D, alpha, z, K, DTE, LTE)
    return wrong, scores


# ----- Quadratic SVM -----
def quadratic_svm (DTR, LTR, DTE, LTE, K, C, gamma):
    z,H = compute_kernel_H(DTR, LTR, K)
    alpha, dual = minimize_dual(DTR, z, H, C)
    wrong, scores = primal_model_kernel(DTR, alpha, z, K, DTE, LTE)
    return wrong, scores


# ----- Polynomial Kernel SVM -----
def polynomial_kernel_svm (DTR, LTR, DTE, LTE, K, C, gamma):
    c = 1
    d = 2
    z,H = compute_polynomial_kernel_H(DTR, LTR, c, d, K)
    alpha, dual = minimize_dual(DTR, z, H, C)
    scores = numpy.sum(numpy.dot(column_to_row(alpha*z), (numpy.dot(DTR.T, DTE)+c)**d+K), axis=0)
    predicted_labels = 1*(scores>0)
    wrong = numpy.array(predicted_labels!=LTE).sum()
    return wrong, scores


# ----- Radial Basis Function Kernel SVM -----
def radial_kernel_svm (DTR, LTR, DTE, LTE, K, C, gamma):
    z,H = compute_radial_kernel_H(DTR, LTR, gamma, K)
    alpha, dual = minimize_dual(DTR, z, H, C)
    ker = numpy.zeros((DTR.shape[1], DTE.shape[1]))
    for i in range(DTR.shape[1]):
        for j in range(DTE.shape[1]):
            ker[i,j] = numpy.exp(-gamma * (numpy.linalg.norm(DTR[:,i]-DTE[:,j])**2)) + K**2
    scores = numpy.sum(numpy.dot((column_to_row(alpha*z)), ker), axis=0)
    predicted_labels = 1*(scores>0)
    wrong = numpy.array(predicted_labels!=LTE).sum()
    return wrong, scores


In [None]:
# ----- Train model using K-Fold -----
def svm_kfold (D, L, K, pca_value, z_value, C_value, gamma_value):
    classifiers = [
        (linear_svm, "Linear SVM"),
        (quadratic_svm, "Quadratic SVM"),
        (polynomial_kernel_svm, "Polynomial Kernel SVM"),
        (radial_kernel_svm, "Radial Basis Function Kernel SVM")
    ]
    output_csv = open(output_csv_name, "a")
    N = int(D.shape[1]/K)

    if pca_value==0:
        if z_value==0:
            print("No PCA, No ZNorm, C: %.5f, gamma: %.5f\n" % (C_value, gamma_value))
        else:
            print("No PCA, ZNorm, C: %.5f, gamma: %.5f\n" % (C_value, gamma_value))
    else:
        if z_value==0:
            print("PCA: %d, C: %.5f, No ZNorm, gamma: %.5f\n" % (pca_value, C_value, gamma_value))
        else:
            print("PCA: %d, C: %.5f, ZNorm, gamma: %.5f\n" % (pca_value, C_value, gamma_value))

    for j,(fun,name) in enumerate(classifiers):
        wrong_predictions = 0
        numpy.random.seed(j)
        ll_ratios = []
        indexes = numpy.random.permutation(D.shape[1])

        for i in range(K):
            # Select which subset to use for evaluation
            idxTest = indexes[i*N:(i+1)*N]
            if i>0: idxTrainLeft = indexes[0:i*N]
            elif (i+1)<K: idxTrainRight = indexes[(i+1)*N:]
            if i==0: idxTrain = idxTrainRight
            elif (i+1)==K: idxTrain = idxTrainLeft
            else: idxTrain = numpy.hstack([idxTrainLeft, idxTrainRight])
            DTR,LTR = D[:,idxTrain], L[idxTrain]
            DTE,LTE = D[:,idxTest], L[idxTest]

            # Apply ZNorm if necessary
            if z_value!=0:
                DTR,DTE = compute_znorm(DTR, DTE)

            # Apply PCA if necessary
            if pca_value!=0:
                P = apply_pca(DTR, LTR, pca_value)
                DTR,DTE = numpy.dot(P.T, DTR), numpy.dot(P.T, DTE)
            
            # Apply classifier
            wrong, scores = fun(DTR, LTR, DTE, LTE, K, C_value, gamma_value)
            wrong_predictions += wrong
            ll_ratios.append(scores)
        
        # Evaluate accuracy, error rate, and minDCF
        error_rate = wrong_predictions / D.shape[1]
        accuracy = 1 - error_rate
        cost = compute_min_DCF(pi_t, Cfn, Cfp, numpy.hstack(ll_ratios), L)

        # Save results in CSV format
        output_csv.write("%s,%d,%d,_,_,%.5f,%.5f,_,_,%.3f,%.3f,%.5f\n" % (name, pca_value, z_value, C_value, gamma_value, 100.0*accuracy, 100.0*error_rate, cost))
        print("%s,%d,%d,_,_,%.5f,%.5f,_,_,%.3f,%.3f,%.5f\n" % (name, pca_value, z_value, C_value, gamma_value, 100.0*accuracy, 100.0*error_rate, cost))

        # Print results to console
        print("  %s" % (name))
        print("    Accuracy: %.3f%%" % (100.0*accuracy))
        print("    Error rate: %.3f%%" % (100.0*error_rate))
        print("    min DCF: %.3f" % (cost))
        print("\n")

    output_csv.close()


In [None]:
# ----- Train a specific model -----
def train_pol_svm (D, L, K, pca_value, z_value, C_value, gamma_value):
    N = int(D.shape[1]/K)

    wrong_predictions = 0
    numpy.random.seed(1)
    ll_ratios = []
    indexes = numpy.random.permutation(D.shape[1])

    for i in range(K):
        # Select which subset to use for evaluation
        idxTest = indexes[i*N:(i+1)*N]
        if i>0: idxTrainLeft = indexes[0:i*N]
        elif (i+1)<K: idxTrainRight = indexes[(i+1)*N:]
        if i==0: idxTrain = idxTrainRight
        elif (i+1)==K: idxTrain = idxTrainLeft
        else: idxTrain = numpy.hstack([idxTrainLeft, idxTrainRight])
        DTR,LTR = D[:,idxTrain], L[idxTrain]
        DTE,LTE = D[:,idxTest], L[idxTest]

        # Apply ZNorm if necessary
        if z_value!=0:
            DTR,DTE = compute_znorm(DTR, DTE)

        # Apply PCA if necessary
        if pca_value!=0:
            P = apply_pca(DTR, LTR, pca_value)
            DTR,DTE = numpy.dot(P.T, DTR), numpy.dot(P.T, DTE)
            
        # Apply classifier
        wrong, scores = polynomial_kernel_svm(DTR, LTR, DTE, LTE, K, C_value, gamma_value)
        wrong_predictions += wrong
        ll_ratios.append(scores)

    return numpy.hstack(ll_ratios)


### Gaussian Mixture Models

In [None]:
# ----- ___ -----
def logpdf_GAU_ND_GMM(X, mu, C):
    M,N = X.shape[0], X.shape[1]
    Y = numpy.empty([1,N])
    
    for i in range(N):
        x = X[:,i:i+1]
        _,det = numpy.linalg.slogdet(C)
        inv = numpy.linalg.inv(C)
        density = -(M/2)*numpy.log(2*math.pi) - 1/2*det - 1/2*numpy.dot((x-mu).T, numpy.dot(inv, (x-mu)))
        Y[:,i]=density
        
    return Y.ravel()


# ----- Compute log-density of a GMM for a set of samples contained in matrix X -----
def logpdf_GMM (x, gmm):    
    y = []
    for weight,mu,sigma in gmm:
        lc = logpdf_GAU_ND_GMM(x, mu, sigma) + numpy.log(weight)
        y.append(column_to_row(lc))
    S = numpy.vstack(y)
    logdensity = scipy.special.logsumexp(y, axis=0)
    return S,logdensity


# ----- ___ -----
def ML_GMM_iteration(D, gmm, diag, tied):
    prevLL = None
    oldLL = None
    deltaLL = 1.0
    iteration = 0

    while deltaLL>1e-6:
        componentsLL = []
        for w, mu, C in gmm:
            ll = logpdf_GAU_ND_GMM(D, mu, C) + numpy.log(w)
            componentsLL.append(column_to_row(ll))
        LL = numpy.vstack(componentsLL)
        post = numpy.exp(LL - scipy.special.logsumexp(LL, axis=0))
        oldLL = prevLL
        prevLL = scipy.special.logsumexp(LL, axis=0).sum() / D.shape[1]
        
        if oldLL is not None: deltaLL = prevLL - oldLL
        iteration += 1
        psi = 0.01
        updatedGMM = []
        for i in range(post.shape[0]):
            Z = post[i].sum()
            F = (post[i:i+1, :]*D).sum(1)
            S = numpy.dot((post[i:i+1, :])*D, D.T)
            new_weight = Z / D.shape[1]
            new_mean = row_to_column(F/Z)
            new_sigma = S/Z - numpy.dot(new_mean, new_mean.T)
            
            if tied:
                c = 0
                for j in range(post.shape[0]):
                    Z = post[j].sum()
                    F = (post[j:j+1, :]*D).sum(1)
                    S = numpy.dot((post[j:j+1, :])*D, D.T)
                    c += Z * (S/Z - numpy.dot(row_to_column(F/Z), row_to_column(F/Z).T))
                new_sigma = c / D.shape[1]
            
            if diag: new_sigma = new_sigma * numpy.eye(new_sigma.shape[0])
            
            U, s, _ = numpy.linalg.svd(new_sigma)
            s[s<psi] = psi
            new_sigma=numpy.dot(U, row_to_column(s)*U.T)
            updatedGMM.append((new_weight, new_mean, new_sigma))

        gmm = updatedGMM
        componentsLL = []
        for w, mu, C in gmm:
            ll = logpdf_GAU_ND_GMM(D, mu, C) + numpy.log(w)
            componentsLL.append(column_to_row(ll))
        LL = numpy.vstack(componentsLL)
        post = LL - scipy.special.logsumexp(LL, axis=0)
        post = numpy.exp(post)
        oldLL = prevLL
        prevLL = scipy.special.logsumexp(LL, axis=0).sum() / D.shape[1]
        deltaLL = prevLL - oldLL
    
    return gmm


# ----- ___ -----
def ML_GMM_LBG (D, weights, means, sigma, G, diag, tied):
    gmm = [(weights,means,sigma)]

    while len(gmm)<=G:
        if len(gmm)!=1: gmm = ML_GMM_iteration(D, gmm, diag, tied)
        if len(gmm)==G: break

        newGMM = []
        for(weight, mu, sigma) in gmm:
            U,s,_ = numpy.linalg.svd(sigma)
            s[s<0.01] = 0.01
            sigma = numpy.dot(U, row_to_column(s)*U.T)
            
            newGMM.append((weight*0.5, mu+s[0]**0.5*U[:, 0:1]*0.1, sigma)) 
            newGMM.append((weight*0.5, mu-s[0]**0.5*U[:, 0:1]*0.1, sigma))
        gmm = newGMM

    return gmm


# ----- GMM classifier - Full covariance -----
def gmm_full_covariance (DTR, LTR, DTE, LTE, g0, g1):
    # Consider only labels=0
    D0 = DTR[:,LTR==0]
    w0 = 1.0
    mu0 = compute_mean(D0)
    sigma0 = compute_covariance(D0)
    U,s,_ = numpy.linalg.svd(sigma0)
    s[s<0.01] = 0.01
    C0 = numpy.dot(U, row_to_column(s)*U.T)
    gmm0 = ML_GMM_LBG(D0, w0, mu0, C0, g0, False, False)
    _,score0 = logpdf_GMM(DTE, gmm0)

    # Consider only labels=1
    D1 = DTR[:,LTR==1]
    w1 = 1.0
    mu1 = compute_mean(D1)
    sigma1 = compute_covariance(D1)
    U,s,_ = numpy.linalg.svd(sigma1)
    s[s<0.01] = 0.01
    C1 = numpy.dot(U, row_to_column(s)*U.T)
    gmm1 = ML_GMM_LBG(D1, w1, mu1, C1, g1, False, False)
    _,score1 = logpdf_GMM(DTE, gmm1)

    # Compute scores and wrong predictions
    scores = numpy.vstack((score0, score1))
    marginals = column_to_row(scipy.special.logsumexp(scores, axis=0))
    f = numpy.exp(scores - marginals)
    wrong_predictions = (f.argmax(0)!=LTE).sum()
    scores = (score1-score0)[0]

    return wrong_predictions, scores


# ----- GMM classifier - Diagonal covariance -----
def gmm_diagonal_covariance (DTR, LTR, DTE, LTE, g0, g1):
    # Consider only labels=0
    D0 = DTR[:,LTR==0]
    w0 = 1.0
    mu0 = compute_mean(D0)
    sigma0 = compute_covariance(D0)
    sigma0 *= numpy.eye(sigma0.shape[0])
    U,s,_ = numpy.linalg.svd(sigma0)
    s[s<0.01] = 0.01
    C0 = numpy.dot(U, row_to_column(s)*U.T)
    gmm0 = ML_GMM_LBG(D0, w0, mu0, C0, g0, True, False)
    _,score0 = logpdf_GMM(DTE, gmm0)

    # Consider only labels=1
    D1 = DTR[:,LTR==1]
    w1 = 1.0
    mu1 = compute_mean(D1)
    sigma1 = compute_covariance(D1)
    sigma1 *= numpy.eye(sigma1.shape[0])
    U,s,_ = numpy.linalg.svd(sigma1)
    s[s<0.01] = 0.01
    C1 = numpy.dot(U, row_to_column(s)*U.T)
    gmm1 = ML_GMM_LBG(D1, w1, mu1, C1, g1, True, False)
    _,score1 = logpdf_GMM(DTE, gmm1)

    # Compute scores and wrong predictions
    scores = numpy.vstack((score0, score1))
    marginals = column_to_row(scipy.special.logsumexp(scores, axis=0))
    f = numpy.exp(scores - marginals)
    wrong_predictions = (f.argmax(0)!=LTE).sum()
    scores = (score1-score0)[0]

    return wrong_predictions, scores


# ----- GMM classifier - Tied covariance -----
def gmm_tied_covariance (DTR, LTR, DTE, LTE, g0, g1):
    # Consider only labels=0
    D0 = DTR[:,LTR==0]
    w0 = 1.0
    mu0 = compute_mean(D0)
    sigma0 = compute_covariance(D0)
    U,s,_ = numpy.linalg.svd(sigma0)
    s[s<0.01] = 0.01
    C0 = numpy.dot(U, row_to_column(s)*U.T)
    gmm0 = ML_GMM_LBG(D0, w0, mu0, C0, g0, False, True)
    _,score0 = logpdf_GMM(DTE, gmm0)

    # Consider only labels=1
    D1 = DTR[:,LTR==1]
    w1 = 1.0
    mu1 = compute_mean(D1)
    sigma1 = compute_covariance(D1)
    U,s,_ = numpy.linalg.svd(sigma1)
    s[s<0.01] = 0.01
    C1 = numpy.dot(U, row_to_column(s)*U.T)
    gmm1 = ML_GMM_LBG(D1, w1, mu1, C1, g1, False, True)
    _,score1 = logpdf_GMM(DTE, gmm1)

    # Compute scores and wrong predictions
    scores = numpy.vstack((score0, score1))
    marginals = column_to_row(scipy.special.logsumexp(scores, axis=0))
    f = numpy.exp(scores - marginals)
    wrong_predictions = (f.argmax(0)!=LTE).sum()
    scores = (score1-score0)[0]

    return wrong_predictions, scores


# ----- GMM classifier - Tied diagonal covariance -----
def gmm_tied_diagonal_covariance (DTR, LTR, DTE, LTE, g0, g1):
    # Consider only labels=0
    D0 = DTR[:,LTR==0]
    w0 = 1.0
    mu0 = compute_mean(D0)
    sigma0 = compute_covariance(D0)
    sigma0 *= numpy.eye(sigma0.shape[0])
    U,s,_ = numpy.linalg.svd(sigma0)
    s[s<0.01] = 0.01
    C0 = numpy.dot(U, row_to_column(s)*U.T)
    gmm0 = ML_GMM_LBG(D0, w0, mu0, C0, g0, True, True)
    _,score0 = logpdf_GMM(DTE, gmm0)

    # Consider only labels=1
    D1 = DTR[:,LTR==1]
    w1 = 1.0
    mu1 = compute_mean(D1)
    sigma1 = compute_covariance(D1)
    sigma1 *= numpy.eye(sigma1.shape[0])
    U,s,_ = numpy.linalg.svd(sigma1)
    s[s<0.01] = 0.01
    C1 = numpy.dot(U, row_to_column(s)*U.T)
    gmm1 = ML_GMM_LBG(D1, w1, mu1, C1, g1, True, True)
    _,score1 = logpdf_GMM(DTE, gmm1)

    # Compute scores and wrong predictions
    joint = numpy.vstack((score0, score1))
    marginals = column_to_row(scipy.special.logsumexp(joint, axis=0))
    f = numpy.exp(joint - marginals)
    wrong_predictions = (f.argmax(0)!=LTE).sum()
    scores = (score1-score0)[0]

    return wrong_predictions, scores


In [None]:
# ----- GMM training -----
def gmm_kfold (D, L, K, pca_value, z_value, g0_value, g1_value):
    classifiers = [
        (gmm_full_covariance, "GMM Full Covariance"),
        (gmm_diagonal_covariance, "GMM Diagonal Covariance"),
        (gmm_tied_covariance, "GMM Tied Covariance"),
        (gmm_tied_diagonal_covariance, "GMM Tied Diagonal Covariance")
    ]
    output_csv = open(output_csv_name, "a")
    N = int(D.shape[1]/K)

    if pca_value==0:
        if z_value==0:
            print("No PCA, No ZNorm, G0: %d, G1: %d\n" % (g0_value, g1_value))
        else:
            print("No PCA, ZNorm, G0: %d, G1: %d\n" % (g0_value, g1_value))
    else:
        if z_value==0:
            print("PCA: %d, No ZNorm, G0: %d, G1: %d\n" % (pca_value, g0_value, g1_value))
        else:
            print("PCA: %d, ZNorm, G0: %d, G1: %d\n" % (pca_value, g0_value, g1_value))

    for j,(fun,name) in enumerate(classifiers):
        wrong_predictions = 0
        numpy.random.seed(j)
        scores = []
        labels = []
        indexes = numpy.random.permutation(D.shape[1])

        for i in range(K):
            # Select which subset to use for evaluation
            idxTest = indexes[i*N:(i+1)*N]
            if i>0: idxTrainLeft = indexes[0:i*N]
            elif (i+1)<K: idxTrainRight = indexes[(i+1)*N:]
            if i==0: idxTrain = idxTrainRight
            elif (i+1)==K: idxTrain = idxTrainLeft
            else: idxTrain = numpy.hstack([idxTrainLeft, idxTrainRight])
            DTR,LTR = D[:,idxTrain], L[idxTrain]
            DTE,LTE = D[:,idxTest], L[idxTest]

            # Apply Z-Normalization if necessary
            if z_value!=0:
                DTR,DTE = compute_znorm(DTR, DTE)

            # Apply PCA if necessary
            if pca_value!=0:
                P = apply_pca(DTR, LTR, pca_value)
                DTR,DTE = numpy.dot(P.T, DTR), numpy.dot(P.T, DTE)
            
            # Apply classifier
            wrong, score = fun(DTR, LTR, DTE, LTE, g0_value, g1_value)
            wrong_predictions += wrong
            scores.append(score)
            labels.append(LTE)
        
        # Evaluate accuracy and error rate
        error_rate = wrong_predictions / D.shape[1]
        accuracy = 1 - error_rate
        cost = compute_min_DCF(pi_t, Cfn, Cfp, numpy.hstack(scores), numpy.hstack(labels))

        # Save results in CSV format
        output_csv.write("%s,%d,%d,_,_,_,_,%d,%d,%.3f,%.3f,%.5f\n" % (name, pca_value, z_value, g0_value, g1_value, 100.0*accuracy, 100.0*error_rate, cost))
        print("%s,%d,%d,_,_,_,_,%d,%d,%.3f,%.3f,%.5f\n" % (name, pca_value, z_value, g0_value, g1_value, 100.0*accuracy, 100.0*error_rate, cost))

        # Print results to console
        print("  %s" % (name))
        print("    Accuracy: %.3f%%" % (100.0*accuracy))
        print("    Error rate: %.3f%%" % (100.0*error_rate))
        print("    min DCF: %.3f\n" % (cost))
        print("\n")

    output_csv.close()


In [None]:
# ----- Train a specific model -----
def train_diagonal_gmm (D, L, K, pca_value, z_value, g0_value, g1_value):
    N = int(D.shape[1]/K)

    wrong_predictions = 0
    numpy.random.seed(1)
    scores = []
    labels = []
    indexes = numpy.random.permutation(D.shape[1])

    for i in range(K):
        # Select which subset to use for evaluation
        idxTest = indexes[i*N:(i+1)*N]
        if i>0: idxTrainLeft = indexes[0:i*N]
        elif (i+1)<K: idxTrainRight = indexes[(i+1)*N:]
        if i==0: idxTrain = idxTrainRight
        elif (i+1)==K: idxTrain = idxTrainLeft
        else: idxTrain = numpy.hstack([idxTrainLeft, idxTrainRight])
        DTR,LTR = D[:,idxTrain], L[idxTrain]
        DTE,LTE = D[:,idxTest], L[idxTest]

        # Apply Z-Normalization if necessary
        if z_value!=0:
            DTR,DTE = compute_znorm(DTR, DTE)

        # Apply PCA if necessary
        if pca_value!=0:
            P = apply_pca(DTR, LTR, pca_value)
            DTR,DTE = numpy.dot(P.T, DTR), numpy.dot(P.T, DTE)
            
        # Apply classifier
        wrong, score = gmm_diagonal_covariance(DTR, LTR, DTE, LTE, g0_value, g1_value)
        wrong_predictions += wrong
        scores.append(score)
        labels.append(LTE)

    return numpy.hstack(scores)


### Run training

In [None]:
# ----- Train model -----
def train_model (D, L):
    K = 5

    output_csv = open(output_csv_name, "w")
    output_csv.write("Model,PCA,ZNorm,pi,lambda,C,gamma,G0,G1,Accuracy,Error rate,minDCF\n")
    output_csv.close()
    
    pca_values = [0, 9, 8, 7, 6, 5]         # value=0 when we don't apply PCA
    pi_values = [0.1, 0.5, 0.9, effective_prior]
    lambda_values = [1e-6, 1e-4, 1e-3, 1e-1, 1e+0, 1e+1, 1e+2]
    gmm_values = [2, 4, 8]
    C_values = [1e-4, 1e-3, 1e-2, 1e-1, 1e+0]
    gamma_values = [1e-4, 1e-3, 1e-2, 1e-1, 1e+0]
    z_values = [0, 1]
    
    # pca_values = [0, 9]                     # value=0 when we don't apply PCA
    # pi_values = [0.1, 0.5]
    # lambda_values = [1e-6, 1e-1]
    # gmm_values = [2, 8]
    # C_values = [1e-2, 1e+0]
    # gamma_values = [1e-3, 1e-1]

    for pca_value in pca_values:
        for pi_value in pi_values:
            # Generative models
            gen_kfold(D, L, K, pca_value, pi_value)

            # Discriminative models
            for z_value in z_values:
                for lambda_value in lambda_values:
                    dis_kfold(D, L, K, pca_value, z_value, pi_value, lambda_value, z_value)
        
        # Support Vector Machines
        for z_value in z_values:
            for C_value in C_values:
                for gamma_value in gamma_values:
                    svm_kfold(D, L, K, pca_value, z_value, C_value, gamma_value)
        
        # Gaussian Mixture Models
        if pca_value!=8 and pca_value!=6:
            for z_value in z_values:
                for g0_value in gmm_values:
                    for g1_value in gmm_values:
                        if g0_value>g1_value:
                            gmm_kfold(D, L, K, pca_value, z_value, g0_value, g1_value)


# ----- Sort training results with respect to minDCF -----
def sort_training_results ():
    # Sort results basing on minDCF
    with open("../output/Training/Results.csv", "r") as file:
        reader = csv.DictReader(file)
        next(reader)
        lines = list(reader)
    sorted_lines = sorted(lines, key=lambda x: float(x["minDCF"]), reverse=False)
    
    # Store ranking on a new CSV file
    fieldnames = ["Model", "PCA", "Z Norm", "pi", "lambda", "C", "gamma", "G0", "G1", "Accuracy", "Error rate", "minDCF"]
    with open("../output/Training/SortedResults.csv", "w+") as file:
        file.write("Model,PCA,ZNorm,pi,lambda,C,gamma,G0,G1,Accuracy,Error rate,minDCF\n")
        writer = csv.DictWriter(file, fieldnames=fieldnames)
        writer.writerows(sorted_lines)
    
    best_models = sorted_lines[:3]
    for model in best_models:
        print("%s \n" % model)

                        
# ----- Select the three best models and train them -----
def train_top3_models (D, L):
    train_model(D, L)
    sort_training_results()

    # ---- Quadratic Logistic Regression ----
    # --- PCA=8, ZNorm, pi=effective_prior, lambda=1e-2 ---
    qlr_scores = train_qlr(D, L, 5, 8, 1, effective_prior, 1e-2)
    
    # ---- Polynomial Kernel SVM ----
    # --- No PCA, No ZNorm, C=1, gamma=1e-3 ---
    svm_scores = train_pol_svm(D, L, 5, 0, 0, 1e+0, 1e-3)

    # ---- Diagonal Gaussian Mixture Model ----
    # --- No PCA, No ZNorm, G0=8, G1=2 ---
    gmm_scores = train_diagonal_gmm(D, L, 5, 0, 0, 8, 2)

    return qlr_scores, svm_scores, gmm_scores

In [None]:
qlr_scores, svm_scores, gmm_scores = train_top3_models(DTR, LTR)

## 3 - Calibration and Fusion

In [None]:
# ----- Bayes Error plot -----
def bayes_error_plot (ll_ratios, labels, title):
    eff_plo = numpy.linspace(-4, 4, 10)
    eff_p = 1 / (1 + numpy.exp(-1*eff_plo))
    actDCF = minDCF = []

    for i in range(10):
        actDCF.append(compute_actual_DCF(eff_p[i], 1, 1, numpy.hstack(ll_ratios), numpy.hstack(labels), True))
        minDCF.append(compute_min_DCF(eff_p[i], 1, 1, numpy.hstack(ll_ratios), numpy.hstack(labels)))

    matplotlib.pyplot.plot(eff_plo, actDCF, label='Actual DCF', color="b")
    matplotlib.pyplot.plot(eff_plo, minDCF, label='Min DCF', color="r", linestyle="--")
    matplotlib.pyplot.xlabel("Effective prior log-odds") 
    matplotlib.pyplot.ylabel("DCF value")
    matplotlib.pyplot.legend(["act DCF", "min DCF"])
    matplotlib.pyplot.title("Bayes Error Plot"+ title)
    matplotlib.pyplot.ylim([0, 1])
    matplotlib.pyplot.xlim([-4, 4])
    matplotlib.pyplot.savefig("output/Calibration_Fusion/" + title+ '.png')


# ----- Score calibration -----
def score_calibration (DTE, v, pi_value=effective_prior):
    s = numpy.empty((DTE.shape[1]))
    w,b = v[0:-1], v[-1]
    for i in range(DTE.shape[1]):
        xt = DTE[:,i]
        s[i] = b + numpy.dot(w.T, xt) - numpy.log((pi_value) / (1-pi_value))
    return numpy.array(s)


# ----- Class calibration -----
def class_calibration (DTR, LTR, DTE, LTE, pi_value, lambda_value):
    x0 = numpy.zeros(DTR.shape[0] + 1)
    x = numerical_optimization(lr_obj_wrap(DTR, LTR, lambda_value, pi_value), x0, lr_compute_gradient(DTR, LTR, lambda_value, pi_value))

    scores = score_calibration(DTE, x, pi_value)
    predicted_labels = predict_labels(scores, 0)
    wrong_predictions = count_mispredictions(predicted_labels, LTE)

    return wrong_predictions, scores


# ----- Calibration function -----
def compute_calibration (D, L, K, pi_value, lambda_value):
    N = int(D.shape[1]/K)
    
    wrong_predictions = 0
    numpy.random.seed(1)
    ll_ratios = []
    labels = []
    indexes = numpy.random.permutation(D.shape[1])

    for i in range(K):
        # Select which subset to use for evaluation
        idxTest = indexes[i*N:(i+1)*N]
        if i>0: idxTrainLeft = indexes[0:i*N]
        elif (i+1)<K: idxTrainRight = indexes[(i+1)*N:]
        if i==0: idxTrain = idxTrainRight
        elif (i+1)==K: idxTrain = idxTrainLeft
        else: idxTrain = numpy.hstack([idxTrainLeft, idxTrainRight])
        DTR,LTR = D[:,idxTrain], L[idxTrain]
        DTE,LTE = D[:,idxTest], L[idxTest]

        # Apply classifier
        wrong, scores = class_calibration(DTR, LTR, DTE, LTE, pi_value, lambda_value)
        wrong_predictions += wrong
        ll_ratios.append(scores)
        labels.append(LTE)

    return numpy.hstack(ll_ratios)


# ----- Calibrate scores -----
def calibrate_scores (scores, labels, model_title):
    scores = numpy.hstack(scores)

    numpy.random.seed(100)
    indexes = numpy.random.permutation(scores.shape[0])
    sc = numpy.zeros((1, scores.shape[0]))
    lab = numpy.zeros((labels.size,))
    i = 0
    for ind in indexes:
        sc[0,i] = scores[ind]
        lab[i] = labels[ind]
        i += 1

    calibrated = compute_calibration(sc, lab, 2, effective_prior, 1e-4)
    
    bayes_error_plot(scores, labels, model_title+'_uncalibrated')
    bayes_error_plot(numpy.hstack(calibrated), lab, model_title+'_calibrated')

    return calibrated


# ----- Fuse two models -----
def fuse_models (model1, scores1, model2, scores2, labels):
    s = [ numpy.hstack(scores1), numpy.hstack(scores2) ]
    s = numpy.vstack(s)
    numpy.random.seed(5)
    indexes = numpy.random.permutation(s.shape[1])
    sc = numpy.zeros((2, s.shape[1]))
    lab = numpy.zeros((labels.size,))
    i = 0
    for ind in indexes:
        sc[:,i] = s[:,ind]
        lab[i] = labels[ind]
        i += 1

    calibrated = compute_calibration(sc, lab, 2, effective_prior, 1e-3)
    actualDCF = compute_actual_DCF(pi_t, Cfn, Cfp, numpy.hstack(calibrated), numpy.hstack(lab), False)

    bayes_error_plot(numpy.hstack(calibrated), numpy.hstack(lab), model1+'_'+model2)

    return calibrated


In [None]:
# ----- Calibrate scores -----
def scores_calibration (L, qlr_scores, svm_scores, gmm_scores):
    qlr_scores = calibrate_scores(qlr_scores, L, 'QLR')
    svm_scores = numpy.hstack(numpy.array([]),numpy.array([]))#calibrate_scores(svm_scores, L, 'SVM')
    gmm_scores = numpy.hstack(numpy.array([]),numpy.array([]))#calibrate_scores(gmm_scores, L, 'GMM')

    return qlr_scores, svm_scores, gmm_scores


# ----- Fuse top3 models -----
def models_fusion (L, qlr_scores, svm_scores, gmm_scores):
    qlr_svm_scores = fuse_models('QLR', qlr_scores, 'SVM', svm_scores, L)
    qlr_gmm_scores = fuse_models('QLR', qlr_scores, 'GMM', gmm_scores, L)
    svm_gmm_scores = fuse_models('SVM', svm_scores, 'GMM', gmm_scores, L)
    
    return qlr_svm_scores, qlr_gmm_scores, svm_gmm_scores

### Run calibration and fusion

In [None]:
qlr_scores_cal, svm_scores_cal, gmm_scores_cal = scores_calibration(LTR, qlr_scores, svm_scores, gmm_scores)
qlr_svm_scores, qlr_gmm_scores, svm_gmm_scores = models_fusion(LTR, qlr_scores_cal, svm_scores_cal, gmm_scores_cal)

## 4 - Evaluation

In [None]:
# ----- Evaluate model -----
def model_evaluation (DTR, LTR, DTE, LTE):
    pass

In [None]:
model_evaluation(DTR, LTR, DTE, LTE)