# Parameter Recovery

This notebook conducts 2D parameter recovery simulations for modified Rachlin discount function ([Vincent, & Stewart, 2020](https://doi.org/10.1016/j.cognition.2020.104203)).

$$
V(R, D, k) = R \cdot \frac{1}{1+(k \cdot D)^s}
$$

where $R$ is a reward, delivered at a delay $D$. 

The parameters are:
- $k$ is the normally interpreted as the discount rate. Although technically in this case it is the product of the discount rate and the constant term in Steven's Power Law.
- $s$ is the exponent in Steven's Power Law.

**Important note:** In order for this to be a meaningful parameter recovery excercise then the data generating model defined in `generate_responses` _must_ be exactly the same model that is used for inference in `infer_parameters`.

In [None]:
import numpy as np
import pandas as pd
from scipy.stats import norm, bernoulli, uniform
import pymc3 as pm
import math

import matplotlib.pyplot as plt

%config InlineBackend.figure_format = 'retina'
plt.rcParams.update({"font.size": 14})
import matplotlib.ticker as ticker
from matplotlib.colors import hsv_to_rgb

# Initialize random number generator
np.random.seed(1234)

import sys

print(f"Python version: {sys.version}")
print(f"PyMC3 version: {pm.__version__}")

# Install Black autoformatter with: pip install nb-black
# %load_ext lab_black

## Define options for this notebook

In [None]:
# simulation options
n_simulations = 2
log_s_list = np.log([0.5, 1, 1.5, 2])
log_k_list = [-5, -4, -3, -2]
should_visualise = False

# export options
export = True
out_dir = "output/"

# PyMC3 inference options
sample_options = {
    "tune": 1000,
    "draws": 2000,
    "chains": 2,
    "cores": 2,
    "nuts_kwargs": {"target_accept": 0.95},
}

## Make 2D grid of true parameters

In [None]:
# make 2D grid of true parameters
param_grid = np.zeros((len(log_s_list), len(log_k_list)), dtype=object)
for row, logs in enumerate(log_s_list):
    for col, logk in enumerate(log_k_list):
        param_grid[row, col] = (logs, logk)

Create a corresponding set of colours, one for each parameter combination

In [None]:
def make_colours():
    # one hue, value for each logk value (column)
    hue_list = [19 / 360, 236 / 360, 88 / 360, 324 / 360]
    v_list = [0.745, 0.78, 0.63, 0.72]

    # one saturation for each logs value ()
    saturation_list = np.linspace(0.2, 1.0, len(log_s_list))

    cols = np.zeros((len(log_s_list), len(log_k_list)), dtype=object)

    for i, saturation in enumerate(saturation_list):
        for j, hue in enumerate(hue_list):
            cols[i, j] = hsv_to_rgb((hue, saturation, v_list[i]))

    # get list of hues (for each kappa) for the histograms
    hue_cols = cols[-1, :]
    return cols


cols = make_colours()

Visualise discount functions for these true parameter values.

In [None]:
def plot_true_discount_functions(ax=None):

    if ax is None:
        ax = plt.gca()

    D = np.linspace(0, 100, 1000)

    for row in range(len(log_s_list)):
        for col in range(len(log_k_list)):
            logs, logk = param_grid[row, col]
            s, k = np.exp(logs), np.exp(logk)
            y = 1 / (1 + (k * D) ** s)
            ax.plot(D, y, c=cols[row, col], lw=2, label="true")

    ax.set(xlabel="delay [seconds]", ylabel="$RA/RB$")
    return ax

In [None]:
plot_true_discount_functions()

## Code for the inference procedure

# TODO: ADD ALPHA AS A FREE PARAMETER

In [None]:
def infer_parameters(data):
    """Infer parameter values based on response data.
    Return the posterior mean parameter estimates"""

    model = generate_model(data)

    # do the inference
    with model:
        trace = pm.sample(**sample_options)

    return np.array([np.mean(trace["logs"]), np.mean(trace["logk"])])


def generate_model(data):
    """Generate a PyMC3 model with the given observed data"""

    # decant data
    R = data["R"].values
    RA, DA = data["RA"].values, data["DA"].values
    RB, DB = data["RB"].values, data["DB"].values

    with pm.Model() as model:
        # define priors
        logk = pm.Normal("logk", mu=np.log(1 / 30), sd=3)
        logs = pm.Normal("logs", mu=0, sd=1)

        VA = pm.Deterministic("VA", value_function(RA, DA, logk, logs))
        VB = pm.Deterministic("VB", value_function(RB, DB, logk, logs))
        P_chooseB = pm.Deterministic("P_chooseB", choice_psychometric(VB - VA))

        R = pm.Bernoulli("R", p=P_chooseB, observed=R)

    return model


# helper functions for the model


def value_function(reward, delay, logk, logs):
    """Calculate the present subjective value of a given prospect"""
    k = pm.math.exp(logk)
    s = pm.math.exp(logs)
    return reward / (1.0 + (k * delay) ** s)


def choice_psychometric(x, ϵ=0.01):
    # x is the decision variable
    return ϵ + (1.0 - 2.0 * ϵ) * (1 / (1 + pm.math.exp(-1.7 * (x))))

## Code to simulate an experiment

In [None]:
def simulate_experiment(params_true, ϵ=0.01):
    """Run a simulated experiment, returning simulated behavioural data"""
    designs = generate_designs()
    responses, _ = generate_responses(designs, params_true, ϵ)
    return pd.concat([designs, responses], axis=1)


def generate_designs():
    """Generate designs (RA, DA, RB, DB). This should precisely match the 
    set of questions we used in the actual experiment."""

    n = 50
    RA_vals = np.array([6, 12, 18, 24, 30, 36, 42, 48, 54, 60])
    DB_vals = np.array([7, 15, 29, 56, 101])

    # define constant values
    DA = np.zeros(n)
    RB = np.full(n, 60)

    # shuffle index for DB
    DB_index = np.arange(len(DB_vals))
    np.random.shuffle(DB_index)

    # fill remaining design dimensions by iterating over DB (shuffled) and RA
    DB = []
    RA = []
    for db_index in DB_index:
        for ra in RA_vals:
            DB.append(DB_vals[db_index])
            RA.append(ra)

    DB = np.array(DB)
    RA = np.array(RA)

    designs = pd.DataFrame({"RA": RA, "DA": DA, "RB": RB, "DB": DB})
    return designs


def generate_responses(designs, params_true, ϵ):
    """Generate simulated responses for the given designs and parameters"""

    # unpack designs
    RA = designs["RA"].values
    DA = designs["DA"].values
    RB = designs["RB"].values
    DB = designs["DB"].values

    # unpack parameters
    logs, logk = params_true

    k = np.exp(logk)
    s = np.exp(logs)

    VA = RA * (1 / (1 + (k * DA) ** s))
    VB = RB * (1 / (1 + (k * DB) ** s))
    decision_variable = VB - VA
    p_choose_B = ϵ + (1 - 2 * ϵ) * (1 / (1 + np.exp(-1.7 * decision_variable)))
    responses = bernoulli.rvs(p_choose_B)
    return pd.DataFrame({"R": responses}), p_choose_B

Example...

In [None]:
simulate_experiment((np.log(1), -2.0)).head()

In [None]:
def visualise(data, data_generating_params, recovered_params):
    """Visualise the results of a simulated experiment"""
    fig, ax = plt.subplots(figsize=(9, 6))
    plt.scatter(data.DB, data.RA / data.RB, c=data.R)

    D = np.linspace(0, 100, 1000)

    # plot recovered
    logs, logk = recovered_params[0], recovered_params[1]
    s, k = np.exp(logs), np.exp(logk)
    y = 1 / (1 + (k * D) ** s)
    plt.plot(D, y, "r", alpha=0.5, lw=2, label="recovered")

    # plot true
    logs, logk = data_generating_params
    s, k = np.exp(logs), np.exp(logk)
    y = 1 / (1 + (k * D) ** s)
    plt.plot(D, y, "k", lw=2, label="true")

    plt.legend()
    plt.show()

## Run the simulations

In [None]:
should_visualise

In [None]:
def many_simulations(
    data_generating_params, N_simulations=50, debug=False, should_visualise=True
):

    N_PARAMETERS = 2
    recovered_params = np.empty([N_simulations, N_PARAMETERS])

    for i in range(N_simulations):
        print(f"Simulation {i+1} of {N_simulations}")
        expt_data = simulate_experiment(data_generating_params)
        recovered_params[i, :] = infer_parameters(expt_data)
        print(recovered_params[i, :])
        if should_visualise:
            visualise(expt_data, data_generating_params, recovered_params[i, :])

    return (recovered_params, data_generating_params)

In [None]:
results = np.zeros((len(log_s_list), len(log_k_list)), dtype=object)
for row, log_s in enumerate(log_s_list):
    for col, log_k in enumerate(log_k_list):
        params = param_grid[row,col]
        results[row,col] = many_simulations(params, 
                                            N_simulations=n_simulations, 
                                            should_visualise=should_visualise)

Plot results

In [None]:
def plot_param_recovery(ax=None):

    if ax is None:
        ax = plt.gca()

    for row, _ in enumerate(log_s_list):
        for col, _ in enumerate(log_k_list):

            modified_rachlinθ, trueθ = results[row, col]

            # plot inferred value
            s = np.exp(modified_rachlinθ[:, 0])
            logk = modified_rachlinθ[:, 1]
            ax.scatter(x=logk, y=s, c=cols[row, col], alpha=0.4)

            # plot true value
            logs_true, logk_true = trueθ
            s_true = np.exp(logs_true)
            # plot true values
            ax.scatter(x=logk_true, y=s_true, c="k", label="true")

            ax.set_xlabel(r"$\log(k)$")
            ax.set_ylabel(r"$s$")

    return ax

Construct the final plot

In [None]:
fig, ax = plt.subplots(2, 1, figsize=(9, 14))

plot_true_discount_functions(ax[0])
plot_param_recovery(ax[1])


In [None]:
if export:
    fig.savefig(f'{out_dir}parameter_recovery_2D.pdf', bbox_inches='tight')

# References
- Vincent, B. T., & Stewart, N. (2020). The case of muddled units in temporal discounting. _Cognition_, 198, 1-11. https://doi.org/10.1016/j.cognition.2020.104203