In [46]:
import numpy as np
import gym

In [47]:
def policy_evaluation(env, policy, gamma=0.9, theta=1e-6):
    """
    Perform policy evaluation to estimate the value function V_pi for a given policy.

    Args:
    - env: Gym environment
    - policy: A numpy array representing the policy
    - gamma: Discount factor
    - theta: A small positive number for stopping criterion

    Returns:
    - V: A numpy array representing the value function V_pi
    """
    num_states = env.observation_space.n
    V = np.zeros(num_states)

    num_iterations = 0
    while True:
        delta = 0
        for s in range(num_states):
            v = 0
            a = policy[s]
            for prob, next_state, reward, _ in env.P[s][a]:
              v += prob * (reward + gamma * V[next_state])
            delta = max(delta, np.abs(v - V[s]))
            V[s] = v
        num_iterations += 1
        if delta < theta:
            break
    return V, num_iterations

In [48]:
def policy_improvement(env, V, gamma=0.9):
    """
    Perform policy improvement to update the policy based on the current value function.

    Args:
    - env: Gym environment
    - V: A numpy array representing the value function
    - gamma: Discount factor

    Returns:
    - new_policy: A numpy array representing the updated policy
    """
    num_states = env.observation_space.n
    num_actions = env.action_space.n
    new_policy = np.zeros(num_states, dtype = int)

    for s in range(num_states):
        q_values = np.zeros(num_actions)
        for a in range(num_actions):
            for prob, next_state, reward, _ in env.P[s][a]:
                q_values[a] += prob * (reward + gamma * V[next_state])
        best_action = np.argmax(q_values)
        new_policy[s] = best_action
    return new_policy

In [49]:
def policy_iteration(env):
    """
    Perform Policy Iteration algorithm to find the optimal policy.

    Args:
    - env: Gym environment
    - gamma: Discount factor
    - max_iterations: Maximum number of policy evaluation and improvement iterations

    Returns:
    - optimal_policy: A numpy array representing the optimal policy
    - V: A numpy array representing the value function for the optimal policy
    - num_iterations: Number of iterations performed
    """
    num_states = env.observation_space.n
    num_actions = env.action_space.n
    policy = np.random.choice(num_actions, size = (num_states))

    num_iterations = 0
    while True:
      value_pi, eval_iterations = policy_evaluation(env, policy)
      new_policy = policy_improvement(env,value_pi)
      if np.array_equal(new_policy, policy):
          break
      policy = new_policy
      num_iterations += 1
    return policy, num_iterations, eval_iterations,value_pi

In [50]:
def value_iteration(env, gamma=0.9, theta=1e-6):
    """
    Perform Value Iteration algorithm to find the optimal policy.

    Args:
    - env: Gym environment
    - gamma: Discount factor
    - theta: A small positive number for stopping criterion

    Returns:
    - optimal_policy: A numpy array representing the optimal policy
    - V: A numpy array representing the value function for the optimal policy
    - num_iterations: Number of iterations performed
    """
    num_states = env.observation_space.n
    num_actions = env.action_space.n
    V = np.zeros(num_states)

    num_iterations = 0
    while True:
        delta = 0
        for s in range(num_states):
            v = V[s]
            q_values = np.zeros(num_actions)
            for a in range(num_actions):
                for prob, next_state, reward, _ in env.P[s][a]:
                    q_values[a] += prob * (reward + gamma * V[next_state])
            V[s] = max(q_values)
            delta = max(delta, abs(v - V[s]))
        num_iterations += 1
        if delta < theta:
            break

    optimal_policy = find_policy(V)
    return optimal_policy, num_iterations, V

In [51]:
def find_policy(V):
  value_matrix = V.reshape(4,4)

  r, c = value_matrix.shape
  directions = np.zeros((r, c), dtype=int)

  for i in range(r):
    for j in range(c):
      value = value_matrix[i][j]
      max = value
      direction = -1

      neighbors = [(i - 1, j), (i + 1, j), (i, j - 1), (i, j + 1)]

      for a, b in neighbors:
        if 0 <= a < r and 0 <= b < c and value_matrix[a, b] > max:
          max = value_matrix[a, b]
          direction = neighbors.index((a, b))

        directions[i, j] = direction

  directions = directions.reshape(-1)
  return directions

In [52]:
# Testing Policy Iteration and Value Iteration on Frozen Lake
if __name__ == "__main__":
    env = gym.make('FrozenLake-v1', is_slippery=True)

    # Value Iteration
    print("\nValue Iteration:")
    policy, iterations, V = value_iteration(env)
    print(f"Optimal Policy:\n{policy}")
    print(f"Number of Iterations: {iterations}")
    print(f"Optimal Value Function:\n{V}")

    # Policy Iteration
    print("Policy Iteration:")
    policy, iterations, eval_iterations, V = policy_iteration(env)
    print(f"Optimal Policy :\n{policy}")
    print(f"Number of Iterations: {eval_iterations}")
    print(f"Number of Iterations: {iterations}")


Value Iteration:
Optimal Policy:
[ 1  3  1  2  1  1  1  2  3  1  1  2  3  3 -1  2]
Number of Iterations: 60
Optimal Value Function:
[0.06888624 0.06141117 0.07440763 0.05580502 0.09185097 0.
 0.11220727 0.         0.14543392 0.24749561 0.29961676 0.
 0.         0.37993504 0.63901974 0.        ]
Policy Iteration:
Optimal Policy :
[0 3 0 3 0 0 0 0 3 1 0 0 0 2 1 0]
Number of Iterations: 61
Number of Iterations: 2
