In [0]:
import numpy as np

In [0]:
class MDP:
  def __init__(self, T, R, discount):
    '''
    Inputs:
    T -- Transition function: |A| x |S| x |S'| array
    R -- Reward function: |A| x |S| array
    discount -- discount factor: scalar in [0,1)
    '''
    assert T.ndim == 3
    assert (abs(T.sum(2) - 1) < 1e-5).all()
    self.nActions = T.shape[0]
    self.nStates = T.shape[1]
    assert T.shape == (self.nActions, self.nStates, self.nStates)
    self.T = T
    assert R.ndim == 2
    assert R.shape == (self.nActions, self.nStates)
    self.R = R
    assert 0 <= discount < 1
    self.discount = discount
    
  def valueIteration(self, initialV, nIterations=np.inf, tolerance=0.01):
    '''
    Bellman optimality equation:
    V <-- max_a R^a + gamma T^a V
    '''
    oldV = initialV
    epsilon = np.inf
    iteration = 0
    while epsilon > tolerance and iteration < nIterations:
      newV = np.max(self.R + self.discount * np.einsum('ijk,k->ij', self.T, oldV), axis=0)
      epsilon = max(abs(newV - oldV))
      oldV = newV
      iteration += 1
      
    return newV
  
  def extractPolicy(self, V):
    '''
    Extract greedy policy
    pi = argmax(R + gamma * T * V)
    '''
    policy = np.argmax(self.R + self.discount * np.einsum('ijk,k->ij', self.T, V), axis=0)
    
    return policy
  
  def evaluatePolicy(self, policy):
    '''
    V^pi = R^pi + gamma * T^pi * V^pi
    '''
    R_pi = np.array([self.R[policy[s]][s] for s in range(self.nStates)])
    T_pi = np.array([self.T[policy[s]][s] for s in range(self.nStates)])
    
    V = np.linalg.solve(np.identity(self.nStates) - self.discount * T_pi, R_pi)
    
    return V
  
  def policyIteration(self, initialPolicy, nIterations=np.inf):
    '''
    Evaluation: solve V^pi = R^pi + gamma * T^pi * V^pi
    Improvement: pi = argmax(R + gamma * T * V)
    '''
    oldPolicy = initialPolicy
    iteration = 0
    while iteration < nIterations:
      V = self.evaluatePolicy(oldPolicy)
      newPolicy = self.extractPolicy(V)
      iteration += 1
      
    return newPolicy  
     
  def evaluatePolicyPartially(self, policy, initialV, nIterations=np.inf, tolerance=0.01):
    '''
    Repeat V^pi = R^pi + gamma * T^pi * V^pi
    instead of solving the system of linear equations with np.linalg.solve()
    '''
    R_pi = np.array([self.R[policy[s]][s] for s in range(self.nStates)])
    T_pi = np.array([self.T[policy[s]][s] for s in range(self.nStates)])
    
    oldV = initialV
    epsilon = np.inf
    iteration = 0
    while epsilon > tolerance and iteration < nIterations:
      newV = R_pi + self.discount * np.matmul(T_pi, oldV)
      epsilon = max(abs(newV - oldV))
      oldV = newV
      iteration += 1
      
    return newV
  
  def modifiedPolicyIteration(self, initialPolicy, initialV, nEvalIterations=5, nIterations=np.inf, tolerance=0.01):
    '''
    The same as policyIteration except that instead of evaluatePolicy we use evaluatePolicyPartially.
    '''
    policy = initialPolicy
    oldV = initialV
    epsilon = np.inf
    iteration = 0
    while epsilon > tolerance and iteration < nIterations:
      newV = self.evaluatePolicyPartially(policy, oldV, nEvalIterations, tolerance=tolerance)
      policy = self.extractPolicy(newV)
      epsilon = max(abs(newV - oldV))
      oldV = newV
      iteration += 1
      
    return policy

