Suhanee Kandalkar

D16AD 30

Exp 06 : Policy Evaluation and Iteration

---



In [3]:
import numpy as np

# Define the grid-world environment as a Markov Decision Process (MDP)
GRID_DIM = 4  # Grid size (4x4)
ACTIONS = ['UP', 'DOWN', 'LEFT', 'RIGHT']
GAMMA = 0.9  # Discount factor for future rewards
CONVERGENCE_THRESHOLD = 1e-6  # Stopping condition for policy evaluation

# Reward structure: Goal state (3,3) with a positive reward, others neutral
REWARD_FUNCTION = np.zeros((GRID_DIM, GRID_DIM))
REWARD_FUNCTION[3, 3] = 1  # Terminal state reward

# Transition function for the MDP (deterministic environment)
def state_transition(state, action):
    """Returns the next state given the current state and selected action."""
    x, y = state
    if action == 'UP' and x > 0:
        return (x - 1, y)
    if action == 'DOWN' and x < GRID_DIM - 1:
        return (x + 1, y)
    if action == 'LEFT' and y > 0:
        return (x, y - 1)
    if action == 'RIGHT' and y < GRID_DIM - 1:
        return (x, y + 1)
    return state  # Stay in place if movement is invalid

# Initialize state-value function and policy for iterative improvement
value_function = np.zeros((GRID_DIM, GRID_DIM))  # V(s): State-Value Function
policy = np.random.choice(ACTIONS, size=(GRID_DIM, GRID_DIM))  # Initial random policy

# Policy Evaluation: Computes V(s) for a given policy using Bellman Expectation Equation
def policy_evaluation(policy):
    global value_function
    while True:
        delta = 0  # Tracks convergence
        for i in range(GRID_DIM):
            for j in range(GRID_DIM):
                state = (i, j)
                selected_action = policy[i, j]
                next_state = state_transition(state, selected_action)
                # Bellman update equation
                reward = REWARD_FUNCTION[next_state]
                new_value = reward + GAMMA * value_function[next_state]
                delta = max(delta, abs(value_function[state] - new_value))
                value_function[state] = new_value
        if delta < CONVERGENCE_THRESHOLD:
            break

# Policy Improvement: Updates policy π(s) based on V(s)
def policy_improvement():
    global policy
    stable_policy = True
    for i in range(GRID_DIM):
        for j in range(GRID_DIM):
            state = (i, j)
            old_action = policy[i, j]
            # Compute Q(s, a) for all actions
            action_values = []
            for action in ACTIONS:
                next_state = state_transition(state, action)
                reward = REWARD_FUNCTION[next_state]
                q_value = reward + GAMMA * value_function[next_state]
                action_values.append(q_value)
            # Select action maximizing expected return
            best_action = ACTIONS[np.argmax(action_values)]
            policy[i, j] = best_action
            if old_action != best_action:
                stable_policy = False  # Policy has changed
    return stable_policy

# Policy Iteration: Iteratively evaluates and improves the policy until convergence
def policy_iteration():
    global policy, value_function
    while True:
        policy_evaluation(policy)  # Evaluate current policy π(s)
        if policy_improvement():  # Improve policy π(s)
            break  # Stop if policy is stable

# Execute policy iteration on the grid-world MDP
policy_iteration()

# Display the optimal policy and state-value function after convergence
print("Optimal Policy (π*):")
print(policy)
print("\nOptimal State-Value Function V*(s):")
print(value_function)


Optimal Policy (π*):
[['DOWN' 'DOWN' 'DOWN' 'DOWN']
 ['DOWN' 'DOWN' 'DOWN' 'DOWN']
 ['DOWN' 'DOWN' 'DOWN' 'DOWN']
 ['RIGHT' 'RIGHT' 'RIGHT' 'DOWN']]

Optimal State-Value Function V*(s):
[[5.90489714 6.56099714 7.28999714 8.09999714]
 [6.56099714 7.28999714 8.09999714 8.99999714]
 [7.28999714 8.09999714 8.99999714 9.99999714]
 [8.09999714 8.99999714 9.99999714 9.99999714]]
