In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
from sys import stdout
import seaborn as sns

In [None]:
## Reload stuff
from importlib import reload
import ezmc
reload(ezmc)
reload(ezmc.base)
reload(ezmc.metropolis)
import ezmc


# Reinforcement Learning

Let's work through a more realistic example.


## Simulate Data

In [None]:
0 

In [None]:
import itertools
from numba import jit

'''Helper functions'''
def invlogit(lo):
    o = np.exp(lo)
    return o / (1 + o)

def logit(p):
    o = p / (1 - p)
    return np.log(o)

def edge_correct(x, e):
    x = np.where(x < 0, 0, x)
    x = np.where(x >1,  1, x)
    # assert(np.all(x >= -e) & np.all(x <= 1+e))
    return x + e*(.5-x)

def generate_outcomes(ntrials=200, probs=[.1, .9], min_len=10, max_len=20):
    '''
    Generates outcomes for a reversal learning paradigm,
    where reward contingencies reverse every `min_len` to `max_len` trials.
    Returns a list of 0s and 1s.
    '''
    block_lengths = np.random.choice(range(min_len, max_len), 50)
    probs = itertools.cycle(probs)
    probabilities = np.concatenate([np.repeat(p, n)
                                    for p, n in zip(probs, block_lengths)])
    probabilities = probabilities[:ntrials]
    outcomes = np.random.binomial(1, probabilities)
    return outcomes


@jit() # Numba jit speeds this up considerably.
def rw_update(outcomes, learning_rate, start=.5):
    '''Rescorla-Wagner learning rule'''
    p = start
    predictions = np.zeros(len(outcomes))
    for i, o in enumerate(outcomes):
        predictions[i] = p
        pe = o - p
        p += pe * learning_rate
    return predictions

def get_agent_response_probs(outcomes, learning_rate, slope):
    '''
    Given a set of outcomes, a learning rate, and a decision slope (inverse temperature),
    how likely is the agent to respond 0 or 1?
    '''
    n = len(outcomes)
    beliefs = rw_update(outcomes, learning_rate)
    beliefs = edge_correct(beliefs, .001)
    action_probs = invlogit(logit(beliefs) * slope)
    return action_probs

def get_agent_responses(outcomes, learning_rate, slope):
    '''
    Simulates binary responses by a RL agent.
    '''
    action_probs = get_agent_response_probs(outcomes, learning_rate, slope)
    responses = np.random.binomial(1, action_probs)
    return responses, action_probs


In [None]:
# true_pars = [.5, 0.] # Learning rate, Slope
# true_pars = [1, 1.] # Learning rate, Slope
true_pars = [.2, 1.]

outcomes = generate_outcomes(ntrials=1000, probs=[.2, .8], min_len=10, max_len=20)
responses, action_probs = get_agent_responses(outcomes, *true_pars)

t = range(len(outcomes))
plt.figure(figsize=(18, 3))
plt.plot(t, outcomes, 'o', label='Outcomes')
plt.plot(t, .1 + (responses * .8), 'o', label='Responses')
plt.plot(t, action_probs, label='Belief [P(Outcome)]')
plt.xlabel('Trials')
plt.ylabel('')
plt.legend()

In [None]:
def log_likelihood(pars, outcomes, responses):
    '''
    This is the key function.
    Arguments are a vector of parameters, a list of outcomes and a list of responses.
    How likely is a RL agent governed by these parameters to produce these responses,
    given these outcomes?
    '''
    learning_rate, slope = pars
    action_probs = get_agent_response_probs(outcomes, learning_rate, slope)
    lik = np.where(responses==1, action_probs, 1 - action_probs)
    return np.sum(np.log(lik))

log_likelihood(true_pars, outcomes, responses)

## Sampling over Grid

This model is pretty simple (only two parameters),
and the log-likelihood function is very fast to run.
This makes it possible to evaluate the log-likelihood across a large grid of parameters,
without the need for MCMC sampling.
Let's do this for comparison.

In [None]:
import itertools
npoints = 101
param_grid = pd.DataFrame(itertools.product(np.linspace(0, 1.5, npoints), # Possible learning rates
                                            np.linspace(0, 2, npoints)),  # Possible slopes
                         columns=['rate', 'slope'])

def f(pars):
    return log_likelihood(pars.values, outcomes, responses)

## Get log-likelihood for each rate x slope combination.
ll = [f(p) for i, p in param_grid.iterrows()]
ll = np.array(ll)
## Replace NaN and -inf with the smallest finite value.
mask = (np.isnan(ll)) | (ll == -np.inf)
ll[mask] = ll[~mask].min()
param_grid['ll'] = ll
## Transform log-likelihoods into normalised probabilties.
prob_adj = np.exp(ll - np.max(ll))
param_grid['prob_adj'] = prob_adj / prob_adj.sum()

In [None]:
X = param_grid.pivot_table(index='rate', columns='slope', values='ll')
X.columns = np.array(X.columns).round(2)
X.index = np.array(X.index).round(2)
sns.heatmap(X , cmap='jet', vmin=None)
plt.gca().invert_yaxis()
plt.xlabel('Slope')
plt.ylabel('Learning Rate')
plt.title('Log-Likelihood')

In [None]:
plt.title('Log-Likelihood (trucuated scale)')
sns.heatmap(X , cmap='jet', vmin=-800)
plt.gca().invert_yaxis()
plt.xlabel('Slope')
plt.ylabel('Learning Rate')


