In [None]:
import torch
from sbi import utils as utils
from sbi import analysis as analysis
from sbi.inference.base import infer
import numpy as np
import matplotlib.pyplot as plt
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
from scipy.linalg import inv
from numpy.random import multivariate_normal
from notebook.services.config import ConfigManager
from traitlets.config.manager import BaseJSONConfigManager
import matplotlib as mpl
import math
import scipy.integrate
import random
import torch.nn as nn
from typing import Callable, List

# import helper code
from workshop_utils import set_plot_attributes,\
                            interactive_projectile, interactive_pendulum,\
                            displacement_deriv, COLORS, plot_data_vs_model


# set jupyter configurations
%matplotlib inline
%config InlineBackend.figure_format='retina'
default_dpi = mpl.rcParamsDefault['figure.dpi']
mpl.rcParams['figure.dpi'] = default_dpi*1.2  

### 1.4.1 Ballistic motion problem
- projectile is launched with given launch velocity, launch angle, and a drag coefficient
- we can measure height of projectile with certain imprecision

In [None]:
def mortar(
    speed:float,
    angle:int,
    drag:float,
    area:float=0.008,
    mass:float=0.2,
    measurement_noise:float=3.
    )->dict:
    """ Simulate the firing of a projectile with parameters:
    Args:
        speed (float): magnitude of initial speed (positive, m/s).
        angle (int): launch angle in degrees
        drag (float): drag coefficient
        area (float): area of the fired projectile
        mass (float): mass of the fired projectile
    Returns:
        simulation (dict): simulation results containing distance
        travelled and height at time points   
    """
    drag_constant = 0.5 * drag * 1.28 * area # 1.28 is air density
    deriv = lambda t,u: (u[1],
                         -drag_constant/mass * np.hypot(u[1],u[3]) * u[1],
                         u[3],
                         -drag_constant/mass * np.hypot(u[1], u[3]) * u[3] - 9.81)
    
    # Initial conditions: x0, v0_x, z0, v0_z.
    u0 = 0, speed * np.cos(np.radians(angle)), 0., speed * np.sin(np.radians(angle))
    t0, tf = 0, 400 # Set range where we want to integrate
    # solve ivp to get distance traveld and height at each point in time -> df/dt = f(t,y); f(t_i) = y_i, 
    solution = scipy.integrate.solve_ivp(
        deriv, (t0, tf), u0, dense_output=True) # dense_output gives continuous solution
    # A fine grid of time points from 0 until impact time.
    t = np.linspace(0, tf, 2000)
    # get solution of ivp -> distance and height at each time point
    d, _, h, _ = solution.sol(t)
    
    measurements = h + np.random.randn(d.shape[0]) * measurement_noise
    # to obtain the params from the interactive plot, we need to return parameters here as well
    return dict(θ=(speed,angle,drag,area,mass), d=d, h=h, x=measurements)


ballistic_motion = interactive_projectile(mortar)
ballistic_motion

### 1.4.2 Linear regression as baseline
- how can we find the underlying parameters of the observation?
- with linear regression on quadratic polynomial parameters? 

In [None]:
def linear_regression(data:np.array, features:np.array)-> np.array:
    """ Perform linear regression with given features.
    Args:
        data (np.array): what we want to learn the parameters of
        features (np.array): our features to describe the data
    Returns:
        Parameters found through solving (features^t * features)^-1 * features.T * observation
    """
    linreg_param = inv(features.T @ features) @ (features.T @ data)
    
    return linreg_param


x_o = ballistic_motion.result['x']
domain = ballistic_motion.result['d']
features = np.stack([domain**0, domain**1, domain**2]).T
linreg_param = linear_regression(x_o, features)

# generate reconstruction with found parameters
linreg_reconstruction = features @ linreg_param

In [None]:
plot_data_vs_model(
    domain,
    x_o, linreg_reconstruction,
    data_label='x_o',
    model_label='Linear regression reconstruction')

In [None]:
def get_landing_distance(d, x_o):
    """ Compute distance travelled until projectile hits the ground.
    Args:
        d (array): distance travelled by projectile
        x_o (array): height of projectile at given distance
    Returns:
        Distance traveled in meter until projectile hits ground.
    """
    height_greater_zero = x_o > 0
    return d[np.argwhere(height_greater_zero == False).min()]
    
    
def get_distance_at_highest_point(d, x_o):
    """ Compute distance travelled until projectile reaches highest point.
    Args:
        d (array): distance travelled by projectile
        x_o (array): height of projectile at given distance
    Returns:
        Distance traveled in meter until projectile reaches highest point of its trajectory.
    """
    return d[x_o.argmax()]

def get_impact_angle(d_highest_point, h_highest_point, d_impact):
    """ Compute impact angle of projectile
    Args:
        d_highest_point (float): distance travelled at highest point
        h_highest_point (float): height at highest point
        d_impact (float): distance travelled at impact point
    Returns:
        Impact angle
    """
    impact_angle_arctan = np.arctan(h_highest_point/ (d_impact-d_highest_point))
    
    return impact_angle_arctan * 180/np.pi
    

### 1.4.3 Try to reproduce data manually 🛠️
- assuming you can build a forward model
- can you beat the linear regression model?

In [None]:
def distance(prediction:np.array, data:np.array, distance_function:Callable) -> float:
    """ Returns distance between arg1 and arg2 according to specified distance measure.
    Args:
        prediction (array): 
        data (array):
        distance_function (Callable):
    Returns:
        distance between prediction and data.
    """
    ensure_array = lambda x: np.array([x])
    if not hasattr(prediction, "__len__") and not (hasattr(data, "__len__")):
        prediction, data = map(ensure_array, [prediction, data])
    assert len(prediction) == len(data)
    
    return distance_function(prediction, data)


