In [None]:
import numpy as np
from Dataset.cmapss import CMAPSS
from sksurv.ensemble import RandomSurvivalForest
import matplotlib.pyplot as plt

### Data

In [None]:
cmapss = CMAPSS(r"Dataset\CMAPSSData\train_FD003.txt", max_cycle=300, max_rul=None)

# tuples with subsets (idx, X, T, E)
train, test, val = cmapss.get_train_test_val(test_size=0.2, val_size=0.2, scale=True)

# now it only works for FD003
failure_modes = cmapss.get_failure_mode(cmapss.df)

In [None]:
def get_event_time_data(subset):
    T = subset[2]
    E = subset[3]
    dtypes = np.dtype('bool,float')
    ET = np.array([(bool(e), t) for e, t in zip(E, T)], dtype=dtypes)
    
    return ET

def get_idx_data(subset):
    return subset[0]

def get_features_data(subset):
    return subset[1]

def get_time_data(subset):
    return subset[2]

def get_event_data(subset):
    return subset[3]

idx_train = get_idx_data(train)
X_train = get_features_data(train)
ET_train = get_event_time_data(train)

idx_test = get_idx_data(test)
X_test = get_features_data(test)
ET_test = get_event_time_data(test)

idx_val = get_idx_data(val)
X_val = get_features_data(val)
ET_val = get_event_time_data(val)

### PCA

In [None]:
from sklearn.decomposition import PCA

pca = PCA(n_components=2)
pca.fit(X_train)
X_pca_train = pca.transform(X_train)

plt.figure(figsize=(8, 5))
plt.title("Data distribution (PCA)")
plt.scatter(X_pca_train[:, 0], X_pca_train[:, 1], c=get_time_data(train), alpha=1, cmap='viridis', s=1, label="Train data")
plt.colorbar()
plt.tight_layout()
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.show()

### Survival Model

In [None]:
%%time

# survival model
survivalForest = RandomSurvivalForest(n_estimators=12, max_depth=5, n_jobs=-1)
survivalForest.fit(X_train, ET_train)

cindex_train = survivalForest.score(X_train, ET_train)
cindex_val = survivalForest.score(X_val, ET_val)
cindex_test = survivalForest.score(X_test, ET_test)

print('Train cindex {:.3f}'.format(cindex_train))
print('Val cindex {:.3f}'.format(cindex_val))
print('Test cindex {:.3f}'.format(cindex_test))

In [None]:
# survival curves
x_time = survivalForest.event_times_
surv_train = survivalForest.predict_survival_function(X_train, return_array=True)
surv_test = survivalForest.predict_survival_function(X_test, return_array=True)
surv_val = survivalForest.predict_survival_function(X_test, return_array=True)

class SurvivalScoreModel:
    def __init__(self, model):
        self.model = model
        
    def predict(self, X):
        if X.ndim==1:
            X = X.reshape((1, -1))
        
        x_time = self.model.event_times_
        survival_curve = self.model.predict_survival_function(X, return_array=True)
        
        return np.trapz(survival_curve, x_time)

survivalScoreModel = SurvivalScoreModel(survivalForest)


survival_score_train = survivalScoreModel.predict(X_train)
survival_score_test = survivalScoreModel.predict(X_test)
survival_score_val = survivalScoreModel.predict(X_val)

In [None]:
T_test = get_time_data(test)
E_test = get_event_data(test).astype(bool)
failure_test = np.array(list(map(lambda x: failure_modes[x], idx_test)))


plt.figure(figsize=(9, 6))
plt.title("Survival Score for test units as a function of time-to-event (only for failure events)")
plt.scatter(T_test[E_test], survival_score_test[E_test], s=8, c=[failure_test[E_test]], cmap='bwr')
plt.xlabel("Time to event [cycles]")
plt.ylabel("Survival Score")
plt.xlim(0, )
plt.ylim(0, )
plt.tight_layout()

In [None]:
units_test = np.unique(idx_test)
cycle = 150
labeled_0 = False
labeled_1 = False

plt.figure(figsize=(7, 5))

for unit in units_test:
    X_units = X_test[idx_test == unit][cycle]
    surv_unit = surv_test[idx_test == unit][cycle]
    mode = failure_modes[unit]
    
    label = None
    
    if mode == 0:
        color='blue'
        if not labeled_0:
            label = "Failure Mode 1"
            labeled_0 = True
    else:
        color='red'
        if not labeled_1:
            label = "Failure Mode 2"
            labeled_1 = True

    plt.plot(x_time, surv_unit, c=color, label=label, alpha=0.8, linewidth=2)
    
plt.legend()
plt.xlim(0, x_time.max())
plt.xlabel("Number of Future Cycles")
plt.ylabel("Survival Porbability")
plt.title("Survival Curves for test units at t=%i" % cycle)
plt.tight_layout()

### Autoencoder

In [None]:
from autoencoder import Autoencoder, AutoencoderDataset, AutoencoderLearner
import torch
from torch.nn import MSELoss

