In [None]:
import numpy as np
import tensorflow as tf
import random
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import SimpleRNN, Dense
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from lime import lime_tabular
import shap
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go


In [None]:
# Set seeds for reproducibility
np.random.seed(42)
tf.set_random_seed(42)
random.seed(42)

# SIMPLE REGRESION CASE

In [None]:
# Generar data set sintético
n = 10000
mu= 0
sigma =1

np.random.seed(0)
x1 = np.random.normal(mu,sigma,n)
np.random.seed(1)
x2 = np.random.normal(mu,sigma,n)
np.random.seed(2)
x3 = np.random.normal(mu,sigma,n)
x1 = x1.reshape(-1,1)
x2 = x2.reshape(-1,1)
x3 = x3.reshape(-1,1)


In [None]:
def reshape_data(seq, n_timesteps):
    N = len(seq) - n_timesteps - 1
    nf = seq.shape[1]
    if N <= 0:
        raise ValueError('I need more data!')
    new_seq = np.zeros((N, n_timesteps, nf))
    for i in range(N):
        new_seq[i, :, :] = seq[i:i+n_timesteps]
    return new_seq

In [None]:
# Reshape data for RNN
N_TIMESTEPS = 3
X1 = reshape_data(x1, n_timesteps=N_TIMESTEPS)
X1 = X1[:,::-1,:]
X2 = reshape_data(x2, n_timesteps=N_TIMESTEPS)
X2 = X2[:,::-1,:]
X3 = reshape_data(x3, n_timesteps=N_TIMESTEPS)
X3 = X3[:,::-1,:]

In [None]:
y = X1[:,0]**2 + X2[:,1] - X3[:,2]

In [None]:
# Concatenate X1 X2 X3
X = np.concatenate((X1,X2,X3), axis=2)

In [None]:
#Create the RNN model and training
model = Sequential()
model.add(SimpleRNN(20, activation='tanh', return_sequences=False, input_shape=(N_TIMESTEPS, X.shape[2])))
model.add(Dense(1))

# Step 4: Compile the model
model.compile(optimizer='adam', loss='mean_squared_error')

# Step 5: Train the model
model.fit(X, y, epochs=50 )

In [None]:
#Predicting values 
y_pred = model.predict(X)


In [None]:
import plotly.graph_objects as go
# Create the plot
fig = go.Figure()

# Add the true values line
fig.add_trace(go.Scatter(x=list(range(y.shape[0])), y=y.squeeze(), mode='lines', name='True Values'))

# Add the predicted values line
fig.add_trace(go.Scatter(x=list(range(y.shape[0])), y=y_pred.squeeze(), mode='lines', name='Predicted Values', line=dict(dash='dash')))

# Update layout
fig.update_layout(
    title='True vs Predicted Values',
    xaxis_title='Time',
    yaxis_title='Value'
)

# Show the plot
fig.show()

## Derivadas Parciales

In [None]:
# obtencion de los pesos
model_weights = model.get_weights()
W1 =model_weights[0] #input to hidden
U1=model_weights[1] #hidden to hidden
b1=model_weights[2] #hidden to hidden
W2=model_weights[3] #hidden to output
b2= model_weights[4] #hidden to output

In [None]:
def jacobian_rnn(X, W1, U1, b1, W2, b2):
    def stanh(x):
        return np.diag(1 - np.tanh(x) ** 2)

    # Forward pass
    nsamples, timesteps, ninputs = X.shape


    # List of every layer 
    Z = [
        X, 
        np.empty((nsamples, timesteps, W1.shape[1])), 
        np.empty((nsamples, timesteps, 1))
    ]
    O = [
        X, 
        np.empty((nsamples, timesteps, W1.shape[1])),
        np.empty((nsamples, timesteps, 1))
    ]
    o_t1_l2 = np.zeros((nsamples, W1.shape[1]))

    for t in range(timesteps):
        # INPUT LAYER
        z_t_l1 = Z[0][:, t, :].copy()
        o_t_l1 = z_t_l1.copy()
        
        # HIDDEN LAYER
        z_t_l2 = o_t_l1 @ W1 + o_t1_l2 @ U1 + b1
        o_t_l2 = np.tanh(z_t_l2)
        o_t1_l2 = o_t_l2.copy()
        
        # OUTPUT LAYER
        z_t_l3 = o_t_l2 @ W2 + b2
        o_t_l3 = z_t_l3
        
        #aquí solo te quedas con el ultimo timestep?
        # STORE VARIABLES
        Z[1][:, t, :] = z_t_l2
        O[1][:, t, :] = o_t_l2
        Z[2][:, t, :] = z_t_l3
        O[2][:, t, :] = o_t_l3
            
    # backward pass
    D=[np.eye(ninputs), np.apply_along_axis(stanh, 2, Z[1]), np.eye(1)]
    
    #dy_t_l3/dyz_t_l2
    dydh  = D[1][:,-1,:,:] @ W2 @ D[2]

    #dz_t_l2/dx_t_j
    PD = np.empty((nsamples,timesteps,ninputs,y.shape[1]))
    CD = np.eye(Z[1].shape[2])
    for t in range (timesteps):
        PD[:,t,:,:] = D[0] @ W1 @ CD @ dydh
        CD = D[1][:,t,:,:] @ U1 @ CD

    return PD

