<a href="https://colab.research.google.com/github/Mohamedragih1/3X3-Grid-World-Markov-Decision-Process/blob/main/Markov_Decision_Process.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment 4

# Markov Decision Process

Defines parameters and initializes variables for a Markov Decision Process (MDP) simulation:

- `GRID_SIZE = 3`: Sets the size of the grid for the MDP.
- `R_X` and `R_Y`: Placeholder variables for the current position in the grid (not used in the provided code snippet).
- `action_symbols`: Defines symbols for actions ('↑' for up, '↓' for down, '→' for right, '←' for left).
- `actions`: Alias for `action_symbols`.
- `gamma = 0.99`: Sets the discount factor for future rewards.
- `rewards`: Defines the reward matrix for each state-action pair in the MDP.
- `transition_probs`: Specifies the transition probabilities for each action in the MDP.
- `value_function`: Initializes the value function matrix with zeros.

The `rewards` matrix holds the immediate rewards for transitioning from one state to another based on the action taken. The `transition_probs` dictionary maps each action to a list of probabilities corresponding to the likelihood of transitioning to each possible next state.

The value function `value_function` is initialized as a square matrix of zeros with dimensions `GRID_SIZE` by `GRID_SIZE`, representing the estimated value of each state in the MDP.


In [None]:
import numpy as np

# MDP parameters
GRID_SIZE = 3  # Size of the grid
R_X = 0  # X of the reward state
R_Y = 0  # Y of the reward state

# Define action symbols
action_symbols = ['↑', '↓', '→', '←']  # Symbols for the four possible actions

actions = action_symbols
gamma = 0.99

# Define the rewards matrix
rewards = np.array([[0, -1, 10],
                    [-1, -1, -1],
                    [-1, -1, -1]])

# rewards = np.array([[0, -1, -1, 10],
#                     [-1, -1, -1, -1],
#                     [-1, -1, -1, -1],
#                     [-1, -1, -1, -1]])

# Transition probabilities for each action
transition_probs = {'↑': [0.8, 0.0, 0.1, 0.1],
                    '↓': [0, 0.8, 0.1, 0.1],
                    '→': [0.1, 0.1, 0.8, 0],
                    '←': [0.1, 0.1, 0, 0.8]}

# Initialize value function for each state in the grid
value_function = np.zeros((GRID_SIZE, GRID_SIZE))
policy = np.zeros((GRID_SIZE, GRID_SIZE), dtype=str)

The `get_new_position` function takes as input the current position `(i, j)` in a grid and an action (`'↑'`, `'↓'`, `'→'`, `'←'`) representing a movement direction. It then calculates the new position after taking that action within the constraints of the grid.

- If the action is `'↑'`, it checks if moving up would result in a position above the grid boundary (`max(i - 1, 0)`) and returns the new position `(i - 1, j)` if valid.
- If the action is `'↓'`, it checks if moving down would result in a position below the grid boundary (`min(i + 1, GRID_SIZE - 1)`) and returns the new position `(i + 1, j)` if valid.
- If the action is `'→'`, it checks if moving right would result in a position beyond the grid boundary (`min(j + 1, GRID_SIZE - 1)`) and returns the new position `(i, j + 1)` if valid.
- If the action is `'←'`, it checks if moving left would result in a position before the grid boundary (`max(j - 1, 0)`) and returns the new position `(i, j - 1)` if valid.

This function ensures that the new position is within the bounds of the grid (`GRID_SIZE`) based on the specified action, preventing movements outside the grid boundaries.


In [None]:
def get_new_position(i, j, action):
    if action == '↑':
        return max(i - 1, 0), j
    elif action == '↓':
        return min(i + 1, GRID_SIZE - 1), j
    elif action == '→':
        return i, min(j + 1, GRID_SIZE - 1)
    elif action == '←':
        return i, max(j - 1, 0)

The `valid_action` function takes as input the current position `(i, j)` in a grid and an action (`'↑'`, `'↓'`, `'→'`, `'←'`) representing a movement direction. It then calculates the new position after taking that action within the constraints of the grid.

The function returns `True` if valid move and `False` if not valid.


In [None]:
def valid_action(i, j, action):
    if action == '↑':
        if i <= 0:
            return False
    elif action == '↓':
        if i >= GRID_SIZE - 1:
            return False
    elif action == '→':
        if j >= GRID_SIZE - 1:
            return False

    elif action == '←':
        if j <= 0:
            return False

    return True

# Value Iteration

It uses a convergence threshold (`theta`) and tracks the number of iterations (`iteration_count`) until convergence.

