<a href="https://colab.research.google.com/github/everestso/Fall24Spring25/blob/main/FrozenLake_DirectEvaluation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Frozen Lake w/ Value Iteration & Direct Evaluation

## Frozen Lake Domain Description

Frozen Lake is a simple grid-world environment where an agent navigates a frozen lake to reach a goal while avoiding falling into holes. The environment is represented as a grid, with each cell being one of the following:

* **S**: Starting position of the agent
* **F**: Frozen surface, safe to walk on
* **H**: Hole, falling into one ends the episode with a reward of 0
* **G**: Goal, reaching it ends the episode with a reward of 1

The agent can take four actions:

* **0: Left**
* **1: Down**
* **2: Right**
* **3: Up**

However, due to the slippery nature of the ice, the agent might not always move in the intended direction. There's a chance it moves perpendicular to the intended direction.




In [2]:
import gym

# Create the environment
env = gym.make('FrozenLake-v1', render_mode='ansi')  # 'ansi' mode for text-based rendering

# Reset the environment to the initial state
observation = env.reset()

# Take a few actions and observe the results
for _ in range(5):
    action = env.action_space.sample()  # Choose a random action
    observation, reward, done, info = env.step(action)
    # Render the environment to see the agent's movement (text-based)
    if done:
        observation= env.reset()
    else:
      rendered = env.render()
      if len(rendered) > 1:  # Check if there's a second element
         print(rendered[1])  # Print the second element
# Close the environment
env.close()

  (Down)
