# Example 3: Estimating Empirical Priors

In the following example, we will walkthrough how to estimate empirical priors for reinforcement learning model based on Gershman (2016).
We will apply this for the 2-armed bandit task we looked at in the previous example.

Let us look at what each column represents:

- `index`: variable to identify each row - this variable is clutter.
- `left`: the stimulus presented on the left side.
- `right`: the stimulus presented on the right side.
- `reward_left`: the reward received when the left stimulus is selected.
- `reward_right`: the reward received when the right stimulus is selected.
- `ppt`: the participant number.
- `responses`: the response of the participant (1 for right, 0 for left).

## Import the data

First, we will import the data and get it ready for the toolbox.

In [None]:
import matplotlib.cm as cmx
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from prettyformatter import pprint

experiment = pd.read_csv('bandit_small.csv', header=0)
experiment.head(10)

Here, we will quickly add a column `observed` to specify for the toolbox, what is the dependent variable we actually want to predict.



In [None]:
experiment['observed'] = experiment['responses'] - 1

## The model

Let us quickly recap through the model we will use.

Each stimulus has an associated value, which is the expected reward that can be obtained from selecting that stimulus.

Let $Q(a)$ be the estimated value of action $a$.
On each trial, $t$, there are two stimuli present, so that $Q(a)$ could be $Q(\text{left})$ or $Q(\text{right})$, where the corresponding Q-values are derived from the associated value of the stimuli present on left or right.
More formally, we can say that the expected value of the action $a$ selected at time $t$ is given by:

$$
Q_t(a) = \mathbb{E}[R_t | A_t = a]
$$

where $R_t$ is the reward received at time $t$, and $A_t$ is the action selected at time $t$.

On each trial $t$, the Softmax policy will select an action (left or right) based on the following policy:

$$
P(a_t) = \frac{e^{Q_{a,t} \epsilon}}{\sum_{i = 1}^{k}{e^{Q_{i,t} \epsilon}}}
$$

where $\epsilon$ is the inverse temperature parameter, and $Q_{a,t}$ is the estimated value of action $a$ at time $t$.
$k$ is the number of actions available, and in our case, $k = 2$.
So, on each trial, the model will select an action (left or right) based on the following policy:

$$
A_t = \begin{cases}
\text{left} & \text{with probability } P(a_{\text{left}}) \\
\text{right} & \text{with probability } P(a_{\text{right}})
\end{cases}
$$

where $A_t$ is the action selected at time $t$, and $Q_t(a)$ is the estimated value of action $a$ at time $t$.

The model will calculate the prediction error according to the following learning rule:

$$
\Delta Q_t(A_t) = \alpha \times \Big[ R_t - Q_t(A_t) \Big]
$$

where $\alpha$ is the learning rate, and $R_t$ is the reward received at time $t$.
Note that this rule is often reported as the Rescorla-Wagner learning rule, where for each action/outcome, the prediction error is the difference between the reward received and the sum of all values present on each trial for that action.
In our case, we only have one value for each action, so the Rescorla-Wagner learning rule is reduced to the Bush and Mosteller separable error term.
Q-values are then updated as follows:

$$
Q_{t+1}(A_t) = Q_t(A_t) + \Delta Q_t(A_t)
$$

In [None]:
from cpm.generators import Parameters, Value

## define our parameters
parameters = Parameters(
    # freely varying parameters are indicated by specifying priors
    alpha=Value(
        value=0.5,
        lower=1e-10,
        upper=1,
        prior="truncated_normal",
        args={"mean": 0.6, "sd": 0.20}, # we pick some random values here for the true prior
    ),
    temperature=Value(
        value=1,
        lower=0,
        upper=10,
        prior="truncated_normal",
        args={"mean": 2, "sd": 2.5}, # we pick some random values here for the true prior
    ),
    # everything without a prior is part of the initial state of the model or fixed constructs (e.g. exemplars in general-context models of categorization)
    values = np.array([0.25, 0.25, 0.25, 0.25]))

In [None]:
from cpm.models import learning, decision, utils

