# Quantitative experiments with Turbofan Engine Degradation Simulation dataset
This dataset contains information was designed to solve predictive maintenance problems. The dataset is accessible here: 
https://www.kaggle.com/datasets/behrad3d/nasa-cmaps

In [1]:
import sys
sys.path.append('C:\\Users\\iamollas\\Desktop\\Altruist New')
model_path = 'C:\\Users\\iamollas\\Desktop\\Altruist New\\experiments\\quantitative\\Models\\D1\\'
weights_path = 'C:\\Users\\iamollas\\Desktop\\Altruist New\\experiments\\quantitative\\Weights\\D1\\'

In [2]:
import random
import csv
import warnings
import json
import lime.lime_tabular as lt
from keras.callbacks import ModelCheckpoint
from keras.layers import Dense
import keras
import keras.backend as K
from keras.callbacks import ModelCheckpoint
from keras.models import Model
from keras.layers import Dense, Flatten, Input, Dropout, LSTM, concatenate, Reshape
import json

import keras
from keras.models import Model
from innvestigate.utils.keras import checks
import innvestigate
import innvestigate.utils as iutils
from altruist import Altruist
from meta_explain import MetaExplain
from utilities.dataset import Dataset
from sklearn.preprocessing import MinMaxScaler, maxabs_scale
import numpy as np
np.seterr(invalid='ignore')
warnings.filterwarnings("ignore")

Using TensorFlow backend.


We will now load our TEDS dataset

In [3]:
teds = Dataset()
x_train, y_train, x_test, y_test, feature_names = teds.load_data_turbofan()

In [4]:
feature_names = ['s_02', 's_03', 's_04', 's_07', 's_08', 's_09', 's_11', 's_12',
                 's_13', 's_14', 's_15', 's_17', 's_20', 's_21']

In [5]:
temp_y_train = [[i] for i in y_train]
temp_y_test = [[i] for i in y_test]
target_scaler = MinMaxScaler()
target_scaler.fit(temp_y_train)
y_train = target_scaler.transform(temp_y_train)
y_test = target_scaler.transform(temp_y_test)

And we will build two neural network models. One linear and one non-linear, and one uneccessary complex model.

In [6]:
import keras.backend as K
def root_mean_squared_error(y_true, y_pred):
    return K.sqrt(K.mean(K.square(y_pred - y_true)))

In [7]:
linear_input = Input(shape=(x_train[0].shape))
linear_hidden = Flatten()(linear_input)
linear_output = Dense(1, activation='linear')(linear_hidden)
linear_neural = Model(linear_input, linear_output)
linear_neural.compile(optimizer='adam', loss=[
                      root_mean_squared_error], metrics=['mae', 'mse'])

neural_input = Input(shape=(x_train[0].shape))
hidden_input = Reshape((14, 50))(neural_input)
neural_r = []
for i in range(14):
    temp_hidden = LSTM(units=51, dropout=0.5,
                       return_sequences=True, activation='tanh')(hidden_input)
    temp_hidden = Dropout(0.5)(temp_hidden)
    temp_hidden = LSTM(units=50, dropout=0.5,
                       return_sequences=False, activation='tanh')(temp_hidden)
    neural_r.append(temp_hidden)
neural_hidden = concatenate(neural_r)
neural_hidden = Dropout(0.5)(neural_hidden)
neural_hidden = Dense(500, activation='tanh')(neural_hidden)  # Relu and selu
neural_hidden = Dropout(0.5)(neural_hidden)
neural_output = Dense(1, activation='linear')(neural_hidden)  # Relu and selu
neural = Model(neural_input, neural_output)
neural.compile(optimizer='adam', loss=[
               root_mean_squared_error], metrics=['mae', 'mse'])

complex_neural_input = Input(shape=(x_train[0].shape))
complex_input = Reshape((14, 50))(complex_neural_input)
complex_r = []
for i in range(14):
    complex_hidden = LSTM(
        units=51, dropout=0.5, return_sequences=True, activation='tanh')(complex_input)
    complex_hidden = Dropout(0.5)(complex_hidden)
    complex_hidden = LSTM(
        units=50, dropout=0.5, return_sequences=False, activation='relu')(complex_hidden)
    complex_hidden = Dropout(0.5)(complex_hidden)
    complex_hidden = Dense(units=50, activation='relu')(complex_hidden)
    complex_r.append(complex_hidden)
