# TD learning; the agent that can see
# Remember to check the number of samples for alpha and beta


in this version we only update q-values based on different rewards. that's it!


fitting: I used 100 values for alpha (learning rate) and beta (inverse temperature) from Beta and Gamma distributions, respectively based on Salman paper. It trains the model using each pair and computes the log-likelihood of the observed choices given the predicted probabilities. highest log likelihood, shows the best alpha beta



evaluation:  model is run again using the best alpha beta and giving confusion matrix


In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import statsmodels.api as sm
import scipy.stats as stats
import seaborn as sns
from sklearn.metrics import confusion_matrix
import seaborn as sns
np.random.seed(42)
from scipy.optimize import minimize
from joblib import Parallel, delayed


In [None]:
folder_path = 'data_risk_added'
dataframes = [pd.read_excel(os.path.join(folder_path, file)) for file in os.listdir(folder_path) if file.endswith('.xlsx')]

n_participant = len(dataframes)
print(f"there are {n_participant} participants.")


output_dir = "10_RL_agent_TDlearn_output"
os.makedirs(output_dir, exist_ok=True)


dataframes[0].head()

actions = { "arrowdown": 0, "arrowup": 1}
# Q_table = np.array([0.0, 0.0])  # for actions [arrowdown, arrowup]
Q_table_init = np.random.normal(0, 0.1, 2)
Q_table = Q_table_init.copy()
Q_table = np.array(Q_table)

In [None]:
def softmax(Q_values, beta):
    Q_shifted = Q_values - np.max(Q_values)
    exps = np.exp(beta * Q_shifted)
    sums = np.sum(exps)
    new_probs = exps / sums

    return new_probs


def rescorla_wagner_update(Q_values, action, reward, alpha):
    prediction_error = reward - Q_values[action]
    Q_values[action] += alpha * prediction_error
    return Q_values


def train_rescorla_wagner(df, alpha, beta, Q_init=None):
    
    if Q_init is None:
        Q_init = Q_table.copy()
    Q_values = Q_init.copy()
    
    q_value_pairs = []
    choices = []
    predicted_probs = []
    
    for _, row in df.iterrows():
        action = actions[row["choice"]] 
        reward = 0.5 if row["outcome"] == "win" else -0.5
        
        probs = softmax(Q_values, beta)
        predicted_probs.append(probs[action])
        
        Q_values = rescorla_wagner_update(Q_values, action, reward, alpha)
        
        q_value_pairs.append(Q_values.copy())
        choices.append(action)

    return np.array(q_value_pairs), np.array(choices), np.array(predicted_probs)

def compute_log_likelihood(alpha, beta, df_all, Q_table):
    Q_init_participant = Q_table.copy()
    q_values, choices, predicted_probs = train_rescorla_wagner(df_all, alpha, beta, Q_init=Q_init_participant)
    
    predicted_probs = np.clip(predicted_probs, 1e-6, 1)  # prevent log(0)
    log_likelihood = np.sum(np.log(predicted_probs))
    
    return (alpha, beta, log_likelihood)

In [None]:
BIC_models = []

for idx, df_all in enumerate(dataframes):

    Q_init_participant = Q_table.copy()
    
    df_all = df_all[df_all['outcome'].str.lower() != 'na']  

    num_of_samples = 50
    alpha_samples = np.random.uniform(0.01, 1, num_of_samples) 
    beta_samples = np.random.uniform(0, 8, num_of_samples)  


    best_alpha, best_beta = None, None
    best_log_likelihood = -np.inf


    alpha_beta_log_likelihood = {}


    results = Parallel(n_jobs=-1)(delayed(compute_log_likelihood)(alpha, beta, df_all, Q_table) 
                                for alpha in alpha_samples for beta in beta_samples)


    alpha_beta_log_likelihood = {}
    best_log_likelihood = -np.inf
    best_alpha, best_beta = None, None

    for alpha, beta, log_likelihood in results:
        alpha_beta_log_likelihood[(alpha, beta)] = log_likelihood
        if log_likelihood > best_log_likelihood:
            best_log_likelihood = log_likelihood
            best_alpha, best_beta = alpha, beta



    results_df = pd.DataFrame(alpha_beta_log_likelihood.keys(), columns=["alpha", "beta"])
    results_df["log_likelihood"] = alpha_beta_log_likelihood.values()

    #  model prediction 
    q_values, choices, predicted_probs = train_rescorla_wagner(df_all, best_alpha, best_beta,  Q_init=Q_init_participant)
    # how model chooses action
    predicted_choices = (q_values[:, 1] > q_values[:, 0]).astype(int) # if q_values[:, 1] is greater, label the predicted choice as 1 as uparrow; otherwise, label it as 0. 


    # confusion matrix
    conf_matrix = confusion_matrix(choices, predicted_choices)
    
    # bayes information criterion
    n_trials = len(df_all)
    k = 2  # number of free parameters: alpha and beta
    BIC = k * np.log(n_trials) - 2 * best_log_likelihood
    
    BIC_models.append(BIC)

    ###########################################################################################
    ## visulization

    fig, axes = plt.subplots(1, 3, figsize=(18, 6))


    sns.kdeplot(x=results_df["alpha"], y=results_df["beta"], fill=True, cmap="viridis", ax=axes[0])
    mappable = axes[0].collections[0]
    fig.colorbar(mappable, ax=axes[0], label="density")
    axes[0].set_xlabel("learning rate (alpha)")
    axes[0].set_ylabel("inverse temp (beta)")
    axes[0].set_title("density of alpha beta joint probability")


    scatter = sns.scatterplot(x=results_df["alpha"], y=results_df["beta"], 
                            size=results_df["log_likelihood"], hue=results_df["log_likelihood"], 
                            palette="Blues", ax=axes[1], legend=False) 

    norm = plt.Normalize(results_df["log_likelihood"].min(), results_df["log_likelihood"].max())
    sm = plt.cm.ScalarMappable(cmap="Blues", norm=norm)
    sm.set_array([])
    cbar = plt.colorbar(sm, ax=axes[1], label="log likelihood")

    axes[1].set_ylabel("inverse temp (beta)")
    axes[1].set_xlabel("learning rate (alpha)")
    axes[1].set_title("log likelihood scatterplot")




    sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues",
                xticklabels=["arrowdown", "arrowup"], 
                yticklabels=["arrowdown", "arrowup"], ax=axes[2], cbar=False)
    axes[2].set_xlabel("prediction")
    axes[2].set_ylabel("true label")
    axes[2].set_title(f"confusion matrix (α={best_alpha:.2f}, β={best_beta:.2f})")
    axes[2].text(0.5, -0.3, f"BIC: {BIC:.2f}", fontsize=12, fontweight="bold",
                ha="center", va="center", transform=axes[2].transAxes)
    


    plt.tight_layout(rect=[0, 0, 1, 0.9]) 
    fig.suptitle(f'participant {idx}', fontsize=16, fontweight='bold')

    filename = os.path.join(output_dir, f"plot_{idx}.pdf")
    plt.savefig(filename, format='pdf')
    plt.close(fig)

    print(f"saved: {filename}")




# saving BIC_models to compare models:
file_path_BIC = os.path.join(output_dir, "BIC_models_blind.txt")

with open(file_path_BIC, "w") as file:
    for bic in BIC_models:
        file.write(f"{bic}\n")

print(f"BIC values saved to {file_path_BIC}")