# RL Setup
In order to implement our RL algorithms, we first set up a way of initializing our environment. In particular, we set up a class FiniteMDP which sets up an MDP with $S, A$ and $H$ being the number of states, actions and stages in each episode repsectively. We will have reward matrix $R\in \mathbb{R}^{S\times A}$ representing the reward recieved for any action $a$ in any state $s$. We will also need to provide the transition probabilities $P \in \mathbb{R}^{S\times A \times S}$ as well as the initial state distribution $p_0$ for the MDP. We then choose all the necessary values to define and initialize an example MDP.

In [9]:
import copy

import numpy as np
from scipy.special import softmax

In [10]:
class FiniteMDP: #(R, P, initial_state_distribution, H):
    """
    Base class for a finite MDP.
    Parameters
    ----------
    R : numpy.ndarray
    P : numpy.ndarray
    initial_state_distribution : numpy.ndarray or int
        array of size (S,) containing the initial state distribution
        or an integer representing the initial/default state
    Attributes
    ----------
    R : numpy.ndarray
        array of shape (S, A) containing the mean rewards, where
        S = number of states;  A = number of actions.
    P : numpy.ndarray
        array of shape (S, A, S) containing the transition probabilities,
        where P[s, a, s'] = Prob(S_{t+1}=s'| S_t = s, A_t = a).
    H : horizon
    sigma2 : noise variance.
    """

    def __init__(self, R, P, p0, H):
        self.p0 = p0
        S, A = R.shape

        self.S = S
        self.A = A
        self.R = R
        self.P = P
        self.H = H

        self.reward_range = (self.R.min(), self.R.max())

        self.state = None

        self.States = np.arange(S)
        self.Actions = np.arange(A)

    def ResetState(self):
        """
        Reset the environment to a default state.
        """
        self.state = np.random.choice(self.States,p=self.p0)

    def Sample(self, state, action):
        """
        Sample a transition s' from P(s'|state, action).
        """
        prob = self.P[state, action, :]
        next_state = np.random.choice(self.States, p=prob)
        reward = self.R[state, action] 
        return next_state, reward

    def Play(self, action):
        next_state, reward = self.Sample(self.state, action)
        self.state = next_state
        return next_state, reward

### Dynamic Programming Algorithm