complex_net = concatenate(complex_r)
complex_net = Dropout(0.5)(complex_net)
complex_net = Dense(500, activation='tanh')(complex_net)
complex_net = concatenate([complex_net, Flatten()(complex_neural_input)])
complex_net = Dropout(0.5)(complex_net)
complex_net = Dense(1000, activation='sigmoid')(complex_net)
complex_net = Dropout(0.5)(complex_net)
complex_output = Dense(1, activation='linear')(complex_net)
complex_neural = Model(complex_neural_input, complex_output)
complex_neural.compile(optimizer='adam', loss=[
                       root_mean_squared_error], metrics=['mae', 'mse'])

models = {'lNN': linear_neural, 'NN': neural, 'cNN': complex_neural}

Instructions for updating:
If using Keras pass *_constraint arguments to layers.


Let's evaluate our models

In [8]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from math import sqrt

def compute_scores(name, y_test, y_pred):
    temp_y_pred = target_scaler.inverse_transform(y_pred)
    y_pred = np.array([i[0] for i in temp_y_pred])
    print(name)
    print('\t', 'MAE:', mean_absolute_error(temp_y_test, y_pred))
    print('\t', 'MSE:', mean_squared_error(temp_y_test, y_pred))
    print('\t', 'RMSE:', sqrt(mean_squared_error(temp_y_test, y_pred)))
    print('\t', 'R2:', r2_score(temp_y_test, y_pred))

In [9]:
train = False
for name, model in models.items():
    if train:
        check_point = ModelCheckpoint(
            "D1_"+name+".hdf5", monitor="val_loss", verbose=0, save_best_only=True, mode="auto")
        model.fit(x_train, y_train, epochs=500, batch_size=512,
                  validation_split=0.33, verbose=0, callbacks=[check_point])
        model.load_weights("D1_"+name+".hdf5")
    else:
        model.load_weights(model_path+"D1_"+name+".hdf5")
    y_pred = model.predict(x_test)
    compute_scores(name, y_test, y_pred)



lNN
	 MAE: 27.4459676816527
	 MSE: 1367.2966335026808
	 RMSE: 36.97697436922984
	 R2: 0.49828356104652527
NN
	 MAE: 24.772757201740628
	 MSE: 1260.1166593068872
	 RMSE: 35.49812191238978
	 R2: 0.537612228771599
cNN
	 MAE: 22.402879480157456
	 MSE: 1069.820192553104
	 RMSE: 32.70810591509548
	 R2: 0.6074396994942856


We will prepare our predict functions to work well with our python scripts!

In [10]:
def predict_cNN(x):
    prediction = models['cNN'].predict(x)
    return [i[0] for i in prediction]

def predict_NN(x):
    prediction = models['NN'].predict(x)
    return [i[0] for i in prediction]

def predict_lNN(x):
    prediction = models['lNN'].predict(x)
    return [i[0] for i in prediction]

predict_functions = {'lNN': predict_lNN, 'NN': predict_NN, 'cNN': predict_cNN}

With the following function we generate Integrated Gradients and LRP explainers for a model

In [11]:
def analyzer_generators(model):
    Xs = iutils.to_list(model.outputs)
    ret = []
    for x in Xs:
        layer, node_index, tensor_index = x._keras_history
        if checks.contains_activation(layer, activation="linear"):
            if isinstance(layer, keras.layers.Activation):
                ret.append(layer.get_input_at(node_index))
            else:
                layer_wo_act = innvestigate.utils.keras.graph.copy_layer_wo_activation(
                    layer)
                ret.append(layer_wo_act(layer.get_input_at(node_index)))
    modified_model = Model(input=model.input, output=ret)
    modified_model.trainable = False
    modified_model.compile(optimizer='adam', loss=[
                           root_mean_squared_error], metrics=['mae', 'mse'])
    analyzer_IG = innvestigate.create_analyzer(
        'integrated_gradients', modified_model, reference_inputs=16*[0])
    analyzer_LRP = innvestigate.create_analyzer('lrp.z', modified_model)
    return [analyzer_IG, analyzer_LRP]

We initiate these explainers for each of our models