In [None]:
PD = jacobian_rnn(X, W1, U1, b1, W2, b2)

In [None]:
def compute_sensitivities(PD):
    # Sensibilidad media de la salida con respecto a la entrada i en el timestep t
    mean_PD_it = np.mean(PD, axis=0)
    
    # Desviación estándar de la salida con respecto a la entrada i en el timestep t
    sigma_PD_it = np.sqrt(np.mean((PD - mean_PD_it)**2, axis=0))
    
    # Sensibilidad cuadrática media de la salida con respecto a la entrada i en el timestep t
    PD_it_squared = np.mean(PD**2, axis=0)
    
    # Sensibilidad media de la salida con respecto a la entrada i
    mean_PD_i = np.mean(mean_PD_it, axis=0)
    
    # Desviación estándar de la salida con respecto a la entrada i
    sigma_PD_i = np.sqrt(np.mean(PD_it_squared - mean_PD_it**2, axis=0))
    
    # Sensibilidad cuadrática media de la salida con respecto a la entrada i
    PD_i_squared = np.mean(PD_it_squared, axis=0)
    
    return mean_PD_it, sigma_PD_it, PD_it_squared, mean_PD_i, sigma_PD_i, PD_i_squared

In [None]:
mean_PD_it, sigma_PD_it, PD_it_squared, mean_PD_i, sigma_PD_i, PD_i_squared = compute_sensitivities(PD)

print("meanPD_it:", mean_PD_it.shape)
print("sigma_PD_it:", sigma_PD_it.shape)
print("PD_it_squared:", PD_it_squared.shape)
print("meanPD_i:", mean_PD_i.shape)
print("sigma_PD_i:", sigma_PD_i.shape)
print("PD_i_squared:", PD_i_squared.shape)

In [None]:
flattened_mean_PD = mean_PD_it.squeeze()
flattened_sigma_PD_it = sigma_PD_it.squeeze()
flattened_PD_it_squared = PD_it_squared.squeeze()
# Crear un DataFrame con pandas y ajustar el índice y las columnas
df_mean_PD = pd.DataFrame(flattened_mean_PD, index=['X1', 'X2', 'X3'], columns=['t', 't-1', 't-2'])
df_sigma_PD_it = pd.DataFrame(flattened_sigma_PD_it, index=['X1', 'X2', 'X3'], columns=['t', 't-1', 't-2'])
df_PD_it_squared= pd.DataFrame(flattened_PD_it_squared, index=['X1', 'X2', 'X3'], columns=['t', 't-1', 't-2'])
# Mostrar el DataFrame resultante
print(df_mean_PD)
print(df_sigma_PD_it)
print(df_PD_it_squared)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

