# win stay lose shift

In [1]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import statsmodels.api as sm
import scipy.stats as stats
import seaborn as sns
from sklearn.metrics import confusion_matrix
np.random.seed(42)
from joblib import Parallel, delayed
import matplotlib.tri as tri
import matplotlib.colors as mcolors
from scipy.interpolate import griddata
from scipy.interpolate import RBFInterpolator
import matplotlib.ticker as mticker
import itertools
from sklearn.metrics import r2_score


In [2]:
output_dir = "27_RL_agent_TDlearn_output_both_param_recovery"
os.makedirs(output_dir, exist_ok=True)


folder_path_participants = 'data_risk_added_epileptic'
folder_path_colors_numbers = '13_RL_agent_TDlearn_output_wsls/model_behavior'


df_participants = []
df_colors_numbers = []


def find_matching_csv(folder_path, df_list):
            for csv_file in os.listdir(folder_path):
                if clean_name in csv_file and csv_file.endswith('.csv'):
                    csv_path = os.path.join(folder_path, csv_file)
                    df_csv = pd.read_csv(csv_path)
                    df_list.append(df_csv)





for file_name in os.listdir(folder_path_participants):
    if file_name.endswith('.csv'):
        file_path = os.path.join(folder_path_participants, file_name)
        df = pd.read_csv(file_path)
        df = df[df['outcome'].str.lower() != 'na'].reset_index(drop=True) 
        df_participants.append(df)

        clean_name = file_name.removeprefix("task_data_").removesuffix(".csv")
        find_matching_csv(folder_path_colors_numbers, df_colors_numbers)


In [3]:
for df in df_participants:
    df['block_type'] = None

    df.loc[df['block'] == 1, 'block_type'] = 'uniform'     # Block 1 is uni
    df.loc[df['block'] == 4, 'block_type'] = 'mix'     # Block 4 is mix

    # For blocks 2 and 3, set based on distribution
    df.loc[(df['block'] == 2) & (df['distribution'] == 'low'), 'block_type'] = 'low'
    df.loc[(df['block'] == 2) & (df['distribution'] == 'high'), 'block_type'] = 'high'
    df.loc[(df['block'] == 3) & (df['distribution'] == 'low'), 'block_type'] = 'low'
    df.loc[(df['block'] == 3) & (df['distribution'] == 'high'), 'block_type'] = 'high'
    



for i in range(len(df_participants)):
    myCard = df_participants[i]['myCard']
    yourCard = df_participants[i]['yourCard']
    distributions = df_participants[i]['distribution']
    block_type = df_participants[i]['block_type']
    
    for df_list in [ df_colors_numbers]:
        df_list[i]['myCard'] = myCard
        df_list[i]['yourCard'] = yourCard
        df_list[i]['distribution'] = distributions
        df_list[i]['block_type'] = block_type

In [4]:
for df in df_colors_numbers:
    df['model_choices'] = df['model_choices'].replace({1: 'arrowup', 0: 'arrowdown'})

In [5]:
for df in df_colors_numbers:
    outcomes = []
    for i in range(len(df)):
        my = df.loc[i, 'myCard']
        your = df.loc[i, 'yourCard']
        choice = df.loc[i, 'model_choices']
        
        if ((my > your and choice == "arrowup") or (my < your and choice == "arrow_down")):
            outcomes.append('win')
        else:
            outcomes.append('lose')
    
    df['outcome'] = outcomes

In [6]:
participants = [os.path.splitext(file)[0].replace("task_data_", "")
    for file in os.listdir(folder_path_participants) if file.endswith('.csv')]

### alpha = p_stay_after_win
### beta = p_shift_after_lose

In [7]:
num_of_samples = 100
# num_of_samples = 1000
alpha_min = 0
alpha_max = 1
beta_min = 0
beta_max  = 1
alpha_samples = np.random.uniform(alpha_min, alpha_max + np.finfo(float).eps, num_of_samples)
beta_samples = np.random.uniform(beta_min, beta_max + np.finfo(float).eps, num_of_samples)

actions = { "arrowdown": 0, "arrowup": 1}
distributions_map = { "uniform": 0, "low": 1,  "high": 2}

In [8]:
# this is the probability that the model assigns to the actual choice the participant made on a given trial, based on the model's current parameters and the outcome of the previous trial.

def get_wsls_probability(prev_choice, current_choice, prev_reward, p_stay_win, p_shift_loss):
    if prev_reward > 0:  # win
        return p_stay_win if current_choice == prev_choice else 1 - p_stay_win
    else:  # lose
        return p_shift_loss if current_choice != prev_choice else 1 - p_shift_loss



def simulate_wsls_choices(prev_choice, rewards, p_stay_win, p_shift_loss):
    n_trials = len(rewards)
    choices = []
    for t in range(n_trials):             
        if t > 0:                          
            prev_reward = rewards[t-1]
            stay_prob = p_stay_win if prev_reward > 0 else 1 - p_shift_loss
            switch_prob = 1 - stay_prob
            probs = [stay_prob, switch_prob] if prev_choice == 0 else [switch_prob, stay_prob]
            prev_choice = np.random.choice([0,1], p=probs)
        choices.append(prev_choice)
    return np.array(choices)