LOAD_WEIGHTS = True
RUN_TRAINING = False
SAVE_WEIGHTS = False
weights_checkpoint = 'checkpoint.pt'


train_loader = AutoencoderDataset(X_train)
val_loader = AutoencoderDataset(X_val)

anomaly_model = Autoencoder(n_features=X_train.shape[-1],
                         hidden_layers_size=[6],
                         latent_size=12,
                         activation="relu",
                         last_activation="sigmoid")

if LOAD_WEIGHTS:
    anomaly_model.load_weights(weights_checkpoint)

if RUN_TRAINING:
    optimizer = torch.optim.Adam(anomaly_model.parameters(), lr=1e-3)
    loss_function = MSELoss()

    train_loss_list, valid_loss_list = AutoencoderLearner.run_training(anomaly_model, optimizer, loss_function, train_loader, val_loader, epochs=10,
                                                                       early_stopping=True, early_stopping_patience=5, early_stopping_delta=1e-4)
    
    if SAVE_WEIGHTS:
        anomaly_model.save_weights(weights_checkpoint)

    plt.figure(figsize=(8, 5))
    plt.plot(train_loss_list, label="Train Loss")
    plt.plot(valid_loss_list, label="Test Loss")
    plt.legend()
    plt.tight_layout()
    plt.show()

In [None]:
# this cell is used to calculate anomaly score with and without softplus activation

errors = anomaly_model.anomaly_score(X_test)
errors_softplus = anomaly_model.anomaly_score(X_test, anomaly_threshold=0.1)
plt.hist(errors, bins=60)
plt.hist(errors_softplus, bins=60)
plt.show()

for percentile in (50, 90, 95, 99):
    value = np.quantile(errors, percentile / 100)
    print("%sth percentile = %.3f" % (percentile, value))

### Counterfactual Explanations

In [None]:
import counterfactuals
import optimization
from tqdm.notebook import tqdm

def select_best_counterfactuals(counterfactuals_list, n=10):
    counterfactuals_list = list(sorted(counterfactuals_list, key=lambda x: x[1]))
    counterfactuals_list = counterfactuals_list[:n]
    counterfactual_points = list(map(lambda x: x[0], counterfactuals_list))
    
    return counterfactual_points

feature_types = ['float' for i in range(X_train.shape[-1])]

explainer = counterfactuals.CounterfactualExplainer(survivalScoreModel, X_train, feature_types, anomaly_model=anomaly_model, anomaly_threshold=0.1,
                                    weight_distance=1,
                                    weight_target=1,
                                    weight_anomaly=5)

# this explainer do not take the autoencoder into account
explainer_simple = counterfactuals.CounterfactualExplainer(survivalScoreModel, X_train, feature_types, anomaly_model=None,  anomaly_threshold=0.1,
                                    weight_distance=1,
                                    weight_target=1,
                                    weight_anomaly=0 )


# selected point
x=X_test[200]
y = survivalScoreModel.predict(x)


step = 0.02
y_delta = 0.3
x0 = x
# target counterfactual value
y_target = y * (1 + y_delta)

counterfactuals_list = []
counterfactuals_simple_list = []

print("Generating counterfactuals...")
for it in tqdm(range(25)):
    #print("Iteration", it)
    #optimizer = optimization.SimulatedAnnealing(1, 1e-3, iterations=600, step_decrease_rate=0)
    optimizer = optimization.ParticleSwarmOptimization(n_particles=50, patience=10, iterations=800, ftol=-np.inf)
    
    # counterfactuals with autoencoder
    x_exp = explainer.compute_explanation_pso(optimizer, x, x0, y_delta, verbose=False, step=step)
    loss = explainer._loss(x_exp, x, y_delta) 
    counterfactuals_list.append((x_exp, loss))
    
    # counterfactuals without autoencoder
    x_exp = explainer_simple.compute_explanation_pso(optimizer, x, x0, y_delta, verbose=False, step=step)
    loss = explainer_simple._loss(x_exp, x, y_delta) 
    counterfactuals_simple_list.append((x_exp, loss))
    
    #print(it+1, "Expected target: %.1f, Counterfactual target: %.1f, Loss: %.3f" % (y_target, y_exp, loss))
    #explainer.print_individual_losses(x, x_exp, 0.1)
    # print("")
    # add a tuple of (explanation, loss) to previously created list
    

counterfactual_points = select_best_counterfactuals(counterfactuals_list, n=30)
counterfactual_points_simple = select_best_counterfactuals(counterfactuals_simple_list, n=30)

In [None]:
def plot_pca_base(pca, model, X_base, y_delta, figsize=(10, 7)):
    X_base_pca = pca.transform(X_base)
    y_base = model.predict(X_base)
    y_target = model.predict(x) * (1 + y_delta)
    
    fig = plt.figure(figsize=figsize)
    ax = plt.subplot()
    ax.set_facecolor((0.95, 0.95, 0.95))
    base_plot = ax.scatter(X_base_pca[:, 0], X_base_pca[:, 1], c=abs(y_target - y_base), s=5, cmap='viridis', vmin=0, vmax=10, label="Base points", alpha=0.7)
    fig.colorbar(base_plot, label="Distance to target score")
    ax.set_xlabel("PC1")
    ax.set_ylabel("PC2")
    fig.tight_layout()
    #fig.show()
    