def plot_partial_derivative_analysis(PD, mean_PD_it, sigma_PD_it, PD_it_squared, timestep_names=None):
    """
    Generate various plots to analyze the relationship between output and inputs using partial derivatives.

    Args:
        PD (numpy.ndarray): Raw partial derivatives with size (samples, timesteps, inputs, outputs).
        mean_PD_it (numpy.ndarray): Mean of partial derivatives in each timestep with size (timesteps, inputs, outputs).
        sigma_PD_it (numpy.ndarray): Standard deviation of partial derivatives in each timestep with size (timesteps, inputs, outputs).
        PD_it_squared (numpy.ndarray): Mean of squared partial derivatives in each timestep with size (timesteps, inputs, outputs).
        timestep_names (list of str, optional): Names of the timesteps. Defaults to None.
    """
    
    inputs = mean_PD_it.shape[1]
    timesteps = mean_PD_it.shape[0]
    samples = PD.shape[0]
    
    input_names = [f'Input {i+1}' for i in range(inputs)]

    if timestep_names is None:
        timestep_names = [f't-{timesteps-t-1}' for t in range(timesteps)]

    # Scatter plot: Mean vs Std of Partial Derivatives for Inputs Across All Timesteps
    plt.figure(figsize=(10, 8))
    plt.axhline(0, color='blue', linewidth=1)
    plt.axvline(0, color='blue', linewidth=1)
    plt.scatter(0, 0, color='blue', s=10) 

    for t in range(timesteps):
        for i in range(inputs):
            mean_val = mean_PD_it[t, i, 0]
            std_val = sigma_PD_it[t, i, 0]
            plt.scatter(mean_val, std_val, color='darkgray', label=f'{input_names[i]} ({timestep_names[t]})')
            plt.text(mean_val, std_val, f'{input_names[i]} ({timestep_names[t]})', fontsize=8)

    plt.xlabel('Mean of Partial Derivatives')
    plt.ylabel('Standard Deviation of Partial Derivatives')
    plt.title('Mean vs Standard Deviation of Partial Derivatives for Inputs Across All Timesteps')
    plt.grid(True)
    plt.show()

    # Bar plot: Squared Partial Derivatives with Gradient Colors Based on Mean Values
    flattened_data = []
    for t in range(timesteps):
        for i in range(inputs):
            mean_val = mean_PD_it[t, i, 0]
            squared_val = PD_it_squared[t, i, 0]
            label = f'{input_names[i]} ({timestep_names[t]})'
            flattened_data.append((mean_val, squared_val, label))

    flattened_data.sort(key=lambda x: x[1])
    sorted_means, sorted_squared_vals, labels = zip(*flattened_data)
    cmap_red = plt.cm.Reds_r
    cmap_green = plt.cm.Greens
    cmap_gray = plt.cm.Greys
    min_mean = np.min(sorted_means)
    max_mean = np.max(sorted_means)

    plt.figure(figsize=(12, 8))

    for mean_val, squared_val, label in zip(sorted_means, sorted_squared_vals, labels):
        if mean_val < 0:
            norm_val = (mean_val - min_mean) / (0 - min_mean)
            color = cmap_red(norm_val)
        elif mean_val > 0:
            norm_val = (mean_val - 0) / (max_mean - 0)
            color = cmap_green(norm_val)
        else:
            color = cmap_gray(0.5)

        plt.bar(label, squared_val, color=color)

    plt.xlabel('Inputs and Timesteps')
    plt.ylabel('Squared Partial Derivatives')
    plt.title('Squared Partial Derivatives with Gradient Colors Based on Mean Values')
    plt.xticks(rotation=90)
    plt.grid(axis='y')
    plt.tight_layout()
    plt.show()

    # Density plot: Partial Derivatives for Each Input and Timestep
    plt.figure(figsize=(12, 8))
    colors = sns.color_palette("husl", timesteps * inputs)

    for t in range(timesteps):
        for i in range(inputs):
            sns.kdeplot(PD[:, t, i, 0], fill=True, alpha=0.4, label=f'{input_names[i]}, {timestep_names[t]}')

    plt.xlabel('Partial Derivatives')
    plt.ylabel('Density')
    plt.title('Density Plots of Partial Derivatives for Each Input and Timestep')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    # Line plot: Evolution of Partial Derivatives Through Samples
    plt.figure(figsize=(15, 10))
    colors = sns.color_palette("husl", timesteps * inputs)

    for t in range(timesteps):
        for i in range(inputs):
            plt.plot(range(samples), PD[:, t, i, 0], label=f'{input_names[i]}, {timestep_names[t]}', alpha=0.7)

    plt.xlabel('Samples')
    plt.ylabel('Partial Derivatives')
    plt.title('Evolution of Partial Derivatives Through Samples')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    # Subplots: Evolution of Mean, Std, and Squared Partial Derivatives
    fig, axes = plt.subplots(3, 1, figsize=(15, 15), sharex=True)

    axes[0].set_title('Evolution of the Mean of Partial Derivatives')
    for i in range(inputs):
        axes[0].plot(range(timesteps), mean_PD_it[:, i, 0], marker='o', label=f'Input {i+1}')
    axes[0].set_ylabel('Mean of Partial Derivatives')
    axes[0].legend()
    axes[0].grid(True)
    axes[0].set_xticks(range(timesteps))
    axes[0].set_xticklabels(timestep_names)

    axes[1].set_title('Evolution of the Standard Deviation of Partial Derivatives')
    for i in range(inputs):
        axes[1].plot(range(timesteps), sigma_PD_it[:, i, 0], marker='o', label=f'Input {i+1}')
    axes[1].set_ylabel('Standard Deviation of Partial Derivatives')
    axes[1].legend()
    axes[1].grid(True)
    axes[1].set_xticks(range(timesteps))
    axes[1].set_xticklabels(timestep_names)

    axes[2].set_title('Evolution of the Mean of Squared Partial Derivatives')
    for i in range(inputs):
        axes[2].plot(range(timesteps), PD_it_squared[:, i, 0], marker='o', label=f'Input {i+1}')
    axes[2].set_xlabel('Timesteps')
    axes[2].set_ylabel('Mean of Squared Partial Derivatives')
    axes[2].legend()
    axes[2].grid(True)
    axes[2].set_xticks(range(timesteps))
    axes[2].set_xticklabels(timestep_names)

    plt.tight_layout(rect=[0, 0, 1, 0.96])
    plt.show()


plot_partial_derivative_analysis(PD, mean_PD_it, sigma_PD_it, PD_it_squared)

# SINE CASE

In [None]:
# Step 1: Define the function to generate sine wave data
def generate_sin_wave(seq_length):
    time_steps = np.linspace(0, 2 * np.pi, seq_length + 1)
    data = np.sin(time_steps)
    data.resize((seq_length + 1, 1))

    x = data[:-1]
    y = data[1:]
    return time_steps, x, y

In [None]:
# Step 2: Generate the data
seq_length = 600  # You can change this value
_, x, y = generate_sin_wave(seq_length)

