In [1]:
import os, sys
sys.path.append(os.path.abspath(os.path.join('../../..'))) # access sibling directories

from src.python.models import MainSimulator

import numpy as np
from scipy import stats

In [2]:
RNG = np.random.default_rng() # Use 2022 for stable results

# Model formulation

## Signal-detection model

$$
\begin{align}
\mu_h &\sim \mathcal{N}(0, 1) \\
\sigma_h &\sim \textrm{Gamma}(1, 1)\\
\mu_f &\sim \mathcal{N}(0, 1)\\
\sigma_f &\sim \textrm{Gamma}(1, 1)\\
h_m &\sim \mathcal{N}(\mu_h, \sigma_h) \text{ for } m=1,\dots,M\\
f_m &\sim \mathcal{N}(\mu_f, \sigma_f) \text{ for } m=1,\dots,M\\
x_n^{h}|h_m &\sim \textrm{Bin}(O_m, \Phi(h_m)) \text{ for } n=1,\dots,N_m\\
x_n^{f}|f_m &\sim \textrm{Bin}(W_m, \Phi(f_m)) \text{ for } n=1,\dots,N_m
\end{align}
$$

$\Phi =$ Standard normal cumulative distribution function

$h_m = \Phi^{-1}(p_m^{(h)}) =$ probit-transformed hit probability (on old items)

$f_m = \Phi^{-1}(p_m^{(f)}) =$ probit-transformed false alarm probability (on new items)

$O_m =$ Number of old/signal trials per participant m

$W_m =$ Number of new/noise trials per participant m

[Notation follows Rouder & Lu (2005)]

[Priors are informed by the results of Greene, Martin & Naveh-Benjamin (2021)]

We code stimulus type and response as
- 0 = "new"
- 1 = "old".

## MPT model (2HTM)

$$
\begin{align}
d_m &\sim \text{ for } m=1,\dots,M\\
g_m &\sim \text{ for } m=1,\dots,M\\
p_m(\textrm{hit}|\textrm{old}) &\sim d_m + (1-d_m)*g_m \text{ for } m=1,\dots,M\\
p_m(\textrm{false alarm}|\textrm{new}) &\sim (1-d_m)*g_m \text{ for } m=1,\dots,M\\
x_n^{\textrm{old}} &\sim \textrm{Bin}(1, p_m(\textrm{hit}|\textrm{old})) \text{ for } n=1,\dots,N_m\\
x_n^{\textrm{new}} &\sim \textrm{Bin}(1, p_m(\textrm{false alarm}|\textrm{new})) \text{ for } n=1,\dots,N_m
\end{align}
$$

# Simulator

Desired data structure: (x, M, N, 2) 
- x data sets
- M participants
- N trials per participant
- 2 Variables (stimulus_type, response)

In [3]:
# Helpers:
# Find percentage of a value with stats.norm.cdf()
# Find value leading to a specific percentage with stats.norm.ppf()

In [4]:
n_clusters = 100
n_obs = 50

In [5]:
# draw from prior
# TODO: Clarify if using Variance notation or SD notation!!!

class HierarchicalSdtMptSimulator:

    def __init__(self):
        pass

    def draw_from_prior(self, model_index, n_clusters):
        """ Draws parameter values from the specified prior distributions of the hyperprior and the conditional prior.

        Parameters
        ----------
        model_index : int
            Index of the model to be simulated from.
        n_clusters : int
            Number of higher order clusters that the observations are nested in.
        """

        if model_index == 0: # SDT model

            # Hyperpriors
            mu_h = RNG.normal(0.5, 1)
            sigma_h = RNG.gamma(1, 1)
            mu_f = RNG.normal(-1, 1)
            sigma_f = RNG.gamma(1, 1)

            # Group-level priors
            h_m = RNG.normal(loc=mu_h, scale=sigma_h, size=n_clusters)
            f_m = RNG.normal(loc=mu_f, scale=sigma_f, size=n_clusters)

        if model_index == 1: # MPT model
            # TODO
            h_m = 1 # TEMPORARY
            f_m = 1 # TEMPORARY
        
        return h_m, f_m

    def generate_from_likelihood(self, h_m, f_m, n_clusters, n_obs):
        """ Generates a single hierarchical data set from the sampled parameter values.

        Parameters
        ----------
        h_m : np.array
            Probit-transformed hit probability.
        f_m : np.array
            Probit-transformed false alarm probability.
        n_clusters : int
            Number of higher order clusters that the observations are nested in.
        n_obs : int
            Number of observations per cluster.

        Returns
        -------
        X : np.array
            Generated data set with shape (n_clusters, n_obs, 2).
            Contains 2 binary variables with stimulus type and response (for both applies: 0="new" / 1="old").
        """

        # Determine amount of signal (old item) and noise (new item) trials
        assert n_obs%2 == 0, "n_obs has to be dividable by 2."
        n_trials_per_cat = int(n_obs/2)

        # Create stimulus types (0="new" / 1="old")
        stim_cluster = np.repeat([[1,0]], repeats=n_trials_per_cat, axis=1) # for 1 participant
        stim_data_set = np.repeat(stim_cluster, repeats=n_clusters, axis=0) # for 1 data set

        # Create individual responses (0="new" / 1="old")
        X_h = RNG.binomial(n=1, p=stats.norm.cdf(h_m), size=(n_trials_per_cat, n_clusters)).T # Old items
        X_f = RNG.binomial(n=1, p=stats.norm.cdf(f_m), size=(n_trials_per_cat, n_clusters)).T # New items
        X_responses = np.concatenate((X_h, X_f), axis=1)

        # Create final data set
        X = np.stack((stim_data_set, X_responses), axis=2)

        return X

    def generate_single(self, model_index, n_clusters, n_obs):
        """Generates a single hierarchical data set utilizing the draw_from_prior and gen_from_likelihood functions.

        Parameters
        ----------
        model_index : int
            Index of the model to be simulated from.
        n_clusters : int
            Number of higher order clusters that the observations are nested in.
        n_obs : int
            Number of observations per cluster.

        Returns
        -------
        X : np.array
            Generated data set with shape (n_clusters, n_obs, 2).
            Contains 2 binary variables with stimulus type and response (for both applies: 0="new" / 1="old").
        """

        h_m, f_m = self.draw_from_prior(model_index, n_clusters)
        X = self.generate_from_likelihood(h_m, f_m, n_clusters, n_obs)

        return X

