# DAT257x: Reinforcement Learning Explained

## Lab 4: Dynamic Programming

### Exercise 4.1 Policy Evaluation with 2 Arrays

Policy Evaluation calculates the value function for a policy, given the policy and the full definition of the associated Markov Decision Process.  The full definition of an MDP is the set of states, the set of available actions for each state, the set of rewards, the discount factor, and the state/reward transition function.

In [1]:
import test_dp               # required for testing and grading your code
import gridworld_mdp as gw   # defines the MDP for a 4x4 gridworld

The gridworld MDP defines the probability of state transitions for our 4x4 gridworld using a "get_transitions()" function.  

Let's try it out now, with state=2 and all defined actions.

In [2]:
# try out the gw.get_transitions(state, action) function

state = 2
actions = gw.get_available_actions(state)
for action in actions:
    transitions = gw.get_transitions(state=state, action=action)

    # examine each return transition (only 1 per call for this MDP)
    for (trans) in transitions:
        next_state, reward, probability = trans    # unpack tuple
        print("transition("+ str(state) + ", " + action + "):", "next_state=", next_state, ", reward=", reward, ", probability=", probability)

transition(2, up): next_state= 2 , reward= -1 , probability= 1
transition(2, down): next_state= 6 , reward= -1 , probability= 1
transition(2, left): next_state= 1 , reward= -1 , probability= 1
transition(2, right): next_state= 3 , reward= -1 , probability= 1


**Implement the algorithm for Iterative Policy Evaluation using the 2 array approach**.  In the 2 array approach, one array holds the value estimates for each state computed on the previous iteration, and one array holds the value estimates for the states computing in the current iteration.

A empty function **policy_eval_two_arrays** is provided below; implement the body of the function to correctly calculate the value of the policy using the 2 array approach.  The function defines 5 parameters - a definition of each parameter is given in the comment block for the function.  For sample parameter values, see the calling code in the cell following the function.

In [3]:
def policy_eval_two_arrays(state_count, gamma, theta, get_policy, get_transitions):
    """
    This function uses the two-array approach to evaluate the specified policy for the specified MDP:
    
    'state_count' is the total number of states in the MDP. States are represented as 0-relative numbers.
    
    'gamma' is the MDP discount factor for rewards.
    
    'theta' is the small number threshold to signal convergence of the value function (see Iterative Policy Evaluation algorithm).
    
    'get_policy' is the stochastic policy function - it takes a state parameter and returns list of tuples, 
        where each tuple is of the form: (action, probability).  It represents the policy being evaluated.
        
    'get_transitions' is the state/reward transiton function.  It accepts two parameters, state and action, and returns
        a list of tuples, where each tuple is of the form: (next_state, reward, probabiliity).  
        
    """
    V = state_count*[0]
    while True:
        V_old = V[:]
        for s in range(state_count):
            V[s] = 0
            for action, action_prob in get_policy(s):
                transitions = get_transitions(state=s, action=action)
                for (trans) in transitions:
                    next_state, reward, probability = trans    # unpack tuple
                    V[s] += action_prob * probability * (reward + gamma * V_old[next_state])    
        delta = max([abs(V[x] - V_old[x]) for x in range(state_count)])
        if (delta < theta): break
    return V

First, test our function using the MDP defined by gw.* functions.

In [7]:
def get_equal_policy(state):
    # build a simple policy where all 4 actions have the same probability, ignoring the specified state
    policy = ( ("up", .25), ("right", .25), ("down", .25), ("left", .25))
    return policy

n_states = gw.get_state_count()

# test our function
values = policy_eval_two_arrays(state_count=n_states, gamma=.9, theta=.001, get_policy=get_equal_policy, \
    get_transitions=gw.get_transitions)


print("Values=", values)

Values= [0.0, -5.274709263277986, -7.123800104889248, -7.64536148969558, -5.274709263277987, -6.602238720082915, -7.17604178238719, -7.1238001048892485, -7.1238001048892485, -7.176041782387191, -6.602238720082915, -5.274709263277986, -7.645361489695581, -7.1238001048892485, -5.274709263277986]


**Expected output from running above cell:**

`
Values= [0.0, -5.274709263277986, -7.123800104889248, -7.64536148969558, -5.274709263277987, -6.602238720082915, -7.17604178238719, -7.1238001048892485, -7.1238001048892485, -7.176041782387191, -6.602238720082915, -5.274709263277986, -7.645361489695581, -7.1238001048892485, -5.274709263277986]
`

In [8]:
import numpy as np
a = np.append(values, 0)
np.reshape(a, (4,4))

array([[ 0.        , -5.27470926, -7.1238001 , -7.64536149],
       [-5.27470926, -6.60223872, -7.17604178, -7.1238001 ],
       [-7.1238001 , -7.17604178, -6.60223872, -5.27470926],
       [-7.64536149, -7.1238001 , -5.27470926,  0.        ]])