In [None]:
# Reshape data for RNN
N_TIMESTEPS = 8
x = reshape_data(x, n_timesteps=N_TIMESTEPS)
y = y[N_TIMESTEPS:-1]

In [None]:
# Step 3: Create the RNN model
model = Sequential()
model.add(SimpleRNN(12, activation='tanh', return_sequences=False, input_shape=(N_TIMESTEPS, 1)))
model.add(Dense(1))

# Step 4: Compile the model
model.compile(optimizer='adam', loss='mean_squared_error')

# Step 5: Train the model
model.fit(x, y, epochs=100)

In [None]:
y_pred = model.predict(x)

In [None]:
import plotly.graph_objects as go
# Create the plot
fig = go.Figure()

# Add the true values line
fig.add_trace(go.Scatter(x=list(range(y.shape[0])), y=y.squeeze(), mode='lines', name='True Values'))

# Add the predicted values line
fig.add_trace(go.Scatter(x=list(range(y.shape[0])), y=y_pred.squeeze(), mode='lines', name='Predicted Values', line=dict(dash='dash')))

# Update layout
fig.update_layout(
    title='True vs Predicted Values',
    xaxis_title='Time',
    yaxis_title='Value'
)

# Show the plot
fig.show()

In [None]:
import matplotlib.pyplot as plt
plt.plot(y, lw=3, alpha=0.3, label='Truth')
plt.plot(y_pred, '--', label='Predictions')
plt.legend(loc='best')

## Derivadas Parciales

In [None]:
# obtencion de los pesos
model_weights = model.get_weights()
W1 =model_weights[0] #input to hidden
U1=model_weights[1] #hidden to hidden
b1=model_weights[2] #hidden to hidden
W2=model_weights[3] #hidden to output
b2= model_weights[4] #hidden to output

In [None]:
PD = jacobian_rnn(x, W1, U1, b1, W2, b2)

In [None]:
mean_PD_it, sigma_PD_it, PD_it_squared, mean_PD_i, sigma_PD_i, PD_i_squared = compute_sensitivities(PD)

print("meanPD_it:", mean_PD_it.shape)
print("sigma_PD_it:", sigma_PD_it.shape)
print("PD_it_squared:", PD_it_squared.shape)
print("meanPD_i:", mean_PD_i.shape)
print("sigma_PD_i:", sigma_PD_i.shape)
print("PD_i_squared:", PD_i_squared.shape)

In [None]:
plot_partial_derivative_analysis(PD, mean_PD_it, sigma_PD_it, PD_it_squared)

## LIME

In [None]:
from lime import lime_tabular

In [None]:
explainer = lime_tabular.RecurrentTabularExplainer(training_data=x, 
                                                   training_labels=y, 
                                                   mode='regression',
                                                   feature_names=['X1'],
                                                   discretize_continuous=False,
                                                   class_names=None, 
                                                   random_state=42)

In [None]:
lime_coefs = np.empty((591, 8))
for p in range(x.shape[0]):
    exp = explainer.explain_instance(x[p], model.predict, num_features=8, labels=None)
    # exp.show_in_notebook()
    lime_coefs[p,:] = np.array([i[1] for i in exp.as_list()])

In [None]:
import matplotlib.pyplot as plt
plt.plot(y, lw=3, alpha=0.3, label='Truth')
plt.plot(lime_coefs[:,0], label='Xt')
plt.plot(lime_coefs[:,1], '--', label='Xt-1')
plt.plot(lime_coefs[:,2], '.-', label='Xt-2')
plt.plot(lime_coefs[:,3], label='Xt-3')
plt.plot(lime_coefs[:,4], '--', label='Xt-4')
plt.plot(lime_coefs[:,5], '.-', label='Xt-5')
plt.plot(lime_coefs[:,6], label='Xt-6')
plt.plot(lime_coefs[:,7], '--', label='Xt-7')
plt.legend(loc='best')

In [None]:
exp = explainer.explain_instance(x[0], model.predict, num_features=8, labels=None)
exp.show_in_notebook()

# Extracting the coefficients
lime_coefficients = exp.as_list()

# Printing the coefficients
print("LIME Coefficients for the instance:")
for feature, weight in lime_coefficients:
    print(f"{feature}: {weight}")

In [None]:
exp = explainer.explain_instance(x[75], model.predict, num_features=8, labels=None)
exp.show_in_notebook()

In [None]:
exp = explainer.explain_instance(x[150], model.predict, num_features=8, labels=None)
exp.show_in_notebook()

In [None]:
exp = explainer.explain_instance(x[225], model.predict, num_features=8, labels=None)
exp.show_in_notebook()

In [None]:
exp = explainer.explain_instance(x[300], model.predict, num_features=8, labels=None)
exp.show_in_notebook()

In [None]:
exp = explainer.explain_instance(x[375], model.predict, num_features=8, labels=None)
exp.show_in_notebook()

In [None]:
exp = explainer.explain_instance(x[450], model.predict, num_features=8, labels=None)
exp.show_in_notebook()

