In [None]:
import numpy as np
import numba as nb
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import numba as nb
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
import torch
import torch.nn as nn

from typing import Callable
from sbi import utils as utils
from sbi import analysis as analysis
from sbi.inference.base import infer
from sbi.inference import SNPE, prepare_for_sbi, simulate_for_sbi
from sbi import analysis, utils
from sbi.inference import SNPE, simulate_for_sbi
from sbi.utils.user_input_checks import (
    check_sbi_inputs,
    process_prior,
    process_simulator,
)
# import required modules
from sbi.utils.get_nn_models import posterior_nn

seed = 0 
torch.manual_seed(seed)

In [None]:
@nb.jit(nopython=True)
def bin_trajectory(x: np.ndarray, bins: np.ndarray) -> np.ndarray:
    """
    Bins an array accoring to bins.

    Parameters
    ----------
    x : np.Array
        Array to be binned.
    bins : np.Array
        Bin edges including the last one.

    Returns
    -------
    binned_x : np.Array
        Array with the bin indicies for each value.

    """

    binned_x = np.zeros(len(x), dtype=np.int64)
    for i in range(len(bins) - 1):
        for j in range(len(x)):
            if x[j] >= bins[i] and x[j] < bins[i + 1]:
                binned_x[j] = i
    return binned_x

@nb.jit(nopython=True)
def build_transition_matrix(
    binned_x: np.ndarray, n_bins: np.ndarray, t: int = 1
) -> np.ndarray:
    """
    Calculates the markov transition matrix for a binned trajectory.

    Parameters
    ----------
    binned_x : np.ndarray
        Binned trajectory.
    n_bins : int
        Number of bins.
    t : int, optional
        Lag time for which the matrix is calculated. The default is 1.

    Returns
    -------
    np.ndarray
        Returns transition matrix of size (n_bins, n_bins).

    """

    matrix = np.zeros(shape=(n_bins, n_bins), dtype=np.int64)
    for i in range(len(binned_x) - t):
        column = binned_x[i]
        row = binned_x[i + t]
        matrix[row][column] += 1

    norm = np.sum(matrix, axis=0, dtype=np.int64)
    return matrix / norm

def build_transition_matricies(
    q: np.ndarray, lag_times: list[int], min_bin: float, max_bin: float, num_bins: int
) -> torch.tensor:
    """
    Builds transition matricies for given lag times.

    Parameters
    ----------
    q : np.ndarray
        Trajectory
    lag_times : list
        List of lag times to compute summary statistics
    min_bin : float
        Minimum value of the bins
    max_bin : float
        Maximum value of the bins
    num_bins : int
        Number of bins between min_bin and max_bin

    Returns
    -------
    matricies : torch.tensor
        Transition matricies for given lag times
    """

    bins = np.linspace(min_bin, max_bin, num_bins + 1)
    binned_q = bin_trajectory(q, bins)
    matricies = np.array(
        [
            build_transition_matrix(binned_q, len(bins) - 1, t=lag_time)
            for lag_time in lag_times
        ]
    )

    matricies = np.float32(matricies)

    return torch.from_numpy(np.nan_to_num(matricies, nan=0.0)).flatten()

In [None]:
@nb.jit(nopython=True)
def simulate_brownian_motion(num_steps=1000, dt = 5e-06, D=10, x0=np.array([-1.5], dtype=np.float64), save_every=1, k=3):
    num_steps = int(num_steps)
    save_every = int(save_every)
    
    N = num_steps

    Ax = D * dt
    Bx = np.sqrt(2 * Ax)
    
    x = np.zeros((num_steps//save_every, 1), dtype=np.float64)
    xold = x0

    for i in range(1, N):
        # forces evaluation
        Fx = -k * xold

        # Drawing noise 
        gx = np.random.standard_normal(size=(1,))

        # integration
        xnew = xold + Ax * Fx + Bx * gx

        if (i % save_every) == 0:
            x[i // save_every] = xnew

        xold = xnew

    return x

In [None]:
x = simulate_brownian_motion(num_steps=1000, dt = 5e-06, D=10, x0=np.array([0], dtype=np.float64), save_every=1, k=3)
x = x.flatten()
min_bin = np.min(x)
max_bin = np.max(x)
num_bins = 15
print(min_bin, max_bin)

In [None]:
def matrix_simulator(params):
    params = np.array(params.cpu(), dtype=np.float64)
    x = simulate_brownian_motion(num_steps=1000, dt = 5e-06, D=10**params[0], x0=np.array([0], dtype=np.float64), save_every=1, k=3)
    x_tran = build_transition_matricies(x, [1, 10, 100, 500], min_bin, max_bin, num_bins)
    return x_tran

In [None]:
class SimpleCNN(nn.Module):
    """
    Simple single layer CNN with ReLU activation

    Parameters
    ----------
    out_channels : int
        Number of output channels.
    kernel_size : int
        Size of the convolution kernel.
    stride : int
        Stride of the convolution.
    num_bins : int
        Number of bins for transition matrix.
    num_lags : int
        Number of lag times for which a transition matrix is generated.
    activation : torch.nn.Module
        Activation function.
    """

    def __init__(
        self,
        out_channels: int,
        kernel_size: int,
        stride: int,
        num_bins: int,
        num_lags: int,
        activation: Callable[[], nn.Module] = nn.ReLU,
    ):
        super(SimpleCNN, self).__init__()

        self.conv1 = nn.Conv2d(
            in_channels=num_lags,or:
        # Check the original matrix size and lag times!!!
        x = x.view((-1, self.num_lags, self.num_bins, self.num_bins))
        x = self.activation(self.conv1(x))
        return x.flatten(start_dim=1)

model = SimpleCNN(out_channels=10, kernel_size=3, stride=1, num_bins=num_bins, num_lags=4)

In [None]:
low_limit = [-1]
high_limit = [2]

In [None]:
prior = utils.BoxUniform(
    low = torch.tensor([low_limit[0]], device='cuda'),
    high = torch.tensor([high_limit[0]], device='cuda')
)

prior, num_parameters, prior_returns_numpy = process_prior(prior)

simulator_wrapper = process_simulator(matrix_simulator, prior, prior_returns_numpy)

check_sbi_inputs(simulator_wrapper, prior)

In [None]:
matrix_simulator, prior = prepare_for_sbi(matrix_simulator, prior)

In [None]:
neural_posterior = posterior_nn(model='nsf', embedding_net = model)

inference = SNPE(prior, device='cuda', density_estimator=neural_posterior)

In [None]:
# run the inference procedure on one round and 10000 simulated data points
theta, x = simulate_for_sbi(matrix_simulator, prior, num_simulations=50000, num_workers=1)

In [None]:
density_estimator = inference.append_simulations(theta, x, data_device='cuda').train(training_batch_size=256, show_train_summary=True)

posterior = inference.build_posterior(density_estimator)

In [None]:
with open('your_path.pkl', 'rb') as f:
    posterior = torch.save(f) #save the posterior for later use