Overview:
In this assignment, you will apply dynamic programming techniques to solve
a simple Markov Decision Process (MDP) problem. MDPs are commonly
used to model sequential decision-making problems, and dynamic
programming is a powerful approach to find the optimal policy and value
function.

Problem Statement:

Imagine a simple grid world with the following characteristics:

  • The grid world is represented as an 25x25 grid.

  • Each cell in the grid can be in one of two states: empty,  or goal.
  
  • The goal state is always in the left most point i.e., the cell in the 1st row and 1st column.
  
  • Empty cells can be traversed by an agent.
  
  • The goal cell is where the agent should reach.
  
  • The agent can move in four directions: up, down, left, and right. It cannot move diagonally.
  
  • At each time step, the agent can choose to move in one of the available directions or stay in the same cell.
  
  • The agent receives a reward of -1 for each time step it takes to reach the goal cell.
  
  • The agent cannot leave the grid.

Your Tasks:


1. Define the MDP for the given grid world. You need to specify the state
space, action space, transition probabilities, reward function, and
discount factor.


A MDP is a tuple <S,A,P,R,y> where
  
  S is a finite set of states, in this case a 25 x 25 grid of 625 states

  A is a finite set of actions, in this case four actions; up, left, down, right

  P is a state transition probability matrix

  R is a reward function

  y is a discount factor

2. Implement the policy iteration using dynamic programming algorithms
to solve the MDP. Here is a simple pseudocode as a refresher:

  a. Start with an initial random policy.
  
  b. Implement policy evaluation to calculate the value function for the policy.
  
  c. Implement policy improvement to update the policy based on the value function.
  
  d. Repeat policy evaluation and improvement until the policy converges.


In [1]:
import numpy as np

# State space S
GRID_SIZE = 25

GOAL_STATE = (0, 0)

#Discount factor y
DISCOUNT_FACTOR = 0.8

# Action space A
ACTIONS = [(0, 1), (0, -1), (1, 0), (-1, 0), (0, 0)]  # Right, Left, Down, Up, Stay

#Ensure the agent doesn't leave the board
def is_valid_state(state):
    return 0 <= state[0] < GRID_SIZE and 0 <= state[1] < GRID_SIZE

#Function to see if we won the game
def is_goal_state(state):
    return state == GOAL_STATE

#Function to move to a new state given an action
def get_next_state(state, action):
    next_state = (state[0] + action[0], state[1] + action[1])
    return next_state if is_valid_state(next_state) else state

#Start with a random policy into a numpy array
def initialize_policy():
    policy = np.random.choice(range(len(ACTIONS)), size=(GRID_SIZE, GRID_SIZE))
    return policy

#
def policy_evaluation(policy, value_function):
    theta = 1e-6
    while True:
        delta = 0
        #For each state
        for i in range(GRID_SIZE):
            for j in range(GRID_SIZE):
                state = (i, j)

                #If its not the goal state update the state value
                if not is_goal_state(state):
                    action = ACTIONS[policy[i, j]]
                    next_state = get_next_state(state, action)
                    reward = -1
                    new_value = reward + DISCOUNT_FACTOR * value_function[next_state[0], next_state[1]]
                    delta = max(delta, abs(new_value - value_function[i, j]))
                    value_function[i, j] = new_value
        if delta < theta:
            break

#Update the policy based on given value function
def policy_update(policy, value_function):
    policy_stable = True
    for i in range(GRID_SIZE):
        for j in range(GRID_SIZE):
            state = (i, j)
            if not is_goal_state(state):
                old_action = policy[i, j]
                action_values = []
                #for each state find the value of each action
                for k, action in enumerate(ACTIONS):
                    next_state = get_next_state(state, action)
                    reward = -1
                    action_values.append(reward + DISCOUNT_FACTOR * value_function[next_state[0], next_state[1]])
                #select the action that maximizes reward
                new_action = np.argmax(action_values)
                policy[i, j] = new_action
                #If nothing changes in the policy break
                if old_action != new_action:
                    policy_stable = False
    return policy_stable

