### Assignment : Week 2
## Finding best policies in simple MDPs

Great work making the MDPs in Week 1!

In this assignment, we'll use the simplest RL techniques - Policy and Value iteration to find the best policies (which maximize the discounted total reward) in our MDPs from last week.

Feel free to use your own MDPs, or import them from the OpenAI Gym library.

You can start this assignment during/after reading Grokking Ch-3.

For this you have to install gymnasium, which is an API standard for reinforcement learning with a diverse collection of reference environments. This can be easily done by running:

    pip install gymnasium

## Frozen Lake

Let's now try to solve the Frozen Lake environment for some cases

In [28]:
# Step 0 is to import stuff

import gymnasium as gym
import numpy as np
from gymnasium.envs.toy_text.frozen_lake import generate_random_map
from pprint import pprint

In [29]:
GRID_SIZE = 4
TERMINAL_STATES = {5, 7, 11, 12, 15}
ACTIONS = ["Up", "Down", "Left", "Right"]

# Define the transitions for stochastic behavior
PROBABILITIES = {
    "Intended": 1/3,
    "Left": 1/3,
    "Right": 1/3
}

In [30]:
def left_action(action):
    if(action=="Up"):
        return "Left"
    elif(action=="Left"):
        return "Down"
    elif(action=="Down"):
        return "Right"
    elif(action=="Right"):
        return "Up"
    
def right_action(action):
    if(action=="Left"):
        return "Up"
    elif(action=="Down"):
        return "Left"
    elif(action=="Right"):
        return "Down"
    elif(action=="Up"):
        return "Right" 

In [31]:
def get_next_state(state, action):
    """Determines the next state based on the current state and action."""
    row, col = divmod(state, GRID_SIZE)
    
    
    if action == "Up":
        row = max(row - 1, 0)
    elif action == "Down":
        row = min(row + 1, GRID_SIZE - 1)
    elif action == "Left":
        col = max(col - 1, 0)
    elif action == "Right":
        col = min(col + 1, GRID_SIZE - 1)

    return row * GRID_SIZE + col

def generate_transitions(state):
    """Generates transitions for a given state."""
    if state in TERMINAL_STATES:
        return {action: [(1.0, state, 0, True)] for action in ACTIONS}  # Stays in the same state

    transitions = {}
    for action in ACTIONS:
        intended_state = get_next_state(state, action)
        left_state = get_next_state(state, left_action(action))
        right_state = get_next_state(state, right_action(action))

        transitions[action] = [
            (PROBABILITIES["Intended"], intended_state, 0, intended_state in TERMINAL_STATES),
            (PROBABILITIES["Left"], left_state, 0, left_state in TERMINAL_STATES),
            (PROBABILITIES["Right"], right_state, 0, right_state in TERMINAL_STATES)
        ]

    return transitions

# Construct the MDP dictionary
mdp = {}
for state in range(GRID_SIZE * GRID_SIZE):
    mdp[state] = generate_transitions(state)

# Example output
pprint(mdp)

from gymnasium.envs.toy_text.frozen_lake import generate_random_map

# Step 1: Initialize the FrozenLake environment
env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=True)
env = env.unwrapped
mdp_transitions = env.P
init_state = env.reset()
goal_state = 15

# Step 2: Define policy
LEFT, DOWN, RIGHT, UP = range(4)
pi = {
    0: RIGHT, 1: RIGHT, 2: DOWN, 3: LEFT,
    4: DOWN, 5: LEFT, 6: DOWN, 7: LEFT,
    8: RIGHT, 9: RIGHT, 10: DOWN, 11: LEFT,
    12: LEFT, 13: RIGHT, 14: RIGHT, 15: LEFT
}

# Step 3: Initialize value function
val = {state: 0 for state in mdp_transitions}
val[5], val[7], val[11], val[12], val[15] = 0, 0, 0, 0, 0