## define our models
def model(parameters, trial):
    # pull out the parameters
    alpha = parameters.alpha
    temperature = parameters.temperature
    values = np.array(parameters.values) # copy essentially prevents us from accidentally overwriting the original values
    # pull out the trial information
    stimulus = np.array([trial.stimulus_left, trial.stimulus_right]).astype(int)
    feedback = np.array([trial.reward_left, trial.reward_right])
    human_choice = trial.responses.astype(int) - 1 # convert to 0-indexing
    mute = np.zeros(4)  # mute learning for all cues not presented

    # activate the value of each available action
    # here there are two possible actions, that can take up on 4 different values
    # so we subset the values to only include the ones that are activated...
    # ...according to which stimuli was presented
    activation = values[stimulus - 1]
    # convert the activations to a 2x1 matrix, where rows are actions/outcomes
    activations = activation.reshape(2, 1)
    # calculate a policy based on the activations
    response = decision.Softmax(activations=activations, temperature=temperature)
    response.compute() # compute the policy
    if np.isnan(response.policies).any():
        # if the policy is NaN for a given action, then we need to set it to 1
        print(f"NaN in policy with parameters: {alpha.value}, {epsilon.value}\n")
        print(response.policies)
        response.policies[np.isnan(response.policies)] = 1

    model_choice = response.choice() # get the choice based on the policy
    reward = feedback[human_choice] # get the reward of the chosen action
    # update the value of the chosen action
    mute[stimulus[human_choice] - 1] = 1 # unmute the learning for the chosen action
    teacher = np.array([reward])
    update = learning.SeparableRule(weights=values, feedback=teacher, input=mute, alpha=alpha)
    update.compute()
    values += update.weights.flatten()
    ## compile output
    output = {
        "trial"    : trial.trial.astype(int),               # trial number
        "policy"   : response.policies,         # policies
        "response" : model_choice,              # choice based on the policy
        "reward"   : reward,                    # reward of the chosen action
        "values"   : values,                    # updated values
        "change"   : update.weights,            # change in the values
        "activation" : activations.flatten(),     # activation of the values
        "dependent"  : np.array([response.policies[1]]),        # dependent variable
    }
    return output

In [None]:
## set up wrappers and run the model to check for bugs
from cpm.generators import Simulator, Wrapper

wrapper = Wrapper(model=model, parameters=parameters, data=experiment[experiment.ppt == 1])
wrapper.run() # run the model

output = wrapper.export() # export sim data into pandas dataframe
output.tail() # check for output

## Simulate the data

In this section, we will generate some data from the model we defined above.
We will use the toolbox to simulate the data and then estimate the empirical priors from the simulated data.

In [None]:
true = parameters.sample(experiment.shape[0]) # sample a set of parameters

simulator = Simulator(wrapper=wrapper,
                      parameters=fit.parameters, # parameters from the optimiser object is directly accessible for simulation
                      data=experiment.groupby("ppt")) # data grouped by participants
simulator.run()

generated = simulator.export()
experiment.response = generated.responses

## Compare the hierarchical vs non-hierarchical model

In [None]:
## Set up the fits
from cpm.optimisation import minimise, FminBound

fit = FminBound(
    model=wrapper,  # Wrapper class with the model we specified from before
    data=experiment,  # the data as a list of dictionaries
    minimisation=minimise.LogLikelihood.bernoulli,
    parallel=False,
    prior=False,
    ppt_identifier="ppt",
    display=False,
    number_of_starts=5,
    # everything below is optional and passed directly to the scipy implementation of the optimiser
    approx_grad=True
)
fit.optimise()

In [None]:
## we now set up the hierarchical fitting
from cpm.hierarchical import EmpiricalBayes as hierarchical

fit_with_prior = FminBound(
    model=wrapper,  # Wrapper class with the model we specified from before
    data=experiment,  # the data as a list of dictionaries
    minimisation=minimise.LogLikelihood.bernoulli,
    parallel=False,
    prior=True,
    ppt_identifier="ppt",
    display=False,
    number_of_starts=5,
    # everything below is optional and passed directly to the scipy implementation of the optimiser
    approx_grad=True
)
eb = hierarchical(optimiser=fit, iteration=20, chain=2)
# disable numpy errors - there is some out-of-bounds errors in the softmax function that we can ignore
np.seterr(all="ignore")
eb.optimise()

Next, we will extract the best performing hyperparameters for the priors and reinitialise the model.

In [None]:
hyperparameters = eb.hyperparameters
hyper = hyperparameters[hyperparameters.lme == hyperparameters.lme.max()]
means = [hyper[hyper.parameter == "alpha"].squeeze()['mean'],
         hyper[hyper.parameter == "beta"].squeeze()['mean']]

ssd = [hyper[hyper.parameter == "alpha"].squeeze()['sd'],
       hyper[hyper.parameter == "beta"].squeeze()['sd']]

In [None]:
## redefine parameters
estimated_parameters = Parameters(
    # freely varying parameters are indicated by specifying priors
    alpha=Value(
        value=0.5,
        lower=1e-10,
        upper=1,
        prior="truncated_normal",
        args={"mean": means[0], "sd": ssd[0]},
    ),
    temperature=Value(
        value=1,
        lower=0,
        upper=10,
        prior="truncated_normal",
        args={"mean": means[1], "sd": ssd[1]},
    ),
    # everything without a prior is part of the initial state of the model or fixed constructs (e.g. exemplars in general-context models of categorization)
    values = np.array([0.25, 0.25, 0.25, 0.25]))

## reinitialise the model
wrapper_with_prior = Wrapper(model=model, parameters=estimated_parameters, data=experiment[experiment.ppt == 1])
fit_with_prior = FminBound(
    model=wrapper,  # Wrapper class with the model we specified from before
    data=experiment,  # the data as a list of dictionaries
    minimisation=minimise.LogLikelihood.bernoulli,
    parallel=False,
    prior=True,
    ppt_identifier="ppt",
    display=False,
    number_of_starts=5,
    # everything below is optional and passed directly to the scipy implementation of the optimiser
    approx_grad=True
)
fit_with_prior.optimise()