In [None]:
# !pip install sbi
# !pip install cython
# !pip install pymc==2.3.8
# !pip install git+https://github.com/hddm-devs/kabuki
# !pip install git+https://github.com/hddm-devs/hddm
# !pip install git+https://github.com/AlexanderFengler/ssm_simulators

In [1]:
from copy import deepcopy
import ssms
import torch
import numpy as np
import pandas as pd
import sbi
from sbi.inference import MNLE
from sbi.inference import MCMCPosterior
from ssms.basic_simulators import simulator
import hddm

In [None]:
#Prior
m = torch.distributions.uniform.Uniform(low = torch.tensor(ssms.config.model_config['ddm']['param_bounds'][0]),
                                        high = torch.tensor(ssms.config.model_config['ddm']['param_bounds'][1]))
#Sample from the prior to collect theta matrix
thetas = m.sample((100000,))
#For every theta, draw 1 sample.
x = ssms.basic_simulators.simulator(model = 'ddm', theta = thetas, n_samples = 1)
#Format the output of the simulator
x = torch.Tensor(np.hstack((x['rts'], x['choices'])))
#Edit choices to [0,1] since that is the format MNLE expects.
x[:,1] = (x[:,1] + 1)/2

In [None]:
#Initialise prior
prior = sbi.utils.BoxUniform(low=torch.tensor(ssms.config.model_config['ddm']['param_bounds'][0]), 
                             high=torch.tensor(ssms.config.model_config['ddm']['param_bounds'][1]), device = 'cuda')   
#Initialise trainer and train on simulated data
trainer = MNLE(prior=prior, device = 'cuda')
trainer = trainer.append_simulations(thetas, x)
mnle = trainer.train(training_batch_size = 1000)

In [None]:
class MixedWrapper():
    def __init__(
        self, MixedDensityEstimator):
        self.mde = MixedDensityEstimator
        self.dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
    def predict_on_batch(self, data):
        #internally, data comes in the form [[theta | rt, choice]]. So to collect x we select the last two columns.
        x = torch.tensor(data[:,-2:]).to(self.dev)
        x[:,1] = (x[:,1] + 1)/2
        #Select all but the last two columns to collect theta
        theta = torch.tensor(data[:,:-2]).to(self.dev)
        return self.mde.log_prob(x, theta).cpu().detach().numpy()

In [None]:
#Initialise wrapper
model_cust = MixedWrapper(MixedDensityEstimator = mnle)

In [None]:
#Simulate testing data from a fixed theta.
fixed_theta = torch.tensor([1.0, 1.5, 0.5, 0.5])
sim_out = simulator(model = 'ddm', 
                    theta = fixed_theta,
                    n_samples = 1000)
#Edit output of simulator
sim_out = torch.Tensor(np.hstack((sim_out['rts'], sim_out['choices'])))

#Create a pandas in the format that HDDM wants
theta_array = fixed_theta.repeat(1000,1)
x = sim_out
edited_choices = (x[:,1] + 1) / 2
x = torch.tensor(np.vstack((x[:,0], edited_choices)).T)


df = pd.DataFrame(x).astype("float")
df.columns = ['rt', 'response']
df['v']=theta_array[:,0]
df['a']=theta_array[:,1]
df['z']=theta_array[:,2]
df['t']=theta_array[:,3]

In [None]:
df

In [None]:
#Fit
model_config = ssms.config.model_config['ddm']
model_config["choices"] = [1,0]

hddmnn_model = hddm.HDDMnn(data = df,
                           informative = False,
                           include = model_config['hddm_include'],# Note: This include statement is an example, you may pick any other subset of the parameters of your model here
                           model = 'ddm',
                           model_config = model_config,
                           network = model_cust)

In [None]:
#Sample from the model
hddmnn_model.sample(700, burn = 100)

In [None]:
#Plot the posteriors
hddmnn_model.plot_posteriors()