In [None]:
import numpy as np
import pandas as pd 
import random 
import copy 
import warnings
import sys
import pickle
import time
import shap

from tqdm import tqdm
from copy import deepcopy
from pyod.models.loda import LODA
from sklearn.ensemble import IsolationForest

sys.path.append('../../')
from ACME.ACME import ACME
from ACME.visual_utils import * 
sys.path.remove('../../')

warnings.filterwarnings('ignore')

# set seed for reproducibility
np.random.seed(0)
random.seed(0)

# AcME-AD vs KernelSHAP: time comparison on IF and LODA
Get the computational time to fit the explainer and explain 1 anomaly on PIADE dataset

In [None]:
# import data
data = pd.read_csv('ad_industrial_datasets/piade_sequences_1h_data.csv')
data = data.drop(columns=['interval_start'])
data = data.fillna(0)

In [None]:
# split data based on equipment ID
equipments = data['equipment_ID'].unique()
data_equipment = [data[data['equipment_ID'] ==e] for e in equipments]

# keep only equipment 2 for the example
data = data_equipment[1].copy()
features = data.drop(columns=['equipment_ID']).columns 
print(features)

## Isolation Forest

In [None]:
def if_score_function(model, X): 
    return 0.5 * (-model.decision_function(X) + 1)

ad_model = IsolationForest().fit(data[features])
data['Score'] = if_score_function(ad_model, data[features])
data['Prediction'] = ad_model.predict(data[features])
data['Prediction'] = data['Prediction'].apply(lambda x: 1 if x == -1 else 0)

In [None]:
anomaly_to_explain = data[data['Prediction'] == 1].sample(1).iloc[0].squeeze()
print(anomaly_to_explain)
print(type(anomaly_to_explain))

AcME-AD

In [None]:
acme_time = time.time()
acme = ACME(ad_model, 'Score', features=features, task='ad', score_function=if_score_function)
acme = acme.explain(data, robust=True)
acme_loc_exp = acme.explain_local(anomaly_to_explain)
feat_table = acme_loc_exp.feature_importance(local=True)
acme_time = time.time() - acme_time

print('ACME time:', acme_time)

KernelSHAP

In [None]:
def if_score_function_shap( X): 
    return 0.5 * (-ad_model.decision_function(X) + 1)

In [None]:
background_shap = [data[features].sample(frac=0.25), data[features].sample(frac=0.5), data[features].sample(frac=0.75), data[features].sample(frac=1)]

for i, b_data in enumerate(background_shap):
    shap_time = time.time()
    explainer = shap.KernelExplainer(if_score_function_shap, b_data[features].values)
    shap_values = explainer.shap_values(anomaly_to_explain[features].values)
    shap_time = time.time() - shap_time
    print('SHAP time', i, ':', shap_time)

## LODA

In [None]:
data = data_equipment[1].copy()

In [None]:
ad_model = LODA(contamination=0.01).fit(data[features])
data['Prediction'] = ad_model.labels_

raw_scores = ad_model.decision_scores_
EPS = 1e-1
l = max(np.abs(np.max(raw_scores)-ad_model.threshold_), np.abs(np.min(raw_scores)-ad_model.threshold_)) + EPS
lb = ad_model.threshold_ - l
ub = ad_model.threshold_ + l
data['Score'] = (raw_scores - lb) / (ub - lb)

def score_function(model, X): 
    return (model.decision_function(X) - lb) / (ub - lb)

In [None]:
anomaly_to_explain = data[data['Prediction'] == 1].sample(1).iloc[0].squeeze()

AcME-AD

In [None]:
acme_time = time.time()
acme = ACME(ad_model, 'Score', features, task='ad', score_function=score_function)
acme = acme.explain(data, robust=True)
acme_loc_exp = acme.explain_local(anomaly_to_explain)
feat_table = acme_loc_exp.feature_importance(local=True)
acme_time = time.time() - acme_time

print('ACME time:', acme_time)

KernelSHAP

In [None]:
background_shap = [data[features].sample(frac=0.25), data[features].sample(frac=0.5), data[features].sample(frac=0.75), data[features].sample(frac=1)]

for i, b_data in enumerate(background_shap):
    shap_time = time.time()
    explainer = shap.KernelExplainer(if_score_function_shap, b_data[features].values)
    shap_values = explainer.shap_values(anomaly_to_explain[features].values)
    shap_time = time.time() - shap_time
    print('SHAP time', i, ':', shap_time)