## Linear Pipeline, Giessen-style synthetic data. 5 sensitive parameters.

### Import Simulation Data and Conduct PCA

Import the input and output data. Here we also import the recorded boolean variables where input parameters lead to a failed output. 

In [None]:
# specify which dataset 
n_samples = 500
n_params = 5

input_file = pd.read_csv(f"../Emulation/Input/input_{n_samples}_{n_params}params.csv")
output_file = pd.read_csv(f"../Emulation/Outputs/Output_{n_samples}_{n_params}params/resampled_all_pressure_traces_rv.csv")


bool_exist = False
if bool_exist:
 boolean_index = pd.read_csv(f"../Emulation/Outputs/Output_{n_samples}_{n_params}params/bool_indices_{n_samples}.csv")

In [None]:
output_file

In [None]:
# Initialize the plot
fig, ax = plt.subplots()

for ind in range(len(output_file)): 
    t = range(100) # Time adjustment
    p_pat = output_file.iloc[ind, :100].values # Pressure transient

    # Plot the pressure transient for each realization
    ax.plot(t, p_pat, label=f'Realisation {ind}')

# Set labels and title
ax.set_xlabel('Time Index')
ax.set_ylabel('Pressure (mmHg)')
ax.set_title('Pressure Transients in Right Ventricle')

# Add legend to the plot
# ax.legend()

# Display the plot
plt.show()

In [None]:
# Clean the input data to remove failed to converge simulations
if bool_exist:
    cleaned_input_df = input_file.drop(boolean_index['0'].values)
   
else: 
    cleaned_input_df = input_file.copy()
    
cleaned_input_df.head()

### Compute PCA Data

In [None]:
import sklearn
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold

df = output_file.copy()

# Copy the data and separate the target variable (only pressure traces)
X = df.iloc[:,:100].copy() # traces only

# Create an instance of StandardScaler
scaler = StandardScaler()

# Fit the scaler to the data and transform it - standardize
X_scaled = scaler.fit_transform(X)

pca = PCA(n_components=10)
X_pca = pca.fit_transform(X_scaled)

# Convert to dataframe
component_names = [f"PC{i+1}" for i in range(X_pca.shape[1])]
X_pca = pd.DataFrame(X_pca, columns=component_names, index=df.index)

X_pca.head()



In [None]:
# Plot Histograms
X_pca.hist(bins=30, figsize=(15, 13), layout=(5, 2), alpha=0.7, color='orange')
plt.suptitle('Histograms of the First 10 Principal Components')
plt.show()

In [None]:
def plot_variance(pca, width=8, dpi=100):
    # Create figure
    fig, axs = plt.subplots(1, 2)
    n = pca.n_components_
    grid = np.arange(1, n + 1)
    # Explained variance
    explained_variance_ratio = pca.explained_variance_ratio_
    axs[0].bar(grid, explained_variance_ratio, log=True)
    axs[0].set(
        xlabel="Component", title="% Explained Variance", ylim=(0.0, 1.0)
    )

    # Cumulative Variance
    cumulative_explained_variance = np.cumsum(explained_variance_ratio)
    axs[1].semilogy(grid, cumulative_explained_variance, "o-")
    axs[1].set(
        xlabel="Component", title="% Cumulative Variance", 
    )
    # Set up figure
    fig.set(figwidth=8, dpi=100)
    fig.tight_layout()
    return axs

plot_variance(pca)

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import  PowerTransformer


pipeline = Pipeline([
                ('scl', StandardScaler()),
                ('pca', PCA(n_components=10)),
                ('post',   PowerTransformer())
            ])

signals_pca = pipeline.fit_transform(X_scaled)

fig, ax = plt.subplots(ncols=10, nrows=2, figsize=(70, 15))

for i in range(signals_pca.shape[1]):
    temp = np.zeros(signals_pca.shape)
    temp[:, i] = signals_pca[:, i]
    
    signals_new = pipeline.inverse_transform(temp)
    
    ax[1][i].hist(signals_pca[:,i], bins=10)
    for signal in signals_new:
        ax[0][i].plot(signal)
        
plt.show()

In [None]:
kf = KFold(n_splits=5, random_state=1, shuffle=True)

