In [1]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from numba import njit
from tqdm.notebook import tqdm

In [2]:
import tensorflow as tf
from tensorflow.keras.layers import LSTM, Dense, GRU
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import to_categorical
import tensorflow_probability as tfp
tfd = tfp.distributions
tfpl = tfp.layers
import talib

## Simulator

In [16]:
def dynamic_prior(batch_size):
    """
    Generates a random draw from the diffusion model prior.
    """
    params = np.random.gamma(5, 0.5, size=(batch_size, 5))
    hyper_params = np.random.uniform(0.01, 0.1, size=(batch_size, 5))

    return np.c_[params, hyper_params]

def context_gen(batch_size, n_obs):
    return np.random.randint(0, 2, (batch_size, n_obs)).astype(np.float32)

@njit
def diffusion_trial(v, a, ndt, zr=0.5, dt=0.001, max_steps=1e4):
    """Simulates a trial from the diffusion model."""

    n_steps = 0.
    x = a * zr

    # Simulate a single DM path
    while (x > 0 and x < a and n_steps < max_steps):

        # DDM equation
        x += v*dt + np.sqrt(dt) * np.random.normal()

        # Increment step
        n_steps += 1.0

    rt = n_steps * dt
    return rt + ndt if x > 0. else -rt - ndt

# @njit
# def dynamic_diffusion_condition(n_obs, v, a, ndt, zr=0.5, dt=0.005, max_steps=1e4):
#     """Simulates a diffusion process over an entire condition."""
    
    
    
#     x = np.empty(n_obs)
#     for i in range(n_obs):
#         x[i] = diffusion_trial(v, a, ndt, zr, dt, max_steps)
#     return x


# @njit
def dynamic_diffusion_process(prior_samples, n_obs, dt=0.001, s=1.0, max_iter=1e4):
    """
    Performs one run of a dynamic diffusion model process.
    """
    
    params, params_stds = np.split(prior_samples, 2, axis=-1)
    params_t = params
    
    params_t_array = np.zeros((n_obs, params.shape[0]))
    
    rt = np.zeros(n_obs)
    exp_condition = 0
    
    # Iterate over number of trials
    for t in range(n_obs):
        
        # switch condition
        if t == (n_obs / 2):
            exp_condition = 1
        
        # Run diffusion process
        rt[t] = diffusion_trial(params_t[exp_condition], params_t[exp_condition +2], params_t[4])
        
        # Store before transition
        params_t_array[t] = params_t
        
        # Transition and ensure non-negative parameters
        params_t = params_t + params_stds * np.random.randn(params.shape[0])
        
        # Constraints
        params_t[0] = min(max(params_t[0], 0.0), 8)
        params_t[1] = min(max(params_t[1], 0.0), 8)
        params_t[2] = min(max(params_t[2], 0.0), 6)
        params_t[3] = min(max(params_t[3], 0.0), 6)
        params_t[4] = min(max(params_t[3], 0.0), 4)
        
    return np.atleast_2d(rts_v).T, params_t_array, params_stds

In [22]:
dynamic_diffusion_process(dynamic_prior(1)[0], 120)[0].shape

(240,)