# Competitive RL in the basal ganglia

This is a notebook to explore the idea that the basal ganglia implement a competitive RL system that gives rise to unbiased estimates of state-action values $Q(s,a)$ over short amounts of time.

The main idea is that the basal ganglia relies on two opposing RL systems, a _positive RL agent_  implemented by the direct pathway and a _negative RL agent_ implemented by the indirect pathway. The direct pathway assigns values for actions to take; the indirect pathway assigns values for actions to avoid. The postive RL agent is a standard RL agent. Under certain circumstances, this competitive system provides a quicker and most efficient learning that a standard RL system.

## Defining the task

The problems with canonical RL systems are well illustrated by the Probabilistic Stimulus Selection task, first introduced by Frank, Seeberger, and O'Reilly (2004). The PSS task is implemented here. In the PSS task, the set of possible actions $\mathbf{A}$ includes only six possible actions $\mathbf{A} = \{A, B, C, D, E, F\}$. An _action_ is such if and only if it belongs to $\mathbf{A}$. Unbeknowst to the agent, each action yields a probabilistic reward with a characteristic probability, and the probability of obtaining a reward depends only the action taken $a_t$, and not on the previous state $s_t$, i.e. $P(r_{t+1} > 0 \mid s_{t}, a_{t}) = P(r_{t+1} > 0 \mid a_t)$.

In [1]:
class PSS_Object():
    """A generic class for PSS objects"""
    ACTIONS = ("A", "C", "E", "F", "D", "B")
    
    REWARD_TABLE = {"A" : 0.8, "C" : 0.7, "E" : 0.6,
                    "F" : 0.4, "D" : 0.3, "B" : 0.2}

    def is_action(self, action):
        """An action is valid only if it belongs to the list of possible actions"""
        return action in self.ACTIONS
    
    def prob_reward(self, action):
        """Returns the probability of obtaining a reward given an action"""
        if self.is_action(action):
            return self.REWARD_TABLE[action]
        
    def get_reward(self, action):
        """Return a probabilistic reward associated with an action"""
        i = random.random()
        if i <= self.prob_reward(action):
            return 1.0
        else:
            return 0.0
        
    
o = PSS_Object()
o.is_action("A")
        



True

# The State class

In the PSS task, a state $s$ is defined as the presentation of two options, $o_1$ and $o_2$: $s = (o_1, o_2)$ We encapsulate this concept into a new class `PSS_State`, which contains the available options as a tuple. Two options in a state cannot be identical, i.e. $o_1 \neq o_2$.  Two states $s_j$ and $s_j$ are equal if their options are equal, irrespective of order; that is $s_i = s_j \Leftrightarrow (o_{1}^{i}, o_{2}^{i}) = (o_{1}^{j}, o_{2}^{j}) \lor (o_{1}^{j}, o_{2}^{j}) = (o_{2}^{j}, o_{1}^{j})$

In [2]:
class PSS_State(PSS_Object):
    """A state in the PSS object"""
    def __init__(self, options = ("A", "B")):
        if self.is_options(options):
            self.options = options
            self.left = options[0]
            self.right = options[1]
        else:
            self.options = None

    def is_options(self, options):
        """Checks whether a given tuple is a set of options"""
        if len(options) == 2 and not False in [x in self.ACTIONS for x in options]:
            return True
        else:
            return False
    
    def __eq__(self, other):
        """Equality if the options are the same, independent of order"""
        return (self.left == other.left and self.right == other.right) or \
               (self.left == other.right and self.right == other.left)
    
    def __repr__(self):
        return "(%s,%s)" % (self.left, self.right)
    
    def __str__(self):
        return self.__repr__()

s1 = PSS_State(("A", "B"))
s2 = PSS_State(("B", "A"))
s1 == s2

True

## The Decision class

We also define a class `PSS_Decision`, every instance $d$ of which is a combination of a state $s_t$, the action $a_t$ that an agent has taken in that state, and the reward $r_{t+1}$ that the agent has received as a consequence of its action: $d = (s_t, a_t, r_{t+1})$. Instances of this class will be used for collecting measures of an agent's performance. A decision is considered _successful_ if $r_{t+1} > 0$, and _optimal_ if $a_t$ is the action that is associated to the greater probability of success.

In [3]:
class PSS_Decision(PSS_Object):
    """A decision made during the PSS task"""
    def __init__(self, state = None, action = None, reward = 0.0):
        if self.is_state(state) and self.is_action(action) and action in state.options:
            self.state = state
            self.action = action
            self.reward = 0.0
    
    def is_state(self, state):
        return True
    
    @property
    def successful(self):
        """Success is reward > 0."""
        if self.reward > 0:
            return True
        else:
            return False
    
    @property
    def optimal(self):
        """Determines if an action was optimal"""
        s = self.state
        apos = s.options.index(self.action)
        probs = [self.prob_reward(x) for x in s.options]
        ppos = probs.index(max(probs))
        return apos == ppos
        
    
    def __repr__(self):
        """The decision as a string"""
        return "<%s, %s, %0.1f>" % (self.state, self.action, self.reward)