In [6]:
# Test class on one data set
simulator_single = HierarchicalSdtMptSimulator()
simulator_single.generate_single(0, 100, 50)

array([[[1, 0],
        [1, 1],
        [1, 0],
        ...,
        [0, 1],
        [0, 0],
        [0, 0]],

       [[1, 1],
        [1, 1],
        [1, 0],
        ...,
        [0, 0],
        [0, 0],
        [0, 0]],

       [[1, 1],
        [1, 1],
        [1, 1],
        ...,
        [0, 0],
        [0, 0],
        [0, 1]],

       ...,

       [[1, 0],
        [1, 0],
        [1, 1],
        ...,
        [0, 0],
        [0, 0],
        [0, 0]],

       [[1, 1],
        [1, 1],
        [1, 1],
        ...,
        [0, 0],
        [0, 0],
        [0, 0]],

       [[1, 0],
        [1, 0],
        [1, 0],
        ...,
        [0, 0],
        [0, 0],
        [0, 0]]], dtype=int64)

In [8]:
# Test class on multiple data sets
def n_clust_obs():
    """
    Nasty hack to make compatible with BayesFlow.
    Defines a fixed number of clusters and observations.
    """
    
    M = 50
    N = 50
    return (M, N)

simulator = MainSimulator(HierarchicalSdtMptSimulator())
simulator(5, (100,50), 2)

(array([[1., 0.],
        [1., 0.],
        [0., 1.],
        [0., 1.],
        [0., 1.]], dtype=float32),
 None,
 array([[[[1., 1.],
          [1., 0.],
          [1., 1.],
          ...,
          [0., 0.],
          [0., 0.],
          [0., 0.]],
 
         [[1., 1.],
          [1., 1.],
          [1., 1.],
          ...,
          [0., 0.],
          [0., 0.],
          [0., 0.]],
 
         [[1., 0.],
          [1., 0.],
          [1., 0.],
          ...,
          [0., 0.],
          [0., 0.],
          [0., 0.]],
 
         ...,
 
         [[1., 1.],
          [1., 0.],
          [1., 0.],
          ...,
          [0., 0.],
          [0., 0.],
          [0., 1.]],
 
         [[1., 0.],
          [1., 0.],
          [1., 0.],
          ...,
          [0., 0.],
          [0., 0.],
          [0., 0.]],
 
         [[1., 0.],
          [1., 0.],
          [1., 0.],
          ...,
          [0., 0.],
          [0., 0.],
          [0., 0.]]],
 
 
        [[[1., 0.],
          [1., 0.],

## Prior predictive checks

### Parameter distributions

In [None]:
# hit prob and false alarm prob for each model

### Data distributions

In [None]:
# hit rates and false alarm rates for each model

# MPT model

## Simulator

In [None]:
# TODO: Sample Params from hyperpriors then transform to [0,1] space applying stats.cdf()
# probit transformation = applying cumulative normal dist? -> same for SDT!!
# See Klauer (2010)

# probability = cumulative normal (param)

In [41]:
# Non-hierarchical modeling of single responses
d = 0.7 # TreeBUGS default
g = 0.5 # TreeBUGS default
size = 10

hits = RNG.binomial(n=1, p=(d + (1-d)*g), size=size) # p(hit|old) = d + (1-d)*g
false_alarms = RNG.binomial(n=1, p=((1-d)*g), size=size)# p(FA|new) = (1-d)*g

In [42]:
print(hits, np.mean(hits))
print(false_alarms, np.mean(false_alarms))

[1 1 1 1 1 1 1 1 1 0] 0.9
[0 0 0 0 0 1 0 0 0 0] 0.1
