#### **Russell Value iteration and  Policy iteration**
#### **Practice 2 Unsolved Version**
Code the activities in the area marked as [TO CODE Activity x]
See RL-SAMBD-Assignment2.pdf documentatgion

In [1]:
import gymnasium as gym
import numpy as np
from gymnasium import spaces
import session_info

In [2]:
# Parameters
gamma = 0.99  # Discount factor
theta = 1e-6  # Convergence threshold

In [3]:
# RussellsGrid Environment
class RussellsGrid(gym.Env):
    """
    RussellsGrid Env is a custom OpenAI Gym environment representing a 3x4 grid world with specific states, actions,
    and rewards. The agent navigates through the grid aiming to reach the positive goal for a reward of +1,
    avoiding the negative goal which leads to a reward of -1, and facing a slight penalty of -0.04 for each step taken.

    Actions:
    - 0: Up
    - 1: Right
    - 2: Down
    - 3: Left

    States:
    - The agent's position, 0 to nrow*ncol-1

    Rewards:
    - +1 for reaching the positive goal.
    - -1 for reaching the negative goal.
    - -0.04 for each step taken.

    Environment Dynamics:
    - The agent moves deterministically in the intended direction (80% chance).
    - There's a 20% chance of moving randomly to an adjacent cell.

    Special Positions:
    - Start position: (2, 0)
    - Positive goal: (0, 3)
    - Negative goal: (1, 3)
    - Invalid position: (1, 1)

    Rendering:
    - Supports 'human' rendering mode to visually represent the grid with agent ('A'), positive goal ('G'),
      negative goal ('R'), and invalid position ('X').

    Usage:
    - Instantiate the environment and use reset() to initialize the agent.
    - Use step(action) to interact with the environment, returning the next state, reward, and done flag.
    """
    metadata = {'render_modes': ['human'], 'render_fps': 4}

    def __init__(self, render_mode=None):
        self.ncol = 4
        self.nrow = 3
        self.size = (self.nrow, self.ncol)
#        self.num_states = self.width * self.height

        # Define action and observation space
        self.action_space = spaces.Discrete(4)  # Up, Right, Down, Left
        self.observation_space = spaces.Discrete(12)

        # Define initial state
        self.state = np.zeros(3)
        self.start_position = np.array([2,0])
        self.positive_goal = np.array([0, 3])
        self.negative_goal = np.array([1, 3])
        self.invalid_position = np.array([1, 1])

        self.render_mode = render_mode

        # Create transition matrix P
        self.P = self._create_transition_matrix()

    def _create_transition_matrix(self):
        P = {s: {a: [] for a in range(4)} for s in range(self.observation_space.n)}

        for s in range(self.observation_space.n):
            row, col = divmod(s, self.ncol)
            
            if (row, col) == tuple(self.invalid_position):
                continue  # Skip invalid position

            for a in range(4):  # Up, Right, Down, Left
                outcomes = self._get_action_outcomes(row, col, a)
                
                for next_state, prob in outcomes.items():
                    next_row, next_col = divmod(next_state, self.ncol)
                    
                    # Determine reward and done status
                    if (next_row, next_col) == tuple(self.positive_goal):
                        reward = 1.0
                        done = True
                    elif (next_row, next_col) == tuple(self.negative_goal):
                        reward = -1.0
                        done = True
                    else:
                        reward = -0.04
                        done = False

                    P[s][a].append((prob, next_state, reward, done))

        return P

    def _get_action_outcomes(self, row, col, action):
        outcomes = {}
        intended_next_state = self._get_next_state(row, col, action)
        outcomes[intended_next_state] = 0.8

        # Add random adjacent cells with 0.2 probability
        adjacent_actions = [0, 1, 2, 3]
        adjacent_actions.remove(action)
        for adj_action in adjacent_actions:
            adj_next_state = self._get_next_state(row, col, adj_action)
            if adj_next_state in outcomes:
                outcomes[adj_next_state] += 0.2 / 3
            else:
                outcomes[adj_next_state] = 0.2 / 3

        return outcomes

    def _get_next_state(self, row, col, action):
        next_row, next_col = row, col

        if action == 0:  # Up
            next_row = max(0, row - 1)
        elif action == 1:  # Right
            next_col = min(self.ncol - 1, col + 1)
        elif action == 2:  # Down
            next_row = min(self.nrow - 1, row + 1)
        elif action == 3:  # Left
            next_col = max(0, col - 1)

        # Check if next position is invalid
        if (next_row, next_col) == tuple(self.invalid_position):
            next_row, next_col = row, col  # Stay in current position

        return next_row * self.ncol + next_col
    
    def cell_id(coord, width=4):
        return coord[0]*width + coord[1]

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        
        # Reset the agent to the starting position
        self.state = self.cell_id(self.start_position)
        
        return self.state, {}
    
    def cell_id(self, coord, width=4):
        return coord[0]*width + coord[1]
    
    def _get_observation(self):
        """Convert agent position to state number"""
        return self.agent_position[0] * self.ncol + self.agent_position[1]

    def step(self, action):
        current_state = self.agent_position[0] * self.ncol + self.agent_position[1]
        
        # Randomly choose an outcome based on probabilities
        outcomes = self.P[current_state][action]
        probs = [outcome[0] for outcome in outcomes]
        chosen_outcome = self.np_random.choice(len(outcomes), p=probs)
        
        _, next_state, reward, done = outcomes[chosen_outcome]
        
        # Update agent position
        self.agent_position = np.array(divmod(next_state, self.ncol))

        # Update the state
        self.state = self._get_observation()

        if self.render_mode == 'human':
            self.render()

        return self.state, reward, done, False, {}

    def render(self):
        if self.render_mode == 'human':
            for i in range(self.nrow):
                for j in range(self.ncol):
                    if np.array_equal([i, j], self.agent_position):
                        print('A', end=' ')
                    elif np.array_equal([i, j], self.positive_goal):
                        print('G', end=' ')
                    elif np.array_equal([i, j], self.negative_goal):
                        print('R', end=' ')
                    elif np.array_equal([i, j], self.invalid_position):
                        print('X', end=' ')
                    else:
                        print('.', end=' ')
                print()
            print()

    def close(self):
        pass

