# MDP and Policy iteration

In [1]:
import numpy as np

# Define grid size and actions
grid_size = (100, 100)
actions = ['up', 'down', 'left', 'right']
action_map = {
    'up': (-1, 0),
    'down': (1, 0),
    'left': (0, -1),
    'right': (0, 1)
}
gamma = 0.9  # Discount factor

# Initialize policies randomly for each state
policy = np.random.choice(actions, size=grid_size)
values = np.zeros(grid_size)

# Define the rewards array
rewards = np.zeros(grid_size)
goal_state = (np.random.randint(0, grid_size[0]), np.random.randint(0, grid_size[1]))
rewards[goal_state] = 100  # Assign a positive reward for reaching the goal

# Add obstacles with negative rewards
num_obstacles = int(grid_size[0] * grid_size[1] * 0.1)  # 10% of grid cells as obstacles
for _ in range(num_obstacles):
    obstacle = (np.random.randint(0, grid_size[0]), np.random.randint(0, grid_size[1]))
    if obstacle != goal_state:  # Ensure the goal is not an obstacle
        rewards[obstacle] = -10  # Assign negative rewards to obstacles

# Helper function for state transition
def transition(state, action):
    x, y = state
    dx, dy = action_map[action]
    new_x, new_y = x + dx, y + dy
    if 0 <= new_x < grid_size[0] and 0 <= new_y < grid_size[1]:
        return (new_x, new_y)
    return state  # If out-of-bounds, stay in the current state

# Policy evaluation
def policy_evaluation(policy, rewards, gamma=0.9, theta=1e-3):
    while True:
        delta = 0
        for x in range(grid_size[0]):
            for y in range(grid_size[1]):
                state = (x, y)
                action = policy[state]
                next_state = transition(state, action)
                v = values[state]
                values[state] = rewards[state] + gamma * values[next_state]
                delta = max(delta, abs(v - values[state]))
        if delta < theta:
            break
    return values

# Policy improvement
def policy_improvement(values, rewards, gamma=0.9):
    policy_stable = True
    for x in range(grid_size[0]):
        for y in range(grid_size[1]):
            state = (x, y)
            old_action = policy[state]
            action_values = {}
            for action in actions:
                next_state = transition(state, action)
                action_values[action] = rewards[state] + gamma * values[next_state]
            policy[state] = max(action_values, key=action_values.get)
            if old_action != policy[state]:
                policy_stable = False
    return policy_stable

# Policy iteration loop
is_policy_stable = False
while not is_policy_stable:
    values = policy_evaluation(policy, rewards, gamma)
    is_policy_stable = policy_improvement(values, rewards, gamma)

# Print final results
print("Final policy stabilized:")
print(policy)


Final policy stabilized:
[['down' 'down' 'down' ... 'down' 'down' 'down']
 ['down' 'down' 'right' ... 'down' 'down' 'left']
 ['down' 'down' 'down' ... 'down' 'down' 'down']
 ...
 ['up' 'up' 'up' ... 'up' 'up' 'up']
 ['up' 'up' 'up' ... 'up' 'up' 'up']
 ['up' 'up' 'up' ... 'up' 'left' 'left']]


# Q Learning

In [6]:
import numpy as np
import random

# Define grid size and actions
grid_size = (100, 100)  # Reduce for faster testing
actions = ['up', 'down', 'left', 'right']
action_map = {
    'up': (-1, 0),
    'down': (1, 0),
    'left': (0, -1),
    'right': (0, 1)
}
gamma = 0.9  # Discount factor

# Initialize start and goal states
start_state = (np.random.randint(0, grid_size[0]), np.random.randint(0, grid_size[1]))
goal_state = (np.random.randint(0, grid_size[0]), np.random.randint(0, grid_size[1]))

# Ensure start and goal are not the same
while start_state == goal_state:
    goal_state = (np.random.randint(0, grid_size[0]), np.random.randint(0, grid_size[1]))

# Define the rewards array
rewards = np.zeros(grid_size)
rewards[goal_state] = 100  # Assign a positive reward for reaching the goal

# Add obstacles with negative rewards
num_obstacles = int(grid_size[0] * grid_size[1] * 0.1)  # 10% of grid cells as obstacles
for _ in range(num_obstacles):
    obstacle = (np.random.randint(0, grid_size[0]), np.random.randint(0, grid_size[1]))
    if obstacle != goal_state and obstacle != start_state:
        rewards[obstacle] = -10  # Assign negative rewards to obstacles

# Helper function for state transition
def transition(state, action):
    x, y = state
    dx, dy = action_map[action]
    new_x, new_y = x + dx, y + dy
    if 0 <= new_x < grid_size[0] and 0 <= new_y < grid_size[1]:
        return (new_x, new_y)
    return state  # If out-of-bounds, stay in the current state

# Initialize the Q-table
q_table = np.zeros((grid_size[0], grid_size[1], len(actions)))

# Hyperparameters
alpha = 0.5  # Learning rate
epsilon = 0.5  # Exploration rate (for epsilon-greedy policy)
epsilon_decay = 0.99  # Decay rate for exploration
epsilon_min = 0.1  # Minimum exploration rate
episodes = 1000  # Number of episodes to train the agent
max_steps_per_episode = 1000  # Maximum steps per episode

# Q-learning algorithm
for episode in range(episodes):
    state = start_state
    step_count = 0

    while state != goal_state and step_count < max_steps_per_episode:
        step_count += 1

        # Choose an action using epsilon-greedy strategy
        if np.random.rand() < epsilon:
            action = np.random.choice(actions)  # Explore: choose a random action
        else:
            action = actions[np.argmax(q_table[state[0], state[1]])]  # Exploit: choose the best action

        # Get the next state after taking the chosen action
        next_state = transition(state, action)

        # Retrieve the reward for the next state
        reward = rewards[next_state]

        # Find the best action for the next state (max Q-value)
        best_next_action = np.argmax(q_table[next_state[0], next_state[1]])

        # Calculate the TD target
        td_target = reward + gamma * q_table[next_state[0], next_state[1], best_next_action]

        # Calculate the TD error
        td_error = td_target - q_table[state[0], state[1], actions.index(action)]

        # Update the Q-value using the learning rate (alpha)
        q_table[state[0], state[1], actions.index(action)] += alpha * td_error

        # Move to the next state
        state = next_state

    # Decay epsilon after each episode
    epsilon = max(epsilon_min, epsilon * epsilon_decay)

print("Trained Q-table:")
print(q_table)


Trained Q-table:
[[[0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  ...
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]]

 [[0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  ...
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]]

 [[0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  ...
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]]

 ...

 [[0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  ...
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]]

 [[0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  ...
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]]

 [[0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  ...
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]]]


In [7]:
print(q_table[0])

[[  0.           0.           0.           0.        ]
 [  0.           0.           0.           0.        ]
 [  0.           0.           0.           0.        ]
 [  0.           0.           0.           0.        ]
 [  0.           0.           0.           0.        ]
 [  0.          -9.921875     0.           0.        ]
 [  0.           0.           0.           0.        ]
 [  0.           0.           0.           0.        ]
 [  0.           0.           0.           0.        ]
 [  0.           0.           0.           0.        ]
 [  0.           0.           0.          -9.99996185]
 [ -9.375        0.           0.           0.        ]
 [  0.           0.          -9.99999996   0.        ]
 [  0.         -10.           0.           0.        ]
 [  0.           0.           0.           0.        ]
 [  0.           0.           0.           0.        ]
 [  0.           0.           0.           0.        ]
 [  0.           0.           0.           0.        ]
 [  0.    