In [30]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import sklearn
from numba import njit,prange,jit,guvectorize
import numba
from imblearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV,KFold,cross_val_score,train_test_split,StratifiedKFold
from sklearn.metrics import confusion_matrix,plot_confusion_matrix, auc as aucc,plot_roc_curve,recall_score,precision_score,f1_score
from sklearn.decomposition import PCA
import time
import os
from scipy.special import expit,softmax
import tensorflow as tf
from scipy.special import expit 

In [32]:
#The main class defining the classifier
class RMTClassifier(object):
    def __init__(self,threshold=1,train_cutoff=0.92):
        self.threshold=threshold
        self.train_cutoff=train_cutoff
        self.epsilon = None
        self.feature_vecs = None
        self.dim_v= None
        self.p = None
    def fit(self,train_mat):
        n,p=train_mat.shape
        self.p=p
        gamma= p/n
        thresh= ((1+np.sqrt(gamma))**2)*self.threshold
        c_mat=np.dot(train_mat.T,train_mat)/n

        evals,evecs=np.linalg.eig(c_mat)
        idx= evals.argsort()
        idx= idx[::-1]
        evals,evecs= evals[idx],evecs[:,idx]
        

        dim_v= evals[evals>thresh].shape[0]
        feature_vecs= np.ascontiguousarray(evecs[:,:dim_v].T,dtype=np.complex128)
        self.dim_v,self.feature_vecs=dim_v,feature_vecs
        train_mat=np.ascontiguousarray(train_mat,dtype=np.complex128)
        
        trial_mol_proj= np.zeros((self.dim_v,self.p),dtype=np.complex128)
        similarity=find_similarity2(train_mat,trial_mol_proj,feature_vecs)
        similarity.sort()
  
        cutoff_idx= int(self.train_cutoff*len(similarity))
        epsilon= similarity[cutoff_idx]
        self.epsilon= epsilon

    def predict(self,test, epsilon_multiple = 1):
        test=np.ascontiguousarray(test,dtype=np.complex128)
        trial_mol_proj= np.zeros((self.dim_v,self.p),dtype=np.complex128)
        test_similarity=find_similarity2(test,trial_mol_proj,self.feature_vecs)
        
        #predictions= np.array([1 if x<self.epsilon*epsilon_multiple else 0 for x in test_similarity])
        predictions= test_similarity,self.epsilon
        return predictions


@guvectorize(["c16[:],c16[:,:],c16[:,:],f8[:]"], "(p),(dim_v,p), (dim_v,p) -> ()", nopython=True, target="parallel")
def find_similarity2(trial_mol,trial_mol_proj,feature_vecs,similarity):
    dummy=np.zeros_like(trial_mol_proj)
    for idx in prange(feature_vecs.shape[0]):
        dotprod= np.vdot(feature_vecs[idx],trial_mol).real
        dummy[idx]=dotprod*feature_vecs[idx]  
    projection=np.sum(dummy,axis=0).real
    similarity[0]=np.linalg.norm(trial_mol-projection)


  dotprod= np.vdot(feature_vecs[idx],trial_mol).real


In [4]:
#This function returns one classifier for the active samples and the other for incative samples
def fit_RMT(x_act,x_inact):
    clf_act=RMTClassifier()
    clf_inact=RMTClassifier()
    clf_act.fit(x_act)
    clf_inact.fit(x_inact)
    return clf_act,clf_inact

#This function uses both the classifiers to predict the scores of the sample being in active class
def clf_predict(clf_act,clf_inact,x_test,e=1,thresh=True,prob=False):
    pred1,epsilon1= clf_act.predict(x_test)
    pred0,epsilon0= clf_inact.predict(x_test)
    diff=expit(pred1-pred0)
    epsilon= expit(epsilon1-epsilon0)
    if prob==True: return diff
    else:    
        if thresh==False: pred=[1 if i<e else 0 for i in diff]
        else: pred=[1 if i<epsilon*e else 0 for i in diff]
        return pred

    
    
#Some random stuff to keep a check on different metrics
def FP_TP_FN_TN(y_test,y_pred):
    y1,y2=y_test.ravel(),y_pred.ravel()
    fp,tp,fn,tn=0,0,0,0
    for i in prange(len(y1)):
        a,b=y1[i],y2[i]
        if(a==1): 
            if(b==1):tp+=1
            else:fn+=1
        else:
            if(b==1):fp+=1
            else:tn+=1
    
    return fp,tp,fn,tn

def optimized_thresh(clf_act,clf_inact,x_train,y_train,y):

    prob1,e1=clf_act.predict(x_train)
    prob0,e0=clf_inact.predict(x_train)
    diff=prob1-prob0
    e_diff=e1-e0
    e,max_acc=0,0
    for i in np.linspace(e_diff-5,e_diff+5,100):
        acc=np.mean((diff<i)==(y_train==y))
        if acc>max_acc:
            max_acc=acc
            e=i
    return e,max_acc

