# Hierarchical pendulum SBI, using a csv of (theta, x)

## The dataset: simple static pendulum
Using the position of a pendulum at one point in time from a collection of pendulums on two different planets, determine the position associated with L, $\theta$, and $a_g$ using a deep ensemble.

In [1]:
## first, import all the necessary modules
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
#from src.scripts import utils
import numpyro

In [2]:
import deepbench
from deepbench.physics_object import Pendulum
print(deepbench.__version__)


0.2.01


In [3]:
from sbi import utils, inference, analysis
# from sbi import inference
from sbi.inference import SNPE, simulate_for_sbi, prepare_for_sbi
from sbi.inference.base import infer
import torch

## Prepare dataframe that associates (theta, x)
Here, I run a quick simulation that grid-searches all the theta values.

In [8]:
def save_thetas_and_xs_single(thetas_in, rng_key):
    # except length will have 8 elements as will theta
    length, theta, μ_a_g, σ_a_g = thetas_in

    a_g = rs.normal(loc=μ_a_g, scale=σ_a_g)
    #numpyro.sample("a_g", numpyro.distributions.TruncatedNormal(float(μ_a_g), float(σ_a_g), low = 0.01))
    #rng_key = jax.random.PRNGKey(1)  # You can use any valid random key
    #a_g_1 = numpyro.sample("a_g", numpyro.distributions.TruncatedNormal(float(μ_a_g), float(σ_a_g), low = 0.01), rng_key = rng_key)

    #a_g = [a_g_0, a_g_1]
    #print('a_g', a_g)
    
    
    pendulum = Pendulum(
                pendulum_arm_length=float(length),
                starting_angle_radians=float(theta),
                acceleration_due_to_gravity=float(a_g),
                noise_std_percent={
                    "pendulum_arm_length": 0.0,
                    "starting_angle_radians": 0.1,
                    "acceleration_due_to_gravity": 0.0,
                },
            )
    x = pendulum.create_object(0.75, noiseless=False)
    del pendulum
    return a_g, x

In [9]:
from numpyro.util import enable_x64

enable_x64()

import jax.numpy as jnp
import jax
rng_key = jax.random.PRNGKey(0)
print(rng_key)

[0 0]


In [10]:
# set priors for all of these things

In [16]:
# okay now make a dataframe with a bunch of different options for the parameters
# generate the L, theta, a_g values somewhat randomly between ranges
length_percent_error_all = 0.0
theta_percent_error_all = 0.1
a_g_percent_error_all = 0.0
pos_err = 0.0

time = 0.75

length_df = 100
thetas = np.zeros((length_df, 5))
xs = np.zeros((length_df))
#labels = np.zeros((2*length_df, 2))
#error = []
#y_noisy = []

for r in range(length_df):
    
    rs = np.random.RandomState()#2147483648)# 
    
    
    length = abs(rs.normal(loc=5, scale=2))
    theta = abs(rs.normal(loc=jnp.pi/100, scale=jnp.pi/500))
    #length, theta, μ_a_g, σ_a_g = thetas_in
    μ_a_g = abs(rs.normal(loc=10, scale=2))
    σ_a_g = abs(rs.normal(loc=1, scale=0.5))
    

    thetas_in = [length, theta, μ_a_g, σ_a_g]

    a_g, x = save_thetas_and_xs_single(thetas_in, rs)
        
    
    thetas[r,:] = [length, theta, a_g, μ_a_g, σ_a_g]
    xs[r] = x
    #labels[r,:] = [0, r]
    
    
    

    


In [None]:
# Is there a way to do an easier embedding network?
import torch
import torch.nn as nn


class SummaryNet(nn.Module):
    """
    Summary Network module
    """

    def __init__(self):
        super().__init__()
        # 2D convolutional layer
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=8, kernel_size=3, padding='same')
        self.bn1 = nn.BatchNorm2d(8)
        self.conv2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, padding='same')
        self.bn2 = nn.BatchNorm2d(16)

        self.conv3 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding='same')
        self.bn3 = nn.BatchNorm2d(32)
        self.conv4 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, padding='same')
        self.bn4 = nn.BatchNorm2d(32)

        self.conv5 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding='same')
        self.bn5 = nn.BatchNorm2d(64)
        self.conv6 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding='same')
        self.bn6 = nn.BatchNorm2d(128)

        # Maxpool layer that reduces 32x32 image to 4x4
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        # out_features correspond to 4*(Number of summary parameters)
        self.fc = nn.Linear(in_features=128 * 4 * 4, out_features=48)

    def forward(self, x):
        x = x.view(-1, 1, 32, 32)

        x = (self.bn1(F.relu(self.conv1(x))))
        x = self.pool(self.bn2(F.relu(self.conv2(x))))

        x = self.bn3(F.relu(self.conv3(x)))
        x = self.pool(self.bn4(F.relu(self.conv4(x))))

        x = self.bn5(F.relu(self.conv5(x)))
        x = self.pool(self.bn6(F.relu(self.conv6(x))))

        x = x.view(-1, 128 * 4 * 4)
        x = F.relu(self.fc(x))
        return x