Now, test our function using the test_dp helper.  The helper also uses the gw MDP, but with a different gamma value.
If our function passes all tests, a passcode will be printed.

In [9]:
# test our function using the test_db helper
test_dp.policy_eval_two_arrays_test( policy_eval_two_arrays ) 


Testing: Policy Evaluation (two-arrays)
passed test: return value is list
passed test: length of list = 15
passed test: values of list elements
PASSED: Policy Evaluation (two-arrays) passcode = 9986-145


### Exercise 4.2 Policy Evaluation using in-place method

Policy Evaluation calculates the value function for a policy, given the policy and the full definition of the associated Markov Decision Process.  The full definition of an MDP is the set of states, the set of available actions for each state, the set of rewards, the discount factor, and the state/reward transition function.

In [1]:
import test_dp               # required for testing and grading your code
import gridworld_mdp as gw   # defines the MDP for a 4x4 gridworld

**Implement the algorithm for Iterative Policy Evaluation using the in-place approach**. In the in-place approach, one array holds the values being estimated for each state and the same array is used for estimates of states needed by the algorithm.

A empty function **policy_eval_in_place** is provided below; implement the body of the function to correctly calculate the value of the policy using the 2 array approach. The function defines 5 parameters - a definition of each parameter is given in the comment block for the function. For sample parameter values, see the calling code in the cell following the function.

In [8]:
def policy_eval_in_place(state_count, gamma, theta, get_policy, get_transitions):
    """
    This function uses the two-array approach to evaluate the specified policy for the specified MDP:
    
    'state_count' is the total number of states in the MDP. States are represented as 0-relative numbers.
    
    'gamma' is the MDP discount factor for rewards.
    
    'theta' is the small number threshold to signal convergence of the value function (see Iterative Policy Evaluation algorithm).
    
    'get_policy' is the stochastic policy function - it takes a state parameter and returns list of tuples, 
        where each tuple is of the form: (action, probability).  It represents the policy being evaluated.
        
    'get_transitions' is the state/reward transiton function.  It accepts two parameters, state and action, and returns
        a list of tuples, where each tuple is of the form: (next_state, reward, probabiliity).  
         
    """
    V = state_count*[0]
    #
    # insert code here to evaluate the policy using the in-place approach 
    #
    while True:
        delta = 0
        for s in range(state_count):
            v = V[s]
            a = 0
            for action, action_prob in get_policy(s):
                transitions = get_transitions(state=s, action=action)
                for (trans) in transitions:
                    next_state, reward, probability = trans    # unpack tuple
                    a += action_prob * probability * (reward + gamma * V[next_state])    
            V[s] = a    
            delta = max(delta, abs(v - V[s]))
        if (delta < theta): break
    return V

First, test our function using the MDP defined by gw.* functions.

In [9]:
def get_equal_policy(state):
    # build a simple policy where all 4 actions have the same probability, ignoring the specified state
    policy = ( ("up", .25), ("right", .25), ("down", .25), ("left", .25))
    return policy

n_states = gw.get_state_count()

# test our function
values = policy_eval_in_place(state_count=n_states, gamma=.9, theta=.001, get_policy=get_equal_policy, \
    get_transitions=gw.get_transitions)

print("Values=", values)

Values= [0.0, -5.275906485600302, -7.125803667372325, -7.647729922717661, -5.275906485600302, -6.604213913250977, -7.1785079112764745, -7.126384243656092, -7.125803667372325, -7.178507911276475, -6.604678371775787, -5.276663994322859, -7.647729922717662, -7.1263842436560925, -5.27666399432286]


**Expected output from running above cell:**

`
Values= [0.0, -5.275906485600302, -7.125803667372325, -7.647729922717661, -5.275906485600302, -6.604213913250977, -7.1785079112764745, -7.126384243656092, -7.125803667372325, -7.178507911276475, -6.604678371775787, -5.276663994322859, -7.647729922717662, -7.1263842436560925, -5.27666399432286]
`

Now, test our function using the test_dp helper.  The helper also uses the gw MDP, but with a different gamma value.
If our function passes all tests, a passcode will be printed.

In [10]:
# test our function using the test_db helper
test_dp.policy_eval_in_place_test(policy_eval_in_place)   


Testing: Policy Evaluation (in-place)
passed test: return value is list
passed test: length of list = 15
passed test: values of list elements
PASSED: Policy Evaluation (in-place) passcode = 9991-562


### Exercise 4.3 Policy Iteration

Policy Iteration calculates the optimal policy for an MDP, given its full definition.  The full definition of an MDP is the set of states, the set of available actions for each state, the set of rewards, the discount factor, and the state/reward transition function.