def metric_RMT(clf1,clf0,x_test,y_test,plot=True):
    x,y,p,r,f1,e=[],[],[],[],[],[]
    best_f1=0
    arr[0]=0
    prob=clf_predict(clf1,clf0,x_test,thresh=False,prob=True)
    for idx,val in enumerate(arr):
      #if idx%5==0:
          #print(f'progress:{idx}%')
        y_pred=np.where(prob<val,1,0)
        fp,tp,fn,tn= FP_TP_FN_TN(np.array(y_test),np.array(y_pred))
        tpr=tp/(tp+fn)
        fpr=fp/(fp+tn)
        if (tp+fp)==0 : precision=1
        else: precision= tp/(tp+fp)
        recall= tpr
        if precision+recall==0: f1_sc=0
        else:f1_sc=(2*precision*recall)/(precision+recall)
        f1.append(f1_sc)
        p.append(precision)
        r.append(recall)
        e.append(val)
        x.append(fpr)
        y.append(tpr)
        print('e=',val,'f1=',f1_sc)
        print(confusion_matrix(y_test,y_pred))

        if (best_f1<f1_sc):
            best_f1,i=f1_sc,idx  
 
    AUC_ROC,AUC_PR=aucc(x,y),aucc(r[1:],p[1:])
    if(plot==True):
    
        fig1,fig2=plt.figure(),plt.figure()
        ax1,ax2=fig1.add_subplot(111),fig2.add_subplot(111)
        ax1.set_title('ROC')
        ax1.set_ylabel('True Positive')
        ax1.set_xlabel('False Positive')
        ax1.text(0.75,0.15,f'AUC:{str(np.round(AUC_ROC,4))}')
        ax1.plot(x,y)
        ax1.axis([0,1,0,1])
        ax2.set_title('Precision/Recall')
        ax2.set_ylabel('Precision')
        ax2.set_xlabel('Recall')
        ax2.text(0.75,0.15,f'AUC-PR:{str(np.round(AUC_PR,4))}')
        ax2.plot(r,p)
        ax2.axis([0,1,0,1])
        plt.show()
  
    return AUC_ROC,AUC_PR,p,r,f1,best_f1,e,AUC_ROC*AUC_PR

# Example application on MNIST

In [12]:
#Loading and preprocessing

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
classes=[0,1,2,3,4,5,6,7,8,9]

def preprocess_and_arrange(x_data,y_data):
    x_data=(x_data/127.5)-1
    x_data=np.reshape(x_data,(len(x_data),28*28))
    idx=y_data.argsort()
    y_data=y_data[idx]
    x_data=x_data[idx]

    return x_data,y_data
    
x_train,y_train=preprocess_and_arrange(x_train,y_train)
x_test,y_test=preprocess_and_arrange(x_test,y_test)

#index list for all classes
idx=[]                            
c='a'
for i in np.arange(len(y_train)):
    if c!=y_train[i]:
        idx.append(i)
        c=y_train[i]
idx.append(len(y_train))
print(x_train.shape,x_test.shape)

(60000, 784) (10000, 784)


In [23]:
#train and optimize classifiers for each class, one to find whether the sample is form class x and the other for not in class x, total of 20 classifiers
classy1,classy0,es,individual_accs=[],[],[],[]
for i in np.arange(len(classes)):
    y=classes[i]
    act=x_train[idx[i]:idx[i+1]]
    inact=np.concatenate((x_train[0:idx[i]],x_train[idx[i+1]:]))
    clf_act,clf_inact=fit_RMT(act,inact)
    e,max_acc=optimized_thresh(clf_act,clf_inact,x_train,y_train,y)
    classy1.append(clf_act),classy0.append(clf_inact),es.append(e),individual_accs.append(max_acc)


85.79111814498901 s


In [33]:
#returns the probabilities of each classifier for each class
def evaluate(x_test,classy0,classy1,es):
    probs=np.zeros((len(x_test),len(classy0)))
    for i in np.arange(len(classy0)):
        clf_act=classy1[i]
        clf_inact=classy0[i]
        prob1,e1=clf_act.predict(x_test)
        prob0,e0=clf_inact.predict(x_test)
        diff=prob1-prob0-es[i]
        diff=(diff-np.mean(diff))/np.std(diff)
        probs[:,i]=diff
    
    probs=softmax(-probs,axis=1)
    return probs

In [34]:
probs=evaluate(x_test,classy0,classy1,es)

In [35]:
accuracy=np.mean(np.argmax(probs,axis=1)==y_test)
print("Accuracy on test = ",accuracy)

Accuracy on test =  0.952