{0: {'Down': [(0.3333333333333333, 4, 0, False),
              (0.3333333333333333, 1, 0, False),
              (0.3333333333333333, 0, 0, False)],
     'Left': [(0.3333333333333333, 0, 0, False),
              (0.3333333333333333, 4, 0, False),
              (0.3333333333333333, 0, 0, False)],
     'Right': [(0.3333333333333333, 1, 0, False),
               (0.3333333333333333, 0, 0, False),
               (0.3333333333333333, 4, 0, False)],
     'Up': [(0.3333333333333333, 0, 0, False),
            (0.3333333333333333, 0, 0, False),
            (0.3333333333333333, 1, 0, False)]},
 1: {'Down': [(0.3333333333333333, 5, 0, True),
              (0.3333333333333333, 2, 0, False),
              (0.3333333333333333, 0, 0, False)],
     'Left': [(0.3333333333333333, 0, 0, False),
              (0.3333333333333333, 5, 0, True),
              (0.3333333333333333, 1, 0, False)],
     'Right': [(0.3333333333333333, 2, 0, False),
               (0.3333333333333333, 1, 0, False),
               (

In [32]:
def get_new_value_fn(val, mdp, pi, gamma=1.0):
    new_val = {}
    for state in mdp:
        action = pi[state]
        new_val[state] = sum(prob * (reward + gamma * val[next_state])
                             for prob, next_state, reward, _ in mdp[state][action])
    return new_val

In [33]:
def policy_evaluation(val, mdp, pi, epsilon=1e-10, gamma=1.0):
    count = 0
    while True:
        new_val = get_new_value_fn(val, mdp, pi, gamma)
        delta = max(abs(new_val[s] - val[s]) for s in mdp)
        val = new_val
        count += 1
        if delta < epsilon:
            break
    return val, count

In [34]:
def policy_improvement(val, mdp, pi, gamma=1.0):
    new_pi = {}
    q = {}
    for state in mdp:
        q[state] = {}
        for action in range(4):
            q[state][action] = sum(prob * (reward + gamma * val[next_state])
                                   for prob, next_state, reward, _ in mdp[state][action])
        new_pi[state] = max(q[state], key=q[state].get)
    return new_pi, q

In [35]:
def policy_iteration(mdp, epsilon=1e-10, gamma=1.0):
    pi = {s: np.random.choice(range(4)) for s in mdp}
    val = {s: 0 for s in mdp}
    count = 0
    while True:
        val, _ = policy_evaluation(val, mdp, pi, epsilon, gamma)
        new_pi, _ = policy_improvement(val, mdp, pi, gamma)
        count += 1
        if new_pi == pi:
            break
        pi = new_pi
    return pi, val, count

In [36]:
def value_iteration(mdp, gamma=1.0, epsilon=1e-10):
    val = {s: 0 for s in mdp}
    count = 0
    while True:
        new_val = {s: max(sum(prob * (reward + gamma * val[next_state])
                              for prob, next_state, reward, _ in mdp[s][a])
                           for a in range(4)) for s in mdp}
        delta = max(abs(new_val[s] - val[s]) for s in mdp)
        val = new_val
        count += 1
        if delta < epsilon:
            break
    pi = {s: max(range(4), key=lambda a: sum(prob * (reward + gamma * val[next_state])
                                             for prob, next_state, reward, _ in mdp[s][a]))
          for s in mdp}
    return pi, val, count

In [37]:
def print_policy(policy, env):
    action_symbols = {0: '←', 1: '↓', 2: '→', 3: '↑'}
    grid_size = env.desc.shape
    policy_symbols = np.array([action_symbols[policy[cell]] for cell in range(len(policy))])
    policy_grid = policy_symbols.reshape(grid_size)
    print("Policy Grid:")
    for row in policy_grid:
        print(" ".join(row))

In [38]:
def test_policy(pi, env, goalstate):
    successes = 0
    for _ in range(100):
        state, _ = env.reset()
        done = False
        while not done:
            action = pi[state]
            state, _, done, _, _ = env.step(action)
        if state == goalstate:
            successes += 1
    return successes


In [39]:
pi1, val1, count1 = policy_iteration(mdp_transitions)
pi2, val2, count2 = value_iteration(mdp_transitions)

# Print results
print("Policy Iteration:")
print_policy(pi1, env)
print(f"Converged in {count1} iterations.")

print("\nValue Iteration:")
print_policy(pi2, env)
print(f"Converged in {count2} iterations.")

# Test policies
success1 = test_policy(pi1, env, goal_state)
success2 = test_policy(pi2, env, goal_state)
print(f"\nPolicy Iteration Success Rate: {success1}/100")
print(f"Value Iteration Success Rate: {success2}/100")

Policy Iteration:
Policy Grid:
← ↑ ↑ ↑
← ← ← ←
↑ ↓ ← ←
← → ↓ ←
Converged in 7 iterations.

Value Iteration:
Policy Grid:
← ↑ ↑ ↑
← ← ← ←
↑ ↓ ← ←
← → ↓ ←
Converged in 806 iterations.

Policy Iteration Success Rate: 77/100
Value Iteration Success Rate: 84/100


You can also write a function `test_policy()` to test your policy after training to find the number of times you reached the goal state