In [1]:
import test_dp               # required for testing and grading your code
import gridworld_mdp as gw   # defines the MDP for a 4x4 gridworld

**Implement the algorithm for Policy Iteration**.  Policy Iteration calculates the optimal policy for an MDP by doing repeated steps of policy evaluation and policy improvement.

A empty function **policy_iteration** is provided below; implement the body of the function to correctly calculate the optimal policy for an MDP.  The function defines 5 parameters - a definition of each parameter is given in the comment block for the function.  For sample parameter values, see the calling code in the cell following the function.

Note that there is a subtle difference between the algorithm for Policy Evaluation, which assumes the policy is stochastic, and the Policy Evaluation step for the Policy Iteration algorithm, which assumes the policy is deterministic.  This means that you cannot directly call your previous code, but you can reuse large pieces of it for the Policy Evaluation step.

In [2]:
def policy_iteration(state_count, gamma, theta, get_available_actions, get_transitions):
    """
    This function computes the optimal value function and policy for the specified MDP, using the Policy Iteration algorithm.
    'state_count' is the total number of states in the MDP. States are represented as 0-relative numbers.
    
    'gamma' is the MDP discount factor for rewards.
    
    'theta' is the small number threshold to signal convergence of the value function (see Iterative Policy Evaluation algorithm).
    
    'get_available_actions' returns a list of the MDP available actions for the specified state parameter.
    
    'get_transitions' is the MDP state / reward transiton function.  It accepts two parameters, state and action, and returns
        a list of tuples, where each tuple is of the form: (next_state, reward, probabiliity).  
    """
    V = state_count*[0]                # init all state value estimates to 0
    pi = state_count*[0]
    
    # init with a policy with first avail action for each state
    for s in range(state_count):
        avail_actions = get_available_actions(s)
        pi[s] = avail_actions[0]
        
    # insert code here to iterate using policy evaluation and policy improvement (see Policy Iteration algorithm)
    while True:
        while True:
            delta = 0
            for s in range(state_count):
                v = V[s]
                a = 0
                for (trans) in get_transitions(state=s, action=pi[s]):
                    next_state, reward, probability = trans    # unpack tuple
                    a += probability * (reward + gamma * V[next_state])
                V[s] = a
                delta = max(delta, abs(v - V[s]))
            if (delta < theta): break
                
        policy_stable = True
        
        for s in range(state_count):
            old_action = pi[s]
            arg_max = ""
            max_ = -float('inf')
            for action in get_available_actions(s):
                a = 0
                for (trans) in get_transitions(state=s, action=action):
                    next_state, reward, probability = trans
                    a += probability * (reward + gamma * V[next_state])
                if a > max_:
                    arg_max = action
                    max_ = a
            pi[s] = arg_max
            if pi[s] != old_action:
                policy_stable = False
        if policy_stable: break
            
    return (V, pi)        # return both the final value function and the final policy

First, test our function using the MDP defined by gw.* functions.

In [3]:
n_states = gw.get_state_count()

# test our function
values, policy = policy_iteration(state_count=n_states, gamma=.9, theta=.001, get_available_actions=gw.get_available_actions, \
    get_transitions=gw.get_transitions)

print("Values=", values)
print("Policy=", policy)

Values= [0.0, -1.0, -1.9, -2.71, -1.0, -1.9, -2.71, -1.9, -1.9, -2.71, -1.9, -1.0, -2.71, -1.9, -1.0]
Policy= ['up', 'left', 'left', 'down', 'up', 'up', 'up', 'down', 'up', 'up', 'down', 'down', 'up', 'right', 'right']


**Expected output from running above cell:**

`
Values= [0.0, -1.0, -1.9, -2.71, -1.0, -1.9, -2.71, -1.9, -1.9, -2.71, -1.9, -1.0, -2.71, -1.9, -1.0]
Policy= ['up', 'left', 'left', 'down', 'up', 'up', 'up', 'down', 'up', 'up', 'down', 'down', 'up', 'right', 'right']
`

In [4]:
import numpy as np
a = np.append(values, 0)
np.reshape(a, (4,4))

array([[ 0.  , -1.  , -1.9 , -2.71],
       [-1.  , -1.9 , -2.71, -1.9 ],
       [-1.9 , -2.71, -1.9 , -1.  ],
       [-2.71, -1.9 , -1.  ,  0.  ]])

Now, test our function using the test_dp helper.  The helper also uses the gw MDP, but with a different gamma value.
If our function passes all tests, a passcode will be printed.

In [5]:
a = np.append(policy, policy[0])
np.reshape(a, (4,4))

array([['up', 'left', 'left', 'down'],
       ['up', 'up', 'up', 'down'],
       ['up', 'up', 'down', 'down'],
       ['up', 'right', 'right', 'up']], dtype='<U5')

