### Assignment : Week 2
## Finding best policies in simple MDPs

Great work making the MDPs in Week 1!

In this assignment, we'll use the simplest RL techniques - Policy and Value iteration to find the best policies (which maximize the discounted total reward) in our MDPs from last week.

Feel free to use your own MDPs, or import them from the OpenAI Gym library.

You can start this assignment during/after reading Grokking Ch-3.

For this you have to install gymnasium, which is an API standard for reinforcement learning with a diverse collection of reference environments. This can be easily done by running:

    pip install gymnasium

In [1]:
%pip install gymnasium

Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
   ---------------------------------------- 0.0/958.1 kB ? eta -:--:--
   ---------------------------------------- 0.0/958.1 kB ? eta -:--:--
   ---------- ----------------------------- 262.1/958.1 kB ? eta -:--:--
   --------------------- ------------------ 524.3/958.1 kB 1.2 MB/s eta 0:00:01
   ---------------------------------------- 958.1/958.1 kB 1.2 MB/s eta 0:00:00
Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0
Note: you may need to restart the kernel to use updated packages.


## Frozen Lake

Let's now try to solve the Frozen Lake environment for some cases

In [1]:
# Step 0 is to import stuff

import gymnasium as gym
import numpy as np
from gymnasium.envs.toy_text.frozen_lake import generate_random_map

In [2]:
# Step 1 is to get the MDP
env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=True)
env = env.unwrapped
mdp_transitions = env.P
init_state = env.reset()
goal_state = 15

In [None]:
# Step 2 is to write the policy

# This is according to the convention of gymnasium
LEFT, DOWN, RIGHT, UP = range(4)
pi = {
    0:RIGHT, 1:RIGHT, 2:DOWN, 3:LEFT,
    4:DOWN, 5:LEFT, 6:DOWN, 7:LEFT,
    8:RIGHT, 9:RIGHT, 10:DOWN, 11:LEFT,
    12:LEFT, 13:RIGHT, 14:RIGHT, 15:LEFT
}

# Or you can do it randomly
# pi = dict()
# for state in mdp:
#     pi[state] = np.random.choice(mdp[state].keys())



In [6]:
# Step 3 is computing the value function for this envi and policy

# Let us start with a random value function

val = dict()
for state in mdp_transitions:
    val[state] = np.random.random()

# Since 5, 7, 11, 12 and 15 are terminal states, we know their values are 0

val[5] = 0
val[7] = 0
val[11] = 0
val[12] = 0
val[15] = 0

#Or you could do it randomly, remember to set the terminal states to 0. You can also implement this while evaluating the value function using 
# val = dict()
# for state in mdp:
#     val[state] = np.random.random()
#     if mdp[state][0][0][0] == 0: # if the first action in the first outcome of the first state is 0, then it is a terminal state
#         val[state] = 0

#instead of doing thsi you can simply intialize the value function to 0 for all states 
# for state in swf_mdp:
#   val[state] = 0

In [8]:
def get_new_value_fn(val, mdp, pi, gamma = 1.0):
    
    new_val = dict()
    # Complete this function to get the new value function given the old value function and the policy
    for state in range(16):
        new_val[state]=0
        for prob, next_state, reward, term in mdp[state][pi[state]]:
            new_val[state]+=prob* (reward+gamma*val[next_state]*(not term))
    


    return new_val

In [10]:
#Use to above function to get the new value function, also print how many iterations it took to converge
def policy_evaluation(val, mdp, pi, epsilon=1e-10, gamma=1.0):
    count = 0
    # Complete this function to iteratively caluculate the value function until the difference between the new and old value function is less than epsilon
    # Also return the number of iterations it took to converge

    while (True):
        count+=1
        new_val=get_new_value_fn(val,mdp, pi)
        improved=False
        for state in range (16):
            if (new_val[state]-val[state])>epsilon:
                improved=True
                continue
        if improved==False:
            break
        else:
            val=new_val

    return val, count 

In [12]:
# Perform policy improvement using the polivy and the value function and return a new policy, the action value function should be a nested dictionary
def policy_improvement(val, mdp, pi, gamma=1.0):
    new_pi = dict()
    q = dict()
    # Complete this function to get the new policy given the value function and the mdp
    actions=[0,1,2,3]
    for state in range(16):
        q[state] = {action: 0 for action in actions}
        for action in actions:
            for prob,next_state,reward,term in mdp[state][action]:
                q[state][action]+= prob * (reward + gamma* val[next_state] * (not term))
        
        new_pi[state] = max(q[state], key=q[state].get)

    return new_pi, q


In [21]:
# Use the above functions to get the optimal policy and optimal value function and return the total number of iterations it took to converge
# Create a random policy and value function to start with or use the ones defined above
def policy_iteration(mdp, epsilon=1e-10, gamma=1.0):
    print(mdp)
    count = 0
    # Complete this function to get the optimal policy and value function and return the total number of iterations it took to converge

    val = dict()
    for state in mdp_transitions:
        val[state] = np.random.random()

    # Since 5, 7, 11, 12 and 15 are terminal states, we know their values are 0

    val[5] = 0
    val[7] = 0
    val[11] = 0
    val[12] = 0
    val[15] = 0

    LEFT, DOWN, RIGHT, UP = range(4)
    pi = {
        0:RIGHT, 1:RIGHT, 2:DOWN, 3:LEFT,
        4:DOWN, 5:LEFT, 6:DOWN, 7:LEFT,
        8:RIGHT, 9:RIGHT, 10:DOWN, 11:LEFT,
        12:LEFT, 13:RIGHT, 14:RIGHT, 15:LEFT
    }
    while True:
        count+=1
        print(count)
        new_val, count =policy_evaluation(val, mdp, pi,epsilon,gamma)
        if count==1:
            break
        else :
            pi,_=policy_improvement(new_val, mdp, pi,gamma)
            val=new_val
        
        
    return pi, val, count

