# Experiments with Synthetic Data using a formulation of the Swing-Equation

In [1]:
# Add the 'src' as root folder, to find other modules in the project
import sys
sys.path.append("../../")

In [2]:
# local imports
from simulator.simulator import Swing_5_Equation, Swing_4_Equation

# external imports
import torch
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt

from sbi.inference import SNPE, SNRE, SNLE, simulate_for_sbi, prepare_for_sbi
from sbi.utils import posterior_nn, BoxUniform
from sklearn.metrics import mean_squared_error, mean_absolute_error

ModuleNotFoundError: No module named 'torch'

# Model Definition

Different Swing Equation formulations using Euler Maruyama to solve.
These are just formulations using different parameters describing power grid dynamics with different precision.
In this case the 5 Parameter model should model the frequency dynamics more precise.

1. 4 Parameter Model ``Swing_4_Equation``:

    $\frac{d\omega}{dt} = c_1\omega + c_2\Theta + P_{const} + \epsilon\xi$
2. 5 Parameter Model ``Swing_5_Equation``:
    
    $\frac{d\omega}{dt} = c_1\omega + c_2\Theta + P_0 + P_1t + \epsilon\xi$

For a more detailed explanation, analysis and proposal of the Equations make sure to read (Artikel verlinken), or view the experiment [notebook](swing_equation_parameter.ipynb).

In [None]:
# 4 Parameter Model assumption on true values for theta
# - tensor(c_1, c_2, P_const, epsilon)
theta_true_4 = torch.tensor([0.5, 0.5, 0.1, 0.05])

# 5 Parameter Model assumption on true values for theta
# - tensor(c_1, c_2, P_0, P_1, epsilon)
theta_true_5 = torch.tensor([4.0, 40.0, 3.0, 60.0, 7.0])

# Define Training Parameters

