<a href="https://colab.research.google.com/github/Skeletonman59/CSCI166-public/blob/main/QLearning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Frozen Lake w/ Value Iteration & Direct Evaluation

## Frozen Lake Domain Description

Frozen Lake is a simple grid-world environment where an agent navigates a frozen lake to reach a goal while avoiding falling into holes. The environment is represented as a grid, with each cell being one of the following:

* **S**: Starting position of the agent
* **F**: Frozen surface, safe to walk on
* **H**: Hole, falling into one ends the episode with a reward of 0
* **G**: Goal, reaching it ends the episode with a reward of 1

The agent can take four actions:

* **0: Left**
* **1: Down**
* **2: Right**
* **3: Up**

However, due to the slippery nature of the ice, the agent might not always move in the intended direction. There's a chance it moves perpendicular to the intended direction.




In [None]:
import gym

# Create the environment
env = gym.make('FrozenLake-v1', render_mode='ansi')  # 'ansi' mode for text-based rendering

# Reset the environment to the initial state
observation = env.reset()

# Take a few actions and observe the results
for _ in range(5):
    action = env.action_space.sample()  # Choose a random action
    observation, reward, done, info = env.step(action)
    # Render the environment to see the agent's movement (text-based)
    if done:
        observation= env.reset()
    else:
      rendered = env.render()
      if len(rendered) > 1:  # Check if there's a second element
         print(rendered[1])  # Print the second element
# Close the environment
env.close()

  (Right)