In [9]:
BIC_models = []
AIC_models = []
best_alpha_models = []
best_beta_models = []
accuracy_models = []
precision_models = []
sensitivity_recall_models = []
specificity_models = []
f1_score_models = []
mcFadden_r2_models = []
r2_models = []



# participants loop
for idx, df_all in enumerate(df_colors_numbers):
    print(f"Processing participant {idx + 1} of {len(df_colors_numbers)}")
    df_all = df_all[df_all['outcome'].str.lower() != 'na'].reset_index(drop=True)
    rewards = df_all['outcome'].apply(lambda x: 1 if x.lower() == 'win' else 0).values
    true_choices = df_all['model_choices'].map(actions).values
    trials_myCard = df_all["myCard"]
    trials_myCard_unique = df_all["myCard"].unique()
    trials_distribution = df_all['distribution'].map(distributions_map).values




    best_alpha, best_beta = None, None
    best_log_likelihood = -np.inf

    results = []
    for alpha in alpha_samples:
        for beta in beta_samples:
            log_likelihood = 0

            prev_choice = np.full((len(distributions_map), len(trials_myCard_unique)), np.nan)

            prev_choice[trials_distribution[0], trials_myCard[0]-1] = true_choices[0]

            prev_reward = np.full((len(distributions_map), len(trials_myCard_unique)), np.nan)


            for t in range(1, len(rewards)): # trial by trial

                prev_reward[trials_distribution[t - 1], trials_myCard[t - 1]-1] = rewards[t - 1]

        
                prob = get_wsls_probability(prev_choice[trials_distribution[t - 1],trials_myCard[t - 1]-1], true_choices[t], prev_reward[trials_distribution[t - 1], trials_myCard[t - 1]-1], alpha, beta)
                prob = np.clip(prob, 1e-6, 1 - 1e-6) # avoid log(0)
                log_likelihood += np.log(prob)
                prev_choice[trials_distribution[t - 1], trials_myCard[t]-1] = true_choices[t]
            
            
            results.append((alpha, beta, log_likelihood))
                

            if log_likelihood > best_log_likelihood:
                best_alpha, best_beta = alpha, beta
                best_log_likelihood = log_likelihood



    best_predicted_choices = simulate_wsls_choices(true_choices[0], rewards, best_alpha, best_beta)

    conf_matrix = confusion_matrix(true_choices, best_predicted_choices)
    TN, FP, FN, TP = conf_matrix.ravel()
    accuracy = (TP + TN) / (TP + TN + FP + FN)
    precision = TP / (TP + FP) if (TP + FP) else 0
    recall = TP / (TP + FN) if (TP + FN) else 0
    specificity = TN / (TN + FP) if (TN + FP) else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) else 0
    n_trials = len(df_all)
    k = 2
    BIC = k * np.log(n_trials) - 2 * best_log_likelihood
    AIC = 2 * k - 2 * best_log_likelihood
    p_null = np.mean(true_choices)
    p_null  = np.clip(p_null, 1e-6, 1 - 1e-6)
    log_likelihood_null = np.sum(true_choices * np.log(p_null) + (1 - true_choices) * np.log(1 - p_null))
    mcFadden_r2 = 1 - (best_log_likelihood / log_likelihood_null)
    r2 = r2_score(true_choices, best_predicted_choices)

    best_alpha_models.append(best_alpha)
    best_beta_models.append(best_beta)
    BIC_models.append(BIC)
    AIC_models.append(AIC)
    accuracy_models.append(accuracy)
    precision_models.append(precision)
    sensitivity_recall_models.append(recall)
    specificity_models.append(specificity)
    f1_score_models.append(f1)
    mcFadden_r2_models.append(mcFadden_r2)
    r2_models.append(r2)



    total_reward = []
    for i in range(len(best_predicted_choices)):
        last_reward = total_reward[-1] if total_reward else 10
        if ((df_all.loc[i, 'myCard'] > df_all.loc[i, 'yourCard'] and best_predicted_choices[i] == 1) or
            (df_all.loc[i, 'myCard'] < df_all.loc[i, 'yourCard'] and best_predicted_choices[i] == 0)):
            total_reward.append(last_reward + 0.5)
        else:
            total_reward.append(last_reward - 0.5)





Processing participant 1 of 7
Processing participant 2 of 7
Processing participant 3 of 7
Processing participant 4 of 7
Processing participant 5 of 7
Processing participant 6 of 7
Processing participant 7 of 7


# now saving the model evaluation values

In [10]:
summary_df = pd.DataFrame({
    "participants": participants,
    "best_alpha": best_alpha_models,
    "best_beta": best_beta_models,
    "BIC": BIC_models,
    "AIC": AIC_models,
    "accuracy": accuracy_models,
    "precision": precision_models,
    "sensitivity_recall": sensitivity_recall_models,
    "specificity": specificity_models,
    "f1_score": f1_score_models,
    "mcFadden_r2": mcFadden_r2_models,
    "r2": r2_models
})

summary_path = os.path.join(output_dir, "models_evaluation_wsls.csv")
summary_df.to_csv(summary_path, index=False)
