# 0. Imports

In [65]:
import numpy as np
import random

# 1. Environnment Configs

In [66]:
env_conf = {
    "GRID_SIZE" : 100,
    "GAMMA" : 0.9,
    "GOAL_STATE" : 3,
    "THRESHOLD" : 0.00001
}

# 2. Create Environment

In [67]:
# Extract all configs into variables
GRID_SIZE, GAMMA, GOAL_STATE, THRESHOLD = env_conf['GRID_SIZE'],  env_conf['GAMMA'], env_conf['GOAL_STATE'], env_conf['THRESHOLD']
NUM_STATES = GRID_SIZE * GRID_SIZE

In [68]:
# Create an reward map with the specifications
rewards = np.zeros(NUM_STATES)
rewards[GOAL_STATE] = 1

In [69]:
# Create an action map, i.e. where does a specified action take you from state X to Y as index changes
actions = {
    "up" : -GRID_SIZE,
    "down" : GRID_SIZE,
    "left" : -1,
    "right": 1
}

In [70]:
action_lists = list(actions.keys())
random.randint(0, len(action_lists))

3

In [71]:
# Initialise a Value matrix
policy = [action_lists[random.randint(0, len(action_lists)-1)] if state != GOAL_STATE else None for state in range(NUM_STATES)]
V = np.zeros(NUM_STATES)

# 3. Supplementary Functions

In [72]:
def action_validation(state: int, action: str):
    """
    Function: Validates if an action can be taken by the agent in that state
    Args:
        state(int) : Grid position of the agent
        action(str) :  Action it wants to take
    Returns:
        Bool: True for valid action and False for invalid actions
    """

    # Check the row, column
    row, col = divmod(state, GRID_SIZE)

    # For VERTICAL BOUNDS
    if (row == 0 and action == "up") or (row == GRID_SIZE - 1 and action == "down"):
        return False
    
    # For HORIZONTAL BOUNDS
    if (col == 0 and action == "left") or (col == GRID_SIZE -1 and action == "right"):
        return False
    
    # Else return true
    return True
    

In [73]:
def get_next_state(state, action):
    """
    Function: Validates the action and the state annd produces the next state
    Args:
        state (int): Position index in the grid worrld
        action (str): The action the agent intends to take 
    Returns:
        next_state (int) : The next state index
    """

    # Check if the action is valid, if not return the same state back
    if not action_validation(state=state, action=action):
        return state
    
    else:
        return state + actions[action]


In [74]:
def print_grid(V: np.array, GRID_SIZE: int):
    """
    Function: Prints the gridfrom the value matrix and the provided GRID_SIZE
    Args:
        V (np.array): Value matrix with the value of each state V(s)
        GRID_SIZE (int): The size of one of the GRID DIMENSIOONs of the square grid
    """

    print(np.round(V.reshape((GRID_SIZE , GRID_SIZE)),2 ), '\n')


# 4. Policy Iteration

In [75]:
iteration = 0
while True:
    iteration += 1
    print(f"\n=== Iteration {iteration} ===")

    # 1. Policy Evaluation
    while True:
        delta = 0
        new_V = np.copy(V)
        for s in range(NUM_STATES):
            if s == GOAL_STATE:
                continue
            a = policy[s]
            s_next = get_next_state(s, a)
            r = rewards[s_next]
            new_V[s] = r + GAMMA * V[s_next]
            delta = max(delta, abs(V[s] - new_V[s]))
        V = new_V
        if delta < THRESHOLD:
            break

    print("Value Function:")
    print(np.round(V.reshape(GRID_SIZE, GRID_SIZE), 2))

    # 2. Policy Improvement
    policy_stable = True
    for s in range(NUM_STATES):
        if s == GOAL_STATE:
            continue

        old_action = policy[s]
        best_action = None
        best_value = float('-inf')

        for a in action_lists:
            if not action_validation(s, a):
                continue
            s_next = get_next_state(s, a)
            r = rewards[s_next]
            value = r + GAMMA * V[s_next]
            if value > best_value:
                best_value = value
                best_action = a

        policy[s] = best_action
        if best_action != old_action:
            policy_stable = False

    # Display the current policy
    print("Policy:")
    arrow_map = {'up': '↑', 'down': '↓', 'left': '←', 'right': '→'}
    policy_grid = []
    for s in range(NUM_STATES):
        if s == GOAL_STATE:
            policy_grid.append("G")
        else:
            policy_grid.append(arrow_map.get(policy[s], "?"))
    policy_grid = np.array(policy_grid).reshape(GRID_SIZE, GRID_SIZE)
    for row in policy_grid:
        print("  ".join(row))

    if policy_stable:
        print("\n✅ Converged to optimal policy.")
        break


=== Iteration 1 ===
Value Function:
[[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
Policy:
↓  ↓  →  G  ←  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓
↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑
↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  ↑  