In [12]:
analyzer_NN = analyzer_generators(neural)
analyzer_lNN = analyzer_generators(linear_neural)
analyzer_cNN = analyzer_generators(complex_neural)

In [13]:
x_train_flat = x_train.copy().reshape((len(x_train), 700))
flat_features = [str('F_'+str(i)) for i in range(700)]

We prepare each interpretation technique to have the same format!

In [15]:
def fi_lNN(instance, predict_function):
    return [i[0] for i in models['lNN'].get_weights()[0]]

def fi_IG_NN(instance, predict_function):
    a = analyzer_NN[0].analyze(np.array([instance]))[0]
    a = a.reshape((700))
    return a.tolist()

def fi_LRP_NN(instance, predict_function):
    a = analyzer_NN[1].analyze(np.array([instance]))[0]
    a = a.reshape((700))
    return a.tolist()

def fi_IG_lNN(instance, predict_function):
    a = analyzer_lNN[0].analyze(np.array([instance]))[0]
    a = a.reshape((700))
    return a.tolist()

def fi_LRP_lNN(instance, predict_function):
    a = analyzer_lNN[1].analyze(np.array([instance]))[0]
    a = a.reshape((700))
    return a.tolist()

def fi_IG_cNN(instance, predict_function):
    a = analyzer_cNN[0].analyze(np.array([instance]))[0]
    a = a.reshape((700))
    return a.tolist()

def fi_LRP_cNN(instance, predict_function):
    a = analyzer_cNN[1].analyze(np.array([instance]))[0]
    a = a.reshape((700))
    return a.tolist()

explainer = lt.LimeTabularExplainer(training_data=x_train_flat,
                                    feature_names=flat_features,
                                    discretize_continuous=False, mode='regression')

def fi_lime(instance, predict_function):
    def predict(x):
        return np.array([i for i in predict_function(x.reshape((len(x), 50, 14)))])
    b = explainer.explain_instance(instance.reshape((700)), predict, num_samples=1000,
                                   num_features=len(list(flat_features)))[0].local_exp
    b = b[list(b.keys())[0]]
    b.sort()
    return [i[1] for i in list(b)]

def fi_random(instance, predict_function):
    a1 = instance.reshape((700))
    seed = (a1.sum() +
            a1.mean())/10
    random.seed(seed)
    return [random.randrange(-1000, 1000)/1000 for i in range(len(flat_features))]

We compute and save the interpretations for easier reproducibility

In [17]:
from sklearn.preprocessing import MaxAbsScaler
for neural_name, neural_type in predict_functions.items():
    if neural_name == 'lNN':
        fi_techniques = [fi_lNN, fi_IG_lNN, fi_LRP_lNN, fi_lime, fi_random]
        fi_names = ['Inherent', 'IG', 'LRP', 'LIME', 'RAND']
    elif neural_name == 'NN':
        fi_techniques = [fi_IG_NN, fi_LRP_NN, fi_lime, fi_random]
        fi_names = ['IG', 'LRP', 'LIME', 'RAND']
    else:
        fi_techniques = [fi_IG_cNN, fi_LRP_cNN, fi_lime, fi_random]
        fi_names = ['IG', 'LRP', 'LIME', 'RAND']

    importance_train = []
    for instance in x_train:
        importance_instance = []
        for fi in fi_techniques:
            importance_instance.append(maxabs_scale(fi(instance, neural_type)))
        importance_train.append(importance_instance)
    importance_train = np.array(importance_train)

    importance_test = []
    for instance in x_test:
        importance_instance = []
        for fi in fi_techniques:
            importance_instance.append(maxabs_scale(fi(instance, neural_type)))
        importance_test.append(importance_instance)
    importance_test = np.array(importance_test)

    importances = {'train': importance_train.tolist(),
                   'test': importance_test.tolist()}
    with open('D1_'+neural_name+'.txt', 'w') as outfile:
        json.dump(importances, outfile)

Our quantitative experiments!