S[41mF[0mFF
FHFH
FFFH
HFFG



The transition model for the Frozen Lake world describes how the agent's actions affect its movement and the resulting state transitions. Here's a breakdown of the key components:

**Actions:**

* The agent can choose from four actions:
    * 0: Left
    * 1: Down
    * 2: Right
    * 3: Up

**State Transitions:**

* **Intended Movement:** Ideally, the agent moves one cell in the chosen direction.
* **Slippery Ice:** Due to the slippery nature of the ice, there's a probability that the agent will move in a perpendicular direction instead of the intended one. The exact probabilities depend on the specific Frozen Lake environment configuration, but typically:
    * **Successful Move:** The agent moves in the intended direction with a high probability.
    * **Perpendicular Move:** The agent moves 90 degrees to the left or right of the intended direction with a lower probability.
* **Boundaries:** If the intended movement would take the agent outside the grid boundaries, it remains in its current position.
* **Holes:** If the agent lands on a hole ("H"), the episode ends, and it receives a reward of 0.
* **Goal:** If the agent reaches the goal ("G"), the episode ends, and it receives a reward of 1.




In [None]:
import gym

# Create the environment
env = gym.make('FrozenLake-v1', render_mode='ansi')  # 'ansi' mode for text-based rendering

# Reset the environment to the initial state
observation = env.reset()

# Take a few actions and observe the results
for _ in range(5):
    action = env.action_space.sample()  # Choose a random action
    observation, reward, done, info = env.step(action)
    # Render the environment to see the agent's movement (text-based)
    if done:
        observation= env.reset()
    else:
      rendered = env.render()
      if len(rendered) > 1:  # Check if there's a second element
         print(rendered[1])  # Print the second element
# Close the environment
env.close()
print ("State 14 Going Right: (s, a, r, Done)", env.P[14][2])

#added code
print ("State 4 Going Down: (s, a, r, Done)", env.P[4][1])
print ("State 11 Going Right: (s, a, r, Done)", env.P[11][2])
print ("State 9 Going Right: (s, a, r, Done)", env.P[9][2])

  (Down)
S[41mF[0mFF
FHFH
FFFH
HFFG

  (Down)
S[41mF[0mFF
FHFH
FFFH
HFFG

State 14 Going Right: (s, a, r, Done) [(0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False)]
State 4 Going Down: (s, a, r, Done) [(0.3333333333333333, 4, 0.0, False), (0.3333333333333333, 8, 0.0, False), (0.3333333333333333, 5, 0.0, True)]
State 11 Going Right: (s, a, r, Done) [(1.0, 11, 0, True)]
State 9 Going Right: (s, a, r, Done) [(0.3333333333333333, 13, 0.0, False), (0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 5, 0.0, True)]


# Value Iteration

In [None]:
import gym
import numpy as np

def value_iteration(env, gamma=0.9, num_iterations=1000, theta=0.0001):
    """
    Implements the Value Iteration algorithm.

    Args:
        env: The OpenAI Gym environment.
        gamma: Discount factor.
        num_iterations: Number of iterations to run.

    Returns:
        The optimal value function and policy.
    """
    # Initialize value function and policy
    V = np.zeros(env.observation_space.n)
    policy = np.zeros(env.observation_space.n, dtype=int)

    for _ in range(num_iterations):
        delta = 0
        V_prev = V.copy()
        for s in range(env.observation_space.n):
            v = V[s]
            action_values = []  # Store Q values for all actions in this state

            for a in range(env.action_space.n): ##This takes every possible turn in range (up, down, left, right)
                q_value = 0                     ##This resets q_val to 0,
                for prob, next_state, reward, done in env.P[s][a]:
                    q_value += prob * (reward + gamma * V_prev[next_state]) ##do the algo stuff to q_value
                action_values.append(q_value)   ##save q_values to an array

            # Update V[s] with the max Q value
            V[s] = max(action_values) #push the maximum q_value into the V array
            # Update policy[s] with the action that maximizes Q value
            policy[s] = np.argmax(action_values) #save the specific action that led us to the max q_val (so if we had 0.3, 0.4, 0.7, 0.6, 2 would be saved, which is right)

            delta = max(delta, abs(v - V[s])) # remember, v is the old value, V[s] is the one we just calculated.
            #find absolute difference between the two, compare it to delta. delta will be replaced if abs-dif is greater
        if delta < theta and 0: #delta is our convergence criterion. theta is our threshhold.
            break
        V_prev = V.copy()

    return V, policy

# Create the environment
env = gym.make('FrozenLake-v1', render_mode='ansi')  # 'ansi' mode for text-based rendering

# Reset the environment to the initial state
observation = env.reset()

# Apply student's Value Iteration
optimal_V, optimal_policy = value_iteration(env)
print (f"optimal policy= \n{optimal_policy.reshape((4,4))}\n optimal_V=\n{np.round(optimal_V.reshape((4,4)), 2)}")

# Evaluate student's solution (Optional)
def evaluate_policy(env, policy, num_episodes=100):
    total_reward = 0
    for _ in range(num_episodes):
        state = env.reset()
        done = False
        while not done:
            action = policy[state]
            state, reward, done, _ =  env.step(action)
            total_reward += reward
    return total_reward / num_episodes

average_reward = evaluate_policy(env, optimal_policy)
print("Average Reward:", average_reward)

optimal policy= 
[[0 3 0 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]
 optimal_V=
[[0.07 0.06 0.07 0.06]
 [0.09 0.   0.11 0.  ]
 [0.15 0.25 0.3  0.  ]
 [0.   0.38 0.64 0.  ]]
Average Reward: 0.75


# Q-Learning (Lecture Slides variant, broken. See below for Gemini-fused Q-Learning

In [None]:
import gym
import numpy as np

# Create the environment
env = gym.make('FrozenLake-v1', render_mode='ansi')  # 'ansi' mode for text-based rendering

# Reset the environment to the initial state
observation = env.reset()

# Take a few actions and observe the results
while True:
    step = global_step.eval()
    if step >= 10000:
        break
    iteration +=1
    if done:
      observation = env.reset()
      for skip in range(skip_start):
        action = env.action_space.sample()  # Choose a random action
        observation, reward, done, info = env.step(action)
        state = preprocess_observation(obs)

    #Online DQN evaluates what to do
    Qval = online_q_values.eval(feed_dict = {X_state: [state]})
    action = epsilon_greedy(Qval, step)

    #Online DQN plays
    obs, reward, done, info = env.step(action)
    next_state = preprocess_observation(obs)
    replay_memory.append((state, action, reward, next_state, 1.0 - done))
    state = next_state

    if iteration < training_start or iteration % training_interval != 0:
        continue
    X_state_val, X_action_val, rewards, X_next_state_val, continues = (
        sample_memories(batch_size))
    next_q_values = target_q_values.eval(
        feed_dict={X_state: X_next_state_val})
    max_next_q_values = np.max(next_q_values, axis=1, keepdims=True)
    y_val = rewards + continues * discount_rate * max_next_q_values

    #train the online DQN
    training_op.run(feed_dict={
        X_state: X_state_val,
        X_action: X_action_val,
        y: y_val
    })

    #regularly copy the online DQN to the target DQN
    if step % copy_steps == 0:
        copy_online_to_target.run()

    #and save regularly
    if step % save_steps == 0:
        online_saver.save(sess, checkpoint_path)


# Close the environment
env.close()

  deprecation(
  deprecation(


NameError: name 'global_step' is not defined

#Q-Learning, With Gemini/GPT guidance

In [None]:
import gym
import numpy as np

Q = np.zeros((env.observation_space.n, env.action_space.n)) #initialize Q to all zeroes

def choose_action(state, epsilon):
    if np.random.random() < epsilon:
        return env.action_space.sample()
    else:
        return np.argmax(Q[state, :])

def update_q(Q, state, action, reward, next_state, alpha, gamma):
    best_next_action = np.argmax(Q[next_state, :])
    td_target = reward + gamma * Q[next_state, best_next_action]
    Q[state, action] += alpha * (td_target - Q[state, action])

def q_learning(env, gamma=0.99, alpha = 0.1, epsilon = 0.3, num_episodes=100000, epsilon_decay=3.0): #approach with exploration + decay
    for _ in range(num_episodes):
        state = env.reset()
        done = False
        while not done:
          action = choose_action(state, epsilon)
          next_state, reward, done, _ = env.step(action)
          #Q[state, action] = Q[state, action] + alpha * (reward + gamma * np.max(Q[next_state, :]) - Q[state, action])
          update_q(Q, state, action, reward, next_state, alpha, gamma)
          state = next_state

          #epsilon decay check will only activate if variable is greater than 0
          if epsilon_decay > 0.0:
            epsilon = max(epsilon * epsilon_decay, 0.01)

    policy = np.argmax(Q, axis=1)
    return Q, policy

env = gym.make('FrozenLake-v1', render_mode='ansi')  # 'ansi' mode for text-based rendering
Q, policy = q_learning(env)
print (f"optimal policy= \n{policy.reshape((4,4))}\n Q=\n{Q}")


optimal policy= 
[[0 3 3 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 3 0]]
 Q=
[[0.58489513 0.56611893 0.56903952 0.56051144]
 [0.28049902 0.42195987 0.38289544 0.52768371]
 [0.48065491 0.47177845 0.44961395 0.50653456]
 [0.39474642 0.31685229 0.35052083 0.48948051]
 [0.60078008 0.32006419 0.44309373 0.33466441]
 [0.         0.         0.         0.        ]
 [0.4105011  0.24254459 0.30611058 0.21298301]
 [0.         0.         0.         0.        ]
 [0.37236299 0.51190684 0.32529949 0.62811769]
 [0.27975067 0.65512567 0.51355667 0.41075268]
 [0.63528945 0.53127802 0.43279036 0.32535564]
 [0.         0.         0.         0.        ]
 [0.         0.         0.         0.        ]
 [0.36248122 0.4217308  0.7401224  0.47272466]
 [0.73192736 0.85982725 0.8461551  0.87315733]
 [0.         0.         0.         0.        ]]


#Interesting Notes about this code box:

1. When the number of episodes (run cycles) is 1000, and when the epsilon decay is 0 (therefore no exploration), we get a working optimal policy with the grid:

[[1 3 0 0]

 [0 0 2 0]

 [1 2 1 0]

 [0 3 1 0]]

However, when we increase the decay, our Q-values and policy get ruined down to zeroes. This applies to really small numbers like 0.001. Interestingly, increasing the epsilon decay to 1 gives us a policy of:

[[0 3 2 3]

 [0 0 2 0]

 [3 1 2 0]

 [0 1 3 0]]

 I'll have to ask what is going on here specifically.

 Final Note: Actually, Increasing the decay by whole values seems to affect both policy and q-Values equally.

 2. Having an epsilon of 0.3 and a decay of 0 also breaks the code.
 HOWEVER! This is not the case when you increase the number of epsiodes from 1000 to 10'000. with the same epsilon/decay, we get a good policy and q-val table printed out. Having epsilon and decay equal each other breaks the code though. This most likely shows us that the value we have for decay makes the q values converge too soon with terrible values.

 Further testing required.



Saved Runs:

epsilon = 0.5, num_episodes = 100'000, epsilon_decay = 1.0

epsilon = 0.3, num_episodes=100'000, epsilon_decay=3.0


# Policy Extraction & Policy Evaluation

In [None]:
def policy_extraction(env, V, gamma=0.9):
  policy = np.zeros(env.observation_space.n, dtype=int)
  for s in range(env.observation_space.n):
      v = V[s]
      action_values = []  # Store Q values for all actions in this state

      for a in range(env.action_space.n):
          q_value = 0
          for prob, next_state, reward, done in env.P[s][a]:
              q_value += prob * (reward + gamma * V[next_state])
          action_values.append(q_value)

      # Update policy[s] with the action that maximizes Q value
      policy[s] = np.argmax(action_values)
  return policy

def policy_evaluation(env, policy, gamma=0.9, num_iterations=1000, theta=0.0001):
    """
    Implements the Value Iteration algorithm.

    Args:
        env: The OpenAI Gym environment.
        gamma: Discount factor.
        num_iterations: Number of iterations to run.

    Returns:
        The optimal value function and policy.
    """
    # Initialize value function and policy
    V = np.zeros(env.observation_space.n)
    for _ in range(num_iterations):
        delta = 0
        V_prev = V.copy()
        for s in range(env.observation_space.n):
            v = V[s]
            a = policy[s]
            q_value = 0
            for prob, next_state, reward, done in env.P[s][a]:
                q_value += prob * (reward + gamma * V_prev[next_state])
            # Update V[s] with the max Q value
            V[s] = q_value

            delta = max(delta, abs(v - V[s]))
        if delta < theta and 0:
            break
        V_prev = V.copy()
    return V

In [None]:
V_policy_evalution = policy_evaluation(env, optimal_policy)
print (f"V_policy_evalution=\n{V_policy_evalution}")
V_policy = policy_extraction(env, V_policy_evalution)
print (f"optimal policy= \n{optimal_policy.reshape((4,4))}\n V_policy=\n{V_policy.reshape((4,4))}")

V_policy_evalution=
[0.0688909  0.06141457 0.07440976 0.05580732 0.09185454 0.
 0.11220821 0.         0.14543635 0.24749695 0.29961759 0.
 0.         0.3799359  0.63902015 0.        ]
optimal policy= 
[[0 3 0 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]
 V_policy=
[[0 3 0 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]


# Reinforcement Learning

In [None]:
env.reset(seed=42)
init_policy = np.array([0, 3, 0, 3, 0, 0, 0, 0, 3, 1, 0, 0, 0, 2, 1, 0])
print (f"optimal policy= \n{init_policy.reshape((4,4))}")

optimal policy= 
[[0 3 0 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]


In [None]:
def GenerateEpisodes(env, policy, num_episodes=5):
    total_reward = 0
    episodes = []
    for _ in range(num_episodes):
        state = env.reset()
        done = False
        episode = []
        while not done:
            action = policy[state]
            newstate, reward, done, _ = env.step(action)
            episode.append((int(state), action, newstate, reward))
            state = newstate
        episodes.append(episode)
    return episodes

training_episodes = GenerateEpisodes(env, init_policy)
for i, e in enumerate(training_episodes):
  print (f"training_episode=({i=}):\n{e}")

training_episodes2 = GenerateEpisodes(env, init_policy, 1000)

training_episode=(i=0):
[(0, 0, 4, 0.0), (4, 0, 8, 0.0), (8, 3, 9, 0.0), (9, 1, 10, 0.0), (10, 0, 14, 0.0), (14, 1, 15, 1.0)]
training_episode=(i=1):
[(0, 0, 0, 0.0), (0, 0, 0, 0.0), (0, 0, 4, 0.0), (4, 0, 4, 0.0), (4, 0, 8, 0.0), (8, 3, 4, 0.0), (4, 0, 0, 0.0), (0, 0, 0, 0.0), (0, 0, 0, 0.0), (0, 0, 4, 0.0), (4, 0, 4, 0.0), (4, 0, 8, 0.0), (8, 3, 4, 0.0), (4, 0, 8, 0.0), (8, 3, 8, 0.0), (8, 3, 8, 0.0), (8, 3, 9, 0.0), (9, 1, 13, 0.0), (13, 2, 13, 0.0), (13, 2, 13, 0.0), (13, 2, 9, 0.0), (9, 1, 10, 0.0), (10, 0, 14, 0.0), (14, 1, 13, 0.0), (13, 2, 14, 0.0), (14, 1, 14, 0.0), (14, 1, 13, 0.0), (13, 2, 13, 0.0), (13, 2, 14, 0.0), (14, 1, 13, 0.0), (13, 2, 9, 0.0), (9, 1, 13, 0.0), (13, 2, 9, 0.0), (9, 1, 10, 0.0), (10, 0, 6, 0.0), (6, 0, 10, 0.0), (10, 0, 14, 0.0), (14, 1, 14, 0.0), (14, 1, 13, 0.0), (13, 2, 9, 0.0), (9, 1, 8, 0.0), (8, 3, 9, 0.0), (9, 1, 8, 0.0), (8, 3, 8, 0.0), (8, 3, 4, 0.0), (4, 0, 8, 0.0), (8, 3, 8, 0.0), (8, 3, 4, 0.0), (4, 0, 4, 0.0), (4, 0, 0, 0.0), (0, 0, 0, 0.0

# Direct Evaluation

## Evaluate Single Episode

In [None]:
def EvaluateEpisode(env, e, V_DE, V_Counts, gamma=0.9):
    future_reward = 0
    for t in reversed(e):  # Iterate in reverse order
        future_reward = t[3] + gamma * future_reward
        V_DE[t[0]] = future_reward+V_DE[t[0]]
        V_Counts[t[0]] = V_Counts[t[0]]+1
    return V_DE, V_Counts

## Evaluate Episode 1



In [None]:
V_DE = np.zeros((env.observation_space.n))
V_Counts = np.zeros((env.observation_space.n))
V_DE, V_Count = EvaluateEpisode(env, training_episodes[0], V_DE, V_Counts, 0.9)
V = np.where(V_Counts != 0, V_DE / V_Counts, 0)
print (f"V_DE=\n{V_DE.reshape((4,4))}")
print (f"V_Counts=\n{V_Counts.reshape((4,4))}")
print (f"V=\n{V.reshape((4,4))}")

V_DE=
[[0.59049 0.      0.      0.     ]
 [0.6561  0.      0.      0.     ]
 [0.729   0.81    0.9     0.     ]
 [0.      0.      1.      0.     ]]
V_Counts=
[[1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 1. 1. 0.]
 [0. 0. 1. 0.]]
V=
[[0.59049 0.      0.      0.     ]
 [0.6561  0.      0.      0.     ]
 [0.729   0.81    0.9     0.     ]
 [0.      0.      1.      0.     ]]


  V = np.where(V_Counts != 0, V_DE / V_Counts, 0)


## Evaluate All Episodes

In [None]:
V_DE = np.zeros((env.observation_space.n))
V_Counts = np.zeros((env.observation_space.n))
for e in training_episodes2:
    V_DE, V_Count = EvaluateEpisode(env, e, V_DE, V_Counts, 0.9)
V = np.where(V_Counts != 0, V_DE / V_Counts, 0)
print (f"V_DE=\n{V_DE.reshape((4,4))}")
print (f"V_Counts=\n{V_Counts.reshape((4,4))}")
print (f"V_DirectEvaluation=\n{np.round(V.reshape((4,4)),2)}")
print (f"optimal policy= \n{optimal_policy.reshape((4,4))}\n optimal_V=\n{np.round(optimal_V.reshape((4,4)), 2)}")

V_DE=
[[ 879.44773993   12.02033939   32.45370905    0.        ]
 [ 910.67996731    0.           63.889207      0.        ]
 [ 943.61931212  887.58499323  418.52594351    0.        ]
 [   0.         1065.13267976 1369.46029196    0.        ]]
V_Counts=
[[12819.   227.   503.     0.]
 [ 9779.     0.   646.     0.]
 [ 6632.  3600.  1432.     0.]
 [    0.  2876.  2159.     0.]]
V_DirectEvaluation=
[[0.07 0.05 0.06 0.  ]
 [0.09 0.   0.1  0.  ]
 [0.14 0.25 0.29 0.  ]
 [0.   0.37 0.63 0.  ]]
optimal policy= 
[[0 3 0 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]
 optimal_V=
[[0.07 0.06 0.07 0.06]
 [0.09 0.   0.11 0.  ]
 [0.15 0.25 0.3  0.  ]
 [0.   0.38 0.64 0.  ]]


  V = np.where(V_Counts != 0, V_DE / V_Counts, 0)
