In [None]:
# Set root directory of the project as the current working directory
import os
initial_dir = os.getcwd()  # Save initial directory (notebooks/)
os.chdir('..')  # Move to project/

from config.defaults import Config
from src.models import load_preprocessed_data, load_saved_model
from src.TimeSHAP import local_event_explainer
from tensorflow.keras.models import Model

# Load the default configuration
config = Config()

import plotly.graph_objects as go
import numpy as np
import shap
from scipy.stats import pearsonr
import pandas as pd

# set the random seed for reproducibility
np.random.seed(42)


In [2]:
import logging

# Suppress all INFO logs at the root level
logging.getLogger().setLevel(logging.WARNING)

In [3]:
# Load preprocessed data
model_task = "lstm_regression"

config = Config()

X_train, X_val, X_test, y_train_reg, y_val_reg, y_test_reg, metadata = load_preprocessed_data(model_task = model_task, eol_capacity=config.eol_capacity)

# Load the trained model
model = load_saved_model(model_task, config)

In [4]:
# Prediction function for TimeSHAP (3D input)
def f_timeshap(X):
    return model.predict(X, verbose=0)  # Input: (batch_size, 120, 1), Output: (batch_size, 1)

# Prediction function for SHAP (2D input)
def f_shap(X):
    X_3d = X.reshape(-1, 120, 1)  # Reshape 2D (batch_size, 120) to 3D (batch_size, 120, 1)
    return model.predict(X_3d, verbose=0)  # Output: (batch_size, 1)

# Prepare the baseline for SHAP
baseline_shap = X_train[:50].reshape(-1, 120) # Shape: (50, 120)

# Baseline for TimeSHAP (subset of training data)
baseline_timeshap = X_train[:50]  # Shape: (50, 120, 1)

In [13]:
# Compute correlations
def compute_correlations(test_data, baseline_shap, baseline_timeshap, label, mode="event", pruned_idx=0, reverse=False):
    correlations = []
    for idx in range(test_data.shape[0]):
        test_3d = test_data[idx:idx+1]
        test_2d = test_3d.reshape(1, -1)
        
        # SHAP
        explainer = shap.KernelExplainer(f_shap, baseline_shap)
        shap_values = explainer.shap_values(test_2d, silent=True, nsamples=300)
        shap_first = shap_values[0].flatten()
        
        # TimeSHAP
        event_explanation = local_event_explainer(
            f_timeshap, test_3d, baseline_timeshap, pruned_idx=pruned_idx, nsamples=3000)
        
        correlation, _ = pearsonr(shap_first, event_explanation["Shapley Value"])
        correlations.append(correlation)
        print(f"Correlation for sequence {idx} ({label}, mode={mode}, pruned_idx={pruned_idx}, reverse={reverse}): {correlation:.4f}")
        
        if idx < 3:
            fig = go.Figure()
            fig.add_trace(go.Scatter(x=np.arange(120), y=shap_first, mode='lines+markers', name='SHAP', line=dict(color='blue')))
            fig.add_trace(go.Scatter(x=event_explanation.index, y=event_explanation["Shapley Value"], mode='lines+markers', name='TimeSHAP', line=dict(color='red')))
            fig.update_layout(
                title=f"SHAP vs TimeSHAP for Sequence {idx} ({label}, mode={mode}, pruned_idx={pruned_idx}, reverse={reverse})",
                xaxis_title="Timestep",
                yaxis_title="Explanation Value",
                template="plotly_white"
            )
            fig.add_hline(y=0, line_dash="dash", line_color="gray")
            fig.show()

    print(f"\nResults for {label} (mode={mode}, pruned_idx={pruned_idx}, reverse={reverse}):")
    print(f"Mean Correlation: {np.mean(correlations):.4f}")
    print(f"Median Correlation: {np.median(correlations):.4f}")
    print(f"Min Correlation: {np.min(correlations):.4f}")
    print(f"Max Correlation: {np.max(correlations):.4f}")
    print(f"Standard Deviation: {np.std(correlations):.4f}")
    return correlations

In [6]:
# sample 20 sequences from the test data using numpy

sample_idx = np.random.choice(np.arange(X_test.shape[0]), 20, replace=False)
test_data = X_test[sample_idx]


correlations_event_hs = compute_correlations(test_data, baseline_shap, baseline_timeshap, "Synthetic Background", mode="event", pruned_idx=0, reverse=True)

Correlation for sequence 0 (Synthetic Background, mode=event, pruned_idx=0, reverse=True): 0.9603


Correlation for sequence 1 (Synthetic Background, mode=event, pruned_idx=0, reverse=True): 0.9078


Correlation for sequence 2 (Synthetic Background, mode=event, pruned_idx=0, reverse=True): 0.9402