explained_variance_ratios = []
pca_components = []

for train_index, test_index in kf.split(X_scaled):
    X_train, _ = X_scaled[train_index], X_scaled[test_index]
    
    pca = PCA(n_components=3)
    X_train_pca = pca.fit_transform(X_train)

    # Store the PCA components
    pca_components.append(pca.components_)
    # Store the explained variance ratio of this fold
    explained_variance_ratios.append(pca.explained_variance_ratio_)


explained_variance_ratios = np.array(explained_variance_ratios)

mean_explained_variance_ratio = np.mean(explained_variance_ratios, axis=0)
std_explained_variance_ratio = np.std(explained_variance_ratios, axis=0)

percentage_error = (std_explained_variance_ratio / mean_explained_variance_ratio) * 100
print(f'percentage error: \n{percentage_error}')
print(f'explained variance ratios: \n{explained_variance_ratios}')


### Save PCA Data

In [None]:
os.system(f'mkdir -p ../Emulation/Outputs/Output_{n_samples}_{n_params}params/PCA')

In [None]:
os.system(f'mkdir -p ../Emulation/Outputs/Output_{n_samples}_{n_params}params/PCA')

# Save first 3 Principle Component data
PC_list = []
for i in list(range(10)):

 PC = X_pca.iloc[:,i]
 PC.to_csv(f'../Emulation/Outputs/Output_{n_samples}_{n_params}params/PCA/PC{i+1}.csv', index=False)
 PC_list.append(PC)




In [None]:
PC_df = pd.DataFrame(PC_list).T
PC_df.columns = ["PC1", "PC2", "PC3", "PC4", "PC5", 
                 "PC6", "PC7", "PC8", "PC9", "PC10" ]
output = pd.concat([output_file, PC_df], axis=1)
output

### Train Linear Emulator

In [None]:
import contextlib
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split

In [None]:

# Select relevant inputs only
relevant_columns = []
for col in cleaned_input_df.columns:
    relevant_columns.append(col)
    if col == 'T': break

#columns_with_multiple_values = df_x.nunique() > 1
#filtered_input = df_x.loc[:, columns_with_multiple_values]

# Select only first 5 inputs 
filtered_input = cleaned_input_df[relevant_columns]
filtered_input

In [None]:
utils.emulate_linear(filtered_input, output=output['iT'])

In [None]:
output.iloc[:,101:].columns

In [None]:
# Initialize dictionaries to store R2 scores and models
linear_r2_scores = {}
linear_mse_scores = {}
linear_rse_scores = {}
fitted_models = {}

# List of output keys to process
#output_keys = ['CO', 'EF', 'mPAP', 'dPAP', 'sPAP', 'PC1', 'PC2', 'PC3', 'PC4']
output_keys = output.iloc[:,101:].columns
#output_keys = ['t_max_dpdt', 'a_epad', 'epad', 's_a_epad', 's_epad', 'min_dpdt', 'max_dpdt',
#               'A_p', 'P_max', 'esp', 'sys', 'EF',  'Ees/Ea', 'PC1', 'PC2', 'PC3']

# Iterate through the output keys
for key in output_keys:
    model, r2, mse, rse = utils.emulate_linear(input=filtered_input, output=output[key])
    linear_r2_scores[key] = r2
    linear_mse_scores[key] = mse
    linear_rse_scores[key] = rse
    fitted_models[key] = model

# Convert the dictionaries to a DataFrame
results_df = pd.DataFrame({'R2_Score': linear_r2_scores, 'MSE': linear_mse_scores,  'RSE': linear_rse_scores, 'Model': fitted_models})
# Now `results_df` will be a DataFrame with column names as indices, R2 scores, and models
print(results_df)

# Save the DataFrame to a CSV file (models will not be saved in this step)
results_df.to_csv(f'../Emulation/Outputs/Output_{n_samples}_{n_params}params/Emulators/linear_models_and_r2_scores_{n_samples}.csv')

# To save the DataFrame with models, use pickle
results_df.to_pickle(f'../Emulation/Outputs/Output_{n_samples}_{n_params}params/Emulators/linear_models_and_r2_scores_{n_samples}.csv')
results_df.to_pickle(f'/Users/pmzff/Documents/GitHub/GiessenDataAnalysis/Emulators/linear_models_and_r2_scores_{n_samples}.csv')


