In [None]:
# pip install iisignature

In [1]:
import numpy as np
import pandas as pd
from iisignature import *
import numbers
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib.patches as patches
plt.style.use("seaborn-darkgrid")
import random
import itertools
import warnings
warnings.filterwarnings('ignore')

In [2]:
from scipy.spatial import distance
from sklearn import linear_model
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report,accuracy_score
import itertools
from sklearn.neighbors import LocalOutlierFactor
from sklearn.ensemble import IsolationForest
import copy
import math
from scipy.ndimage import shift
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils import as_float_array,shuffle
from sklearn.metrics import roc_curve,roc_auc_score,auc
from sklearn.covariance import EmpiricalCovariance, MinCovDet
from sklearn.metrics import fbeta_score

### Conformance Anomaly Detection Method with Signature

#### Main equations

In [1]:
def sign_splitdata(x,y,x_test,order):
    signa = [list(sig(np.array(stream), order)) for stream in x]
    signa_test = [list(sig(np.array(stream), order)) for stream in x_test]
    
    scaler = MinMaxScaler()
    X_center = np.array(scaler.fit_transform(signa))
    X_testcenter = np.array(scaler.transform(signa_test))
    # math.factorial(order)*
    X1,X2,y1,y2 = train_test_split(X_center,y,test_size=0.5)
    
    return X1,X2,y1,X_testcenter

In [2]:
def sign_splitdata_all(x,y,x_test,order):
    signa = [list(sig(np.array(stream), order)) for stream in x]
    signa_test = [list(sig(np.array(stream), order)) for stream in x_test]
    
    scaler = MinMaxScaler()
    X_center = np.array(scaler.fit_transform(signa))
    X_testcenter = np.array(scaler.transform(signa_test))
    # math.factorial(order)*
    X1,X2,y1,y2 = train_test_split(X_center,y,test_size=0.5)
    
    return X1,X2,y1,y2,X_testcenter,X_center

In [5]:
def conformance(x, X2):
    cov_X2 = np.linalg.pinv(np.cov((x-X2).T))
    inf = distance.mahalanobis(x, X2[0], cov_X2)
    for i in range(len(X2)):
        mal = distance.mahalanobis(x, X2[i], cov_X2)
        if inf > mal:
            inf = mal
    return inf

In [6]:
def thershold_all(X1,X2,epsilon):
    threshold_1,pred_label_1 = thershold(X1,X2,epsilon)
    threshold_2,pred_label_2 = thershold(X2,X1,epsilon)
    return (threshold_1+threshold_2)/2,pred_label_1,pred_label_2

In [7]:
def thershold(X1,X2,epsilon):
    dist = []
    pred_label = []
    for i in range(len(X1)):
        dist.append(conformance(X1[i],X2))
    dist = np.array(dist)
    threshold = np.quantile(dist,epsilon)
    for i in range(len(X1)):
        if dist[i] > threshold:
            dist = np.array(dist)
            pred_label.append(1)
        else:
            pred_label.append(0)
    return threshold,pred_label

In [8]:
def pred_anomaly(X_test,X2,thresh,X_test_label):
    anoma = []
    conf = []

    for i in range(len(X_test)):
        conf.append(conformance(X_test[i],X2))
        if conf[i] > thresh:
            anoma.append(1)
        else:
            anoma.append(0)
    return conf,anoma

In [9]:
def mergeDict(dict1, dict2):
    dict3 = {**dict1, **dict2}
    for key, value in dict3.items():
        if key in dict1 and key in dict2:
               dict3[key] = [dict1[key],value]
    return dict3

In [53]:
def pred_result_diff_signature_order(x,y,x_test,y_test,order,epsilon):
    X1={}
    X2={}
    X1_label={}
    X_test={}
    thershold_ratio = {}
    X1_pred_label={}
    X2_pred_label={}
    pred_conf = {}
    pred_test_label ={}

    for sig_order in range(1,(order+1)):
        key = 'Signature order {}'.format(sig_order)
        
        X1[key],X2[key],X1_label[key],X_test[key] = sign_splitdata(x,y,x_test,sig_order)
        thershold_ratio[key],X1_pred_label[key]= thershold(X1[key],X2[key],epsilon)
        pred_conf[key],pred_test_label[key] = pred_anomaly(X_test[key],X2[key],thershold_ratio[key],y_test)
    
    return X1_label,X1_pred_label,thershold_ratio,pred_conf,pred_test_label
    

In [13]:
def plot_pred_result(pred_conf,y_test,epsilon):
    plt.figure(figsize=(6, 6))
    for key in pred_conf.keys():
       false_positive_rate, true_positive_rate, _ = roc_curve(y_test, pred_conf[key])
       area_under_curve = auc(false_positive_rate, true_positive_rate)
       plt.plot(false_positive_rate, true_positive_rate,
                 linewidth=3, label='{} (AUC={:0.3f})'.format(key, area_under_curve))
    plt.plot([0, 1], [0, 1], color='k', linestyle='--', linewidth=3)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.0])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.legend(loc='center left', bbox_to_anchor=(1.04,0.5), borderaxespad=0)
    plt.title('AUCROC curve for epsilon {}'.format(epsilon))
    plt.grid(True)
    plt.show()

#### Transform methods

In [20]:
def invis_transform(X):
    l = []
    l.append([X[0],0])
    for i in range(len(X)):
        l.append([X[i],1])
    return l

In [16]:
 # add time (transform)
def time_transform(X):
    tt=np.linspace(0,1,len(X))
    l = []
    for j in range(len(tt)):
        l.append([tt[j],X[j]])
    return l

In [43]:
def non_transform(X):
    return X

In [21]:
def transf(X,transform):
    return [transform(x) for x in X]

In [19]:
# cited from https://github.com/crispitagorico/sigkernel/blob/master/sigkernel/transformers.py
def transform(paths, at=False, ll=False, scale=1.):
    paths = scale*paths
    if ll:
        paths = LeadLag().fit_transform(paths)
    if at:
        paths = AddTime().fit_transform(paths)
    return np.array(paths)

def normalize(sigs, width, depth):
    new_sigs = []
    for sig in sigs:
        new_sig = np.zeros_like(sig)
        for k in range(depth):
            dim = width*(width**(k)-1)
            new_sig[dim:dim + width**(k+1)] = math.factorial(k+1)*sig[dim:dim + width**(k+1)]
        new_sigs.append(new_sig)
    return np.array(new_sigs)

class AddTime(BaseEstimator, TransformerMixin):
    def __init__(self, init_time=0., total_time=1.):
        self.init_time = init_time
        self.total_time = total_time

    def fit(self, X, y=None):
        return self

    def transform_instance(self, X):
        t = np.linspace(self.init_time, self.init_time + 1, len(X))
        return np.c_[t, X]

    def transform(self, X, y=None):
        return [self.transform_instance(x) for x in X]

class Reversion(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        return [as_float_array(x[::-1]) for x in X]


class LeadLag(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform_instance(self, X):
        lag = []
        lead = []

        for val_lag, val_lead in zip(X[:-1], X[1:]):
            lag.append(val_lag)
            lead.append(val_lag)

            lag.append(val_lag)
            lead.append(val_lead)

        lag.append(X[-1])
        lead.append(X[-1])

        return np.c_[lag, lead]

    def transform(self, X, y=None):
        return [self.transform_instance(x) for x in X]