In [6]:
# test our function using the test_db helper
test_dp.policy_iteration_test( policy_iteration ) 


Testing: Policy Iteration
passed test: return value is tuple
passed test: length of tuple = 2
passed test: v is list of length=15
passed test: values of v elements
passed test: pi is list of length=15
passed test: values of pi elements
PASSED: Policy Iteration passcode = 9970-010


### Exercise 4.4 Value Iteration

Value Iteration calculates the optimal policy for an MDP, given its full definition.  The full definition of an MDP is the set of states, the set of available actions for each state, the set of rewards, the discount factor, and the state/reward transition function.

In [1]:
import test_dp               # required for testing and grading your code
import gridworld_mdp as gw   # defines the MDP for a 4x4 gridworld

**Implement the algorithm for Value Iteration**.  Value Iteration calculates the optimal policy for an MDP by iteration of a single step combining both policy evaluation and policy improvement.

A empty function **value_iteration** is provided below; implement the body of the function to correctly calculate the optimal policy for an MDP.  The function defines 5 parameters - a definition of each parameter is given in the comment block for the function.  For sample parameter values, see the calling code in the cell following the function.

In [2]:
def value_iteration(state_count, gamma, theta, get_available_actions, get_transitions):
    """
    This function computes the optimal value function and policy for the specified MDP, using the Value Iteration algorithm.
    
    'state_count' is the total number of states in the MDP. States are represented as 0-relative numbers.
    
    'gamma' is the MDP discount factor for rewards.
    
    'theta' is the small number threshold to signal convergence of the value function (see Iterative Policy Evaluation algorithm).
    
    'get_available_actions' returns a list of the MDP available actions for the specified state parameter.
    
    'get_transitions' is the MDP state / reward transiton function.  It accepts two parameters, state and action, and returns
        a list of tuples, where each tuple is of the form: (next_state, reward, probabiliity).  
    """
    V = state_count*[0]                # init all state value estimates to 0
    pi = state_count*[0]
    
    # init with a policy with first avail action for each state
    for s in range(state_count):
        avail_actions = get_available_actions(s)
        pi[s] = avail_actions[0]
        
    # insert code here to iterate using policy evaluation and policy improvement (see Policy Iteration algorithm)
    while True:
        delta = 0
        for s in range(state_count):
            v = V[s]
            max_ = -float('inf')
            for action in get_available_actions(s):
                a = 0
                for (trans) in get_transitions(state=s, action=action):
                    next_state, reward, probability = trans
                    a += probability * (reward + gamma * V[next_state])
                if a > max_:
                    max_ = a
            V[s] = max_
            delta = max(delta, abs(v - V[s]))
        if (delta < theta): break
    for s in range(state_count):
        arg_max = ""
        max_ = -float('inf')
        for action in get_available_actions(s):
            a = 0
            for (trans) in get_transitions(state=s, action=action):
                next_state, reward, probability = trans
                a += probability * (reward + gamma * V[next_state])
            if a > max_:
                arg_max = action
                max_ = a
        pi[s] = arg_max
    return (V, pi)        # return both the final value function and the final policy

First, test our function using the MDP defined by gw.* functions.

In [3]:
n_states = gw.get_state_count()

# test our function
values, policy = value_iteration(state_count=n_states, gamma=.9, theta=.001, get_available_actions=gw.get_available_actions, \
    get_transitions=gw.get_transitions)

print("Values=", values)
print("Policy=", policy)

Values= [0.0, -1.0, -1.9, -2.71, -1.0, -1.9, -2.71, -1.9, -1.9, -2.71, -1.9, -1.0, -2.71, -1.9, -1.0]
Policy= ['up', 'left', 'left', 'down', 'up', 'up', 'up', 'down', 'up', 'up', 'down', 'down', 'up', 'right', 'right']


**Expected output from running above cell:**

`
Values= [0.0, -1.0, -1.9, -2.71, -1.0, -1.9, -2.71, -1.9, -1.9, -2.71, -1.9, -1.0, -2.71, -1.9, -1.0]
Policy= ['up', 'left', 'left', 'down', 'up', 'up', 'up', 'down', 'up', 'up', 'down', 'down', 'up', 'right', 'right']
`

Now, test our function using the test_dp helper.  The helper also uses the gw MDP, but with a different gamma value.
If our function passes all tests, a passcode will be printed.

In [4]:
# test our function using the test_db helper
test_dp.value_iteration_test( value_iteration ) 


Testing: Value Iteration
passed test: return value is tuple
passed test: length of tuple = 2
passed test: v is list of length=15
passed test: values of v elements
passed test: pi is list of length=15
passed test: values of pi elements
PASSED: Value Iteration passcode = 9990-000