In [None]:
exp = explainer.explain_instance(x[525], model.predict, num_features=8, labels=None)
exp.show_in_notebook()

In [None]:
y_lime = 0.17 * x[:, 3, :] + 0.18 * x[:, 4, :] - 0.13 * x[:, 7, :] -0.12 * x[:, 5, :] - 0.03 * x[:, 6, :] + 0.27 * x[:, 0, :] + 0.42 * x[:, 2, :] - 0.20 * x[:, 1, :] 

In [None]:
plt.plot(y, lw=3, alpha=0.3, label='Truth')
plt.plot(y_pred, '--', label='Predictions')
plt.plot(y_lime, '.-', label='Predictions LIME')
plt.legend(loc='best')

## SHAP

In [None]:
import shap
import seaborn as sns

shap.initjs()
x = np.array(x)
explainer = shap.DeepExplainer(model, x)

shap_values = explainer.shap_values(x)

In [None]:
shap_values=np.squeeze(shap_values)

In [None]:
import pandas as pd
shap_values_array = np.array(shap_values[0])
print("Shape of shap_values_array:", shap_values_array.shape)
shap_pd_list = []
x_pd_list = []
names_features=['X1']
timestep_labels=['t','t-1','t-2','t-3','t-4','t-5','t-6','t-7']
for i in range(shap_values_array.shape[2]):
    shap_pd_list.append(pd.DataFrame(shap_values_array[:,:,i], columns=[str(names_features[i]) + '_' + str(timestep_labels[j]) for j in range(N_TIMESTEPS)]))
    x_pd_list.append(pd.DataFrame(x[:,:,i], columns=[str(names_features[i]) + '_' + str(timestep_labels[j]) for j in range(N_TIMESTEPS)]))
shap_pd = pd.concat(shap_pd_list, axis=1)
x_pd = pd.concat(x_pd_list, axis=1)

In [None]:
shap.summary_plot(shap_pd.values, x_pd)
shap.summary_plot(shap_pd.values, x_pd, plot_type='bar')

## PDP

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def PDP(model, data, time_step, feature_index, num_grid_points=50):
    """
    Generates Partial Dependence Plot (PDP) and Individual Conditional Expectation (ICE) plots
    for a given feature at a specific time step.

    :param model: Trained Keras RNN model.
    :param data: Input data (3D array: [batch_size, time_steps, features]).
    :param time_step: Time step at which the PDP is to be generated.
    :param feature_index: Index of the feature for which PDP is to be generated.
    :param num_grid_points: Number of points for plotting.
    """
    # Ensure the specified time step is within the range of the data's time steps
    assert 0 <= time_step < data.shape[1], "time_step is out of bounds."

    # Get range for the feature
    feature_range = np.linspace(np.min(data[:, time_step, feature_index]), 
                                np.max(data[:, time_step, feature_index]), 
                                num_grid_points)
    
    # Storage for model ice
    ice = np.zeros((data.shape[0], len(feature_range)))

    # Calculate ice for each value in feature range
    for i, value in enumerate(feature_range):
        modified_data = data.copy()
        modified_data[:, time_step, feature_index] = value
        ice[:, i] = model.predict(modified_data).flatten()

    # Calculate PDP
    pdp = ice.mean(axis=0)

    # Plotting
    plt.figure(figsize=(12, 6))

    # ICE Plots
    for i in range(data.shape[0]):
        plt.plot(feature_range, ice[i, :], color='grey', alpha=0.5)

    # PDP
    plt.plot(feature_range, pdp, color='red', linewidth=3, label='PDP')

    plt.xlabel(f'Feature {names_features[feature_index]} at Time Step {timestep_labels[time_step]}')
    plt.ylabel('Model Response')
    plt.title(f'ICE and PDP for Feature {names_features[feature_index]} at Time Step {timestep_labels[time_step]}')
    plt.legend()
    plt.show()

    return pdp, ice

In [None]:
def generate_all_PDPs(model, data, num_grid_points=50):
    """
    Generates PDPs for each feature at each time step for the given RNN model.

    :param model: Trained Keras RNN model.
    :param data: Input data (3D array: [batch_size, time_steps, features]).
    :param num_grid_points: Number of points for plotting.
    """
    num_time_steps = data.shape[1]
    print(num_time_steps)
    num_features = data.shape[2]

    pdp_results = {}

    for time_step in range(num_time_steps):
        for feature_index in range(num_features):
            pdp = PDP(model, data, time_step, feature_index, num_grid_points)
            pdp_results[(time_step, feature_index)] = pdp

    return pdp_results


In [None]:
generate_all_PDPs(model, x)

## Permutation Importance

In [None]:
import numpy as np
from sklearn.metrics import mean_squared_error  # or use another appropriate metric depending on your task