In [4]:
# Supporting functions
def print_policy(env, policy):
    """Prints the policy in a grid format."""
    print("\nValue Optimal Policy Format (3x4):")
    print("-" * 50)
    actions = ['↑', '→', '↓', '←']
    
    for i in range(env.nrow):
        for j in range(env.ncol):
            s = i * env.ncol + j
            if (i, j) == tuple(env.invalid_position):
                print('X', end=' ')
            else:
                a = np.argmax(policy[s])
                print(actions[a], end=' ')
        print()
        
    
def print_value_function(V, env):
    print("\nValue Function Format (3x4):")
    print("-" * 50)
    for i in range(env.nrow):
        for j in range(env.ncol):

            s = i * env.ncol + j
            if np.array_equal([i, j], env.invalid_position):
                print('  X  ', end=' ')
            else:
                print(f'{V[s]:5.3f}', end=' ')
        print()

In [5]:
# Policy Evaluation Activity (Algorithm 1)
# Policy Improvement (Algorithm 2)

def policy_evaluation(env, policy, theta=1e-8, gamma=1.0):
    """Evaluates a policy by computing the value function.
    
    Args:
        env: The environment
        policy: Array of shape [n_states, n_actions] representing the policy
        theta: Convergence criterion
        gamma: Discount factor
    """
    V = np.zeros(env.observation_space.n)
    invalid_state = env.invalid_position[0] * env.ncol + env.invalid_position[1]
    
    while True:
        delta = 0
        for s in range(env.observation_space.n):
            if s == invalid_state:
                continue
            
            old_v = V[s]
            new_v = 0
            
            # Calculate new value based on policy
            for a, action_prob in enumerate(policy[s]):                           # for each s in S do
                for prob, next_state, reward, done in env.P[s][a]:                 
                    # If done, don't include future rewards
                    future_value = 0 if done else gamma * V[next_state]           # Line 6 in Algorithm
                    new_v += action_prob * prob * (reward + future_value)         # Line 6 in Algorithm
            
            V[s] = new_v
            delta = max(delta, np.abs(old_v - new_v))                             # Line 7 in Algorithm
            
        if delta < theta:                                                         # until Delta < theta
            break
            
    return V

def policy_improvement(env, V, gamma=1.0):
    """Computes an improved policy based on a value function.
    
    Args:
        env: The environment
        V: Array of shape [n_states] representing the value function
        gamma: Discount factor
    """
    policy = np.zeros([env.observation_space.n, env.action_space.n])
    invalid_state = env.invalid_position[0] * env.ncol + env.invalid_position[1]
    
    for s in range(env.observation_space.n):
        if s == invalid_state:
            continue
            
        # Compute Q-values for all actions
        q_values = np.zeros(env.action_space.n)                                # Initialization
        for a in range(env.action_space.n):                                    # for each state in environment
            for prob, next_state, reward, done in env.P[s][a]:                 # Compute Q values for all actions
                future_value = 0 if done else gamma * V[next_state]
                q_values[a] += prob * (reward + future_value)                  # Lines 9 and 10 Policy improvement
        
        # Choose the action(s) with highest Q-value
        best_actions = np.where(q_values == np.max(q_values))[0]               # Line 16 
        # Split probability equally among best actions (in case of ties)
        policy[s, best_actions] = 1.0 / len(best_actions)
        
    return policy