In [None]:
results_df['Model']['iT'].intercept_

### Calibrate Emulator 

In [None]:
calibrate_output_keys = ['t_max_dpdt', 'a_epad', 'epad', 's_a_epad', 's_epad', 'min_dpdt', 'max_dpdt',
                         'A_p', 'P_max', 'esp', 'sys', 'EF',  'Ees/Ea', 'iT', 'PC1', 'PC2', 'PC3']
filtered_output = output[calibrate_output_keys]
filtered_output

In [None]:
selected_rows = results_df.loc[calibrate_output_keys]
selected_rows


In [None]:
# Build beta matrix (d * p, where d is dimension of y_obs and p is dimension of X)
beta_matrix = []
intercept = []

# Selects which observation to calibrate on
which_obs = 3

for index, row_entry in selected_rows.iterrows():
    model = row_entry['Model']
    coeffs = model.coef_
    b0 = model.intercept_

    beta_matrix.append(coeffs)
    intercept.append(b0)

# Convert the list to a NumPy array
beta_matrix = np.array(beta_matrix)
intercept = np.array(intercept)
intercept = intercept.reshape(len(intercept),1)

print(beta_matrix.shape)
print(intercept.shape)

# Select observation and reshape to be (d, 1)
Y_obs = np.array(filtered_output.T[which_obs])
Y_obs = Y_obs.reshape(len(Y_obs), 1)
print(Y_obs.shape)

# Scale observation by intercepts of models
Y_scaled = Y_obs - intercept
print(Y_scaled.shape)

# Compute the pseudo-inverse of the coefficient matrix
beta_inv = np.linalg.inv(beta_matrix.T @ beta_matrix) @ beta_matrix.T
x_hat = beta_inv @ Y_scaled



In [None]:
# Convert the calibrated inputs to a DataFrame
x_hat_df = pd.DataFrame(x_hat, index=filtered_input.iloc[which_obs].T.index)

# Concatenate the true input value and calibrated values
result = pd.concat([filtered_input.iloc[which_obs].T, x_hat_df], axis=1)

# Rename the columns
result.columns = ['x_true', 'x_calibrated']

result

In [None]:
# Feed calibrated x_hat back into linear model 
y_calibrated = (beta_matrix @ x_hat) + intercept 

y_compare = np.hstack([Y_obs, y_calibrated])
y_compare = pd.DataFrame(y_compare)
y_compare.columns = ("y_true", "y_calibrated")

y_compare.to_csv('multiple_output_calibration_result_y.csv', index=False)
y_compare

### Plot Observation 

In [None]:
fig, ax = plt.subplots()

ind = which_obs

t = range(100) # Time adjustment
p_pat = output_file.iloc[ind, :100].values # Pressure transient

# Plot the pressure transient for each realization
ax.plot(t, p_pat, label=f'Realisation {ind}')

# Set labels and title
ax.set_xlabel('Time Index')
ax.set_ylabel('Pressure (mmHg)')
ax.set_title('Pressure Transients in Right Ventricle')

# Add legend to the plot
# ax.legend()
fig.set_size_inches(4,2)
# Display the plot
plt.show()

### Bayesian Calibration 

In [None]:
## Define priors
# mean
c_svn_mu = 20.5
r_pat_mu = 0.31
c_pat_mu = 3.8
rv_mu = 1.15
t_mu = 0.61 #1

# variances
c_svn_sd = 3.42**2
r_pat_sd = 0.05**2
c_pat_sd = 0.63**2
rv_sd = 0.48**2
t_sd = 0.0000001 #0.15**2

mu_0 = np.array([c_svn_mu, r_pat_mu, c_pat_mu, rv_mu, t_mu])[:, np.newaxis]
Sd = [c_svn_sd, r_pat_sd, c_pat_sd, rv_sd, t_sd]
Sigma_0 = np.diag(Sd)/5

# Build beta matrix (d * p, where d is dimension of y_obs and p is dimension of X)
beta_matrix = []
intercept = []