In [76]:
#Now perform value iteration, note that the value function is a dictionary and not a list, also return the number of iterations it took to converge
import numpy as np

def value_iteration(mdp, gamma=1.0, epsilon=1e-10):
    
    val = {state: np.random.random() for state in mdp}


    # Since 5, 7, 11, 12 and 15 are terminal states, we know their values are 0

    val[5] = 0.0
    val[7] = 0.0
    val[11] = 0.0
    val[12] = 0.0
    val[15] = 0.0

    count = 0
    actions=[0,1,2,3]

    # Complete this function to get the optimal policy, optimal value function and return the total number of iterations it took to converge
    # pi = {s: max(q[s], key=q[s].get) for s in mdp}
    LEFT, DOWN, RIGHT, UP = range(4)
    pi = {
        0:RIGHT, 1:RIGHT, 2:DOWN, 3:LEFT,
        4:DOWN, 5:LEFT, 6:DOWN, 7:LEFT,
        8:RIGHT, 9:RIGHT, 10:DOWN, 11:LEFT,
        12:LEFT, 13:RIGHT, 14:RIGHT, 15:LEFT
    }
    while True:
        count+=1
        q = np.zeros((len(mdp), len(actions)))  # Q-values for all states and actions
        for state in range(16):
            for action in actions:
                for prob,next_state,reward,term in mdp[state][action]:
                    q[state][action]+= prob * (reward + gamma* val[next_state] * (not term))

        new_val=np.max(q,axis=1)
        delta=max(abs(val[state]-new_val[state]) for state in mdp)
        for state in mdp :
            val[state]=new_val[state]

        if delta<epsilon:
            break
        # new_pi = {s: max(q[s], key=q[s].get) for s in mdp}
        # if new_pi==pi:
        #     break
        # else:
        #     pi=new_pi
    # val=policy_evaluation(val, mdp, pi,epsilon,gamma)
    pi = {s: np.argmax(q[s]) for s in range(len(mdp))}
    return pi, val, count
    

In [15]:
#Function to print the policy you got after running the policy iteration or value iteration on the 4x4 FrozenLake environment
def print_policy(policy, env):
    """
    Prints the policy for the 4x4 FrozenLake environment in a grid layout.
    """
    action_symbols = {0: '←', 1: '↓', 2: '→', 3: '↑'}  #action symbols
    grid_size = env.desc.shape  #get the grid dimensions (e.g., 4x4)
    
    policy_symbols = np.array([action_symbols[action] for cell,action in policy.items()])
    policy_grid = policy_symbols.reshape(grid_size)  #reshape into a grid

    print("Policy Grid:")
    for row in policy_grid:
        print(" ".join(row))


In [77]:
pi1, val1, count1 = policy_iteration(mdp_transitions)
pi2, val2, count2 = value_iteration(mdp_transitions)


{0: {0: [(0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 4, 0.0, False)], 1: [(0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 4, 0.0, False), (0.3333333333333333, 1, 0.0, False)], 2: [(0.3333333333333333, 4, 0.0, False), (0.3333333333333333, 1, 0.0, False), (0.3333333333333333, 0, 0.0, False)], 3: [(0.3333333333333333, 1, 0.0, False), (0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 0, 0.0, False)]}, 1: {0: [(0.3333333333333333, 1, 0.0, False), (0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 5, 0.0, True)], 1: [(0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 5, 0.0, True), (0.3333333333333333, 2, 0.0, False)], 2: [(0.3333333333333333, 5, 0.0, True), (0.3333333333333333, 2, 0.0, False), (0.3333333333333333, 1, 0.0, False)], 3: [(0.3333333333333333, 2, 0.0, False), (0.3333333333333333, 1, 0.0, False), (0.3333333333333333, 0, 0.0, False)]}, 2: {0: [(0.3333333333333333, 2, 0.0, False), (0.3333333333333333

In [78]:
print_policy(pi,env=env)

Policy Grid:
→ → ↓ ←
↓ ← ↓ ←
→ → ↓ ←
← → → ←


In [79]:
print_policy(pi1,env=env)

Policy Grid:
← ↑ ↑ ↑
← ← ← ←
↑ ↓ ← ←
← → ↓ ←


In [80]:
print_policy(pi2,env=env)

Policy Grid:
← ↑ ↑ ↑
← ← ← ←
↑ ↓ ← ←
← → ↓ ←


In [81]:
print(val1)
print(val2)

{0: 0.8235294093675635, 1: 0.8235294085640291, 2: 0.8235294079934693, 3: 0.8235294076974147, 4: 0.8235294095420496, 5: 0.0, 6: 0.5294117629918788, 7: 0.0, 8: 0.8235294098783209, 9: 0.8235294103519006, 10: 0.7647058811069277, 11: 0.0, 12: 0.0, 13: 0.88235294017329, 14: 0.9411764700677003, 15: 0.0}
{0: 0.8235294094317905, 1: 0.8235294086497853, 2: 0.8235294080945126, 3: 0.8235294078063902, 4: 0.8235294096016015, 5: 0.0, 6: 0.5294117630378022, 7: 0.0, 8: 0.823529409928863, 9: 0.8235294103897539, 10: 0.7647058811403122, 11: 0.0, 12: 0.0, 13: 0.8823529402001683, 14: 0.941176470081647, 15: 0.0}


You can also write a function `test_policy()` to test your policy after training to find the number of times you reached the goal state

In [None]:
def test_policy(pi, env, goalstate):
    # Complete this function to test the policy
    
    return
