### Assignment : Week 2
## Finding best policies in simple MDPs

Great work making the MDPs in Week 1!

In this assignment, we'll use the simplest RL techniques - Policy and Value iteration to find the best policies (which maximize the discounted total reward) in our MDPs from last week.

Feel free to use your own MDPs, or import them from the OpenAI Gym library.

You can start this assignment during/after reading Grokking Ch-3.

## Frozen Lake

Let's now try to solve the Frozen Lake environment for some cases

To align with Grokking, let us consider an unusual $\gamma = 1$.

In [67]:
# Step 0 is to import stuff

import gymnasium as gym

import numpy as np
from gymnasium.envs.toy_text.frozen_lake import generate_random_map

In [3]:
# Step 1 is to get the MDP
env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=True)
env = env.unwrapped
mdp_transitions = env.P
init_state = env.reset()
goal_state = 15
print(mdp_transitions)

{0: {0: [(0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 4, 0.0, False)], 1: [(0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 4, 0.0, False), (0.3333333333333333, 1, 0.0, False)], 2: [(0.3333333333333333, 4, 0.0, False), (0.3333333333333333, 1, 0.0, False), (0.3333333333333333, 0, 0.0, False)], 3: [(0.3333333333333333, 1, 0.0, False), (0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 0, 0.0, False)]}, 1: {0: [(0.3333333333333333, 1, 0.0, False), (0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 5, 0.0, True)], 1: [(0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 5, 0.0, True), (0.3333333333333333, 2, 0.0, False)], 2: [(0.3333333333333333, 5, 0.0, True), (0.3333333333333333, 2, 0.0, False), (0.3333333333333333, 1, 0.0, False)], 3: [(0.3333333333333333, 2, 0.0, False), (0.3333333333333333, 1, 0.0, False), (0.3333333333333333, 0, 0.0, False)]}, 2: {0: [(0.3333333333333333, 2, 0.0, False), (0.3333333333333333

In [35]:
# Step 2 is to write the policy

# This is according to the convention of gymnasium
# LEFT, DOWN, RIGHT, UP = range(4)
# pi = {
#     0:RIGHT, 1:RIGHT, 2:DOWN, 3:LEFT,
#     4:DOWN, 5:LEFT, 6:DOWN, 7:LEFT,
#     8:RIGHT, 9:RIGHT, 10:DOWN, 11:LEFT,
#     12:LEFT, 13:RIGHT, 14:RIGHT, 15:LEFT
# }

# Or you can do it randomly
pi = dict()
for state in mdp_transitions:
    pi[state] = np.random.choice(range(4))

In [36]:
# Step 3 is computing the value function for this envi and policy

# Let us start with a random value function

val = dict()
for state in mdp_transitions:
    val[state] = np.random.random()

# Since 5, 7, 11, 12 and 15 are terminal states, we know their values are 0

val[5] = 0
val[7] = 0
val[11] = 0
val[12] = 0
val[15] = 0


#Or you could do it randomly, remember to set the terminal states to 0. You can also implement this while evaluating the value function using 
# val = dict()
# for state in mdp:
#     val[state] = np.random.random()
#     if mdp[state][0][0][0] == 0: # if the first action in the first outcome of the first state is 0, then it is a terminal state
#         val[state] = 0

#instead of doing thsi you can simply intialize the value function to 0 for all states 
# for state in swf_mdp:
#   val[state] = 0

In [37]:
def get_new_value_fn(val, mdp, pi, gamma = 0.99):

    new_val = dict()
    for s in mdp:
        new_val[s] = 0
        for pro,snew,rew,done in mdp[s][pi[s]]:
            new_val[s] += pro*(rew + gamma*val[snew]*(not done))

    return new_val



In [38]:
#Use to above function to get the new value function, also print how many iterations it took to converge

def checkConvergence(val,old_val,epsilon):
    ans = True
    for s in val:
        if abs(val[s]-old_val[s])>epsilon:
            ans = False
    return ans

def policy_evaluation(val, mdp, pi, epsilon=1e-10, gamma=0.99):
    count = 0
    # Complete this function to iteratively caluculate the value function 
    # until the difference between the new and old value function is less than epsilon
    # Also return the number of iterations it took to converge
    
    while(True):
        count += 1
        old_val = val.copy()
        val = get_new_value_fn(old_val,mdp,pi,gamma)

        if(checkConvergence(val,old_val,epsilon)):
            break

    return val, count 



In [39]:
def getMax(qval):
    ans = 0
    for x in qval:
        if qval[x] > qval[ans]:
            ans = x
    return ans

# Perform policy improvement using the polivy and the value function and return a new policy, the action value function should be a nested dictionary
def policy_improvement(val, mdp, pi, gamma=0.99):
    new_pi = dict()
    q = dict()
    # Complete this function to get the new policy given the value function and the mdp
    for s in val.keys():
        new_pi[s] = pi[s]
        q[s] = None
        temp = dict()
        for x in mdp[s]:
            temp[x] = 0
            for y in mdp[s][x]:
                temp[x] += y[0]*(y[2] + gamma*val[y[1]]*(not y[3]))
        q[s] = temp.copy()
        new_pi[s] = getMax(temp)

    return new_pi, q



In [58]:
# Use the above functions to get the optimal policy and optimal value function and return the total number of iterations it took to converge
# Create a random policy and value function to start with or use the ones defined above
def checkSame(op,p):
    for x in op:
        if(op[x]!=p[x]):
            return False
        
    return True

def policy_iteration(mdp, epsilon=1e-10, gamma=0.99):
    pi = dict()
    for state in mdp_transitions:
        pi[state] = np.random.choice(range(4))

    val = dict()
    for state in mdp_transitions:
        val[state] = np.random.random()
    val[5] = 0
    val[7] = 0
    val[11] = 0
    val[12] = 0
    val[15] = 0
    
    count = 0
    # Complete this function to get the optimal policy and value function and return the total number of iterations it took to converge
    while(True):
        count += 1
        old_pi = pi.copy()
        val, c = policy_evaluation(val,mdp,old_pi)
        pi , q = policy_improvement(val,mdp,old_pi)
        if(checkSame(old_pi,pi)):
            break
        

    return pi, val, count



In [59]:
#Now perform value iteration, note that the value function is a dictionary and not a list, also return the number of iterations it took to converge
def value_iteration(mdp, gamma=0.99, epsilon=1e-10):
    val = {s: 0 for s in mdp}
    count = 0
    
    q = dict()
    # Complete this function to get the optimal policy, optimal value function and return the total number of iterations it took to converge
    while(True):
        count += 1
        old_val = val.copy()

        for s in mdp :
            q[s] = dict()
            for x in range(4):
                q[s][x] = 0
                for pro,snew,rew,done in mdp[s][x]:
                    q[s][x] += pro*(rew + gamma*val[snew]*(not done))
            
            val[s] = q[s][getMax(q[s])]
        
        if(checkConvergence(val,old_val,epsilon)):
            break

    pi = {s: max(q[s], key=q[s].get) for s in mdp}
    return pi, val, count



In [60]:
env2 = gym.make('FrozenLake-v1',desc=generate_random_map(size=4))
env2 = env2.unwrapped
mdp2 = env2.P

In [65]:
pi1, val1, count1 = policy_iteration(mdp2)
pi2, val2, count2 = value_iteration(mdp2)


You can also write a function `test_policy()` to test your policy after training to find the number of times you reached the goal state

In [66]:
def test_policy(pi, env, goalstate):
    # Complete this function to test the policy
    mdp = env.P
    init_state = env.reset()
    goal_state = goalstate
    print(mdp)
    print(policy_iteration(val,pi,mdp))
    print(value_iteration(mdp))
    return