Correlation for sequence 3 (Synthetic Background, mode=event, pruned_idx=0, reverse=True): 0.8682
Correlation for sequence 4 (Synthetic Background, mode=event, pruned_idx=0, reverse=True): 0.9531
Correlation for sequence 5 (Synthetic Background, mode=event, pruned_idx=0, reverse=True): 0.9375
Correlation for sequence 6 (Synthetic Background, mode=event, pruned_idx=0, reverse=True): 0.7661
Correlation for sequence 7 (Synthetic Background, mode=event, pruned_idx=0, reverse=True): 0.9281
Correlation for sequence 8 (Synthetic Background, mode=event, pruned_idx=0, reverse=True): 0.9486
Correlation for sequence 9 (Synthetic Background, mode=event, pruned_idx=0, reverse=True): 0.9433
Correlation for sequence 10 (Synthetic Background, mode=event, pruned_idx=0, reverse=True): 0.9119
Correlation for sequence 11 (Synthetic Background, mode=event, pruned_idx=0, reverse=True): 0.9573
Correlation for sequence 12 (Synthetic Background, mode=event, pruned_idx=0, reverse=True): 0.9611
Correlation for s

## Sequences with knee point

In [8]:
n_sequences = 20

end_capacities = []
for idx in range(X_test.shape[0]):  # Loop over all sequences
    last_value = X_test[idx, -1, 0]  # Capacity at the end: [idx, 119, 0]
    end_capacities.append((idx, last_value))

# Convert to numpy array for easier manipulation
end_capacities = np.array([(idx, val) for idx, val in end_capacities])

# Step 2: Get indices of the 20 sequences with the lowest end capacity
# Use argsort to get indices sorted by end capacity (ascending), then take the first 20 (lowest)
lowest_indices = np.argsort(end_capacities[:, 1])[:n_sequences]
lowest_indices = end_capacities[lowest_indices, 0].astype(int)  # Extract the sequence indices

# Step 3: Extract the top 20 sequences from X_test
lowest_20_sequences = X_test[lowest_indices]

# Verify the shape
print("Shape of lowest_20_sequences:", lowest_20_sequences.shape)

# Plot the sequences in a line plot
fig = go.Figure()
for idx, sequence in enumerate(lowest_20_sequences):
    fig.add_trace(go.Scatter
    (
        x=np.arange(120),
        y=sequence.flatten(),
        mode='lines',
        name=f"Sequence {idx}",
        line=dict(width=1)
    ))
fig.update_layout(
    title="Lowest Capacity at end Sequences",
    xaxis_title="Timestep",
    yaxis_title="Capacity",
    template="plotly_white"
)
fig.show()

Shape of lowest_20_sequences: (20, 120, 1)


In [9]:
# Compute correlations for the lowest 20 sequences
correlations_event_hs = compute_correlations(lowest_20_sequences, baseline_shap, baseline_timeshap, "Lowest capacity", mode="event", pruned_idx=0, reverse=True)

Correlation for sequence 0 (Lowest capacity, mode=event, pruned_idx=0, reverse=True): 0.9059


Correlation for sequence 1 (Lowest capacity, mode=event, pruned_idx=0, reverse=True): 0.8909


Correlation for sequence 2 (Lowest capacity, mode=event, pruned_idx=0, reverse=True): 0.9067