S[41mF[0mFF
FHFH
FFFH
HFFG

  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG



The transition model for the Frozen Lake world describes how the agent's actions affect its movement and the resulting state transitions. Here's a breakdown of the key components:

**Actions:**

* The agent can choose from four actions:
    * 0: Left
    * 1: Down
    * 2: Right
    * 3: Up

**State Transitions:**

* **Intended Movement:** Ideally, the agent moves one cell in the chosen direction.
* **Slippery Ice:** Due to the slippery nature of the ice, there's a probability that the agent will move in a perpendicular direction instead of the intended one. The exact probabilities depend on the specific Frozen Lake environment configuration, but typically:
    * **Successful Move:** The agent moves in the intended direction with a high probability.
    * **Perpendicular Move:** The agent moves 90 degrees to the left or right of the intended direction with a lower probability.
* **Boundaries:** If the intended movement would take the agent outside the grid boundaries, it remains in its current position.
* **Holes:** If the agent lands on a hole ("H"), the episode ends, and it receives a reward of 0.
* **Goal:** If the agent reaches the goal ("G"), the episode ends, and it receives a reward of 1.




In [3]:
import gym

# Create the environment
env = gym.make('FrozenLake-v1', render_mode='ansi')  # 'ansi' mode for text-based rendering

# Reset the environment to the initial state
observation = env.reset()

# Take a few actions and observe the results
for _ in range(5):
    action = env.action_space.sample()  # Choose a random action
    observation, reward, done, info = env.step(action)
    # Render the environment to see the agent's movement (text-based)
    if done:
        observation= env.reset()
    else:
      rendered = env.render()
      if len(rendered) > 1:  # Check if there's a second element
         print(rendered[1])  # Print the second element
# Close the environment
env.close()
print ("State 14 Going Right: (s, a, r, Done)", env.P[14][2])

  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG

State 14 Going Right: (s, a, r, Done) [(0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False)]


# Value Iteration

In [4]:
import gym
import numpy as np

def value_iteration(env, gamma=0.9, num_iterations=1000, theta=0.0001):
    """
    Implements the Value Iteration algorithm.

    Args:
        env: The OpenAI Gym environment.
        gamma: Discount factor.
        num_iterations: Number of iterations to run.

    Returns:
        The optimal value function and policy.
    """
    # Initialize value function and policy
    V = np.zeros(env.observation_space.n)
    policy = np.zeros(env.observation_space.n, dtype=int)

    for _ in range(num_iterations):
        delta = 0
        V_prev = V.copy()
        for s in range(env.observation_space.n):
            v = V[s]
            action_values = []  # Store Q values for all actions in this state

            for a in range(env.action_space.n):
                q_value = 0
                for prob, next_state, reward, done in env.P[s][a]:
                    q_value += prob * (reward + gamma * V_prev[next_state])
                action_values.append(q_value)

            # Update V[s] with the max Q value
            V[s] = max(action_values)
            # Update policy[s] with the action that maximizes Q value
            policy[s] = np.argmax(action_values)

            delta = max(delta, abs(v - V[s]))
        if delta < theta and 0:
            break
        V_prev = V.copy()

    return V, policy

# Create the environment
env = gym.make('FrozenLake-v1', render_mode='ansi')  # 'ansi' mode for text-based rendering

# Reset the environment to the initial state
observation = env.reset()

# Apply student's Value Iteration
optimal_V, optimal_policy = value_iteration(env)
print (f"optimal policy= \n{optimal_policy.reshape((4,4))}\n optimal_V=\n{np.round(optimal_V.reshape((4,4)), 2)}")

# Evaluate student's solution (Optional)
def evaluate_policy(env, policy, num_episodes=100):
    total_reward = 0
    for _ in range(num_episodes):
        state = env.reset()
        done = False
        while not done:
            action = policy[state]
            state, reward, done, _ =  env.step(action)
            total_reward += reward
    return total_reward / num_episodes

average_reward = evaluate_policy(env, optimal_policy)
print("Average Reward:", average_reward)

optimal policy= 
[[0 3 0 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]
 optimal_V=
[[0.07 0.06 0.07 0.06]
 [0.09 0.   0.11 0.  ]
 [0.15 0.25 0.3  0.  ]
 [0.   0.38 0.64 0.  ]]
Average Reward: 0.74


# Policy Extraction & Policy Evaluation

In [5]:
def policy_extraction(env, V, gamma=0.9):
  policy = np.zeros(env.observation_space.n, dtype=int)
  for s in range(env.observation_space.n):
      v = V[s]
      action_values = []  # Store Q values for all actions in this state

      for a in range(env.action_space.n):
          q_value = 0
          for prob, next_state, reward, done in env.P[s][a]:
              q_value += prob * (reward + gamma * V[next_state])
          action_values.append(q_value)

      # Update policy[s] with the action that maximizes Q value
      policy[s] = np.argmax(action_values)
  return policy

def policy_evaluation(env, policy, gamma=0.9, num_iterations=1000, theta=0.0001):
    """
    Implements the Value Iteration algorithm.

    Args:
        env: The OpenAI Gym environment.
        gamma: Discount factor.
        num_iterations: Number of iterations to run.

    Returns:
        The optimal value function and policy.
    """
    # Initialize value function and policy
    V = np.zeros(env.observation_space.n)
    for _ in range(num_iterations):
        delta = 0
        V_prev = V.copy()
        for s in range(env.observation_space.n):
            v = V[s]
            a = policy[s]
            q_value = 0
            for prob, next_state, reward, done in env.P[s][a]:
                q_value += prob * (reward + gamma * V_prev[next_state])
            # Update V[s] with the max Q value
            V[s] = q_value

            delta = max(delta, abs(v - V[s]))
        if delta < theta and 0:
            break
        V_prev = V.copy()
    return V

In [6]:
V_policy_evalution = policy_evaluation(env, optimal_policy)
print (f"V_policy_evalution=\n{V_policy_evalution}")
V_policy = policy_extraction(env, V_policy_evalution)
print (f"optimal policy= \n{optimal_policy.reshape((4,4))}\n V_policy=\n{V_policy.reshape((4,4))}")

V_policy_evalution=
[0.0688909  0.06141457 0.07440976 0.05580732 0.09185454 0.
 0.11220821 0.         0.14543635 0.24749695 0.29961759 0.
 0.         0.3799359  0.63902015 0.        ]
optimal policy= 
[[0 3 0 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]
 V_policy=
[[0 3 0 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]


# Reinforcement Learning

In [7]:
env.reset(seed=42)
init_policy = np.array([0, 3, 0, 3, 0, 0, 0, 0, 3, 1, 0, 0, 0, 2, 1, 0])
print (f"optimal policy= \n{init_policy.reshape((4,4))}")

optimal policy= 
[[0 3 0 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]


In [8]:
def GenerateEpisodes(env, policy, num_episodes=5):
    total_reward = 0
    episodes = []
    for _ in range(num_episodes):
        state = env.reset()
        done = False
        episode = []
        while not done:
            action = policy[state]
            newstate, reward, done, _ = env.step(action)
            episode.append((int(state), action, newstate, reward))
            state = newstate
        episodes.append(episode)
    return episodes

training_episodes = GenerateEpisodes(env, init_policy)
for i, e in enumerate(training_episodes):
  print (f"training_episode=({i=}):\n{e}")

training_episodes2 = GenerateEpisodes(env, init_policy, 1000)

training_episode=(i=0):
[(0, 0, 4, 0.0), (4, 0, 8, 0.0), (8, 3, 9, 0.0), (9, 1, 10, 0.0), (10, 0, 14, 0.0), (14, 1, 15, 1.0)]
training_episode=(i=1):
[(0, 0, 0, 0.0), (0, 0, 0, 0.0), (0, 0, 4, 0.0), (4, 0, 4, 0.0), (4, 0, 8, 0.0), (8, 3, 4, 0.0), (4, 0, 0, 0.0), (0, 0, 0, 0.0), (0, 0, 0, 0.0), (0, 0, 4, 0.0), (4, 0, 4, 0.0), (4, 0, 8, 0.0), (8, 3, 4, 0.0), (4, 0, 8, 0.0), (8, 3, 8, 0.0), (8, 3, 8, 0.0), (8, 3, 9, 0.0), (9, 1, 13, 0.0), (13, 2, 13, 0.0), (13, 2, 13, 0.0), (13, 2, 9, 0.0), (9, 1, 10, 0.0), (10, 0, 14, 0.0), (14, 1, 13, 0.0), (13, 2, 14, 0.0), (14, 1, 14, 0.0), (14, 1, 13, 0.0), (13, 2, 13, 0.0), (13, 2, 14, 0.0), (14, 1, 13, 0.0), (13, 2, 9, 0.0), (9, 1, 13, 0.0), (13, 2, 9, 0.0), (9, 1, 10, 0.0), (10, 0, 6, 0.0), (6, 0, 10, 0.0), (10, 0, 14, 0.0), (14, 1, 14, 0.0), (14, 1, 13, 0.0), (13, 2, 9, 0.0), (9, 1, 8, 0.0), (8, 3, 9, 0.0), (9, 1, 8, 0.0), (8, 3, 8, 0.0), (8, 3, 4, 0.0), (4, 0, 8, 0.0), (8, 3, 8, 0.0), (8, 3, 4, 0.0), (4, 0, 4, 0.0), (4, 0, 0, 0.0), (0, 0, 0, 0.0

# Direct Evaluation

## Evaluate Single Episode

In [9]:
def EvaluateEpisode(env, e, V_DE, V_Counts, gamma=0.9):
    future_reward = 0
    for t in reversed(e):  # Iterate in reverse order
        future_reward = t[3] + gamma * future_reward
        V_DE[t[0]] = future_reward+V_DE[t[0]]
        V_Counts[t[0]] = V_Counts[t[0]]+1
    return V_DE, V_Counts

## Evaluate Episode 1



In [10]:
V_DE = np.zeros((env.observation_space.n))
V_Counts = np.zeros((env.observation_space.n))
V_DE, V_Count = EvaluateEpisode(env, training_episodes[0], V_DE, V_Counts, 0.9)
V = np.where(V_Counts != 0, V_DE / V_Counts, 0)
print (f"V_DE=\n{V_DE.reshape((4,4))}")
print (f"V_Counts=\n{V_Counts.reshape((4,4))}")
print (f"V=\n{V.reshape((4,4))}")

V_DE=
[[0.59049 0.      0.      0.     ]
 [0.6561  0.      0.      0.     ]
 [0.729   0.81    0.9     0.     ]
 [0.      0.      1.      0.     ]]
V_Counts=
[[1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 1. 1. 0.]
 [0. 0. 1. 0.]]
V=
[[0.59049 0.      0.      0.     ]
 [0.6561  0.      0.      0.     ]
 [0.729   0.81    0.9     0.     ]
 [0.      0.      1.      0.     ]]


  V = np.where(V_Counts != 0, V_DE / V_Counts, 0)


## Evaluate All Episodes

In [11]:
V_DE = np.zeros((env.observation_space.n))
V_Counts = np.zeros((env.observation_space.n))
for e in training_episodes2:
    V_DE, V_Count = EvaluateEpisode(env, e, V_DE, V_Counts, 0.9)
V = np.where(V_Counts != 0, V_DE / V_Counts, 0)
print (f"V_DE=\n{V_DE.reshape((4,4))}")
print (f"V_Counts=\n{V_Counts.reshape((4,4))}")
print (f"V_DirectEvaluation=\n{np.round(V.reshape((4,4)),2)}")
print (f"optimal policy= \n{optimal_policy.reshape((4,4))}\n optimal_V=\n{np.round(optimal_V.reshape((4,4)), 2)}")

V_DE=
[[ 879.44773993   12.02033939   32.45370905    0.        ]
 [ 910.67996731    0.           63.889207      0.        ]
 [ 943.61931212  887.58499323  418.52594351    0.        ]
 [   0.         1065.13267976 1369.46029196    0.        ]]
V_Counts=
[[12819.   227.   503.     0.]
 [ 9779.     0.   646.     0.]
 [ 6632.  3600.  1432.     0.]
 [    0.  2876.  2159.     0.]]
V_DirectEvaluation=
[[0.07 0.05 0.06 0.  ]
 [0.09 0.   0.1  0.  ]
 [0.14 0.25 0.29 0.  ]
 [0.   0.37 0.63 0.  ]]
optimal policy= 
[[0 3 0 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]
 optimal_V=
[[0.07 0.06 0.07 0.06]
 [0.09 0.   0.11 0.  ]
 [0.15 0.25 0.3  0.  ]
 [0.   0.38 0.64 0.  ]]


  V = np.where(V_Counts != 0, V_DE / V_Counts, 0)


Q Learning






In [32]:
import gym
import numpy as np
import random

def q_learning(env, alpha=0.1, gamma=0.99, epsilon=1.0, epsilon_decay=0.99, num_episodes=10000):
    """
    Implements the Q-Learning algorithm with epsilon-greedy action selection.

    Args:
        env: The OpenAI Gym environment.
        alpha: Learning rate.
        gamma: Discount factor.
        epsilon: Initial exploration rate.
        epsilon_decay: Decay rate for epsilon.
        num_episodes: Number of episodes to run.

    Returns:
        The learned Q-table and the optimal policy.
    """
    Q = np.zeros((env.observation_space.n, env.action_space.n))

    # Decaying epsilon
    min_epsilon = 0.01

    for episode in range(num_episodes):
        state = env.reset()
        done = False

        while not done:
            if random.uniform(0, 1) < epsilon:
                action = env.action_space.sample()
            else:
                action = np.argmax(Q[state])

            next_state, reward, done, _ = env.step(action)
            best_next_action = np.argmax(Q[next_state])
            Q[state, action] = Q[state, action] + alpha * (reward + gamma * Q[next_state, best_next_action] - Q[state, action])
            # Updates state
            state = next_state
        # Decay epsilon after each episode
        epsilon = max(min_epsilon, epsilon * epsilon_decay)
    # policy from learned Q-table
    policy = np.argmax(Q, axis=1)
    return Q, policy

# Create environment
env = gym.make('FrozenLake-v1', render_mode='ansi')

# Apply Q-Learning
Q, learned_policy = q_learning(env)

# Print learned policy and Q-table
print(f"Learned Policy:\n{learned_policy.reshape((4, 4))}")
print(f"Learned Q-Table:\n{np.round(Q, 2)}")

# Evaluate learned policy
average_reward = evaluate_policy(env, learned_policy)
print("Average Reward:", average_reward)

Learned Policy:
[[0 3 0 1]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]
Learned Q-Table:
[[0.48 0.46 0.46 0.45]
 [0.23 0.1  0.16 0.38]
 [0.28 0.16 0.07 0.17]
 [0.   0.08 0.   0.  ]
 [0.51 0.31 0.46 0.39]
 [0.   0.   0.   0.  ]
 [0.3  0.15 0.15 0.07]
 [0.   0.   0.   0.  ]
 [0.38 0.44 0.44 0.55]
 [0.4  0.62 0.46 0.43]
 [0.52 0.43 0.28 0.21]
 [0.   0.   0.   0.  ]
 [0.   0.   0.   0.  ]
 [0.43 0.58 0.75 0.5 ]
 [0.73 0.88 0.72 0.74]
 [0.   0.   0.   0.  ]]
Average Reward: 0.72


optimal policy=
[[0 3 0 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]
 optimal_V=
[[0.07 0.06 0.07 0.06]
 [0.09 0.   0.11 0.  ]
 [0.15 0.25 0.3  0.  ]
 [0.   0.38 0.64 0.  ]]
Average Reward: 0.74

This is the policy and reward from Value Iteration. With Q-learning, it yields similar results

Learned Policy:
[[0 3 0 0]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]
Learned Q-Table:
[[0.52 0.49 0.49 0.51]
 [0.05 0.15 0.18 0.44]
 [0.39 0.14 0.12 0.19]
 [0.1  0.   0.   0.  ]
 [0.54 0.34 0.4  0.4 ]
 [0.   0.   0.   0.  ]
 [0.36 0.18 0.12 0.07]
 [0.   0.   0.   0.  ]
 [0.33 0.36 0.33 0.58]
 [0.47 0.64 0.51 0.46]
 [0.54 0.38 0.28 0.43]
 [0.   0.   0.   0.  ]
 [0.   0.   0.   0.  ]
 [0.47 0.53 0.77 0.54]
 [0.72 0.89 0.76 0.78]
 [0.   0.   0.   0.  ]]
Average Reward with Q-Learning: 0.7

It has the same policy and similar reward since both are trying to find the optimal policy.