def permutation_importance(model, X, y, metric=mean_squared_error):
    """
    Compute permutation feature importance for each time step for each feature 
    in a Keras RNN model.

    :param model: Trained Keras RNN model.
    :param X: Input data (3D array: [batch_size, time_steps, features]).
    :param y: True output values.
    :param metric: Performance metric function to evaluate the model (default is mean squared error).
    :return: Dictionary of feature importances for each time step.
    """
    # Store the original model performance
    original_performance = metric(y, model.predict(X))

    # Number of time steps and features
    num_time_steps = X.shape[1]
    num_features = X.shape[2]

    # Dictionary to hold importances
    importances = {}

    for time_step in range(num_time_steps):
        for feature_index in range(num_features):
            # Save original feature
            saved_feature = X[:, time_step, feature_index].copy()

            # Permute the feature at the specific time step
            np.random.shuffle(X[:, time_step, feature_index])

            # Calculate new performance
            shuffled_performance = metric(y, model.predict(X))

            # Restore original feature
            X[:, time_step, feature_index] = saved_feature

            # Calculate importance
            importance = shuffled_performance - original_performance
            importances[(time_step, feature_index)] = importance

    return importances


In [None]:
permutation_importance(model, x, y, mean_squared_error)

# DAILY DEMAND CASE

In [None]:
# Cargar los datos desde el archivo CSV
data = pd.read_csv('daily_demand_tr.csv', delimiter=';', skiprows=0)

print(data.head())

# Separar las características de entrada (WD y temp) y la variable de salida (dem)
X = data[['WD', 'TEMP']].values
y = data['DEM'].values
#names_features=data.columns.tolist()
print(X)
# Normalizar los datos
scaler_X = MinMaxScaler()
scaler_y = MinMaxScaler()
X = scaler_X.fit_transform(X)
y = scaler_y.fit_transform(y.reshape(-1, 1))

In [None]:
# Reshape data for RNN
N_TIMESTEPS = 8
X = reshape_data(X, n_timesteps=N_TIMESTEPS)
X = X[:,::-1,:]
y = y[N_TIMESTEPS:-1]

In [None]:
#Select only timesteps 1, 2 and 7
N_TIMESTEPS_SELECTED=3
selected_time_steps = [1, 2, 7]
X_selected = X[:, selected_time_steps, :]

In [None]:
#Create the RNN model and training
from keras.optimizers import Adam
model = Sequential()
model.add(SimpleRNN(32, activation='tanh', return_sequences=False, input_shape=(N_TIMESTEPS_SELECTED, 2)))
model.add(Dense(1))

# Step 4: Compile the model
model.compile(optimizer='adam', loss='mean_squared_error')

# Step 5: Train the model
model.fit(X_selected, y, epochs=500 )

In [None]:
#Predicting values y guardar entradas y salidas

y_pred = model.predict(X_selected)

In [None]:
import plotly.graph_objects as go
# Create the plot
fig = go.Figure()

# Add the true values line
fig.add_trace(go.Scatter(x=list(range(y.shape[0])), y=y.squeeze(), mode='lines', name='True Values'))

# Add the predicted values line
fig.add_trace(go.Scatter(x=list(range(y.shape[0])), y=y_pred.squeeze(), mode='lines', name='Predicted Values', line=dict(dash='dash')))

# Update layout
fig.update_layout(
    title='True vs Predicted Values',
    xaxis_title='Time',
    yaxis_title='Value'
)

# Show the plot
fig.show()

In [None]:
#Plotting predictions

import matplotlib.pyplot as plt
plt.plot(y[200:400], lw=3, alpha=0.3, label='Truth')
plt.plot(y_pred[200:400], '--', label='Predictions')
plt.legend(loc='best')

## Derivadas Parciales

In [None]:
# obtencion de los pesos
model_weights = model.get_weights()
W1 =model_weights[0] #input to hidden
U1=model_weights[1] #hidden to hidden
b1=model_weights[2] #hidden to hidden
W2=model_weights[3] #hidden to output
b2= model_weights[4] #hidden to output

In [None]:
PD = jacobian_rnn(X_selected, W1, U1, b1, W2, b2)

In [None]:
mean_PD_it, sigma_PD_it, PD_it_squared, mean_PD_i, sigma_PD_i, PD_i_squared = compute_sensitivities(PD)

print("meanPD_it:", mean_PD_it.shape)
print("sigma_PD_it:", sigma_PD_it.shape)
print("PD_it_squared:", PD_it_squared.shape)
print("meanPD_i:", mean_PD_i.shape)
print("sigma_PD_i:", sigma_PD_i.shape)
print("PD_i_squared:", PD_i_squared.shape)

In [None]:
plot_partial_derivative_analysis(PD, mean_PD_it, sigma_PD_it, PD_it_squared, [f't-{7}', f't-{2}', f't-{1}'])

## LIME

In [None]:
explainer = lime_tabular.RecurrentTabularExplainer(training_data=X_selected, 
                                                   training_labels=y, 
                                                   mode='regression',
                                                   feature_names=['WD_t-1','TEMP_t-1','WD_t-2','TEMP_t-2','WD_t-7','TEMP_t-7'],
                                                   discretize_continuous=False,
                                                   class_names=None, 
                                                   random_state=42)