- `theta`: Convergence threshold to determine when to stop iterating.
- `iteration_count`: Tracks the number of iterations for convergence.

The algorithm iterates through the MDP grid, skips terminal states, and updates non-terminal states' values using the Bellman optimality equation. It stops iterating when the maximum change in the value function (`delta`) falls below the threshold (`theta`).


In [None]:
def value_iteration():
    theta = 0.001
    iteration_count = 0
    while True:
        delta = 0
        for i in range(GRID_SIZE):
            for j in range(GRID_SIZE):
                if rewards[i][j] == 10 or rewards[i][j] == rewards[R_X][R_Y]:  # Skip terminal states
                    continue

                v = value_function[i][j]
                new_v = float('-inf')
                best_action = None
                for action in actions:
                    if not valid_action(i, j, action):
                        continue

                    action_probabilities = transition_probs[action]
                    action_value = rewards[i][j]

                    for k, prob in enumerate(action_probabilities):
                        new_i, new_j = get_new_position(i, j, action_symbols[k])
                        action_value += prob * gamma * value_function[new_i][new_j]

                    if action_value > new_v:
                        new_v = action_value
                        best_action = action

                value_function[i][j] = new_v
                policy[i][j] = best_action
                delta = max(delta, abs(v - value_function[i][j]))
        iteration_count += 1
        if delta < theta:
            break

    return iteration_count

# Run Value Iteration Function

Runs the Value Iteration algorithm.

- Returns: The number of iterations required for convergence (`iteration_count`).

Calls the `value_iteration()` function to compute the optimal value function.


In [None]:
def run_value_iteration(reward_value):

    global rewards
    rewards[R_X][R_Y] = reward_value
    return value_iteration()

## Main call

In [None]:
r_values = [100, 3, 0, -3]
for r in r_values:
    value_function[R_X, R_Y] = r
    value_function[0, -1] = 10


    num_iterations = run_value_iteration(r)

    print(f"\nOptimal Value Function for r = {r} (Converged in {num_iterations} iterations):")
    print(value_function)
    policy[R_X, R_Y] = 'r'
    policy[0,-1] = 'T'
    print(f"\nOptimal Policy for r = {r}")
    print(policy)


Optimal Value Function for r = 100 (Converged in 13 iterations):
[[100.          97.20351875  10.        ]
 [ 97.20351875  94.7512665   88.20399982]
 [ 94.48181286  92.35290246  89.76213039]]

Optimal Policy for r = 100
[['r' '←' 'T']
 ['↑' '←' '↓']
 ['↑' '←' '←']]

Optimal Value Function for r = 3 (Converged in 41 iterations):
[[ 3.          8.46194904 10.        ]
 [ 5.38366785  7.11322545  8.46194227]
 [ 4.57497057  5.79414801  6.96501766]]

Optimal Policy for r = 3
[['r' '→' 'T']
 ['→' '→' '↑']
 ['→' '→' '↑']]

Optimal Value Function for r = 0 (Converged in 3 iterations):
[[ 0.          8.46193956 10.        ]
 [ 5.08334043  7.1132055   8.46193936]
 [ 4.54187018  5.79411233  6.96500905]]

Optimal Policy for r = 0
[['r' '→' 'T']
 ['→' '→' '↑']
 ['→' '→' '↑']]

Optimal Value Function for r = -3 (Converged in 3 iterations):
[[-3.          8.46193928 10.        ]
 [ 4.78307131  7.11320493  8.46193928]
 [ 4.50887324  5.79411129  6.9650088 ]]

