In [1]:
import numpy as np
import pandas as pd
import random
from scipy.io import loadmat

In [2]:
# np.random.seed(123)

In [23]:
def sample_Bernouli(theta = .5, n_samples = 1):
    """
    Generating samples at random from Bernouli density funtion
    """
    return (np.random.rand(n_samples) <= theta).astype(int)
def shuffle_Binomial():
    """Shuffling bionomial number"""
    data_bionom = np.concatenate([np.ones(21), np.zeros(21)]).astype(int)
    np.random.shuffle(data_bionom)
    
    return data_bionom

def shuffle_Amt(run = 1):
    idx = np.linspace(1, 84, 84).astype(int)
    amt = np.round(np.linspace(1,100,84)).astype(int)
    if run == 1:
        amt_output = amt[idx%2 == 1]
        np.random.shuffle(amt_output)
    elif run == 2:
        amt_output = amt[idx%2 == 0]
        np.random.shuffle(amt_output)
    else:
        amt_output = -1
        
    return amt_output

def task_act_stim(isActFirst = True):
    """
    This function performs the task desgin for Action an Stimulus Values learning
    as Probabilistic Reiforcement Learning task to test the relative contribution of
    Action and Rewarding mechanisms of Dopemiergic system in Parkinson disease.
    The experimental task design was predefined and stored in .mat files. Also, it is the same for all participants.
    There are two .m files, first is related to Action value learnig, when it is presented at the begining.
                            another is related to Stimulus value leanring, when it is presented at the begining.
    
    Arguments
    ----------
    firstAct: bool
        True if the task starts with with Action-value, False if the task sarts with Stimulus-value learning
        
    Output
    -------
    data : pandas.DataFrame
        Columns contains:'session',
                         'run',
                         'block',
                         'trialNumber',
                         'yellowOnLeftSide',
                         'leftCanBePushed',
                         'winAmtLeft',
                         'winAmtRight',
                         'winAmtYellow',the amount of feedback when participant selected yellow color correcly, between  [0, 100]
                         'winAmtBlue',
                         'winAmtPushable',
                         'winAmtPullable',
                         'pushCorrect': 1 if participant pushed and 0 if participant pulled 
                         'yellowCorrect':1 if participant chose yellow color and 0 if participant chose blue color
    """   

    
           
    # The oputpu dataframe for task design generated by computer

    # Two sessions, each session contains two runs
    session = np.concatenate([np.repeat(1, 4*42), # session 1
                              np.repeat(2, 4*42)]) # session 2

    # Four runs, each run contains two blocks (conditions)
    run = np.concatenate([np.repeat(1, 2*42), #The session 1 and run 1
                          np.repeat(2, 2*42), #The session 1 and run 2
                          np.repeat(1, 2*42), #The session 2 and run 1
                          np.repeat(2, 2*42)]) #The session 2 and run 2

    # the number of trials for eigh conditions, each condition (Action value or Stimulus Value) includes 42 trials
    trialNumber = np.arange(1, 8*42 + 1)

    # counterbalanced ledft and right stimulus response            
    yellowOnLeftSide = np.concatenate([shuffle_Binomial(), shuffle_Binomial(),
                                       shuffle_Binomial(), shuffle_Binomial(),
                                       shuffle_Binomial(), shuffle_Binomial(),
                                       shuffle_Binomial(), shuffle_Binomial()])   

    # counterbalanced ledft and right action response
    leftCanBePushed = np.concatenate([shuffle_Binomial(), shuffle_Binomial(), # Session 1 and Run 1
                                      shuffle_Binomial(), shuffle_Binomial(), # Session 1 and Run 2
                                      shuffle_Binomial(), shuffle_Binomial(), # Session 2 and Run 1
                                      shuffle_Binomial(), shuffle_Binomial()]) # Session 2 and Run 2
    # Alternative function sample_bernouli(theta = .5, n_samples = 42)


    # winning reward for left side
    winAmtLeft = np.concatenate([shuffle_Amt(run = 1), shuffle_Amt(run = 1), # Session 1 and Run 1
                                 shuffle_Amt(run = 2), shuffle_Amt(run = 2), # Session 1 and Run 2
                                 shuffle_Amt(run = 1), shuffle_Amt(run = 1), # Session 2 and Run 1
                                 shuffle_Amt(run = 2), shuffle_Amt(run = 2)]) # Session 2 and Run 2
    # winning reward for right side
    winAmtRight = 100 - winAmtLeft  

    # winning amounts for pulled respose
    winAmtYellow = yellowOnLeftSide*winAmtLeft + (1 - yellowOnLeftSide)*winAmtRight
    winAmtBlue = 100 - winAmtYellow       

    # winning amounts for pushed respose
    winAmtPushable = leftCanBePushed*winAmtLeft + (1 - leftCanBePushed)*winAmtRight
    winAmtPullable = 100 - winAmtPushable

            
    # Announce choice correct for push and Yellow
    pushCorrect = np.zeros(8*42).astype(int)
    yellowCorrect = np.zeros(8*42).astype(int)
    
    for ses in range(2):
        """Two sessions applying altertative Action first and Stimlus first from predefined .m files"""
        if isActFirst:
            """The action value learning is the first condition"""
            data = loadmat('../data/ExpStruct_ActFirst_winOnly.mat')  
            # If Action is first for the current participant
            if ses==0:
                # Each block is Action value or Stimulus Value condition
                block = np.concatenate([np.repeat('Act', 42),  np.repeat('Stim', 42),
                                        np.repeat('Stim', 42), np.repeat('Act', 42),
                                        np.repeat('Stim', 42), np.repeat('Act', 42),
                                        np.repeat('Act', 42), np.repeat('Stim', 42)])
                stimActFirst = np.repeat('Act', 8*42)
                
            
            # predefined pushed correct responce
            pushCorrect[ses*4*42:(ses+1)*4*42] = np.concatenate([data['triallist1_1'][0], # The condition 1 and run 1
                                                                shuffle_Binomial(), # The condition 1 and run 2
                                                                  shuffle_Binomial(),  # The condition 2 and run 1
                                                                  data['triallist2_2'][0]]) # The condition 2 and run 2
            # predefined Yellow correct responce
            yellowCorrect[ses*4*42:(ses+1)*4*42] = np.concatenate([shuffle_Binomial(), # The condition 1 and run 1
                                                                data['triallist1_2'][0], # The condition 1 and run 2
                                                                data['triallist2_1'][0],# The condition 2 and run 1
                                                                shuffle_Binomial()]) # The condition 2 and run 2                    
            isActFirst = False

        else:
            """The stimulus value learning is the first condition"""
            data = loadmat('../data/ExpStruct_StimFirst_winOnly.mat')
            
            # If Stimulus is first for the current participant
            if ses==0:
                # Each block is Action value or Stimulus Value condition
                block = np.concatenate([np.repeat('Stim', 42),  np.repeat('Act', 42),
                                        np.repeat('Act', 42), np.repeat('Stim', 42),
                                        np.repeat('Act', 42), np.repeat('Stim', 42),
                                        np.repeat('Stim', 42), np.repeat('Act', 42)])
                stimActFirst = np.repeat('Stim', 8*42)
                
            pushCorrect[ses*4*42:(ses+1)*4*42] = np.concatenate([data['triallist1_1'][0], # The condition 1 and run 1
                                                                shuffle_Binomial(), # The condition 1 and run 2
                                                                  shuffle_Binomial(),  # The condition 2 and run 1
                                                                  data['triallist2_2'][0]]) # The condition 2 and run 2
            # predefined Yellow correct responce
            yellowCorrect[ses*4*42:(ses+1)*4*42] = np.concatenate([shuffle_Binomial(), # The condition 1 and run 1
                                                                data['triallist1_2'][0], # The condition 1 and run 2
                                                                data['triallist2_1'][0],# The condition 2 and run 1
                                                                shuffle_Binomial()]) # The condition 2 and run 2 
            
            isActFirst = True
        
        # Dictionary of task desing generated by computer
        dataAct = ({'session':session,
                    'run':run,
                    'block':block,
                     'stimActFirst':stimActFirst,
                    'trialNumber':trialNumber,
                    'yellowOnLeftSide':yellowOnLeftSide,
                    'leftCanBePushed':leftCanBePushed,
                    'winAmtLeft':winAmtLeft,
                    'winAmtRight':winAmtRight,
                    'winAmtYellow':winAmtYellow,
                    'winAmtBlue':winAmtBlue,
                    'winAmtPushable':winAmtPushable,
                    'winAmtPullable':winAmtPullable,
                    'pushCorrect':pushCorrect,
                    'yellowCorrect':yellowCorrect})
        # Dataframe of output
        output = pd.DataFrame(dataAct)
     
    return output