In [25]:
import sbi
# Now let's put them in a tensor form that SBI can read.
theta = torch.tensor(thetas,dtype=torch.float32)
x = torch.tensor(xs,dtype=torch.float32)

#embedding_net = SummaryNet()

# instantiate the neural density estimator
neural_posterior = sbi.utils.posterior_nn(model='maf')#,
                                  #embedding_net=embedding_net,
                                  #hidden_features=hidden_features,
                                  #num_transforms=num_transforms)

# make a fake prior
# L, theta_0, a_g, mu, sigma

'''
length = abs(rs.normal(loc=5, scale=2))
    theta = abs(rs.normal(loc=jnp.pi/100, scale=jnp.pi/500))
    #length, theta, μ_a_g, σ_a_g = thetas_in
    μ_a_g = abs(rs.normal(loc=10, scale=2))
    σ_a_g = abs(rs.normal(loc=1, scale=0.5))
'''
prior_low = [0, jnp.pi / 1000, 0, 0, 0]
prior_high = [10, jnp.pi / 10, 20, 20, 4]

prior = utils.BoxUniform(low=torch.tensor(prior_low), high=torch.tensor(prior_high), device='cpu')

# setup the inference procedure with the SNPE-C procedure
inference = SNPE(prior=prior, density_estimator=neural_posterior, device="cpu")

# Now that we have both the simulated images and parameters defined properly, we can train the SBI.
density_estimator = inference.append_simulations(theta,x).train()
posterior = inference.build_posterior(density_estimator)

AssertionError: Simulated data must be a batch with at least two dimensions.

In [14]:
print(xs)