d = PSS_Decision(PSS_State(), "A", 1.0)
d.optimal

True

## The PSS Task

The PSS task is a repetitive, two-alternative forced-choice task. The task is made of two consecutive phases, a _training_ phase where the agent makes repetive choices between fixed pairs of alternatives and learns the value of every action, and a _test_ phase where the agent faces new combinations of options. In human experiments, a third phase, _practice_ occurs before training to ensure that participants do understand the task; this phase is obviously not needed in a model.

Participants proceed through one or more training blacks, until they have reached a predefined criterion. 

In [4]:
import random
import copy
from collections import deque

class PSS_Task(PSS_Object):
    """An object implementing the PSS task"""
    TRAINING_BLOCK = ((("A", "B"),) * 10 +
                      (("B", "A"),) * 10 +
                      (("C", "D"),) * 10 +
                      (("D", "C"),) * 10 +
                      (("E", "F"),) * 10 +
                      (("F", "E"),))
    
    TEST_BLOCK = ((("A", "C"),) * 2 + (("C", "A"),) * 2 +
                  (("A", "D"),) * 2 + (("D", "A"),) * 2 +
                  (("A", "E"),) * 2 + (("E", "A"),) * 2 +
                  (("B", "C"),) * 2 + (("C", "B"),) * 2 +
                  (("B", "D"),) * 2 + (("D", "B"),) * 2 +
                  (("B", "E"),) * 2 + (("E", "B"),) * 2)
    
    PHASES = ("Training", "Test")
    
    def __init__(self):
        """Initializes a PSS task experiment"""
        self.index = 0
        self.state = None
        self.phase = "Training"
        
        self.train = self.instantiate_block(self.TRAINING_BLOCK)        
        self.test =  self.instantiate_block(self.TEST_BLOCK)
        self.blocks = dict(zip(self.PHASES, [self.train, self.test]))                
        self.history = dict(zip(self.PHASES, [[], []]))

    
    def instantiate_block(self, block):
        """Instantiates and randomizes a block of trials"""
        trials = [PSS_State(x) for x in block]
        random.shuffle(trials)
        return deque(trials)
    
    def criterion_reached(self):
        """Reached criterion for successful learning"""
        return True
    
    def next_state(self):
        """Next state (transitions are independent of actions)"""
        state_next = None
        current_block = self.blocks[self.phase]
        if len(current_block) == 0:
            if self.phase == "Training":
                if self.criterion_reached():
                    # Move to the Test phase
                    self.phase = "Test"
                else:
                    self.blocks["Training"] = self.instantiate_block(self.TRAINING_BLOCK)
                
                state_next = current_block.popleft()
            
            else: 
                state_next = None # End of the experiment
        else:
            state_next = current_block.popleft()
            
        return state_next
                    
    
    def execute_action(self, action):
        """Executes and action and returns the new state and a reward"""
        if self.is_action(action):
            r = self.get_reward(action)
            
            # Update history
            d = PSS_Decision(self.state, self.action, self.next_r)
            self.history[self.phase].append(d)
            
            self.state = self.next_state()
            return (self.state(), r)
            
            

    
p = PSS_Task()
p.history
p.blocks


{'Test': deque([(E,A),
        (E,B),
        (B,C),
        (C,B),
        (B,C),
        (B,E),
        (C,B),
        (B,D),
        (E,B),
        (A,E),
        (A,D),
        (B,E),
        (A,E),
        (D,B),
        (C,A),
        (E,A),
        (A,C),
        (C,A),
        (A,C),
        (D,A),
        (B,D),
        (D,A),
        (D,B),
        (A,D)]),
 'Training': deque([(A,B),
        (C,D),
        (B,A),
        (A,B),
        (A,B),
        (E,F),
        (C,D),
        (B,A),
        (E,F),
        (B,A),
        (E,F),
        (E,F),
        (B,A),
        (C,D),
        (D,C),
        (C,D),
        (E,F),
        (D,C),
        (C,D),
        (C,D),
        (D,C),
        (A,B),
        (A,B),
        (E,F),
        (D,C),
        (E,F),
        (B,A),
        (B,A),
        (A,B),
        (E,F),
        (A,B),
        (D,C),
        (C,D),
        (D,C),
        (C,D),
        (F,E),
        (B,A),
        (C,D),
        (E,F),
        (B,A),
        (D,C),
   

In [5]:
import random, copy, collections
z = [4, 5, 2, 1]
random.shuffle(copy.copy(z))
q=collections.deque(z)
q
q.popleft()

4

# Agents

Here we can define the agents, and how they learn. An agent $A$ perceives the experiments's state, and decides which actions to perform.  Because not all the actions are available to every state, the agent will have to improvise.

All agents are instances of the class `PSS_Agent`, and all inherit a simple mechanism to interact with the experiment  

## A Q-Learning agent



In [6]:
class PSS_Agent():
    def __init__(self):
        pass