In [0]:
T = np.array([[[0.5,0.5,0,0],[0,1,0,0],[0.5,0.5,0,0],[0,1,0,0]],[[1,0,0,0],[0.5,0,0,0.5],[0.5,0,0.5,0],[0,0,0.5,0.5]]])
# Reward function: |A| x |S| array
R = np.array([[0,0,10,10],[0,0,10,10]])
# Discount factor: scalar in [0,1)
discount = 0.9        
# MDP object
mdp = MDP(T,R,discount)

In [70]:
V = mdp.valueIteration(np.zeros((mdp.nStates)), 1000)
print(V)
policy = mdp.extractPolicy(V)
print(policy)

[31.49636306 38.51527513 43.935435   54.1128575 ]
[0 1 1 1]


In [71]:
V = mdp.evaluatePolicy(policy)
print(V)
V = mdp.evaluatePolicyPartially(policy, np.zeros(mdp.nStates), 1000)
print(V)

[31.58510431 38.60401638 44.02417625 54.20159875]
[31.49636306 38.51527513 43.935435   54.1128575 ]


In [72]:
policy = mdp.policyIteration(np.zeros(mdp.nStates, dtype=np.int32), 1000)
print(policy)
policy = mdp.modifiedPolicyIteration(np.zeros(mdp.nStates, dtype=np.int32), np.zeros(mdp.nStates), 5, 1000)
print(policy)



[0 1 1 1]
[0 1 1 1]