In [None]:
predict_functions = {'lNN': predict_lNN, 'NN': predict_NN, 'cNN': predict_cNN}
for neural_name, neural_type in predict_functions.items():
    if neural_name == 'lNN':
        fi_techniques = [fi_lNN, fi_IG_lNN, fi_LRP_lNN, fi_lime, fi_random]
        fi_names = ['Inherent', 'IG', 'LRP', 'LIME', 'RAND']
    elif neural_name == 'NN':
        fi_techniques = [fi_IG_NN, fi_LRP_NN, fi_lime, fi_random]
        fi_names = ['IG', 'LRP', 'LIME', 'RAND']
    else:
        fi_techniques = [fi_IG_cNN, fi_LRP_cNN, fi_lime, fi_random]
        fi_names = ['IG', 'LRP', 'LIME', 'RAND']
    meta_names = ['Average', 'Median', 'RuleBased']

    with open(weights_path+'D1_'+neural_name+'.txt') as json_file:
        importances = json.load(json_file)
    importance_train = np.array(importances['train'])
    importance_test = np.array(importances['test'])
    meta_explain = MetaExplain(importance_train, flat_features)

    print('Starting evaluation for', neural_name)
    for noise in ['weak', 'normal', 'strong']:
        for delta in [0, 0.0001, 0.001, 0.01]:
            my_altruist = Altruist(neural_type, x_train, fi_techniques,
                                   flat_features, level=noise, delta=delta)
            fis_scores = []
            meta_scores = []
            nzw_scores = []
            nzw_scores_delta = []
            for i in fi_techniques:
                fis_scores.append([])
                nzw_scores.append([])
                nzw_scores_delta.append([])
            for i in meta_names:
                meta_scores.append([])
                nzw_scores.append([])
                nzw_scores_delta.append([])
            for j in range(len(x_test[:1000])):
                my_altruist.fis = len(fi_techniques)
                a = my_altruist.find_untruthful(x_test[j], importance_test[j])
                for i in range(len(importance_test[j])):
                    cnzw = 0
                    cnzwt = 0
                    for k in importance_test[j][i]:
                        if abs(k) > 0:
                            cnzw += 1
                        if abs(k) > delta:
                            cnzwt += 1
                    nzw_scores[i].append(cnzw)
                    nzw_scores_delta[i].append(cnzwt)
                b = np.array(a[-1])
                for i in range(len(a[0])):
                    fis_scores[i].append(len(a[0][i]))
                temp_meta = []
                temp_meta.append(meta_explain.meta_avg(b))
                temp_meta.append(meta_explain.meta_median(b))
                temp_meta.append(meta_explain.meta_rule_based(a[0], a[2], b))
                for i in range(len(temp_meta)):
                    cnzw = 0
                    cnzwt = 0
                    for k in temp_meta[i]:
                        if abs(k) > 0:
                            cnzw += 1
                        if abs(k) > delta:
                            cnzwt += 1
                    nzw_scores[i+len(importance_test[j])].append(cnzw)
                    nzw_scores_delta[i+len(importance_test[j])].append(cnzwt)
                my_altruist.fis = len(meta_names)
                a = my_altruist.find_untruthful(x_test[j], temp_meta)
                for i in range(len(a[0])):
                    meta_scores[i].append(len(a[0][i]))
            row = [neural_name, noise, delta]
            for fis_score in fis_scores:
                row.append(np.array(fis_score).mean())
            for meta_score in meta_scores:
                row.append(np.array(meta_score).mean())
            with open('D1_'+neural_name+'.csv', 'a', encoding='UTF8') as f:
                writer = csv.writer(f)
                writer.writerow(row)
            row = [neural_name, noise, delta]
            all_names = fi_names+meta_names
            for aname in range(len(all_names)):
                row.append(np.array(nzw_scores[aname]).mean())
                row.append(np.array(nzw_scores_delta[aname]).mean())
            with open('D1_NZW_'+neural_name+'.csv', 'a', encoding='UTF8') as f:
                writer = csv.writer(f)
                writer.writerow(row)

We will evaluate the interpretations on the Sensor level as well. This is  