In [21]:
task_design = task_act_stim(isActFirst=True)

In [65]:
task_design[(task_design['session']==1)&(task_design['run']==1)&(task_design['block']=='Act')]['pushCorrect'].mean()

0.47619047619047616

In [143]:
def simulate_rl(task_act_stim, alpha_A, alpha_C, weight, beta, n_trilas = 10, init_probability=[.5, .5]):
    """
    General Comment   
    ----------
    
    Simulates a individual behavior for Action and Stimulus Value Learning 
    according to a RL model with the weightening parameter,

    Notw that in this simulation, a simple Rescorla-Wagner rule is used for reinforcement learning
    and the softmax function is used for the choice response

    This function is to simulate data for, for example, parameter recovery.
    Simulates data for one participant.
    
    Two rewarded feedback and non-rewarded feedback are presented in each trial.
  

    Arguments
    ----------

    task_frame : pandas.DataFrame
         Size of n_trials rows.
            'w': rewarded feedback coded to 1 and non-rewarded feedback coded to 0
        
    alpha_A : float [0, 1]
        The learning rate related to Action Value Learning.
      
    alpha_C : float [0, 1]
        The learning rate related to Color Value Learning.
      
    weight : float [0, 1]
        The reelative contribution of Action and Stimulus Values Learning to get rewarded.

    beta : float  [0, )
        The sensitivity parameter in the soft_max choice rule.
        the higher value leads to the more sensitivity to value differences between two options

    init_probability : float 
        The initial probability of reward for Run 1 and Run 2
        The value should ne between 0 and 1 (default .5)

    Output
    -------

    data : pandas.DataFrame
         Columns contains the task_frame, plus:
        'alpha_A', 'alpha_C', 'weight', 'bet', 'w'

    """
    
    task_fram = {'alpha_A':[],
                 'alpha_C':[],
                 'alpha_A':[],
                 'weight':[],
                 'bet':[],
                 'alpha_A':[],
                 'alpha_A':[],                
                }
    data = task_frame.copy()
 , 
    data['alpha_A'] = alpha_A
    data['alpha_C'] = alpha_C
    data['weight'] = weight
    data['bet'] = bet
    
    for n in range(n_trials):
        
    data = pd.concat([data, _simulate_delta_rule_2A(task_design=task_design,
                                                                   alpha=gen_alpha,
                                                                   initial_value_learning=initial_value_learning)],
                         axis=1)

    elif type(gen_alpha) is list:
        if len(gen_alpha) == 2:
            data['alpha_pos'] = gen_alpha[0]
            data['alpha_neg'] = gen_alpha[1]
            data = pd.concat([data, _simulate_delta_rule_2A(task_design=task_design,
                                                                       alpha=None,
                                                                       initial_value_learning=initial_value_learning,
                                                                       alpha_pos=gen_alpha[0],
                                                                       alpha_neg=gen_alpha[1])],
                             axis=1)

        elif len(gen_alpha) == 3:
            pass # implement here Stefano's learning rule
        else:
            raise ValueError("The gen_alpha list should be of either length 2 or 3.")
    else:
        raise TypeError("The gen_alpha should be either a list or a float/int.")

    data['sensitivity'] = gen_sensitivity
    data['p_cor'] = data.apply(_soft_max_2A, axis=1)
    data['accuracy'] = stats.bernoulli.rvs(data['p_cor'].values) # simulate choices

    data = data.set_index(['participant', 'block_label', 'trial_block'])
    return data