In [0]:
class RL:
  def __init__(self, mdp, sampleReward):
    ''' 
    Inputs:
    mdp -- Markov decision process (T, R, discount)
    sampleReward -- Function to sample rewards (e.g., bernoulli, Gaussian).
    This function takes one argument: the mean of the distributon and 
    returns a sample from the distribution.
    '''
    self.mdp = mdp
    self.sampleReward = sampleReward
    
  def sampleRewardAndNextState(self, state, action):
    '''Procedure to sample a reward and the next state
    reward ~ Pr(r)
    nextState ~ Pr(s'|s,a)
    Inputs:
    state -- current state
    action -- action to be executed
    Outputs: 
    reward -- sampled reward
    nextState -- sampled next state
    '''
    reward = self.sampleReward(self.mdp.R[action, state])
    cumProb = np.cumsum(self.mdp.T[action, state, :])
    nextState = np.where(cumProb >= np.random.rand(1))[0][0]
    return [reward, nextState]
  
  def selectAction(self, state, Q, epsilon, temperature):
    '''
    This function implements epsilon exploration if epsilon > 0
    if temprature > 0 it would implement Boltzmann exploration.
    and if both epsilon and temprature are zero it would implement greedy policy without exploration.
    '''
    if epsilon > 0:
      if np.random.rand(1) < epsilon:
        action = np.random.randint(self.mdp.nActions)
      else:
        action = np.argmax(Q[:, state])
        
    elif temprature > 0:
      probs = [np.exp(Q[action, state] / temprature) for action in range(self.mdp.nActions)]
      probs /= np.sum(probs)
      action = np.random.choice(self.mdp.nActions, p=probs)
      
    else:
      action = np.argmax(Q[:, state])
      
    return action

  def sampleSoftmaxPolicy(self,policyParams,state):
    '''Procedure to sample an action from stochastic policy
    pi(a|s) = exp(policyParams(a,s))/[sum_a' exp(policyParams(a',s))])
    This function should be called by reinforce() to selection actions

    Inputs:
    policyParams -- parameters of a softmax policy (|A|x|S| array)
    state -- current state

    Outputs: 
    action -- sampled action
    '''

    denominator = np.exp(policyParams[:,state]).sum()
    numerator = np.exp(policyParams[:,state])
    pi = numerator / denominator
    
    action = np.random.choice(self.mdp.nActions, p=pi)
    
    return action
  
  def qLearning(self, s0, initialQ, nEpisodes, nSteps, epsilon=0, temperature=0):
    '''qLearning algorithm.  Epsilon exploration and Boltzmann exploration
    are combined in one procedure by sampling a random action with 
    probabilty epsilon and performing Boltzmann exploration otherwise.  
    When epsilon and temperature are set to 0, there is no exploration.
    Inputs:
    s0 -- initial state
    initialQ -- initial Q function (|A|x|S| array)
    nEpisodes -- # of episodes (one episode consists of a trajectory of nSteps that starts in s0
    nSteps -- # of steps per episode
    epsilon -- probability with which an action is chosen at random
    temperature -- parameter that regulates Boltzmann exploration
    Outputs: 
    Q -- final Q function (|A|x|S| array)
    policy -- final policy
    '''
    Q = initialQ
    rewards = np.zeros(nEpisodes)
    n = np.zeros((self.mdp.nActions, self.mdp.nStates))
    for e in range(nEpisodes):
      state = s0
      for s in range(nSteps):
        action = self.selectAction(state, Q, epsilon, temperature)
        reward, nextState = self.sampleRewardAndNextState(state, action)
        rewards[e] += reward * self.mdp.discount ** s
        n[action][state] += 1
        alpha = 1.0 / n[action][state]
        Q[action, state] += alpha * (reward + self.mdp.discount * np.max(Q[:, nextState]) - Q[action, state])
        Q[action][state] = np.round(Q[action][state], 2)
        state = nextState
        
    policy = np.argmax(Q, axis=0)
    
    return [Q,policy,rewards]


  def reinforce(self,s0,initialPolicyParams,nEpisodes,nSteps):
    '''reinforce algorithm.  Learn a stochastic policy of the form
    pi(a|s) = exp(policyParams(a,s))/[sum_a' exp(policyParams(a',s))]).
    This function should call the function sampleSoftmaxPolicy(policyParams,state) to select actions

    Inputs:
    s0 -- initial state
    initialPolicyParams -- parameters of the initial policy (array of |A|x|S| entries)
    nEpisodes -- # of episodes (one episode consists of a trajectory of nSteps that starts in s0)
    nSteps -- # of steps per episode

    Outputs: 
    policyParams -- parameters of the final policy (array of |A|x|S| entries)
    '''

    policyParams = initialPolicyParams
    n_s_a = np.zeros([self.mdp.nStates, self.mdp.nActions])
    rewards = np.zeros(nEpisodes)
    for e in range(nEpisodes):
      state = s0
      G = []
      trajectory = []
      for s in range(nSteps):
        action = self.sampleSoftmaxPolicy(policyParams, state)
        n_s_a[state,action] += 1
        reward, nextState = self.sampleRewardAndNextState(state, action)
        G.append((self.mdp.discount ** s) * reward)
        trajectory.append((state, action))
        state = nextState
      G = np.cumsum(G[::-1])[::-1]
      rewards[e] = G[0]
      for n in range(nSteps):
        state, action = trajectory[n]
        alpha = 1 / n_s_a[state,action]
        G_n = G[n]
        denominator = np.exp(policyParams[:,state]).sum()
        for a in range(self.mdp.nActions):
          if a == action:
            policyParams[a,state] = policyParams[a,state] + 0.1*G_n*(1-np.exp(policyParams[a,state])/denominator)
          else:
            policyParams[a,state] = policyParams[a,state] + 0.1*G_n*(-np.exp(policyParams[a,state])/denominator)
    return [policyParams, rewards]

  def modelBasedRL(self,s0,defaultT,initialR,nEpisodes,nSteps,epsilon=0):
    '''Model-based Reinforcement Learning with epsilon greedy 
    exploration.  This function should use value iteration,
    policy iteration or modified policy iteration to update the policy at each step

    Inputs:
    s0 -- initial state
    defaultT -- default transition function when a state-action pair has not been vsited
    initialR -- initial estimate of the reward function
    nEpisodes -- # of episodes (one episode consists of a trajectory of nSteps that starts in s0
    nSteps -- # of steps per episode
    epsilon -- probability with which an action is chosen at random

    Outputs: 
    V -- final value function
    policy -- final policy
    '''

    T = defaultT
    R = initialR
    V=np.zeros(self.mdp.nStates)
    n_s_a = np.zeros([self.mdp.nStates, self.mdp.nActions])
    n_s_a_nextState = np.zeros([self.mdp.nStates, self.mdp.nActions, self.mdp.nStates])
    rewards = np.zeros(nEpisodes)
    for e in range(nEpisodes):
      state = s0
      for s in range(nSteps):
        action = np.argmax(R[:,state] + self.mdp.discount * np.matmul(T[:,state,:], V))
        reward, nextState = self.sampleRewardAndNextState(state, action)
        rewards[e] += reward * (self.mdp.discount ** s)
        n_s_a[state,action] += 1
        n_s_a_nextState[state,action,nextState] += 1
        T[action,state,:] = n_s_a_nextState[state,action,:] / n_s_a[state,action]
        R[action,state] = (reward + (n_s_a[state,action]-1) * R[action,state]) / n_s_a[state,action]
        epsilon = np.inf
        while epsilon > 0.01:
          V_new = np.max(R + self.mdp.discount * np.einsum('ijk,k->ij', T, V), axis=0)
          epsilon = max(abs(V_new-V))
          V = V_new
        state = nextState
    policy = np.argmax(R + self.mdp.discount * np.einsum('ijk,k->ij', T, V), axis=0)

    return [V,policy,rewards]