For most algorithms it will be helpful to have a function which will run the dynamic programming algorithm for a given transition and reward function (NB: we want the function to be general so these don't need to be the true ones).

We recall the Dynamic Programming Algorithm from the notes in which we input the transition functions $P_h$ and reward functions $r_h$, in our case these are $P$ and $R$, and run the following steps.

1. Set $V^*_{H+1}(x)=0$ for all $ x \in \mathcal{S}$
2. For $h=H,\dots, 1$:
3. $\qquad$ Calculate $ Q_h^*(x,a) = r_h(x,a) + \sum_{x' \in \mathcal{S}} P_h(x'|x,a) V_{h+1}^*(x') $
3. $\qquad$ Set $\pi^*_h(x) = \text{argmax}_{a \in \mathcal{A}} Q_h^*(x,a)$
4. $\qquad$ Define $V_h^*(x) = \text{max}_{a \in \mathcal{A}} Q_h^*(x,a) = Q_h^*(x,\pi^*_h(x))$ 

We implement this in a function below.

In [11]:
# define Dynamic Programming Algorithm
def DP(MDP,P,R):
    # define generally for any transition function P and reward R
    H = MDP.H
    Q = np.ones((H, S, A))
    V = np.ones((H+1, S))
    V[H, :] = 0
    for h in range(H):
        for s in MDP.States:
            for a in MDP.Actions:
                Q[H-h-1,s,a] = R[s,a] + P[s,a,:].dot(V[H-h,:])
            
            V[H-h-1,s] = np.max(Q[H-h-1,s,:])
    
    return Q, V

In [12]:
# define function to find expected value of a policy defined using myQ in true MDP
def ExpectedValue(myMDP,myQ):
    H = myMDP.H
    Q = np.ones((myMDP.H, myMDP.S, myMDP.A))
    V = np.ones((myMDP.H+1, myMDP.S))
    V[H, :] = 0
    for h in range(H):
        for s in myMDP.States:
            for a in myMDP.Actions:
                Q[H-h-1,s,a] = myMDP.R[s,a] + sum(myMDP.P[s,a,:]*V[H-h,:])
            
            myaction = np.argmax(myQ[H-h-1,s,:])
            V[H-h-1,s] = Q[H-h-1,s,myaction]
    
    return Q, V

In [13]:
# define function to add extra attributes to the MDP needed for the algorithms
def AddExtraAttributes(myMDP):
    # extract S,A,H from MDP
    H = myMDP.H
    S = myMDP.S
    A = myMDP.A

    # define Q and V functions attached to the MDP
    setattr(myMDP, 'V', np.ones((H+1, S)))
    myMDP.V[H, :] = 0
    setattr(myMDP, 'Q', np.ones((H, S, A)))

    for h in range(myMDP.H):
        myMDP.V[h, :] *= (myMDP.H-h)
        myMDP.Q[h, :, :] *= (myMDP.H-h)

    # to run the DP algorithm for true MDP to get Vstar and Qstar
    setattr(myMDP, 'Vstar', np.ones((H+1, S)))
    myMDP.Vstar[H, :] = 0
    setattr(myMDP, 'Qstar', np.ones((H, S, A)))
    myMDP.Qstar, myMDP.Vstar = DP(myMDP,myMDP.P,myMDP.R)

    # define estimates and counters to MDP
    setattr(myMDP, 'Nsa', np.zeros((S, A)))
    setattr(myMDP, 'Rhat', np.zeros((S, A)))
    setattr(myMDP, 'Phat', np.zeros((S, A, S)))
    setattr(myMDP, 'Vhat', np.ones((H+1, S)))
    myMDP.Vhat[H, :] = 0
    setattr(myMDP, 'Qhat', np.ones((H, S, A)))
    setattr(myMDP, 'Vopt', np.ones((H+1, S)))
    myMDP.Vopt[H, :] = 0
    setattr(myMDP, 'Qopt', np.ones((H, S, A)))

#AddExtraAttributes(myMDP)

In [14]:
# define a function to generate a random environment 
def random_env(S,A,H):
    R = np.random.uniform(0, 1, (S, A))
    P = np.random.uniform(0, 1, (S, A, S))
    p0 = np.ones(S)/S
    # normalize transition probs     
    for ss in range(S):
        for aa in range(A):
            P[ss, aa, :] /= P[ss, aa, :].sum()
    return FiniteMDP(R, P, p0, H)

### Define the standard Q-learning algorithm

In [15]:
# define alg running in T episodes of MDP, learning rate alpha=fn(Nsa)
def Q_Learning(myMDP1, T, Nsa_fn, tau):
    myMDP = copy.deepcopy(myMDP1)
    reg = np.zeros(T)
    rew = np.zeros(T)
    true_reg = np.zeros(T)
    # ResetCounters(myMDP)

    # initialize Qhat,Vhat
    myMDP.Qhat = np.ones((myMDP.H, myMDP.S, myMDP.A)) * myMDP.H
    # myMDP.Qhat = 1.0*myMDP.Qstar
    # myMDP.Vhat = 1.0*myMDP.Vstar
    myMDP.Vhat = np.ones(((myMDP.H + 1, myMDP.S))) * myMDP.H
    myMDP.Vhat[myMDP.H, :] = 0


    episodes_to_reach_threshold = []
    for t in range(T):
        myMDP.ResetState()  # sample initial state
        optval = myMDP.Vstar[0, myMDP.state]

        # calculate expected value from playing with Qhat over episode
        expval = ExpectedValue(myMDP, myMDP.Qhat)[1][0, myMDP.state]

        eprew = 0.0
        for h in range(myMDP.H):
            myState = myMDP.state
            # take action maximizing Qhat function
            myAction = np.argmax(myMDP.Qhat[h, myState, :])
            # record new state/reward from action played
            mynextstate, myreward = myMDP.Play(myAction)
            eprew += myreward
            # UpdateEstMDP(myMDP, myState, myAction, mynextstate, myreward, h) #update N(s,a) counter
            # Upate Qhat,Vhat
            myMDP.Nsa[myState, myAction] += 1
            alpha = Nsa_fn(myMDP.Nsa[myState, myAction])
            myMDP.Qhat[h, myState, myAction] = (1 - alpha) * myMDP.Qhat[
                h, myState, myAction
            ] + alpha * (myreward + myMDP.Vhat[h + 1, mynextstate])
            myMDP.Vhat[h, myState] = max(myMDP.Qhat[h, myState, :])

            # check if threshold is exceeded
            if eprew >= tau:
                episodes_to_reach_threshold.append(t + 1)
                break

        reg[t] = optval - eprew
        true_reg[t] = optval - expval
        rew[t] = eprew

    return rew, reg, true_reg, episodes_to_reach_threshold, myMDP.Qhat

### Define the Softmax Q-learning algorithm

In [16]:
# define alg running in T episodes of MDP, learning rate alpha, beta for softmax defn
def Softmax_Q_Learning(myMDP1, T, Nsa_fn, beta, tau):
    myMDP = copy.deepcopy(myMDP1)
    reg = np.zeros(T)
    rew = np.zeros(T)
    # ResetCounters(myMDP)

    # initialize Qhat,Vhat
    myMDP.Qhat = np.ones((myMDP.H, myMDP.S, myMDP.A)) * myMDP.H
    myMDP.Vhat = np.ones(((myMDP.H + 1, myMDP.S))) * myMDP.H
    myMDP.Vhat[myMDP.H, :] = 0

    episodes_to_reach_threshold = []
    for t in range(T):
        myMDP.ResetState()  # sample initial state
        optval = myMDP.Vstar[0, myMDP.state]

        eprew = 0.0
        for h in range(myMDP.H):
            myState = myMDP.state
            # take action with softmax of Qhat function
            probs = softmax(beta * myMDP.Qhat[h, myState, :])
            myAction = np.random.choice(np.arange(myMDP.A), p=probs)
            # record new state/reward from action played
            mynextstate, myreward = myMDP.Play(myAction)
            eprew += myreward
            # Upate Qhat,Vhat
            myMDP.Nsa[myState, myAction] += 1
            alpha = Nsa_fn(myMDP.Nsa[myState, myAction])
            myMDP.Qhat[h, myState, myAction] = (1 - alpha) * myMDP.Qhat[
                h, myState, myAction
            ] + alpha * (myreward + myMDP.Vhat[h + 1, mynextstate])
            myMDP.Vhat[h, myState] = max(myMDP.Qhat[h, myState, :])

            # check if threshold is exceeded
            if eprew >= tau:
                episodes_to_reach_threshold.append(t + 1)
                break

        reg[t] = optval - eprew
        rew[t] = eprew

    return rew, reg, episodes_to_reach_threshold, myMDP.Qhat