In [None]:
N = 100
lime_coefs = np.empty((N, N_TIMESTEPS_SELECTED, 2))
for p in range(N):
    exp = explainer.explain_instance(X_selected[p], model.predict, num_features=lime_coefs.shape[1] * lime_coefs.shape[2], labels=None)
    # exp.show_in_notebook()
    lime_coefs[p,:,:] = np.array([i[1] for i in exp.as_list()]).reshape(lime_coefs.shape[1:])

In [None]:
lime_coefs = np.empty((X_selected.shape[0], N_TIMESTEPS_SELECTED, 2))
for p in range(X_selected.shape[0]):
    exp = explainer.explain_instance(X_selected[p], model.predict, num_features=lime_coefs.shape[1] * lime_coefs.shape[2], labels=None)
    # exp.show_in_notebook()
    lime_coefs[p,:,:] = np.array([i[1] for i in exp.as_list()]).reshape(lime_coefs.shape[1:])

In [None]:
plt.plot(y[:N], lw=3, alpha=0.3, label='Truth')
linetypes=['', '--', '.-', '.']
timestep_labels=[1,2,7]
for i in range(N_TIMESTEPS_SELECTED * 2):
    plt.plot(lime_coefs[:N, i % N_TIMESTEPS_SELECTED, int(i >= N_TIMESTEPS_SELECTED)], linetypes[i % len(linetypes)], label = "WD t-" + str(timestep_labels[i]) if i < N_TIMESTEPS_SELECTED else "TEMP t-" + str(timestep_labels[i-N_TIMESTEPS_SELECTED]))

plt.legend(loc='best')

In [None]:
import plotly.graph_objects as go

# Plot the truth
fig = go.Figure()
fig.add_trace(go.Scatter(x=np.arange(N), y=y[:N].reshape(-1,), mode='lines', line=dict(width=3, color='blue', dash='dash'), name='Truth'))

# Define linetypes
linetypes = ['dash', 'dot', 'dashdot', 'solid']

# Plot Lime coefficients
for i in range(2 * N_TIMESTEPS_SELECTED):
    if i < N_TIMESTEPS_SELECTED:
        name = "WD t-" + str(timestep_labels[i])
        color = 'red'
    else:
        name = "TEMP t-" + str(timestep_labels[i-N_TIMESTEPS_SELECTED])
        color = 'green'
    fig.add_trace(go.Scatter(x=np.arange(N), y=lime_coefs[:N, i % N_TIMESTEPS_SELECTED, int(i >= N_TIMESTEPS_SELECTED)], mode='lines', line=dict(width=2, color=color, dash=linetypes[i % len(linetypes)]), name=name))

# Update layout
fig.update_layout(title='Lime Coefficients',
                  xaxis_title='Sample',
                  yaxis_title='Coefficient Value',
                  legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
                  template='plotly_white')

fig.show()


In [None]:
# Define date ranges for winter (January and February) and summer (July and August)
winter_indices = data[(data['DATE'] >= '2008-01-01') & (data['DATE'] <= '2008-02-28')].index
summer_indices = data[(data['DATE'] >= '2008-07-01') & (data['DATE'] <= '2008-08-31')].index

# Select two weeks from each season (first 14 days of January and July)
winter_data_indices = winter_indices[:16]
summer_data_indices = summer_indices[:16]

X_winter = X_selected[winter_data_indices]
y_winter = y[winter_data_indices]
X_summer = X_selected[summer_data_indices]
y_summer = y[summer_data_indices]


In [None]:
#SUMMER 
plt.plot(y_summer, lw=3, alpha=0.3, label='Truth')
# Apply LIME to explain the predictions for each day in the selected two weeks of summer
lime_coefficients_summer = np.empty((16, N_TIMESTEPS_SELECTED, 2))
linetypes=['', '--', '.-', '.']
timestep_labels=[1,2,7]
for p in range(len(X_summer)):
    exp = explainer.explain_instance(X_summer[p].reshape(-1), model.predict, num_features=6)
    lime_coefficients_summer[p,:,:] = np.array([i[1]for i in exp.as_list()]).reshape(lime_coefficients_summer.shape[1:])
for i in range(N_TIMESTEPS_SELECTED * 2):
    plt.plot(lime_coefficients_summer[:N, i % N_TIMESTEPS_SELECTED, int(i >= N_TIMESTEPS_SELECTED)], linetypes[i % len(linetypes)], label = "WD t-" + str(timestep_labels[i]) if i < N_TIMESTEPS_SELECTED else "TEMP t-" + str(timestep_labels[i-N_TIMESTEPS_SELECTED]))
    plt.legend(loc='best')

In [None]:
#WINTER 
plt.plot(y_winter, lw=3, alpha=0.3, label='Truth')
# Apply LIME to explain the predictions for each day in the selected two weeks of winter
lime_coefficients_winter = np.empty((16, N_TIMESTEPS_SELECTED, 2))
for i in range(len(X_winter)):
    exp = explainer.explain_instance(X_winter[i].reshape(-1), model.predict, num_features=6)
    lime_coefficients_winter[p,:,:] = np.array([i[1]for i in exp.as_list()]).reshape(lime_coefficients_winter.shape[1:])