In [74]:
rl = RL(mdp, np.random.normal)

# Test Q-learning
[Q, policy, rewards] = rl.qLearning(s0=0, initialQ=np.zeros((mdp.nActions, mdp.nStates)), nEpisodes=1000, nSteps=100, epsilon=0.3)
print("\nQ-learning results:")
print(Q)
print(policy)

# Test REINFORCE 
[policy, rewards] = rl.reinforce(s0=0, initialPolicyParams=np.random.rand(mdp.nActions,mdp.nStates), nEpisodes=1000, nSteps=100)
print("\nREINFORCE results:")
print(policy)

# Test model-based RL
[V, policy, rewards] = rl.modelBasedRL(s0=0, defaultT=np.ones([mdp.nActions,mdp.nStates,mdp.nStates])/mdp.nStates,initialR=np.zeros([mdp.nActions,mdp.nStates]), nEpisodes=1000, nSteps=100, epsilon=0.3)
print("\nModel-based RL results:")
print(V)
print(policy)



Q-learning results:
[[11.72 17.79 25.   28.06]
 [ 8.74 21.81 27.02 37.36]]
[0 1 1 1]

REINFORCE results:
[[ 4.454234   -3.93237654 -4.32857704 -4.39085301]
 [-4.34940406  5.52074779  4.99487285  5.42246958]]

Model-based RL results:
[16.10735194 19.68413259 18.4389601  27.72037902]
[0 1 0 0]