In [None]:
predict_functions = {'lNN': predict_lNN, 'NN': predict_NN, 'cNN': predict_cNN}
for neural_name, neural_type in predict_functions.items():
    if neural_name == 'lNN':
        fi_techniques = [fi_lNN, fi_IG_lNN, fi_LRP_lNN, fi_lime, fi_random]
        fi_names = ['Inherent', 'IG', 'LRP', 'LIME', 'RAND']
    elif neural_name == 'NN':
        fi_techniques = [fi_IG_NN, fi_LRP_NN, fi_lime, fi_random]
        fi_names = ['IG', 'LRP', 'LIME', 'RAND']
    else:
        fi_techniques = [fi_IG_cNN, fi_LRP_cNN, fi_lime, fi_random]
        fi_names = ['IG', 'LRP', 'LIME', 'RAND']
    meta_names = ['Average', 'Median', 'RuleBased']

    with open(weights_path+'D1_'+neural_name+'.txt') as json_file:
        importances = json.load(json_file)
    importance_train = np.array(importances['train'])
    importance_train_temp = []
    importance_test = np.array(importances['test'])
    importance_test_temp = []
    for i in importance_train:
        importance_train_temp.append(i.reshape((len(i), 50, 14)).mean(axis=1))
    for i in importance_test:
        importance_test_temp.append(i.reshape((len(i), 50, 14)).mean(axis=1))
    importance_train = np.array(importance_train_temp)
    importance_test = np.array(importance_test_temp)

    sensor_names = [str('F_'+str(i)) for i in range(14)]

    meta_explain = MetaExplain(importance_train, sensor_names)

    print('Starting evaluation for', neural_name)
    for noise in ['weak', 'normal', 'strong']:
        for delta in [0, 0.0001, 0.001, 0.01, 0.1]:
            my_altruist = Altruist(neural_type, x_train, fi_techniques, sensor_names,
                                   level=noise, delta=delta, data_type='per Sensor')
            fis_scores = []
            meta_scores = []
            nzw_scores = []
            nzw_scores_delta = []
            for i in fi_techniques:
                fis_scores.append([])
                nzw_scores.append([])
                nzw_scores_delta.append([])
            for i in meta_names:
                meta_scores.append([])
                nzw_scores.append([])
                nzw_scores_delta.append([])
            for j in range(len(x_test[:1000])):
                my_altruist.fis = len(fi_techniques)
                a = my_altruist.find_untruthful(x_test[j], importance_test[j])
                for i in range(len(importance_test[j])):
                    cnzw = 0
                    cnzwt = 0
                    for k in importance_test[j][i]:
                        if abs(k) > 0:
                            cnzw += 1
                        if abs(k) > delta:
                            cnzwt += 1
                    nzw_scores[i].append(cnzw)
                    nzw_scores_delta[i].append(cnzwt)
                b = np.array(a[-1])
                for i in range(len(a[0])):
                    fis_scores[i].append(len(a[0][i]))
                temp_meta = []
                temp_meta.append(meta_explain.meta_avg(b))
                temp_meta.append(meta_explain.meta_median(b))
                temp_meta.append(meta_explain.meta_rule_based(a[0], a[2], b))
                for i in range(len(temp_meta)):
                    cnzw = 0
                    cnzwt = 0
                    for k in temp_meta[i]:
                        if abs(k) > 0:
                            cnzw += 1
                        if abs(k) > delta:
                            cnzwt += 1
                    nzw_scores[i+len(importance_test[j])].append(cnzw)
                    nzw_scores_delta[i+len(importance_test[j])].append(cnzwt)
                my_altruist.fis = len(meta_names)
                a = my_altruist.find_untruthful(x_test[j], temp_meta)
                for i in range(len(a[0])):
                    meta_scores[i].append(len(a[0][i]))

            row = [neural_name, noise, delta]
            for fis_score in fis_scores:
                row.append(np.array(fis_score).mean())
            for meta_score in meta_scores:
                row.append(np.array(meta_score).mean())
            with open('D1_PS'+neural_name+'.csv', 'a', encoding='UTF8') as f:
                writer = csv.writer(f)
                writer.writerow(row)
            row = [neural_name, noise, delta]
            all_names = fi_names+meta_names
            for aname in range(len(all_names)):
                row.append(np.array(nzw_scores[aname]).mean())
                row.append(np.array(nzw_scores_delta[aname]).mean())
            with open('D1_PS_NZW_'+neural_name+'.csv', 'a', encoding='UTF8') as f:
                writer = csv.writer(f)
                writer.writerow(row)

Finally, we will perform the ablation study in per sensor level!