Correlation for sequence 3 (Lowest capacity, mode=event, pruned_idx=0, reverse=True): 0.9083
Correlation for sequence 4 (Lowest capacity, mode=event, pruned_idx=0, reverse=True): 0.9077
Correlation for sequence 5 (Lowest capacity, mode=event, pruned_idx=0, reverse=True): 0.9093
Correlation for sequence 6 (Lowest capacity, mode=event, pruned_idx=0, reverse=True): 0.9207
Correlation for sequence 7 (Lowest capacity, mode=event, pruned_idx=0, reverse=True): 0.9123
Correlation for sequence 8 (Lowest capacity, mode=event, pruned_idx=0, reverse=True): 0.9154
Correlation for sequence 9 (Lowest capacity, mode=event, pruned_idx=0, reverse=True): 0.9132
Correlation for sequence 10 (Lowest capacity, mode=event, pruned_idx=0, reverse=True): 0.9121
Correlation for sequence 11 (Lowest capacity, mode=event, pruned_idx=0, reverse=True): 0.9110
Correlation for sequence 12 (Lowest capacity, mode=event, pruned_idx=0, reverse=True): 0.9104
Correlation for sequence 13 (Lowest capacity, mode=event, pruned_id

## With Hidden States

Not working currently.

In [24]:
def compute_correlations_hidden(test_data, baseline_shap, baseline_timeshap, label, mode="event", pruned_idx=0, reverse=False):
    correlations = []
    for idx in range(test_data.shape[0]):
        test_3d = test_data[idx:idx+1]
        test_2d = test_3d.reshape(1, -1)
        
        # SHAP (final output)
        explainer = shap.KernelExplainer(f_shap, baseline_shap)
        shap_values = explainer.shap_values(test_2d, silent=True, nsamples=300)
        shap_first = shap_values[0].flatten()  # (120,)
        
        # TimeSHAP (hidden state mean)
        event_explanation = local_event_explainer(
            f_timeshap_hidden_mean, test_3d, baseline_timeshap, 
            pruned_idx=pruned_idx, nsamples=3000
        )
        
        correlation, _ = pearsonr(shap_first, event_explanation["Shapley Value"])
        correlations.append(correlation)
        print(f"Correlation for sequence {idx} ({label}, mode={mode}, pruned_idx={pruned_idx}, reverse={reverse}): {correlation:.4f}")
        
        if idx < 3:
            fig = go.Figure()
            fig.add_trace(go.Scatter(x=np.arange(120), y=shap_first, mode='lines+markers', name='SHAP (Output)', line=dict(color='blue')))
            fig.add_trace(go.Scatter(x=event_explanation.index, y=event_explanation["Shapley Value"], mode='lines+markers', name='TimeSHAP (Hidden Mean)', line=dict(color='red')))
            fig.update_layout(
                title=f"SHAP (Output) vs TimeSHAP (Hidden Mean) for Sequence {idx} ({label}, mode={mode})",
                xaxis_title="Timestep",
                yaxis_title="Explanation Value",
                template="plotly_white"
            )
            fig.add_hline(y=0, line_dash="dash", line_color="gray")
            fig.show()

    print(f"\nResults for {label} (mode={mode}, pruned_idx={pruned_idx}, reverse={reverse}):")
    print(f"Mean Correlation: {np.mean(correlations):.4f}")
    print(f"Median Correlation: {np.median(correlations):.4f}")
    print(f"Min Correlation: {np.min(correlations):.4f}")
    print(f"Max Correlation: {np.max(correlations):.4f}")
    print(f"Standard Deviation: {np.std(correlations):.4f}")
    return correlations

In [20]:
model.summary()

In [None]:
# Create a new model that outputs the LSTM hidden state

lstm_layer_output = model.get_layer("lstm").output  # Shape: (batch_size, 32)
hidden_state_model = Model(inputs=model.input, outputs=lstm_layer_output)

def f_timeshap_hidden_mean(X):
    hidden_states = hidden_state_model.predict(X, verbose=0)  # (batch_size, 32)
    return np.mean(hidden_states, axis=1, keepdims=True)  # (batch_size, 1)

In [21]:
# Test the hidden state output directly
dummy_input = np.ones((1, 120, 1))  # Use ones instead of zeros to ensure non-trivial input
hidden_states = hidden_state_model.predict(dummy_input, verbose=0)
print("Hidden states shape:", hidden_states.shape)  # Should be (1, 32)
print("Hidden states sample:", hidden_states[0][:5])  # First 5 values
print("Hidden states mean:", np.mean(hidden_states, axis=1))  # Should not be zero


Hidden states shape: (1, 32)
Hidden states sample: [ 0.01498125 -0.05079614 -0.47480452 -0.06650043 -0.2100757 ]
Hidden states mean: [-0.01559193]


In [23]:
def f_timeshap_hidden_mean(X):
    hidden_states = hidden_state_model.predict(X, verbose=0)  # (batch_size, 32)
    mean_values = np.mean(hidden_states, axis=1, keepdims=True)  # (batch_size, 1)
    print("Input shape:", X.shape)
    print("Hidden states sample:", hidden_states[0][:5])
    print("Mean value:", mean_values[0])
    return mean_values

# Test with synthetic data
test_input = test_data[0:1]  # (1, 120, 1)
result = f_timeshap_hidden_mean(test_input)
print("Result:", result)

Input shape: (1, 120, 1)
Hidden states sample: [ 0.00771899 -0.01324994 -0.01000896 -0.0199471  -0.0091048 ]
Mean value: [0.00753458]
Result: [[0.00753458]]


In [19]:
correlations_hidden = compute_correlations_hidden(
    test_data, baseline_shap, baseline_timeshap,
    "Synthetic Background (Hidden Mean)", mode="event", pruned_idx=0, reverse=True
)

Correlation for sequence 0 (Synthetic Background (Hidden Mean), mode=event, pruned_idx=0, reverse=True): -0.2150


Correlation for sequence 1 (Synthetic Background (Hidden Mean), mode=event, pruned_idx=0, reverse=True): 0.5195


Correlation for sequence 2 (Synthetic Background (Hidden Mean), mode=event, pruned_idx=0, reverse=True): 0.4415


Correlation for sequence 3 (Synthetic Background (Hidden Mean), mode=event, pruned_idx=0, reverse=True): 0.2540
Correlation for sequence 4 (Synthetic Background (Hidden Mean), mode=event, pruned_idx=0, reverse=True): 0.3487


KeyboardInterrupt: 

## Synthetic data

In [7]:
# Generate synthetic test data
def generate_synthetic_sequences(n_seq=3, length=120, noise_std=0.1):
    sequences = np.zeros((n_seq, length, 1))
    for i in range(n_seq):
        sequences[i, :60, 0] = 0.9 + np.random.normal(0, noise_std, 60)
        sequences[i, 60:, 0] = np.linspace(0.9, 0.1, 60) + np.random.normal(0, noise_std, 60)
    return sequences

# Generate synthetic background
def generate_synthetic_background(n_seq=50, length=120, drop_range=(20, 100), noise_std=0.1):
    sequences = np.zeros((n_seq, length, 1))
    drop_points = []
    for i in range(n_seq):
        drop_point = np.random.randint(drop_range[0], drop_range[1])
        drop_points.append(drop_point)
        sequences[i, :drop_point, 0] = 0.9 + np.random.normal(0, noise_std, drop_point)
        sequences[i, drop_point:, 0] = np.linspace(0.9, 0.1, length - drop_point) + np.random.normal(0, noise_std, length - drop_point)
    print("Background drop points:", drop_points)
    return sequences

# Generate constant background
def generate_constant_background(n_seq=50, length=120, value=0.9):
    sequences = np.ones((n_seq, length, 1)) * value
    return sequences

# Generate data
synthetic_test_data = generate_synthetic_sequences()
synthetic_background = generate_synthetic_background()
constant_background = generate_constant_background()

# Baselines
baseline_shap_synth = synthetic_background.reshape(50, 120)
baseline_timeshap_synth = synthetic_background
baseline_shap_const = constant_background.reshape(50, 120)
baseline_timeshap_const = constant_background

Background drop points: [20, 97, 79, 21, 86, 52, 73, 80, 71, 73, 59, 20, 32, 32, 35, 43, 29, 87, 58, 32, 28, 91, 73, 36, 72, 41, 90, 39, 45, 61, 80, 80, 93, 63, 28, 57, 52, 26, 24, 29, 45, 94, 97, 85, 73, 79, 60, 33, 81, 87]


In [11]:
correlations_event_hs = compute_correlations(synthetic_test_data, baseline_shap_synth, baseline_timeshap_synth, "Synthetic Background", mode="event", pruned_idx=0, reverse=True)

Provided model function fails when applied to the provided data set.


AttributeError: The layer sequential_1 has never been called and thus has no defined input.

### Test with synthetic data

In [43]:
# Generate synthetic test data with noise in pre-drop period
def generate_synthetic_sequences(n_seq=20, length=120, noise_std=0.1):
    sequences = np.zeros((n_seq, length, 1))
    for i in range(n_seq):
        # Noisy constant capacity before drop
        sequences[i, :60, 0] = 0.9 + np.random.normal(0, noise_std, 60)
        # Linear decline after drop with noise
        sequences[i, 60:, 0] = np.linspace(0.9, 0.1, 60) + np.random.normal(0, noise_std, 60)
    return sequences

# Generate synthetic background with wider drop range
def generate_synthetic_background(n_seq=50, length=120, drop_range=(20, 100), noise_std=0.1):
    sequences = np.zeros((n_seq, length, 1))
    drop_points = []
    for i in range(n_seq):
        drop_point = np.random.randint(drop_range[0], drop_range[1])
        drop_points.append(drop_point)
        # Noisy constant capacity before drop
        sequences[i, :drop_point, 0] = 0.9 + np.random.normal(0, noise_std, drop_point)
        # Linear decline after drop with noise, ensuring continuity
        post_drop_length = length - drop_point
        linear_part = np.linspace(0.9, 0.1, post_drop_length)
        noise = np.random.normal(0, noise_std, post_drop_length)
        sequences[i, drop_point:, 0] = linear_part + noise
    return sequences

# Generate synthetic test and background data
synthetic_test_data = generate_synthetic_sequences()
synthetic_background = generate_synthetic_background()

# Update the baseline for SHAP and TimeSHAP
baseline_shap = synthetic_background.reshape(50, 120)  # Shape: (50, 120)
baseline_timeshap = synthetic_background  # Shape: (50, 120, 1)