In [0]:
class Bandit:
    def __init__(self,mdp,sampleReward):
        '''Constructor for the RL class

        Inputs:
        mdp -- Markov decision process (T, R, discount)
        sampleReward -- Function to sample rewards (e.g., bernoulli, Gaussian).
        This function takes one argument: the mean of the distributon and 
        returns a sample from the distribution.
        '''

        self.mdp = mdp
        self.sampleReward = sampleReward

    def sampleRewardAndNextState(self,state,action):
        '''Procedure to sample a reward and the next state
        reward ~ Pr(r)
        nextState ~ Pr(s'|s,a)

        Inputs:
        state -- current state
        action -- action to be executed

        Outputs: 
        reward -- sampled reward
        nextState -- sampled next state
        '''

        reward = self.sampleReward(self.mdp.R[action,state])
        cumProb = np.cumsum(self.mdp.T[action,state,:])
        nextState = np.where(cumProb >= np.random.rand(1))[0][0]
        return [reward,nextState]


    def epsilonGreedyBandit(self,nIterations):
        '''Epsilon greedy algorithm for bandits (assume no discount factor)

        Inputs:
        nIterations -- # of arms that are pulled

        Outputs: 
        empiricalMeans -- empirical average of rewards for each arm (array of |A| entries)
        '''
        
        state = 0
        n_a = np.zeros(self.mdp.nActions)
        empiricalMeans = np.zeros(self.mdp.nActions)
        rewards = np.zeros(nIterations)
        for i in range(nIterations):
                epsilon = 1 / (i + 1)
                if np.random.rand(1) < epsilon:
                       action = np.random.randint(self.mdp.nActions)
                else:
                       action = np.argmax(empiricalMeans) 
                reward, _ = self.sampleRewardAndNextState(state, action)
                rewards[i] = reward
                empiricalMeans[action] = (n_a[action] * empiricalMeans[action] + reward) / (n_a[action] + 1)
                n_a[action] += 1
        return empiricalMeans, rewards

    def thompsonSamplingBandit(self,prior,nIterations,k=1):
        '''Thompson sampling algorithm for Bernoulli bandits (assume no discount factor)

        Inputs:
        prior -- initial beta distribution over the average reward of each arm (|A|x2 matrix such that prior[a,0] is the alpha hyperparameter for arm a and prior[a,1] is the beta hyperparameter for arm a)  
        nIterations -- # of arms that are pulled
        k -- # of sampled average rewards

        Outputs: 
        empiricalMeans -- empirical average of rewards for each arm (array of |A| entries)
        '''
        state = 0
        empiricalMeans = np.zeros(self.mdp.nActions)
        rewards = np.zeros(nIterations)
        for i in range(nIterations):
               for a in range(self.mdp.nActions):
                      theta = np.random.beta(prior[a,0], prior[a,1], k)
                      empiricalMeans[a] = np.mean(theta)
               action = np.argmax(empiricalMeans)
               reward, _ = self.sampleRewardAndNextState(state, action)
               rewards[i] = reward
               if reward == 1:
                       prior[action,0] += 1
               else:
                       prior[action,1] += 1

        return empiricalMeans, rewards

    def UCBbandit(self,nIterations):
        '''Upper confidence bound algorithm for bandits (assume no discount factor)

        Inputs:
        nIterations -- # of arms that are pulled

        Outputs: 
        empiricalMeans -- empirical average of rewards for each arm (array of |A| entries)
        '''

        state = 0
        n = 1
        n_a = np.ones(self.mdp.nActions)
        empiricalMeans = np.zeros(self.mdp.nActions)
        rewards = np.zeros(nIterations)
        for i in range(nIterations):
                action = np.argmax(empiricalMeans + np.sqrt(2 * np.log(n) / n_a))
                reward, _ = self.sampleRewardAndNextState(state, action)
                rewards[i] = reward
                empiricalMeans[action] = (n_a[action] * empiricalMeans[action] + reward) / (n_a[action] + 1)
                n_a[action] += 1
                n += 1

        return empiricalMeans, rewards


In [76]:

def sampleBernoulli(mean):
    ''' function to obtain a sample from a Bernoulli distribution

    Input:
    mean -- mean of the Bernoulli
    
    Output:
    sample -- sample (0 or 1)
    '''

    if np.random.rand(1) < mean: return 1
    else: return 0


# Multi-arm bandit problems (3 arms with probabilities 0.3, 0.5 and 0.7)
T = np.array([[[1]],[[1]],[[1]]])
R = np.array([[0.3],[0.5],[0.7]])
discount = 0.999
mdp = MDP(T,R,discount)
bandit = Bandit(mdp,sampleBernoulli)

# Test epsilon greedy strategy
[empiricalMeans, reward] = bandit.epsilonGreedyBandit(nIterations=200)
print("\nEpsilonGreedyBandit results:")
print(empiricalMeans)

# Test Thompson sampling strategy
[empiricalMeans, reward] = bandit.thompsonSamplingBandit(prior=np.ones([mdp.nActions,2]),nIterations=200)
print("\nThompsonSamplingBandit results:")
print(empiricalMeans)

# Test UCB strategy
[empiricalMeans, reward] = bandit.UCBbandit(nIterations=200)
print("\nUCBbandit results:")
print(empiricalMeans)



EpsilonGreedyBandit results:
[0.335 0.    0.   ]

ThompsonSamplingBandit results:
[0.0055013  0.54223695 0.68615258]

UCBbandit results:
[0.26086957 0.53846154 0.65217391]