#Combine the above two ie. evaluate improve evaluate etc until convergence
def policy_iteration():
    #Initialize a random first policy
    policy = initialize_policy()
    #Fill the grid with zeros
    value_function = np.zeros((GRID_SIZE, GRID_SIZE))
    while True:
        policy_evaluation(policy, value_function)
        policy_stable = policy_update(policy, value_function)
        #if no changes are made ent the loop
        if policy_stable:
            break
    return policy, value_function



3. Visualize the results:

  a. Show the optimal policy found using both policy iteration and value iteration.
  
  b. Calculate and display the optimal value function.


In [2]:
    optimal_policy, optimal_value_function = policy_iteration()

    def visualize_policy(policy):
        action_symbols = ['Right', 'Left', 'Down', 'Up', 'S']
        for i in range(GRID_SIZE):
            row = ''
            for j in range(GRID_SIZE):
                if is_goal_state((i, j)):
                    row += 'G '
                else:
                    row += action_symbols[policy[i, j]] + ' '
            print(row)

    def visualize_value_function(value_function):
        for i in range(GRID_SIZE):
            row = ''
            for j in range(GRID_SIZE):
                if is_goal_state((i, j)):
                    row += 'G '
                else:
                    row += f'{value_function[i, j]:.2f} '
            print(row)

    print("Optimal Policy:")
    visualize_policy(optimal_policy)

    print("\nOptimal Value Function:")
    visualize_value_function(optimal_value_function)

Optimal Policy:
G Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left 
Up Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left 
Up Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left 
Up Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left 
Up Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left 
Up Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left 
Up Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left 
Up Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left Left

4. Write a report describing your approach to the problem. It must contain
the following:

  a. The choice of data structure to implement the MDP and a justification

  I used a simple numpy array randomly initialized with np.random.choice. It was just the most straightforward structure I had the most experience with.


In [3]:
# Function to perform a greedy evaluation of a given policy
def greedy_evaluation(policy, start_state, value_function):
    state = start_state
    steps = 0
    while not is_goal_state(state):
        action = ACTIONS[policy[state[0], state[1]]]
        next_state = get_next_state(state, action)
        state = next_state
        steps += 1
    return steps

# Function to run experiments and report median steps
def run_experiments(policy, value_function, num_trials=3):
    start_states = [(1, 1), (10, 10), (20, 20)]  # Example start states

    steps_list = []
    for start_state in start_states:
        steps_trial = []
        for _ in range(num_trials):
            steps = greedy_evaluation(policy, start_state, value_function)
            steps_trial.append(steps)
        steps_list.append(np.median(steps_trial))

    return steps_list

In [8]:
# b. Performance of a random policy
random_policy = initialize_policy()
random_value_function = np.zeros((GRID_SIZE, GRID_SIZE))
random_steps = run_experiments(random_policy, random_value_function)
print(f"Random Policy Performance: {random_steps}")

KeyboardInterrupt: ignored

Random performance will enter a permanent loop and will never complete because you know its random


In [11]:
intermediate_policy, intermediate_value_function = policy_iteration()
intermediate_steps = run_experiments(intermediate_policy, intermediate_value_function)
print(f"Intermediate Policy Performance: {intermediate_steps}")

Intermediate Policy Performance: [2.0, 20.0, 40.0]


  d. The performance of the optimal policy found using your approach. You can compute it by greedily evaluating it and reporting the median (out of three trials) number of steps it took to reach the goal state from 3 different start states. You should use the same start states as defined in the previous section for your experiments.

In [10]:
optimal_policy, optimal_value_function = policy_iteration()
optimal_steps = run_experiments(optimal_policy, optimal_value_function)
print(f"Optimal Policy Performance: {optimal_steps}")

Optimal Policy Performance: [2.0, 20.0, 40.0]


It would appear that since the intermediate and opitmal policys perform the same so the game is easily learned.