for i in range(N_TIMESTEPS_SELECTED * 2):
    plt.plot(lime_coefficients_winter[:N, i % N_TIMESTEPS_SELECTED, int(i >= N_TIMESTEPS_SELECTED)], linetypes[i % len(linetypes)], label = "WD t-" + str(timestep_labels[i]) if i < N_TIMESTEPS_SELECTED else "TEMP t-" + str(timestep_labels[i-N_TIMESTEPS_SELECTED]))
    plt.legend(loc='best')

In [None]:
for i in range(X_winter.shape[0]):
    exp = explainer.explain_instance(X_winter[i], model.predict, num_features=N_TIMESTEPS_SELECTED*2, labels=None)
    exp.show_in_notebook()

## SHAP

In [None]:
import shap

# print the JS visualization code to the notebook
shap.initjs()

explainer = shap.DeepExplainer(model, X_selected)

shap_values = explainer.shap_values(X_selected)

In [None]:
shap.force_plot(explainer.expected_value[0], shap_values[0][0], np.array(['WD','TEMP']))


In [None]:
shapes_of_shap_values = [np.array(values).shape for values in shap_values]

print("Shapes of shap_values arrays:", shapes_of_shap_values)

In [None]:
num_features=[np.array(values).shape for values in shap_values]

# Constructing feature names
feature_names = []
for feature_type in ['WD', 'TEMP']:
    for timestep in range(N_TIMESTEPS_SELECTED):
        feature_names.append(f'{feature_type}_{timestep}')

# Checking if feature_names includes all features represented in shap_values
if len(feature_names) != num_features:
    print("Error: The feature_names arg must include all features represented in shap_values.")
    print("Number of features in shap_values:", num_features)
    print("Number of elements in feature_names:", len(feature_names))
else:
    print("Feature names aligned correctly.")

print(num_features)

In [None]:
import pandas as pd
shap_values_array = np.array(shap_values[0])
print("Shape of shap_values_array:", shap_values_array.shape)
shap_pd_list = []
x_pd_list = []
names_features=['WD','TEMP']
for i in range(shap_values_array.shape[2]):
    shap_pd_list.append(pd.DataFrame(shap_values_array[:,:,i], columns=[str(names_features[i]) + '_t' + str(timestep_labels[j]) for j in range(N_TIMESTEPS_SELECTED)]))
    x_pd_list.append(pd.DataFrame(X_selected[:,:,i], columns=[str(names_features[i]) + '_t' + str(timestep_labels[j]) for j in range(N_TIMESTEPS_SELECTED)]))
shap_pd = pd.concat(shap_pd_list, axis=1)
x_pd = pd.concat(x_pd_list, axis=1)

In [None]:
shap.summary_plot(shap_pd.values, x_pd)

In [None]:
shap.summary_plot(shap_pd.values, x_pd, plot_type='bar')

In [None]:
# Apply SHAP using DeepExplainer
explainer = shap.DeepExplainer(model, X_selected)
shap_values = explainer.shap_values(X_selected)

# Select a specific instance for SHAP plots
instance_index = 0

# Extract the correct SHAP values and input data for the instance
shap_value_instance = shap_values[0][instance_index].flatten()
base_value_instance = explainer.expected_value[0]
data_instance = X_selected[instance_index].flatten()

# Ensure SHAP values and data have the correct dimensions
assert shap_value_instance.shape == data_instance.shape, f"Shapes do not match: {shap_value_instance.shape} vs {data_instance.shape}"

# Waterfall plot for a specific instance
shap.waterfall_plot(shap.Explanation(values=shap_value_instance, base_values=base_value_instance, data=data_instance))
plt.show()

# Reshape for summary plots
x_reshaped = X_selected.reshape((X_selected.shape[0], -1))  # Flatten x for summary plot
shap_values_reshaped = shap_values[0].reshape((shap_values[0].shape[0], -1))  # Flatten shap values for summary plot

# Generate summary plot with bar type
shap.summary_plot(shap_values_reshaped, x_reshaped, plot_type="bar")
plt.show()
names_feat=['WD_t1','TEMP_t1','WD_t2','TEMP_t2','WD_t3','TEMP_t3']
# Generate scatter plots for all features
num_features = x_reshaped.shape[1]
for i in range(num_features):
    explanation = shap.Explanation(
        values=shap_values_reshaped[:, [i]],
        base_values=np.mean(base_value_instance),
        data=x_reshaped[:, [i]],
        feature_names=[f'Feature {i}']
    )
    shap.plots.scatter(explanation,color=explanation)
    
    plt.show()
    

## PDP

In [None]:
generate_all_PDPs(model, X_selected)

## Permutation Importance

In [None]:
permutation_importance(model, X_selected, y, mean_squared_error)