[[ 7.58798179e+00  2.07597931e-02  1.57562023e+01  1.42910159e+01
   1.84377293e+00  5.92555688e-02]
 [ 2.05339683e+00  2.85406868e-02  1.13419898e+01  1.20854526e+01
   1.78660172e+00 -1.17401254e-02]
 [ 5.41048906e+00  3.10157844e-02  9.65805085e+00  9.77499190e+00
   7.86417236e-01  8.40525588e-02]
 [ 5.89551163e+00  2.10998454e-02  7.37501978e+00  7.74171478e+00
   5.68880503e-01  6.13586092e-02]
 [ 4.65010046e+00  2.59832434e-02  1.33441194e+01  1.06048101e+01
   1.40196020e+00  3.87906924e-02]
 [ 3.63916820e+00  3.71920366e-02  6.93015253e+00  7.00727495e+00
   4.96412826e-01  8.03115525e-02]
 [ 1.17014982e+00  2.43437293e-02  1.08487147e+01  8.74500622e+00
   1.82928529e+00 -1.62962656e-02]
 [ 8.13093751e+00  2.26712859e-02  8.49684037e+00  9.61914095e+00
   1.16070774e+00  1.57255549e-01]
 [ 7.81344622e+00  2.18606391e-02  8.66833626e+00  8.91278844e+00
   7.60943405e-01  1.17466968e-01]
 [ 7.18380606e+00  2.93259361e-02  1.04340244e+01  1.10153835e+01
   7.18935034e-01  1.2360

Okay define the priors as uniform distributions.

In [None]:
num_dim = 3

# there are three priors for the non-hierarchical run of SBI:
low_bounds = torch.tensor([1, np.pi/500, 1])
high_bounds = torch.tensor([10, 3*np.pi/200, 20])
# length, theta, μ_a_g, σ_a_g, σ = thetas

prior = utils.BoxUniform(low = low_bounds, high = high_bounds)
print(prior.sample())

# there are 
low_bounds = torch.tensor([1, np.pi/500, 1, 0.1])
high_bounds = torch.tensor([10, 3*np.pi/200, 20, 5])
# length, theta, μ_a_g, σ_a_g, σ = thetas

prior_hierarchical = utils.BoxUniform(low = low_bounds, high = high_bounds)
print(prior.sample())

In [None]:
def simulator(thetas):#, percent_errors):
    # just plop the pendulum within here
    length, theta, a_g = thetas
    #print('heres what were inputting', thetas, a_g)
    #length_percent_error_all, theta_percent_error_all, a_g_percent_error_all = \
    #    percent_errors
    pendulum = Pendulum(
        pendulum_arm_length=float(length),
        starting_angle_radians=float(theta),
        acceleration_due_to_gravity=float(a_g),
        noise_std_percent={
            "pendulum_arm_length": 0.0,
            "starting_angle_radians": 0.1,
            "acceleration_due_to_gravity": 0.0,
        },
    )
    output = np.array(pendulum.create_object(np.linspace(0,2,100), noiseless=False))
    #torch.tensor(pendulum.create_object(0.75, noiseless=False))
    
    return output



def linear_gaussian(theta):
    output = theta + 1.0 + torch.randn_like(theta) * 0.1
    return output

## Hierarchical SBI
Okay so normally the simulator functions within SBI to return one observation for every time you sample from the prior. So how do we build this within a hierarchical structure? Also how do we preserve the groupings?
1. Do we need to simulate a set of observations (ie multiple observations)
2. Do we need a trickle down effect where you draw from population level parameter and then this determines which group you fall into, so it's still one observation all the way through?

A simulator simply takes parameters (thetas) and outputs data (x). So in the hierarchical pendulum example, we would be giving it length, theta, μ_a_g, and σ_a_g. It would take these and have a deterministic way to get to a_g, maybe from randomly drawing from the μ_a_g and σ_a_g distribution?

In the first case, we would draw two a_g values randomly and then for each of these we would have four pendulums.

In [None]:
# is it actually possible to set up a hierarchical model in this way?
# can we actually that the pendulums are actually on two different planets?
# how do you set up a simulator so that it knows about placing pendulums into groups?

import jax


# okay so you need to pass it parameters on all levels
# each of these parameters will have a prior
def hierarchical_simulator_set(thetas):
    # except length will have 8 elements as will theta
    length0, length1, length2, length3, theta0, theta1, theta2, theta3, μ_a_g, σ_a_g = thetas

    length = [length0, length1, length2, length3]
    theta = [theta0, theta1, theta2, theta3]
    # draw two a_g values
    rng_key = jax.random.PRNGKey(0)  # You can use any valid random key
    #a_g = np.zeros(2)
    #print(μ_a_g, σ_a_g)
    #print(float(μ_a_g), float(σ_a_g))
    a_g_0 = numpyro.sample("a_g", numpyro.distributions.TruncatedNormal(float(μ_a_g), float(σ_a_g), low = 0.01), rng_key = rng_key)
    rng_key = jax.random.PRNGKey(1)  # You can use any valid random key
    a_g_1 = numpyro.sample("a_g", numpyro.distributions.TruncatedNormal(float(μ_a_g), float(σ_a_g), low = 0.01), rng_key = rng_key)

    a_g = [a_g_0, a_g_1]
    #print('a_g', a_g)
    
    output = []#np.zeros((2,4))
    for j in range(2):
        for i in range(4):
            #print('input params are', float(length[i]), float(theta[i]), float(a_g[j]))
            pendulum = Pendulum(
                pendulum_arm_length=float(length[i]),
                starting_angle_radians=float(theta[i]),
                acceleration_due_to_gravity=float(a_g[j]),
                noise_std_percent={
                    "pendulum_arm_length": 0.0,
                    "starting_angle_radians": 0.1,
                    "acceleration_due_to_gravity": 0.0,
                },
            )
            #np.linspace(0,2,100)
            #print('output', np.array(pendulum.create_object(0.75, noiseless=False)))
            output.append(pendulum.create_object(0.75, noiseless=False))
    #torch.tensor(pendulum.create_object(0.75, noiseless=False))
    
    return output

def hierarchical_simulator_trickle_down(thetas):
    # just plop the pendulum within here

    #μ_a_g
    #σ_a_g
    #a_g = numpyro.sample("a_g", dist.TruncatedNormal(μ_a_g, σ_a_g, low = 0.01))
    # ^ I'm not sure how to to draw one parameter nested from another
    # in SBI you don't make a sampling distribution, you set an exact equality
    # unless you're introducing noise
    # and then there's some element of random sampling

    length, theta, μ_a_g, σ_a_g = thetas #, σ
    
    '''
    μ_a_g = numpyro.sample("μ_a_g", dist.TruncatedNormal(12.5, 2, low = 0.01))
    # scale parameters should be log uniform so that they don't go negative 
    # and so that they're not uniform
    # 1 / x in linear space
    σ_a_g = numpyro.sample("σ_a_g", dist.TruncatedNormal(2, 0.5, low = 0.01))
    '''
    n_planets = 2
    n_pendulums = 8

    rng_key = jax.random.PRNGKey(0)  # You can use any valid random key

    ## plates are a numpyro primitive or context manager for handing conditionally independence
    ## for instance, we wish to model a_g for each planet independently
    with numpyro.plate("planet_i", n_planets):
        #a_g = numpyro.sample("a_g", numpyro.distributions.TruncatedNormal(μ_a_g, σ_a_g, low = 0.01))
        a_g = numpyro.sample("a_g", numpyro.distributions.TruncatedNormal(μ_a_g, σ_a_g, low = 0.01), rng_key = rng_key)
        
        
        print(a_g)
        #a_g = numpyro.sample("a_g", utils.MultivariateNormal(μ_a_g, σ_a_g))
        #a_g = numpyro.sample("a_g", torch.normal(μ_a_g, σ_a_g))
        
    '''
    ## we also wish to model L and theta for each pendulum independently
    ## here we draw from an uniform distribution
    with numpyro.plate("pend_i", n_pendulums):
        L = 5#numpyro.sample("L", dist.TruncatedNormal(5, 2, low = 0.01))
        theta = numpyro.sample("theta", dist.TruncatedNormal(jnp.pi/100,jnp.pi/500, low = 0.00001))
    '''
    ## σ is the error on the position measurement for each moment in time
    ## we also model this
    ## eventually, we should also model the error on each parameter independently?
    ## draw from an exponential distribution parameterized by a rate parameter
    ## the mean of an exponential distribution is 1/r where r is the rate parameter
    ## exponential distributions are never negative. This is good for error.
    #σ = numpyro.sample("σ", dist.Exponential(exponential))#dist.Uniform(0, 0.1))#dist.HalfNormal(2.0))
    
    ## the moments in time are not independent, so we do not place the following in a plate
    ## instead, the brackets segment the model by pendulum and by planet,
    ## telling us how to conduct the inference
    #modelx = L * jnp.sin(theta[pendulum_code] * jnp.cos(jnp.sqrt(a_g[planet_code] / L) * times))
    
    
    #print('heres what were inputting', thetas, a_g)
    #length_percent_error_all, theta_percent_error_all, a_g_percent_error_all = \
    #    percent_errors
    pendulum = Pendulum(
        pendulum_arm_length=float(length),
        starting_angle_radians=float(theta),
        acceleration_due_to_gravity=float(a_g),
        noise_std_percent={
            "pendulum_arm_length": 0.0,
            "starting_angle_radians": 0.1,
            "acceleration_due_to_gravity": 0.0,
        },
    )
    output = np.array(pendulum.create_object(np.linspace(0,2,100), noiseless=False))
    #torch.tensor(pendulum.create_object(0.75, noiseless=False))
    
    return output

In [None]:
posterior = infer(simulator, prior, "SNPE", num_simulations=1000)

In [None]:
#x_o_1 = simulator([5, np.pi/100, 5])
x_o_1 = simulator([5, np.pi/200, 7])
print(x_o_1)
posterior_samples_1 = posterior.sample((10000,), x=x_o_1)

# plot posterior samples
_ = analysis.pairplot(
    posterior_samples_1, 
    labels = ['L',r'$\theta_0$','$a_g$'],
    limits = [[0,10],[np.pi/200,3*np.pi/200],[0,10]],
    truths = [5, np.pi/100, 5],
    figsize=(5, 5)
)


In [None]:
#hierarchical_simulator_set
low_bounds = torch.tensor([1,1,1,1,
                           np.pi/500,np.pi/500,np.pi/500,np.pi/500,
                           1, 0.1])
high_bounds = torch.tensor([10,10,10,10,
                           3*np.pi/200,3*np.pi/200,3*np.pi/200,3*np.pi/200,
                           20, 5])
# length, theta, μ_a_g, σ_a_g, σ = thetas

prior_hierarchical_set = utils.BoxUniform(low = low_bounds, high = high_bounds)
print(prior_hierarchical_set.sample())

In [None]:
# test the simulator
params = prior_hierarchical_set.sample()
print('params', params, 'x', hierarchical_simulator_set(params))

In [None]:
posterior = infer(hierarchical_simulator_set, prior_hierarchical_set, "SNPE", num_simulations=100)

In [None]:
params = prior_hierarchical_set.sample()
print('true params', params)
x_o_1 = hierarchical_simulator_set(params)
print('observed xs', x_o_1)
# give it all the same position
xs = [0.018, 0.018, 0.018, 0.018, 0.018, 0.018, 0.018, 0.018]
posterior_samples_1 = posterior.sample((10000,), x=xs)#x_o_1)

# plot posterior samples
_ = analysis.pairplot(
    posterior_samples_1, 
    #labels = ['L',r'$\theta_0$','$a_g$'],
    #limits = [[0,10],[np.pi/200,3*np.pi/200],[0,10]],
    truths = params,
    figsize=(15, 15)
)