In [6]:
def policy_iteration(env, theta=1e-8, gamma=1.0, max_iterations=1000):
    """Computes optimal policy using policy iteration.
    
    Args:
        env: The environment
        theta: Convergence criterion
        gamma: Discount factor
        max_iterations: Maximum number of iterations
    """
    # Initialize uniform random policy
    policy = np.ones([env.observation_space.n, env.action_space.n]) / env.action_space.n
    iterations = 0
    
    while iterations < max_iterations:
        # Policy Evaluation
        V = policy_evaluation(env, policy, theta, gamma)
        
        # Policy Improvement
        new_policy = policy_improvement(env, V, gamma)
        
        # Check convergence
        if np.all(np.abs(policy - new_policy) < theta):
            break
            
        policy = new_policy
        iterations += 1
    
    print(f'Policy iteration converged after {iterations} iterations')
    return policy, V

In [7]:
# Value Iteration LOOP Activity 2
def value_iteration(env, theta=1e-8, gamma=1.0, max_iterations=1000):
    """Computes optimal policy using value iteration.
    
    Args:
        env: The environment
        theta: Convergence criterion
        gamma: Discount factor
        max_iterations: Maximum number of iterations
    """
    V = np.zeros(env.observation_space.n)
    invalid_state = env.invalid_position[0] * env.ncol + env.invalid_position[1]
    iterations = 0
    
    while iterations < max_iterations:
        delta = 0
        
        for s in range(env.observation_space.n):                           # for all s in Environment
            if s == invalid_state:
                continue
                
            old_v = V[s]
            
            # Compute Q-values for all actions
            q_values = np.zeros(env.action_space.n)                       # initialize Q table
            for a in range(env.action_space.n):                           # For all states
                for prob, next_state, reward, done in env.P[s][a]:        # prob * Reward * gamma V' - Line 6 Algorithm 3
                    future_value = 0 if done else gamma * V[next_state]
                    q_values[a] += prob * (reward + future_value)
            
            V[s] = np.max(q_values)
            delta = max(delta, np.abs(old_v - V[s]))
        
        if delta < theta:
            break
            
        iterations += 1
    
    print(f'Value iteration converged after {iterations} iterations')
    policy = policy_improvement(env, V, gamma)
    return policy, V


In [8]:
env = RussellsGrid()

# Run policy iteration
pi_policy, pi_value = policy_iteration(env, theta=1e-8, gamma=0.9)
print("\nPolicy from Policy Iteration:")
print_policy(env, pi_policy)


# Run value iteration
vi_policy, vi_value = value_iteration(env, theta=1e-8, gamma=0.9)
print("\nPolicy from Value Iteration:")
print_policy(env, vi_policy)
print_value_function(vi_value, env)


Policy iteration converged after 3 iterations

Policy from Policy Iteration:

Value Optimal Policy Format (3x4):
--------------------------------------------------
→ → → ↑ 
↑ X ↑ ↑ 
↑ → ↑ ← 
Value iteration converged after 20 iterations

Policy from Value Iteration:

Value Optimal Policy Format (3x4):
--------------------------------------------------
→ → → ↑ 
↑ X ↑ ↑ 
↑ → ↑ ← 

Value Function Format (3x4):
--------------------------------------------------
0.607 0.758 0.931 0.853 
0.477   X   0.634 0.783 
0.370 0.376 0.485 0.278 


In [9]:
session_info.show(html=False)

-----
gymnasium           1.0.0
numpy               1.26.4
session_info        1.0.0
-----
IPython             8.26.0
jupyter_client      8.6.2
jupyter_core        5.7.2
-----
Python 3.12.3 (main, Sep 11 2024, 14:17:37) [GCC 13.2.0]
Linux-5.15.153.1-microsoft-standard-WSL2-x86_64-with-glibc2.39
-----
Session information updated at 2024-11-02 18:42


In [10]:
env = RussellsGrid()
env.P

{0: {0: [(0.8666666666666667, 0, -0.04, False),
   (0.06666666666666667, 1, -0.04, False),
   (0.06666666666666667, 4, -0.04, False)],
  1: [(0.8, 1, -0.04, False),
   (0.13333333333333333, 0, -0.04, False),
   (0.06666666666666667, 4, -0.04, False)],
  2: [(0.8, 4, -0.04, False),
   (0.13333333333333333, 0, -0.04, False),
   (0.06666666666666667, 1, -0.04, False)],
  3: [(0.8666666666666667, 0, -0.04, False),
   (0.06666666666666667, 1, -0.04, False),
   (0.06666666666666667, 4, -0.04, False)]},
 1: {0: [(0.8666666666666667, 1, -0.04, False),
   (0.06666666666666667, 2, -0.04, False),
   (0.06666666666666667, 0, -0.04, False)],
  1: [(0.8, 2, -0.04, False),
   (0.13333333333333333, 1, -0.04, False),
   (0.06666666666666667, 0, -0.04, False)],
  2: [(0.8666666666666667, 1, -0.04, False),
   (0.06666666666666667, 2, -0.04, False),
   (0.06666666666666667, 0, -0.04, False)],
  3: [(0.8, 0, -0.04, False),
   (0.13333333333333333, 1, -0.04, False),
   (0.06666666666666667, 2, -0.04, False)