def delta_rule(task_design, alpha, initial_value_learning):
    """Q learning (delta learning rule) for two alternatives
    (one correct, one incorrect).

    Parameters
    ----------

    task_design : DataFrame
        `pandas.DataFrame`, with n_trials_block*n_blocks rows.
        Columns contain:
        "f_cor", "f_inc", "trial_type", "cor_option", "inc_option",
        "trial_block", "block_label", "participant".

    alpha : float
        The generating learning rate.
        It should be a value between 0 (no updating) and 1 (full updating).

    alpha_pos : float, default None
        If a value for both alpha_pos and alpha_neg is provided,
        separate learning rates are estimated
        for positive and negative prediction errors.

    alpha_neg : float, default None
        If a value for both alpha_pos and alpha_neg is provided,
        separate learning rates are estimated
        for positive and negative prediction errors.

    initial_value_learning : float
        The initial value for Q learning.

    Returns
    -------

    Q_series : Series
        The series of learned Q values (separately for correct and incorrect options).

    """
    alpha = np.array([alpha])

    n_trials = task_design.shape[0]

    for n in range(n_trials):
        index_cor = int(task_design.cor_option.values[n]-1)
        Q = initial_value_learning
        else:
            if separate_learning_rates:
                pe_cor = task_design.f_cor.values[n] - Q[index_cor]
                pe_inc = task_design.f_inc.values[n] - Q[index_inc]
                if pe_cor > 0:
                    Q[index_cor] += alpha_pos[index_participant]*(task_design.f_cor.values[n] - Q[index_cor])
                else:
                    Q[index_cor] += alpha_neg[index_participant]*(task_design.f_cor.values[n] - Q[index_cor])
                if pe_inc > 0:
                    Q[index_inc] += alpha_pos[index_participant]*(task_design.f_inc.values[n] - Q[index_inc])
                else:
                    Q[index_inc] += alpha_neg[index_participant]*(task_design.f_inc.values[n] - Q[index_inc])
            else:
                Q[index_cor] += alpha[index_participant]*(task_design.f_cor.values[n] - Q[index_cor])
                Q[index_inc] += alpha[index_participant]*(task_design.f_inc.values[n] - Q[index_inc])

        Q_cor = np.append(Q_cor, Q[index_cor])
        Q_inc = np.append(Q_inc, Q[index_inc])

    return pd.DataFrame({'Q_cor':Q_cor, 'Q_inc':Q_inc})

IndentationError: unindent does not match any outer indentation level (<tokenize>, line 64)