U = np.random.uniform
# sample parameters from a uniform distribution
speed = U(50, 250)
angle = U(10, 45)
drag = U(0.2, 0.7)


# pass parameters to simulator and see result
simulation = mortar(speed, angle, drag)

x_reconstruction = simulation['x']
domain = simulation['d']

plot_data_vs_model(
    domain,
    x_o,
    x_reconstruction,
    data_label='x_o',
    model_label='reconstruction')


# choose a distance function
mse = lambda prediction, data: np.square(prediction - data).mean()
chebyshev = lambda prediction, data:\
                max([np.abs(data[i]-prediction[i]) for i in range(data.shape[0])])

# and an acceptance threshold ε
ε = 100000

error = distance(x_reconstruction, x_o, mse)
accepted = error <= ε


start_bold = '\033[1m'
end_bold = '\033[0m'
print(f'Distance of simulation with parameters\nlaunch velocity={speed:.1f},\
launch angle={angle:.1f},\
and drag coefficient={drag:.2f}:{start_bold}{error:.2f}{end_bold}')
print(f'Based on chosen {start_bold}ε={ε}{end_bold}, the simulation was {start_bold}\
{"accepted" if {accepted} else "not accepted"}{end_bold}')

### 1.4.4 Automate the search for parameters 🛠️
- there should be a smarter way to approach this
- your task is now to automate what we just tried manually and make decisions which paramters to keep, based on distance

In [None]:
def rejection_abc(ε:float, distance_function:Callable, N:int=1000)-> List:
    """ Can your approach be automated?
    Args:
        ε (float): acceptance threshold
        distance_function (Callable): function to compute distance between prediction and observation
        N (int): number of simulations
    Returns:
        List of parameters where distance of simulation to observation is <= ε
    """
    θ_accepted = []
    
    for _ in range(N):
        # sample θ from a uniform prior
        speed = U(50, 250)
        angle = U(10, 45)
        drag = U(0.2, 0.7)
        ######################################################################## Solution
        # pass to simulator
        simulation = mortar(speed, angle, drag)
        domain, abc_reconstruction = simulation['d'], simulation['x']
        # compute distance
        dist = distance(x_o, abc_reconstruction, distance_function)
        # check if within epsilon
        if dist <= ε:
            # if so return parameters
            θ_accepted.append((speed, angle, drag))
        ######################################################################## Solution end
    return θ_accepted

ε = 1000
θ_accepted = rejection_abc(ε, mse)
print(f'Your rejection-ABC implementation has found {len(θ_accepted)}\
 parameters that produce observations within ε={ε}.')

In [None]:
# plot with one of the found parameters
speed, angle, drag = random.choice(θ_accepted)
sim_result = mortar(speed, angle, drag)
error = distance(sim_result["x"], x_o, mse)

plot_data_vs_model(
    sim_result['d'],
    x_o,
    sim_result['x'],
    data_label='x_o',
    model_label='rejection-ABC reconstruction')


print(f'Distance of simulation with automatically found parameters\nlaunch velocity={speed:.1f},\
launch angle={angle:.1f}, and drag coefficient={drag:.2f}: {start_bold}{error:.2f}{end_bold}')

- You just discovered rejection ABC!
- Reflect on how choice of $\epsilon$, $d$ was made

### 1.4.5 When does this break?
- suppose our observation is more complex

In [None]:
def pendulum_sim(θ:np.array)->np.array:
    """
    Blackbox simulator that takes in a parameter vector
    and produces an observation of angular dispalement
    of a damped pendulum.
    """
    dampening_factor, mass, length = θ
    theta_0 = [0,3]
    t = np.linspace(0,20,150)    
    # solve ODE
    solution = scipy.integrate.odeint(displacement_deriv, theta_0,t,args =(dampening_factor,mass,9.81,length))
    angular_velocity_measurements =  solution[:,0] + np.random.randn(t.shape[0]) * 0.05
    # return observation
    return angular_velocity_measurements

pendulum = interactive_pendulum(pendulum_sim)
pendulum

### 1.4.6 What can we do about this?
- What summary statistics for this data would come to your  mind?
- Can we learn summary statistics with neural networks?

### 1.4.7 Introduce SBI as a potentially useful approach to solve our problem

In [None]:
# define an initial, three-dimensional prior 
num_dim = 3
prior = utils.BoxUniform(low=torch.Tensor([0,0.01,1]), high=torch.Tensor([1,5,4]))

# and infer the posterior over θ using the sbi toolkit
posterior = infer(pendulum_sim, prior, method='SNPE', num_simulations=500)

In [None]:
x_o = pendulum.result['x_o']
samples = posterior.sample((10000,), x=x_o)
θ = samples.mean(dim=0).numpy()
log_probability = posterior.log_prob(samples, x=x_o)
_ = analysis.pairplot(samples, figsize=(6,6))

In [None]:
_, ax = plt.subplots()
ax.plot(np.linspace(0,20,150), pendulum_sim(θ), label='sbi reconstruction', color=COLORS['sbi'])
ax.plot(np.linspace(0,20,150), x_o, label='x_o', color=COLORS['data'], marker='x')
set_plot_attributes(ax, legend=True, xlabel='$t$', grid=True, ylabel='Angular displacement')