In [1]:
import numpy as np
import pandas as pd

## Race diffusion model:

Race diffusion model is an evidence accumulation model which assumes the decision maker assigns an accumulator to each option and select the option the wins the race between accumulators (i.e., reaches the threshold first). The accumulation process in this model can be formulated as follows:

$$X_i(t+\Delta t) = X_i(t) + v_i \Delta t + \sigma (\Delta t)^{\frac{1}{2}} \xi, ~~~~~~~~~ \xi \sim \mathcal{N}(0, 1)$$

For more discussion on diffusion models you can check:

- [Ratcliff, R., Smith, P. L., Brown, S. D., & McKoon, G. (2016). Diffusion decision model: Current issues and history. Trends in cognitive sciences, 20(4), 260-281.](https://www.sciencedirect.com/science/article/pii/S1364661316000255)

- [Evans, Nathan J., Wagenmakers, Eric-Jan (2020) Evidence Accumulation Models: Current Limitations and Future Directions, The Quantitative Methods for Psychology, 16(2), 73-90.](https://www.tqmp.org/RegularArticles/vol16-2/p073/)

- [Heathcote, A., & Matzke, D. (2022). Winner takes all! What are race models, and why and how should psychologists use them?. Current Directions in Psychological Science, 31(5), 383-394.](https://journals.sagepub.com/doi/full/10.1177/09637214221095852)

- [Rasanan, A. H. H., Evans, N. J., Rieskamp, J., & Rad, J. A. (2024). Response time and accuracy modeling through the lens of fractional dynamics: A foundational primer. In Computation and Modeling for Fractional Order Systems (pp. 1-27). Academic Press.](https://www.sciencedirect.com/science/article/pii/B9780443154041000060)

In [2]:
def race_diffusion_trial(v, b, ndt=0, sigma=1, dt=0.001):
    x = np.array([0.0, 0.0]) # Both accumulators start from zero
    rt = 0
    ch = -1
    stop = False
    while not stop:
        x[0] += v[0]*dt + sigma*np.random.normal(0, 1)*np.sqrt(dt)
        x[1] += v[1]*dt + sigma*np.random.normal(0, 1)*np.sqrt(dt)
        rt += dt
        if x[0]>= b and x[1]<b: # First accumulator reached first
            ch = 0.0
            stop = True
        elif x[1]>= b and x[0]<b: # Second accumulator reached first
            ch = 1.0
            stop = True
        elif x[0]>= b and x[1]>=b: # Both accumulators reached at the same time 
            rt = 0
            x = np.array([0.0, 0.0]) # restart the process
    
    return rt+ndt, ch

## Mapping function examples:


1) Polynomial mapping function:

$$m(U_1, U_2) = c \times \Big(\frac{U_1}{U_2}\Big)^{\alpha}$$

2) Sigmoidal mapping function:
$$m(U_1, U_2) = s \frac{U_1}{1 + \exp\big(- \tau (U_1 - U_2)\big)}$$

In [3]:
def polynomial(U1, U2, scaling, sensitivity):
    return scaling * (U1/U2)**sensitivity

def sigmoidal(U1, U2, scaling, sensitivity):
    return scaling * U1/ (1 + np.exp(-sensitivity*(U1 - U2)))

## Data simulation:

- 100 subjects
- 200 trials for each subject

In [4]:
def simulate_group_data(mapping_function, 
                        min_val0=None, max_val0=None,
                        min_val1=None, max_val1=None,
                        n_subject=100, trial_per_sbj=200):
    data = {'sbj': [], 'trial': [], 'value_0': [], 'value_1': [],
            'gaze_0': [], 'gaze_1': [], 'true_drift_0': [], 'true_drift_1': [],
            'true_threshold': [],'true_ndt':[], 'rt': [], 'choice': []}
    
    for sbj in range(n_subject):
        scaling = np.random.uniform(0.3, 1)
        sensitivity = np.random.uniform(0.5, 1.5)
        gam = np.random.uniform(0.3, 1)
        threshold = np.random.uniform(0.5, 3)
        ndt = np.random.uniform(0.1, 1)

        for t in range(trial_per_sbj):
            data['sbj'].append(sbj+1)
            data['trial'].append(t+1)

            if t % 2 == 0:
                value0 = np.random.uniform(min_val0, max_val0)
                value1 = np.random.uniform(min_val1, max_val1)
                g0 = np.random.uniform(0.55, .95)
                g1 = 1 - g0
            else:
                value1 = np.random.uniform(min_val0, max_val0)
                value0 = np.random.uniform(min_val1, max_val1)
                g1 = np.random.uniform(0.55, .95)
                g0 = 1 - g1        

            data['value_0'].append(value0)
            data['value_1'].append(value1)
            data['gaze_0'].append(g0)
            data['gaze_1'].append(g1)

            drift0 = mapping_function(g0*value0 + gam*g1*value0, 
                                      g1*value1 + gam*g0*value1,
                                      scaling, sensitivity)
            drift1 = mapping_function(g1*value1 + gam*g0*value1,
                                      g0*value0 + gam*g1*value0, 
                                      scaling, sensitivity)

            data['true_drift_0'].append(drift0)
            data['true_drift_1'].append(drift1)
            data['true_threshold'].append(threshold)
            data['true_ndt'].append(ndt)

            rt, ch = race_diffusion_trial([drift0, drift1], threshold, ndt=ndt)

            data['rt'].append(rt)
            data['choice'].append(ch)
    
    return pd.DataFrame(data)

In [5]:
data_polynomial = simulate_group_data(polynomial,
                                      min_val0=1, max_val0=3,
                                      min_val1=2, max_val1=4,
                                      n_subject=100, trial_per_sbj=200)
data_polynomial = data_polynomial[data_polynomial['rt']<=10].reset_index(drop=True)
data_polynomial.to_csv('simulation_polynomial.csv')


data_sigmoidal = simulate_group_data(sigmoidal,
                                     min_val0=1, max_val0=4,
                                     min_val1=3, max_val1=6,
                                     n_subject=100, trial_per_sbj=200)
data_sigmoidal = data_sigmoidal[data_sigmoidal['rt']<=10].reset_index(drop=True)
data_sigmoidal.to_csv('simulation_sigmoidal.csv')