# Selects which observation to calibrate on
which_obs = 3

for index, row_entry in selected_rows.iterrows():
    model = row_entry['Model']
    coeffs = model.coef_
    b0 = model.intercept_

    beta_matrix.append(coeffs)
    intercept.append(b0)

# Convert the list to a NumPy array
beta_matrix = np.array(beta_matrix)
intercept = np.array(intercept)
intercept = intercept.reshape(len(intercept),1)

# Select observation and reshape to be (d, 1)
Y_obs = np.array(filtered_output.T[which_obs])
Y_obs = Y_obs.reshape(len(Y_obs), 1)

# Scale observation by intercepts of models
Y_scaled = Y_obs - intercept

# Compute the posterior covariance
Sigma_post_inv = (beta_matrix.T @ beta_matrix) + np.linalg.inv(Sigma_0)
Sigma_post = np.linalg.inv(Sigma_post_inv)

# Cmpute the posterior mean
Mu_post = Sigma_post @ (beta_matrix.T @ Y_scaled + np.linalg.inv(Sigma_0) @ mu_0)


In [None]:
# Convert the calibrated inputs to a DataFrame
Mu_post_df = pd.DataFrame(Mu_post, index=filtered_input.iloc[which_obs].T.index)
true_observation = pd.DataFrame(filtered_input.iloc[which_obs])

# Feed calibrated x_hat back into linear model 
y_calibrated = (beta_matrix @ Mu_post) + intercept 

y_compare = np.hstack([Y_obs, y_calibrated])
y_compare = pd.DataFrame(y_compare)
y_compare.columns = ("y_true", "y_calibrated")

# Concatenate the true input value and calibrated values
bayes_result = pd.concat([filtered_input.iloc[which_obs].T, Mu_post_df, y_compare], axis=1)

# Rename the columns
bayes_result.columns = ['x_true', 'x_calibrated', "y_true", "y_calibrated"]

bayes_result

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import norm

# Convert (5,1) arrays to (5,) for proper indexing
prior_means = mu_0.flatten()
prior_stds = np.sqrt(np.diag(Sigma_0))  # Extract standard deviations

posterior_means = Mu_post.flatten()
posterior_stds = np.sqrt(np.diag(Sigma_post))  # Extract standard deviations

# True values from filtered_input
true_values = filtered_input.iloc[which_obs].values

# Parameter names
param_names = ["c_svn", "r_pat", "c_pat", "rv_Eact", "T"]

# Create subplots for distributions
fig, axes = plt.subplots(1, 5, figsize=(18, 4))

for i, ax in enumerate(axes):
    # Define x-range based on prior and posterior means
    x_min = min(prior_means[i] - 3 * prior_stds[i], posterior_means[i] - 3 * posterior_stds[i])
    x_max = max(prior_means[i] + 3 * prior_stds[i], posterior_means[i] + 3 * posterior_stds[i])
    x = np.linspace(x_min, x_max, 100)

    # Compute PDFs
    prior_pdf = norm.pdf(x, prior_means[i], prior_stds[i])
    posterior_pdf = norm.pdf(x, posterior_means[i], posterior_stds[i])

    # Plot prior and posterior distributions
    ax.plot(x, prior_pdf, label="Prior", linestyle="dashed", color="blue")
    ax.plot(x, posterior_pdf, label="Posterior", linestyle="solid", color="red")

    # Plot true value as a vertical line
    ax.axvline(true_values[i], color="green", linestyle="dotted", label="True Value")

    # Labels and title
    ax.set_title(param_names[i])
    ax.set_xlabel("Value")
    ax.set_ylabel("Density")
    ax.legend()

plt.tight_layout()
plt.show()

# ---- Plot Covariance Matrices ----
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

# Plot Prior Covariance Matrix
sns.heatmap(Sigma_0, annot=True, fmt=".3f", cmap="RdBu", xticklabels=param_names, yticklabels=param_names, ax=axes[0])
axes[0].set_title("Prior Covariance Matrix")

# Plot Posterior Covariance Matrix
sns.heatmap(Sigma_post, annot=True, fmt=".4f", cmap="PiYG", xticklabels=param_names, yticklabels=param_names, ax=axes[1])
axes[1].set_title("Posterior Covariance Matrix")