In [None]:
predict_functions = {'NN': predict_NN, 'cNN': predict_cNN}
for neural_name, neural_type in predict_functions.items():
    if neural_name == 'NN':
        fi_techniques = [fi_LRP_NN, fi_lime, fi_random]
        fi_names = ['LRP', 'LIME', 'RAND']  # ['IG', 'LRP', 'LIME', 'RAND']
    else:
        fi_techniques = [fi_LRP_cNN, fi_lime, fi_random]
        fi_names = ['LRP', 'LIME', 'RAND']  # ['IG', 'LRP', 'LIME', 'RAND']
    meta_names = ['Average', 'Median', 'RuleBased']

    with open(weights_path+'D1_'+neural_name+'.txt') as json_file:
        importances = json.load(json_file)
    importance_train = np.array(importances['train'])
    importance_train_temp = []
    importance_test = np.array(importances['test'])
    importance_test_temp = []
    for i in importance_train:
        importance_train_temp.append(i.reshape((len(i), 50, 14)).mean(axis=1))
    for i in importance_test:
        importance_test_temp.append(i.reshape((len(i), 50, 14)).mean(axis=1))
    importance_train = np.array(importance_train_temp)
    importance_test = np.array(importance_test_temp)
    importance_train = np.delete(importance_train, 2, axis=1)
    importance_test = np.delete(importance_test, 2, axis=1)

    sensor_names = [str('F_'+str(i)) for i in range(14)]

    meta_explain = MetaExplain(importance_train, sensor_names)

    print('Starting evaluation for', neural_name)
    for noise in ['normal']:
        for delta in [0.0001]:
            my_altruist = Altruist(neural_type, x_train, fi_techniques, sensor_names,
                                   level=noise, delta=delta, data_type='per Sensor')
            fis_scores = []
            meta_scores = []
            nzw_scores = []
            nzw_scores_delta = []
            for i in fi_techniques:
                fis_scores.append([])
                nzw_scores.append([])
                nzw_scores_delta.append([])
            for i in meta_names:
                meta_scores.append([])
                nzw_scores.append([])
                nzw_scores_delta.append([])
            for j in range(len(x_test[:1000])):
                my_altruist.fis = len(fi_techniques)
                a = my_altruist.find_untruthful(x_test[j], importance_test[j])
                for i in range(len(importance_test[j])):

                    cnzw = 0
                    cnzwt = 0
                    for k in importance_test[j][i]:
                        if abs(k) > 0:
                            cnzw += 1
                        if abs(k) > delta:
                            cnzwt += 1
                    nzw_scores[i].append(cnzw)
                    nzw_scores_delta[i].append(cnzwt)
                b = np.array(a[-1])
                for i in range(len(a[0])):
                    fis_scores[i].append(len(a[0][i]))
                temp_meta = []
                temp_meta.append(meta_explain.meta_avg(b))
                temp_meta.append(meta_explain.meta_median(b))
                temp_meta.append(meta_explain.meta_rule_based(a[0], a[2], b))
                for i in range(len(temp_meta)):
                    cnzw = 0
                    cnzwt = 0
                    for k in temp_meta[i]:
                        if abs(k) > 0:
                            cnzw += 1
                        if abs(k) > delta:
                            cnzwt += 1
                    nzw_scores[i+len(importance_test[j])].append(cnzw)
                    nzw_scores_delta[i+len(importance_test[j])].append(cnzwt)
                my_altruist.fis = len(meta_names)
                a = my_altruist.find_untruthful(x_test[j], temp_meta)
                for i in range(len(a[0])):
                    meta_scores[i].append(len(a[0][i]))

            row = [neural_name, noise, delta]
            for fis_score in fis_scores:
                row.append(np.array(fis_score).mean())
            for meta_score in meta_scores:
                row.append(np.array(meta_score).mean())
            with open('D1_PS_AblationIG'+neural_name+'.csv', 'a', encoding='UTF8') as f:
                writer = csv.writer(f)
                writer.writerow(row)
            row = [neural_name, noise, delta]
            all_names = fi_names+meta_names
            for aname in range(len(all_names)):
                row.append(np.array(nzw_scores[aname]).mean())
                row.append(np.array(nzw_scores_delta[aname]).mean())
            with open('D1_PS_AblationIG_NZW_'+neural_name+'.csv', 'a', encoding='UTF8') as f:
                writer = csv.writer(f)
                writer.writerow(row)