1. [Simulation Parameters](#simulation-parameters)
2. [Neural Density Estimator Parameters](#neural-density-estimator-parameters)


## Simulation Parameters

In [None]:
# Define the simulation parameters
# Define length and timesteps of the simulations
dt = 0.01
T = 10

# Define 2 simulators with different model precision
simulator_4 = Swing_4_Equation(dt=dt, T=T)
simulator_5 = Swing_5_Equation(dt=dt, T=T)

# Define proposals for parameter distributions

# Modify proposal to be a uniform distribution with possible parameter values
proposal_low_5 = torch.tensor([0.1, 1.0, 0.1, 40.0, 0.1])
proposal_high_5 = torch.tensor([10.0, 100.0, 10.0, 70.0, 10.0])
proposal_5 = BoxUniform(low=proposal_low_5, high=proposal_high_5)

# Define proposal for the 4-parameter model as a uniform distribution
proposal_low_4 = torch.tensor([0.1, 0.1, 0.01, 0.01])
proposal_high_4 = torch.tensor([1.0, 1.0, 0.5, 0.1])
proposal_4 = BoxUniform(low=proposal_low_4, high=proposal_high_4)

# # Define proposal for the 4-parameter model as a normal distribution
# proposal_means_4 = torch.tensor([0.5, 0.5, 0.1, 0.05])
# proposal_stddevs_4 = torch.tensor([0.1, 0.1, 0.05, 0.01])
# proposal_means_4 = torch.max(proposal_means_4, torch.tensor(1e-6))

# # Create a truncated normal distribution centered around proposal_means with standard deviations
# proposal_4 = TruncatedNormal(
#     loc=proposal_means_4, 
#     scale=proposal_stddevs_4,
#     min=torch.tensor([0.1, 0.1, 0.01, 0.01]), 
#     max=torch.tensor([1.0, 1.0, 0.5, 0.1]) 
# )

# # Define proposal for the 5-parameter model
# proposal_means_5 = torch.tensor([4.0, 40.0, 3.0, 60.0, 7.0])
# proposal_stddevs_5 = torch.tensor([1.0, 5.0, 1.0, 5.0, 1.0])
# proposal_means_5 = torch.max(proposal_means_5, torch.tensor(1e-6))

# # Create a truncated normal distribution centered around proposal_means with standard deviations
# proposal_5 = TruncatedNormal(
#     loc=proposal_means_5, 
#     scale=proposal_stddevs_5,
#     min=torch.tensor([0.1, 1.0, 0.1, 40.0, 0.1]),  
#     max=torch.tensor([10.0, 100.0, 10.0, 70.0, 10.0])
# )

# Define simulation process parameters
num_simulations = 20000
num_workers = 20
simulation_batch_size = 50
seed = None
show_progress_bar = True

## Neural Density Estimator Parameters

In [None]:
# Density estimator parameters
model = 'maf'
hidden_features = 50
num_transforms = 10
z_score_x = 'independent'
z_score_theta = 'independent'
num_bins = 10
num_components = 10

# Simulation Process

In [None]:
# Wrap the simulator function for compatibility with SBI
post_simulator_4, proposal_4 = prepare_for_sbi(simulator_4.simulate, proposal_4)
post_simulator_5, proposal_5 = prepare_for_sbi(simulator_5.simulate, proposal_5)

# Run the inference procedure to generate samples and corresponding simulated data points
theta_4, x_4 = simulate_for_sbi(
    simulator=post_simulator_4, 
    proposal=proposal_4, 
    num_simulations=num_simulations, 
    num_workers=num_workers, 
    simulation_batch_size=simulation_batch_size, 
    seed=seed, 
    show_progress_bar=show_progress_bar
)   

theta_5, x_5 = simulate_for_sbi(
    simulator=post_simulator_5, 
    proposal=proposal_5, 
    num_simulations=num_simulations, 
    num_workers=num_workers, 
    simulation_batch_size=simulation_batch_size, 
    seed=seed, 
    show_progress_bar=show_progress_bar
)   

# Training Process

In [None]:
# Define the default observation by simulating the true parameters
default_observation_4 = simulator_4.simulate(theta_true_4)
default_observation_5 = simulator_5.simulate(theta_true_5)

# # Instantiate the neural density estimator
# neural_posterior_4 = posterior_nn(
#     model=model, 
#     hidden_features=hidden_features, 
#     num_transforms=num_transforms, 
#     num_bins=num_bins, 
#     num_components=num_components, 
#     z_score_theta=z_score_theta, 
#     z_score_x=z_score_x
# )

# neural_posterior_5 = posterior_nn(
#     model=model, 
#     hidden_features=hidden_features, 
#     num_transforms=num_transforms, 
#     num_bins=num_bins, 
#     num_components=num_components, 
#     z_score_theta=z_score_theta, 
#     z_score_x=z_score_x
# )

# # Set up the inference procedure with the SNPE procedure
# inference_4 = SNPE(prior=proposal_4, density_estimator=neural_posterior_4)
# inference_5 = SNPE(prior=proposal_5, density_estimator=neural_posterior_5)

inference_4_snpe = SNPE(prior=proposal_4)
inference_5_snpe = SNPE(prior=proposal_5)

inference_4_snle = SNLE(prior=proposal_4)
inference_5_snle = SNLE(prior=proposal_5)

inference_4_snre = SNRE(prior=proposal_4)
inference_5_snre = SNRE(prior=proposal_5)

# Train the neural density estimator
density_estimator_4_snpe = inference_4_snpe.append_simulations(theta_4, x_4).train()
density_estimator_5_snpe = inference_5_snpe.append_simulations(theta_5, x_5).train()

density_estimator_4_snle = inference_4_snle.append_simulations(theta_4, x_4).train()
density_estimator_5_snle = inference_5_snle.append_simulations(theta_5, x_5).train()

density_estimator_4_snre = inference_4_snre.append_simulations(theta_4, x_4).train()
density_estimator_5_snre = inference_5_snre.append_simulations(theta_5, x_5).train()

# Build the posterior for the given parameters
posterior_4_snpe = inference_4_snpe.build_posterior(density_estimator_4_snpe).set_default_x(default_observation_4)
posterior_5_snpe = inference_5_snpe.build_posterior(density_estimator_5_snpe).set_default_x(default_observation_5)

posterior_4_snle = inference_4_snle.build_posterior(density_estimator_4_snle).set_default_x(default_observation_4)
posterior_5_snle = inference_5_snle.build_posterior(density_estimator_5_snle).set_default_x(default_observation_5)

posterior_4_snre = inference_4_snre.build_posterior(density_estimator_4_snre).set_default_x(default_observation_4)
posterior_5_snre = inference_5_snre.build_posterior(density_estimator_5_snre).set_default_x(default_observation_5)

# Model Analysis

- [Statistical, Theoretical Analysis and Results](#statistical-theoretical-analysis-and-results)
- [Visual Analysis and Results](#visual-analysis-and-results)

# Statistical, Theoretical Analysis and Results

In [None]:
# Parameter names
param_names_4 = ['c_1', 'c_2', 'P_const', 'epsilon']
param_names_5 = ['c_1', 'c_2', 'P_0', 'P_1', 'epsilon']

# Define a function to calculate the metrics
def calculate_metrics(true_values, predicted_values):
    # If true_values is a scalar, convert it to a 1D array of the same length as predicted_values
    if np.isscalar(true_values):
        true_values = np.full(predicted_values.shape, true_values)
    
    # Calculate the Mean Squared Error (MSE)
    mse = mean_squared_error(true_values, predicted_values)
    
    # Calculate the Mean Absolute Error (MAE)
    mae = mean_absolute_error(true_values, predicted_values)
    
    return mse, mae

# Get the true parameters
true_parameters_4 = theta_true_4.numpy()
true_parameters_5 = theta_true_5.numpy()

# Get the model predictions
predicted_values_4_snpe = posterior_4_snpe.sample((10000,), x=default_observation_4, show_progress_bars=False).numpy()
predicted_values_5_snpe = posterior_5_snpe.sample((10000,), x=default_observation_5, show_progress_bars=False).numpy()
predicted_values_4_snle = posterior_4_snle.sample((10000,), x=default_observation_4, show_progress_bars=False).numpy()
predicted_values_5_snle = posterior_5_snle.sample((10000,), x=default_observation_5, show_progress_bars=False).numpy()
predicted_values_4_snre = posterior_4_snre.sample((10000,), x=default_observation_4, show_progress_bars=False).numpy()
predicted_values_5_snre = posterior_5_snre.sample((10000,), x=default_observation_5, show_progress_bars=False).numpy()

# Initialize empty dataframes for each model and inference method
errors_snpe_4 = pd.DataFrame(columns=param_names_4, index=['MSE', 'MAE'])
errors_snpe_5 = pd.DataFrame(columns=param_names_5, index=['MSE', 'MAE'])
errors_snle_4 = pd.DataFrame(columns=param_names_4, index=['MSE', 'MAE'])
errors_snle_5 = pd.DataFrame(columns=param_names_5, index=['MSE', 'MAE'])
errors_snre_4 = pd.DataFrame(columns=param_names_4, index=['MSE', 'MAE'])
errors_snre_5 = pd.DataFrame(columns=param_names_5, index=['MSE', 'MAE'])

# Calculate and store the MSE and MAE for each parameter of model 4 for SNPE
for i in range(len(param_names_4)):
    mse, mae = calculate_metrics(true_parameters_4[i], predicted_values_4_snpe[:, i])
    errors_snpe_4.loc['MSE', param_names_4[i]] = mse
    errors_snpe_4.loc['MAE', param_names_4[i]] = mae

    mse, mae = calculate_metrics(true_parameters_4[i], predicted_values_4_snre[:, i])
    errors_snre_4.loc['MSE', param_names_4[i]] = mse
    errors_snre_4.loc['MAE', param_names_4[i]] = mae

    mse, mae = calculate_metrics(true_parameters_4[i], predicted_values_4_snle[:, i])
    errors_snle_4.loc['MSE', param_names_4[i]] = mse
    errors_snle_4.loc['MAE', param_names_4[i]] = mae

# Calculate and store the MSE and MAE for each parameter of model 5 for SNPE
for i in range(len(param_names_5)):
    mse, mae = calculate_metrics(true_parameters_5[i], predicted_values_5_snpe[:, i])
    errors_snpe_5.loc['MSE', param_names_5[i]] = mse
    errors_snpe_5.loc['MAE', param_names_5[i]] = mae

    mse, mae = calculate_metrics(true_parameters_5[i], predicted_values_5_snre[:, i])
    errors_snre_5.loc['MSE', param_names_5[i]] = mse
    errors_snre_5.loc['MAE', param_names_5[i]] = mae

    mse, mae = calculate_metrics(true_parameters_5[i], predicted_values_5_snle[:, i])
    errors_snle_5.loc['MSE', param_names_5[i]] = mse
    errors_snle_5.loc['MAE', param_names_5[i]] = mae

# Define the directory
directory = "../../../results/synthetic_data/tables"

# Ensure the directory exists
os.makedirs(directory, exist_ok=True)

# Save the dataframes to CSV files
errors_snpe_4.to_csv(os.path.join(directory, "errors_snpe_4.csv"))
errors_snpe_5.to_csv(os.path.join(directory, "errors_snpe_5.csv"))
errors_snle_4.to_csv(os.path.join(directory, "errors_snle_4.csv"))
errors_snle_5.to_csv(os.path.join(directory, "errors_snle_5.csv"))
errors_snre_4.to_csv(os.path.join(directory, "errors_snre_4.csv"))
errors_snre_5.to_csv(os.path.join(directory, "errors_snre_5.csv"))

# Print the tables in a 2x3 grid
print("Error Tables:")
print("SNPE Model (4 Parameters):")
print(errors_snpe_4)
print("\nSNPE Model (5 Parameters):")
print(errors_snpe_5)
print("\nSNLE Model (4 Parameters):")
print(errors_snle_4)
print("\nSNLE Model (5 Parameters):")
print(errors_snle_5)
print("\nSNRE Model (4 Parameters):")
print(errors_snre_4)
print("\nSNRE Model (5 Parameters):")
print(errors_snre_5)

# Visual Analysis and Results

In [3]:
def plot_posteriors(posterior_4, posterior_5, theta_true_4, theta_true_5, model_name, num_samples=1000):
    # Parameter names
    param_names_4 = ['c_1', 'c_2', 'P_const', 'epsilon']
    param_names_5 = ['c_1', 'c_2', 'P_0', 'P_1', 'epsilon']

    # Parameter boundaries
    bounds_4 = [[0.1, 1.0], [0.1, 1.0], [0.01, 0.5], [0.01, 0.1]]
    bounds_5 = [[0.1, 10.0], [1.0, 100.0], [0.1, 10.0], [40.0, 70.0], [0.1, 10.0]]

    # Sample from the posteriors
    samples_4 = posterior_4.sample((num_samples,), show_progress_bars=False)
    samples_5 = posterior_5.sample((num_samples,), show_progress_bars=False)

    # Create subplots
    fig, axs = plt.subplots(2, 5, figsize=(20, 10))

    # Plot the first posterior
    for i, param_name in enumerate(param_names_4):
        axs[0, i].hist(samples_4[:, i], bins=30, density=True, label=f'Posterior {param_name}', range=bounds_4[i])
        axs[0, i].axvline(theta_true_4[i].item(), color='r', linestyle='dashed', linewidth=2, label=f'True {param_name}')
        axs[0, i].legend()
    axs[0, 0].set_title('swing4', loc='left')

    # Remove unused subplot
    fig.delaxes(axs[0, 4])

    # Plot the second posterior
    for i, param_name in enumerate(param_names_5):
        axs[1, i].hist(samples_5[:, i], bins=30, density=True, label=f'Posterior {param_name}', range=bounds_5[i])
        axs[1, i].axvline(theta_true_5[i].item(), color='r', linestyle='dashed', linewidth=2, label=f'True {param_name}')
        axs[1, i].legend()
    axs[1, 0].set_title('swing5', loc='left')

    # Show the plot
    plt.tight_layout()

    # Add labels to the plot
    fig.text(0, 1, 'Parameter Value', ha='center', va='center')
    fig.text(0, 1, 'Density', ha='center', va='center', rotation='vertical')

    # Define the directory and filename
    directory = "../../../results/synthetic_data/plots"
    filename = f"posteriors_{model_name.lower()}.pdf"

    # Ensure the directory exists
    os.makedirs(directory, exist_ok=True)

    # Save the plot
    plt.savefig(os.path.join(directory, filename))

# Call the function for SNPE
plot_posteriors(posterior_4_snpe, posterior_5_snpe, theta_true_4, theta_true_5, "SNPE")

# Call the function for SNRE
plot_posteriors(posterior_4_snre, posterior_5_snre, theta_true_4, theta_true_5, "SNRE")

# Call the function for SNLE
plot_posteriors(posterior_4_snle, posterior_5_snle, theta_true_4, theta_true_5, "SNLE")


NameError: name 'posterior_4_snpe' is not defined