plt.tight_layout()
plt.show()

### MSE of Bayes Calibration for Multiple Observations

In [None]:
## Define priors
# mean
c_svn_mu = 20.5
r_pat_mu = 0.31
c_pat_mu = 3.8
rv_mu = 1.15
t_mu = 1

# variances
c_svn_sd = 3.42**2
r_pat_sd = 0.05**2
c_pat_sd = 0.63**2
rv_sd = 0.48**2
t_sd = 0.15**2

mu_0 = np.array([c_svn_mu, r_pat_mu, c_pat_mu, rv_mu, t_mu])[:, np.newaxis]
Sd = [c_svn_sd, r_pat_sd, c_pat_sd, rv_sd, t_sd]
Sigma_0 = np.diag(Sd)

# Build beta matrix (d * p, where d is dimension of y_obs and p is dinemnsion of X)
beta_matrix = []
intercept = []

for index, row_entry in selected_rows.iterrows():
    model = row_entry['Model']
    coeffs = model.coef_
    b0 = model.intercept_

    beta_matrix.append(coeffs)
    intercept.append(b0)

# Convert the list to a NumPy array
beta_matrix = np.array(beta_matrix)
intercept = np.array(intercept)
intercept = intercept.reshape(len(intercept),1)

x_differences = []

for row in range(10):
 # Select observation and reshape to be (d, 1)
 Y_obs = np.array(filtered_output.T[row])
 Y_obs = Y_obs.reshape(len(Y_obs), 1)
 
 # Scale observation by intercepts of models
 Y_scaled = Y_obs - intercept


 # Compute the posterior covariance
 Sigma_post_inv = (beta_matrix.T @ beta_matrix) + np.linalg.inv(Sigma_0)
 Sigma_post = np.linalg.inv(Sigma_post_inv)

 # Cmpute the posterior mean
 Mu_post = Sigma_post @ (beta_matrix.T @ Y_scaled + np.linalg.inv(Sigma_0) @ mu_0)

 # Compute squared-diff between true and calibrated x
 true = np.array(filtered_input.iloc[row].T)
 true = true.reshape(len(mu_0),1)
 diff = (Mu_post - true)**2 

 # Append arrary
 x_differences.append(diff)

# Compute MSE
bayes_mse_x = np.mean(np.hstack(x_differences), axis=1)

bayes_mse_x_df = pd.DataFrame(bayes_mse_x)
bayes_mse_x_df.columns = ['MSE']
bayes_mse_x_df.index = filtered_input.columns
#bayes_mse_x_df.to_csv('bayes_MSE_multi_output_x.csv')

bayes_mse_x_df

### Including model error and observation error

### Bayesian Calibration  
Assume we have the following linear regression model $$y = \beta X + \beta_0 + \epsilon_{model}, \epsilon_{model} \sim N_p(0, \sigma_{model}^2I)$$ 

Assume we also have the following observation model $$y_{obs} = y + \epsilon_{obs}, \epsilon_{obs} \sim N_p(0, \sigma_{obs}^2I)$$. 

The posterior distribution can be shown that $$\pi(X|y_{obs}) \sim N(\mu_{post}, \Sigma_{post})$$ where $$\mu_{post} = \Sigma_{post} \left( \beta^T \left( \sigma_{model}^2 + \sigma_{obs}^2\right)^{-1} y_{obs} + \Sigma_0^{-1}\mu_0\right)$$ and $$ \Sigma_{post} = \left( \beta^T( \sigma_{model}^2 + \sigma_{obs}^2)^{-1}\beta + \Sigma_0^{-1}\right)^{-1}$$


In [None]:
# Selects which observation to calibrate on
which_obs = 1

## Define priors
# mean
c_svn_mu = 20.5
r_pat_mu = 0.31
c_pat_mu = 3.8
rv_mu = 1.15
t_mu = filtered_input['T'][which_obs]

# variances
c_svn_sd = 3.42**2
r_pat_sd = 0.05**2
c_pat_sd = 0.63**2
rv_sd = 0.48**2
t_sd = 0.00000001 #0.15**2