Optimal Policy for r = -3
[['r' '→' 'T']
 [

## Conclusion

When r is bigger then 10 the algorithm will go toward it otherwise go to the 10.

# BONUS

Additional to `value_function` we will add `policy` which will store the optimal policy

In [None]:
# Initialize a random policy
policy = np.random.choice(action_symbols, size=(GRID_SIZE, GRID_SIZE))

# Policy Iteration Function

Implements the Policy Iteration algorithm to compute the optimal policy and value function.

- `iteration_count`: Counter for iterations.

The algorithm consists of two main steps:

1. **Policy Evaluation**:
   - Iteratively evaluates the current policy until convergence using a specified convergence threshold (`theta`).
   - Updates the value function based on the Bellman expectation equation.

2. **Policy Improvement**:
   - Determines the best action for each state by considering possible actions and their expected rewards.
   - Updates the policy if a better action is found for any state.
   - Checks for policy stability, where no further changes occur in the policy.

The algorithm continues iterating between policy evaluation and improvement until the policy becomes stable.

The function returns the number of iterations (`iteration_count`) required for convergence.


In [None]:
def policy_iteration():
    theta = 0.01
    iteration_count = 0  # Counter for iterations
    while True:
        iteration_count += 1

        # Policy Evaluation
        while True:
            delta = 0
            for i in range(GRID_SIZE):
                for j in range(GRID_SIZE):
                    if rewards[i][j] == 10 or rewards[i][j] == rewards[R_X][R_Y]:  # Skip terminal states
                        continue

                    v = value_function[i][j]
                    action = policy[i][j]
                    action_probabilities = transition_probs[action]
                    action_value = rewards[i][j]
                    for k, prob in enumerate(action_probabilities):
                        new_i, new_j = get_new_position(i, j, action_symbols[k])
                        action_value += prob * gamma * value_function[new_i][new_j]
                    value_function[i][j] = action_value
                    delta = max(delta, abs(v - value_function[i][j]))
            if delta < theta:  # Convergence threshold
                break

        # Policy Improvement
        policy_stable = True
        for i in range(GRID_SIZE):
            for j in range(GRID_SIZE):
                if rewards[i][j] == 10 or rewards[i][j] == rewards[R_X][R_Y]:  # Skip terminal states
                    continue

                old_action = policy[i][j]
                best_action = None
                best_value = float('-inf')

                for action in actions:
                    if not valid_action(i, j, action):
                        continue

                    action_probabilities = transition_probs[action]
                    action_value = rewards[i][j]

                    for k, prob in enumerate(action_probabilities):
                        new_i, new_j = get_new_position(i, j, action_symbols[k])
                        action_value += prob * gamma * value_function[new_i][new_j]

                    if action_value > best_value:
                        best_value = action_value
                        best_action = action
                policy[i][j] = best_action

                # Check for policy stability based on action change
                if old_action != best_action:
                    policy_stable = False

        # Break if policy remains stable after full iteration
        if policy_stable:
            break

    return iteration_count  # Return the number of iterations


# Run Policy Iteration Function

The `run_policy_iteration(reward_value)` function runs the Policy Iteration algorithm to compute the optimal policy.

- `reward_value`: The reward value to set for the specified state.

Calls the `policy_iteration()` function to compute the optimal policy and value function.

The function returns the number of iterations required for convergence in the Policy Iteration algorithm.


In [None]:
def run_policy_iteration(reward_value):
    global rewards
    rewards[R_X][R_Y] = reward_value
    return policy_iteration()

## Main call

In [None]:
r_values = [100, 3, 0, -3]
for r in r_values:
    value_function[R_X, R_Y] = r
    value_function[0, -1] = 10

    num_iterations = run_policy_iteration(r)

    optimal_policy = np.where(rewards == 10, 'T', policy)

    optimal_policy[R_X][R_Y] = 'r'
    print(f"\nOptimal Values for r = {r} (Stabilized in {num_iterations} iterations):")
    print(value_function)

    policy[R_X, R_Y] = 'r'
    policy[0,-1] = 'T'
    print(f"\nOptimal Policy for r = {r}")
    print(optimal_policy)


Optimal Values for r = 100 (Stabilized in 5 iterations):
[[100.          97.20352503  10.        ]
 [ 97.20352503  94.7512797   88.20192869]
 [ 94.48183143  92.35292609  89.76179727]]

Optimal Policy for r = 100
[['r' '←' 'T']
 ['↑' '←' '↓']
 ['↑' '←' '←']]

Optimal Values for r = 3 (Stabilized in 4 iterations):
[[ 3.          8.46182121 10.        ]
 [ 2.30865806  7.11295718  8.46190306]
 [ 4.23526615  5.79366842  6.9649018 ]]

Optimal Policy for r = 3
[['r' '→' 'T']
 ['↑' '→' '↑']
 ['→' '→' '↑']]

Optimal Values for r = 0 (Stabilized in 2 iterations):
[[ 0.          8.46193924 10.        ]
 [ 5.08260177  7.11320484  8.46193926]
 [ 4.54105705  5.79411115  6.96500876]]

Optimal Policy for r = 0
[['r' '→' 'T']
 ['→' '→' '↑']
 ['→' '→' '↑']]

Optimal Values for r = -3 (Stabilized in 1 iterations):
[[-3.          8.46193927 10.        ]
 [ 4.78337961  7.1132049   8.46193927]
 [ 4.50921201  5.79411125  6.96500879]]

Optimal Policy for r = -3
[['r' '→' 'T']
 ['→' '→' '↑']
 ['→' '→' '↑']]