In [None]:
## Transform the existing matrix
sns.heatmap(np.exp(X - X.max().max()), cmap='jet')
plt.gca().invert_yaxis()
plt.xlabel('Slope')
plt.ylabel('Learning Rate')
plt.title('Likelihood')

## Or use the existing column
# X = param_grid.pivot_table(index='rate', columns='slope', values='prob_adj')
# X.columns = np.array(X.columns).round(2)
# X.index = np.array(X.index).round(2)
# sns.heatmap(X , cmap='jet', vmin=None)
# plt.gca().invert_yaxis()
# plt.xlabel('Slope')
# plt.ylabel('Learning Rate')

We can also sample values directly. from this grid (with some noise) to obtain posterior samples 
comparable to what we would obtain from MCMC sampling.

In [None]:
## Sample columns with probabilities proportional to their likelihoods.
p = np.exp(ll  - np.max(ll))
sample_indices = np.random.choice(param_grid.index, size=1000, replace=True, p=p/p.sum())
grid_posterior = param_grid.iloc[sample_indices]
def r(): # Add noise
    return np.random.normal(0, .0001, 1000)

def draw_x(x, y):
    plt.text(x, y, 'x',
             horizontalalignment='center', verticalalignment='center',
             fontdict={'color':'red', 'size':20})

In [None]:
sns.kdeplot(grid_posterior['slope'] + r(), grid_posterior['rate'] + r(), shade=False)
plt.xlabel('Slope')
plt.ylabel('Learning Rate')
plt.xlim(0, 2)
plt.ylim(0, 1)
draw_x(true_pars[1], true_pars[0])

In [None]:
plt.scatter(grid_posterior['slope'] + r(), grid_posterior['rate'] + r(), alpha=.2)
plt.xlabel('Slope')
plt.ylabel('Learning Rate')
plt.xlim(0, 2)
plt.ylim(0, 1)
draw_x(true_pars[1], true_pars[0])

In [None]:
grid_posterior[['rate', 'slope']].hist()

In [None]:
m = np.abs(grid_posterior['slope'] - true_pars[1]) < .025
grid_posterior.loc[m, 'rate'].hist()

In [None]:
a, b = [grid_posterior[v] for v in ['rate', 'slope']]
plt.scatter(a-b, a+b)
plt.xlabel('<- Learning noise | Decision noise ->')
plt.ylabel('Sensitivity')
plt.vlines(0, linestyle='dashed', *plt.ylim())


## Sample

In [None]:
def init_func():
    a = np.random.uniform(0, .5)
    b = np.random.uniform(0, .5)
    return [a, b]

def f(pars):
    log_prior = np.sum(stats.norm.logpdf(pars, 0, 2))
    ll = log_likelihood(pars, outcomes, responses)
    if np.isnan(ll):
        return  log_prior - 1e+5
    else:
        return log_prior + ll
sampler = ezmc.MetropolisSampler(func=f,
                                 par_names=['learning_rate', 'slope'],
                                 n_chains=4,
                                 init_func=init_func, proposal_sd=.05,
                                verbose=5)

In [None]:
sampler.sample_chains(n=6000)

In [None]:
chains = sampler.get_chains()
fig = ezmc.viz.traceplot(chains);

In [None]:
# results = sampler.get_results(burn_in=2000, thin=20)
results = sampler.get_results(burn_in=1000, thin=4)
fig = ezmc.viz.traceplot(results, sampler.par_names);

In [None]:
import arviz as az
posterior = sampler.to_arviz(burn_in=1000, thin=10)

In [None]:
az.plot_trace(posterior)

In [None]:
az.plot_autocorr(posterior);

In [None]:
az.plot_joint(posterior, kind='scatter', joint_kwargs=dict(alpha=.1))

In [None]:
az.plot_joint(posterior, kind='kde')

In [None]:
az.plot_forest(posterior, kind='ridgeplot',
               linewidth=1, combined=True, ridgeplot_overlap=1, colors='skyblue',
               figsize=(9, 4))

In [None]:
results[sampler.par_names].hist()

In [None]:
estimates = results.mean()
estimates_se = results.std()
# trans_est_pars = [estimates[p] for p in sampler.par_names]
# est_pars = untransform_pars(trans_est_pars)
est_pars = [estimates[p] for p in sampler.par_names]
est_pars

In [None]:
estimates_se

In [None]:
sns.pairplot(results[sampler.par_names + ['ll']])

In [None]:
sim_responses, sim_action_probs = get_agent_responses(outcomes, *est_pars)

t = range(len(outcomes))
plt.figure(figsize=(18, 3))
plt.plot(t, outcomes, 'o', label='Outcomes')
plt.plot(t, .1 + (responses * .8), 'o', label='Responses')
plt.plot(t, action_probs, label='True Belief')
plt.plot(t, sim_action_probs, label='Estimated Belief')
plt.xlabel('Trials')
plt.ylabel('')
plt.legend()

In [None]:
plt.figure(figsize=(18, 3))
for i, row in results.sample(100).iterrows():
    p = row[sampler.par_names].values
    _, sim_action_probs = get_agent_responses(outcomes, *p)
    plt.plot(t, sim_action_probs, label='__none__', alpha=.1, color='b')
plt.plot(t, sim_action_probs, label='Estimated Belief', alpha=.1, color='b') # Plot again to get legend
plt.plot(t, action_probs, label='True Belief', color='r')
plt.legend()
plt.show()