mu_0 = np.array([c_svn_mu, r_pat_mu, c_pat_mu, rv_mu, t_mu])[:, np.newaxis]
Sd = [c_svn_sd, r_pat_sd, c_pat_sd, rv_sd, t_sd]
Sigma_0 = np.diag(Sd)


# Model error
epsilon_model = np.diag(selected_rows['RSE']**2)

# Observation error 
obs_error_scale = 0.05
obs_error = np.std(filtered_output)*obs_error_scale
epsilon_obs = np.diag(obs_error)

# total error
full_error = epsilon_obs + epsilon_model

# Build beta matrix (d * p, where d is dimension of y_obs and p is dimension of X)
beta_matrix = []
intercept = []


for index, row_entry in selected_rows.iterrows():
    model = row_entry['Model']
    coeffs = model.coef_
    b0 = model.intercept_

    beta_matrix.append(coeffs)
    intercept.append(b0)

# Convert the list to a NumPy array
beta_matrix = np.array(beta_matrix)
intercept = np.array(intercept)
intercept = intercept.reshape(len(intercept),1)

# Select observation and reshape to be (d, 1)
Y_obs = np.array(filtered_output.T[which_obs])
Y_obs = Y_obs.reshape(len(Y_obs), 1)

# Scale observation by intercepts of models
Y_scaled = Y_obs - intercept

# Compute the posterior covariance
Sigma_post_inv = (beta_matrix.T @ np.linalg.inv(full_error) @ beta_matrix) + np.linalg.inv(Sigma_0)
Sigma_post = np.linalg.inv(Sigma_post_inv)

# Cmpute the posterior mean
Mu_post = Sigma_post @ (beta_matrix.T @ np.linalg.inv(full_error) @ Y_scaled + np.linalg.inv(Sigma_0) @ mu_0)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import norm

# Convert (5,1) arrays to (5,) for proper indexing
prior_means = mu_0.flatten()
prior_stds = np.sqrt(np.diag(Sigma_0))  # Extract standard deviations

posterior_means = Mu_post.flatten()
posterior_stds = np.sqrt(np.diag(Sigma_post))  # Extract standard deviations

# True values from filtered_input
true_values = filtered_input.iloc[which_obs].values

# Parameter names
param_names = ["c_svn", "r_pat", "c_pat", "rv_Eact", "T"]

# Create subplots for distributions
fig, axes = plt.subplots(1, 5, figsize=(18, 4))

for i, ax in enumerate(axes):
    # Define x-range based on prior and posterior means
    x_min = min(prior_means[i] - 3 * prior_stds[i], posterior_means[i] - 3 * posterior_stds[i])
    x_max = max(prior_means[i] + 3 * prior_stds[i], posterior_means[i] + 3 * posterior_stds[i])
    x = np.linspace(x_min, x_max, 100)

    # Compute PDFs
    prior_pdf = norm.pdf(x, prior_means[i], prior_stds[i])
    posterior_pdf = norm.pdf(x, posterior_means[i], posterior_stds[i])

    # Plot prior and posterior distributions
    ax.plot(x, prior_pdf, label="Prior", linestyle="dashed", color="blue")
    ax.plot(x, posterior_pdf, label="Posterior", linestyle="solid", color="red")

    # Plot true value as a vertical line
    ax.axvline(true_values[i], color="green", linestyle="dotted", label="True Value")

    # Labels and title
    ax.set_title(param_names[i])
    ax.set_xlabel("Value")
    ax.set_ylabel("Density")
    ax.legend()

plt.tight_layout()
plt.show()

# ---- Plot Covariance Matrices ----
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

# Plot Prior Covariance Matrix
sns.heatmap(Sigma_0, annot=True, fmt=".3f", cmap="RdBu", xticklabels=param_names, yticklabels=param_names, ax=axes[0])
axes[0].set_title("Prior Covariance Matrix")

# Plot Posterior Covariance Matrix
sns.heatmap(Sigma_post, annot=True, fmt=".4f", cmap="PiYG", xticklabels=param_names, yticklabels=param_names, ax=axes[1])
axes[1].set_title("Posterior Covariance Matrix")

plt.tight_layout()
plt.show()