def plot_original_point(x):
    x_pca = pca.transform(x.reshape((1, -1)))
    plt.scatter(x_pca[:, 0], x_pca[:, 1], s=150, color='black', edgecolor='white', label="Original Point")
    

def plot_counterfactuals(pca, points, color, label=None):
    points = np.array(points)
    points_pca = pca.transform(points)
    plt.scatter(points_pca[:, 0], points_pca[:, 1], color=color, label=label, edgecolor='white')


plot_pca_base(pca, survivalScoreModel, X_test, y_delta)
ax = plt.gca()
plot_counterfactuals(pca, counterfactual_points, 'blue', 'With AE')
plot_counterfactuals(pca, counterfactual_points_simple, 'red', 'Without AE')
plot_original_point(x)
plt.legend()
# plt.xlim(0.25, 0.75)
# plt.ylim(-0.4, 0.25)


In [None]:
def plot_counterfactual(explainer, x, x_exp, feature_names=None, sort=False):
    delta_x = abs(x_exp - x)
    
    y = explainer._model.predict(x)
    y_exp = explainer._model.predict(x_exp)
    
    # sort
    if sort:
        x = [x for _, x in sorted(zip(delta_x, x), key=lambda pair: pair[0])]
        x_exp = [x for _, x in sorted(zip(delta_x, x_exp), key=lambda pair: pair[0])]
        feature_names = [x for _, x in sorted(zip(delta_x, feature_names), key=lambda pair: pair[0])]
        
    
    yrange = np.arange(len(x))
    
    plt.figure(figsize=(8, 6))
    plt.title("Original Target=%.1f, Counterfactual Target: %.1f" % (y, y_exp))
    plt.barh(y=yrange+0.15, width=x, height=0.3, label="Original Point", color='black')
    plt.barh(y=yrange-0.15, width=x_exp, height=0.3, label="Counterfactual", color='orange')
    plt.legend()
    plt.ylim(yrange[0]-0.5, yrange[-1] + 0.5)
    plt.yticks(ticks=yrange, labels=list(feature_names))
    plt.xlabel("Normalized feature value")
    plt.tight_layout()
    plt.show()
    
    # 

x_exp = counterfactual_points[0]
plot_counterfactual(explainer, x, x_exp, feature_names=cmapss.df.columns[5:-2], sort=True)
x_exp = counterfactual_points[1]
plot_counterfactual(explainer, x, x_exp, feature_names=cmapss.df.columns[5:-2], sort=True)

In [None]:
import seaborn as sns
import pandas as pd

In [None]:
def get_loss_arrays(explainer, x, y_delta, counterfactual_points):
    loss_target = []
    loss_distance = []
    loss_anomaly = []
    
    if x.ndim==1:
        x = x.reshape((1, -1))
    
    for x_exp in counterfactual_points:
        y = explainer._model.predict(x)
        y_target = y * (1 + y_delta)
        y_exp = explainer._model.predict(x_exp)
        
        lt = explainer._distance_target(y_target, y_exp)
        ld = explainer._distance_features(x, x_exp)
        la = explainer._anomaly_model.anomaly_score(x_exp.reshape((1, -1)))
        
        loss_target.append(lt)
        loss_distance.append(lt)
        loss_anomaly.append(la)
        
    
    return loss_target, loss_distance, loss_anomaly

loss_target, loss_distance, loss_anomaly = get_loss_arrays(explainer, x, 0.3, counterfactual_points)
loss_target_simple, loss_distance_simple, loss_anomaly_simple = get_loss_arrays(explainer, x, 0.3, counterfactual_points_simple)

loss_df = pd.DataFrame()
i=0
for lt, ld, la in zip(loss_target, loss_distance, loss_anomaly):
    loss_df.at[i, "AE"] = True
    loss_df.at[i, "Target"] = lt
    loss_df.at[i, "Distance"] = ld
    loss_df.at[i, "Anomaly Score"] = la
    i+=1
    
for lt, ld, la in zip(loss_target_simple, loss_distance_simple, loss_anomaly_simple):
    loss_df.at[i, "AE"] = False
    loss_df.at[i, "Target"] = lt
    loss_df.at[i, "Distance"] = ld
    loss_df.at[i, "Anomaly Score"] = la
    i+=1
    
loss_df = loss_df.melt(id_vars='AE', var_name='Loss Type')

plt.figure(figsize=(8, 4))
plt.title("Distribution of loss value for counterfactuals")
sns.boxplot(data=loss_df, x="Loss Type", y="value", hue="AE", width=0.4, fliersize=2, saturation=1.0)
#plt.yscale('log')
#plt.ylim(1e-2, 1)
plt.tight_layout()