# ECEN743 Spring 2025 - Assignment 1
## Tabular RL Algorithms

In this assignment, you will solve the FrozenLake-v0 environment from [Gymnasium](https://gymnasium.farama.org/). You will be using this helper file to answer questions in your assignment.

**Note that you do not need to start from the scratch. Only write your code between the following lines. Do not modify other parts.**  
"### YOUR CODE HERE"  
"### END OF YOUR CODE"

## Introduction of the FrozenLake Environment

Frozen lake involves crossing a frozen lake from start to goal without falling into any holes by walking over the frozen lake. The player may not always move in the intended direction due to the slippery nature of the frozen lake. The game starts with the player at location [0,0] of the frozen lake grid world with the goal located at far extent of the world e.g. [3,3] for the 4x4 environment. Holes in the ice are distributed in set locations using a pre-determined map, and the player makes moves until they reach the goal or fall in a hole. The map is given below for your reference

        SFFF
        FHFH
        FFFH
        HFFG
    S : starting point, safe
    F : frozen surface, safe
    H : hole, fall to your doom
    G : goal, where the frisbee is located
    
    
### Action Space
The player/agent can take 4 discrete actions, in the range {0,3}
* 0: Move left
* 1: Move down
* 2: Move right
* 3: Move up


### State Space
The environment consists of 16 states. The state is a value representing the player’s current position as current_row * nrows + current_col (where both the row and col start at 0).
For example, the goal position in the 4x4 map can be calculated as follows: 3 * 4 + 3 = 15.


### Starting State
The episode starts with the player in state [0] (location [0, 0]).


### Rewards

* Reach goal: +1
* Reach hole: 0
* Reach frozen: 0

### Episode End
The episode ends if the following happens:
#### 1.Termination:
* The player moves into a hole.
* The player reaches the goal at max(nrow) * max(ncol) - 1 (location [max(nrow)-1, max(ncol)-1]).

#### 2.Truncation:
* The length of the episode is 100 for 4x4 environment.

For more info refer to source: https://gymnasium.farama.org/environments/toy_text/frozen_lake/

### The Environment Parameters
* Use discount factor, $\gamma = 0.9$
* The environment is slippery, ie., the transition kernel is stochastic.
* The transition kernel P is a dictionary.
* P[state][action] is tuples with (probability, nextstate, reward, terminal)

**Run the following initializer. Make sure you can execute it without any error.**

If you wish to finish this assignment using Google Colab. Uncomment the following commands and run them.

In [None]:
!pip install swig
!pip install "gymnasium[box2d]"




In [None]:
import gymnasium as gym
%matplotlib inline
import seaborn
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
import numpy as np
from matplotlib.patches import Rectangle, Polygon
import matplotlib.pyplot as plt
from IPython.display import display, Image
from PIL import Image as PILImage
import io

env = gym.make('FrozenLake-v1', render_mode="rgb_array", desc=None, map_name="4x4", is_slippery=True)

while hasattr(env, 'env'):
    env = env.env

class FrozenLakeDisplay:
    def __init__(self, size=4):
        self.grid_size = size
        self.map = [
            ['S', 'F', 'F', 'F'],
            ['F', 'H', 'F', 'H'],
            ['F', 'F', 'F', 'H'],
            ['H', 'F', 'F', 'G']
        ]

    def create_triangles(self, x, y):
        center = (x + 0.5, y + 0.5)
        nw = (x, y + 1)
        ne = (x + 1, y + 1)
        se = (x + 1, y)
        sw = (x, y)
        
        triangles = {
            'up': [center, nw, ne],
            'right': [center, ne, se],
            'down': [center, sw, se],
            'left': [center, nw, sw]
        }
        return triangles

    def get_text_positions(self, x, y):
        positions = {
            'up': (x + 0.5, y + 0.8),
            'right': (x + 0.8, y + 0.5),
            'down': (x + 0.5, y + 0.2),
            'left': (x + 0.2, y + 0.5)
        }
        return positions
    

    def get_state_color(self, state_type):
        colors = {
            'S': (0, 0.8, 0, 1),    # Start - Green
            'F': (0.15, 0.15, 0.15, 1),  # Frozen - Dark gray
            'H': (0.3, 0.3, 0.3, 1),    # Hole - Gray
            'G': (0, 0.8, 0, 1)     # Goal - Green
        }
        return colors.get(state_type, (0.15, 0.15, 0.15, 1))

    def display_q_values(self, q_values):
        fig, ax = plt.subplots(figsize=(12, 12))
        ax.set_aspect('equal')

        q_values = convert_q_array_to_dict(q_values)

        for i in range(self.grid_size):
            for j in range(self.grid_size):
                state_type = self.map[i][j]
                cell = Rectangle((j, self.grid_size-1-i), 1, 1, facecolor=self.get_state_color(state_type), edgecolor='white', linewidth=1)
                ax.add_patch(cell)

                if state_type in ['H', 'G']:
                    # Add state label
                    ax.text(j+0.5, self.grid_size-1-i+0.5,  state_type, ha='center', va='center', color='white', fontsize=10)
                    continue

                triangles = self.create_triangles(j, self.grid_size-1-i)
                text_pos  = self.get_text_positions(j, self.grid_size-1-i)
                max_value = float('-inf')
                for direction in ['left','down', 'right', 'up']:
                    max_value = max(max_value, q_values[(i, j)][direction])
                
                for direction in ['left','down', 'right', 'up']:
                    value = q_values[(i, j)][direction]
                    if value == max_value:
                        color = (0, 0.8, 0, 1)
                    else:
                        color = (0.15, 0.15, 0.15, 1)   
                    triangle = Polygon(triangles[direction],  facecolor=color,  edgecolor='white',  linewidth=1)
                    ax.add_patch(triangle)
                    tx, ty = text_pos[direction]
                    ax.text(tx, ty, f"{value:.3f}",  ha='center', va='center', color='white', fontsize=8)

        ax.set_xlim(-0.1, self.grid_size + 0.1)
        ax.set_ylim(-0.1, self.grid_size + 0.1)
        ax.axis('off')
        plt.title('CURRENT Q-VALUES', pad=20, color='white')
        ax.set_facecolor('black')
        fig.patch.set_facecolor('black')
        plt.show()

def convert_q_array_to_dict(q_array):
    q_dict = {}
    actions = ['left','down', 'right', 'up']
    size = 4
    
    for i in range(size):
        for j in range(size):
            idx = i * size + j
            q_values = q_array[idx]
            q_dict[(i, j)] = dict(zip(actions, q_values))
    return q_dict


display_q = FrozenLakeDisplay()

## 1. Q-value iteration

### Explanation of the parameters
* `Q_prev` is the Q-value function from the previous iteration.
* `Q_curr` is the Q-value function of current iteration. In each iteration, you need to use model, reward, and the Q-value function from previous iteration (`Q_prev`) to compute `Q_curr`.
* You can use
    * `n = np.array([n for (p,n,r,t) in env.P[s][a]])` to access the information of the possible next states if you pick action `a` in state `s`. **Note that for conciseness, `env.P[s][a]` omits entries that are not reachable from `(s,a)`;**
    * `p = np.array([p for (p,n,r,t) in env.P[s][a]])` to access the transition probabilities at any state-action pair `(s,a)`. For example, in state `4`, if you choose to move right (`a=2`), then `env.P[4][2]` outputs:
        ```
        [(0.3333333333333333, 8, 0.0, False),
         (0.3333333333333333, 5, 0.0, True),
         (0.3333333333333333, 0, 0.0, False)]
        ```
        That is, there is a one third chance that we end up in either state `8`, `5`, or `0`. **This is very import since the indexing of this array is totally irrelavant to the indexing of your Q-value function array.** In this particular example, you need to update `Q_curr[4]` using `p[0]` and `Q_prev[8]`, `p[1]` and `Q_prev[5]`, `p[2]` and `Q_prev[0]` (Why?);
    * `r = np.array([r for (p,n,r,t) in env.P[s][a]])` to access the reward.

### Your Task
1. Complete the Bellman update in Task 1 below. In particular, calculate the current Q-value function `Q_curr` using the previous Q-value function `Q_prev`.
2. Compute the optimal value function `Vopt` and the optimal policy `Policyopt`.

In [None]:
no_of_actions = env.action_space.n
no_of_states = env.observation_space.n
epsilon = 1e-5
gamma  = 0.9
Q_prev = np.array([[0.0]*no_of_actions for i in range(no_of_states)])
Q_delta = []
delta = 1


while(delta > epsilon):
    Q_curr = np.array([[0.0]*no_of_actions for i in range(no_of_states)])
    ### YOUR CODE HERE
    # Update Q - Bellman Equation
    # Task 1
    ### END OF YOUR CODE
    Q_prev = Q_curr.copy()

# Task 2
### YOUR CODE HERE

V_opt      = []
Policy_opt  = []

### END OF YOUR CODE

### 1a. Optimal Policy and Value function

You do not need to modify the code below but you have to run it before submission.

In [None]:
print("Optimal Value function: ")
print(V_opt)
print("Optimal Policy")
print(Policy_opt)

### 1b. Plot $||Q_k - Q_{k-1}||$

You do not need to modify the code below but you have to run it before submission.

In [None]:
plt.plot(range(len(Q_delta)),Q_delta)
plt.title("Q-value Iteration")
plt.xlabel("Number of iterations")
plt.ylabel("$||Q_k - Q_{k-1}||$")
plt.show()

### 1c. Heat map

You do not need to modify the code below but you have to run it before submission.

In [None]:
display_q.display_q_values(Q_curr)

## 2. Policy Evaluation  

### 2a. linear system of equations

### Hint: one way to do this
1. Compute `P_opt` which is a $|\mathcal{S}|\times|\mathcal{S}|$ matrix where the entry in the $i$-th row, $j$-th column represents the probability of going from state `i` to state `j` by executing the optimal policy obtained by QVI in Problem 1.

2. Compute `r_opt` which is a $|\mathcal{S}|$-dimensional vector whose $i$-th element is $$\mathbb{E}_{a\sim \pi^*(\cdot\mid s)}\mathbb{E}_{s'\sim P(\cdot\mid s,a)}[r(s,a,s')] \stackrel{(a)}{=} \mathbb{E}_{s'\sim P(\cdot\mid s,\pi^*(s))}[r(s,\pi^*(s),s')]$$, where $(a)$ is because $\pi^*$ is deterministic.

3. Recall the Bellman consistency equation, for any policy $\pi$, we have $V^{\pi} = (I-\gamma P^{\pi})^{-1} r^{\pi}$. Rearrange the terms, we can look at the system of linear equations, $(I-\gamma P^{\pi}) V^{\pi} = r^{\pi}$, and solve for $V^\pi$.

4. For $\pi_{\mathrm{unif}}$, repeat the steps above but be careful since the policy is no longer deterministic. You need to do extra work in step 2.


In [None]:
# Optimal policy
I = np.identity(no_of_states)
P_opt = np.zeros((no_of_states,no_of_states))
r_opt = np.zeros(no_of_states)

### YOUR CODE HERE
     
### END OF YOUR CODE

# Uniform policy
I = np.identity(no_of_states)
P_unif = np.zeros((no_of_states,no_of_states))
r_unif = np.zeros(no_of_states)

### YOUR CODE HERE

### END OF YOUR CODE

print("Value function under optimal policy: ")
### YOUR CODE HERE
# print your value function for the optimal policy
### END OF YOUR CODE

print("Value function under uniformly random policy:")
### YOUR CODE HERE
# print your value function for the uniform policy
### END OF YOUR CODE

### 2b. Iterative Method

Recall the Bellman consistency equation that for any policy $\pi$, we have
$$
V^\pi(s) = \mathbb{E}_{a\sim \pi(\cdot\mid s)}\mathbb{E}_{s'\sim P(\cdot\mid s,a)}[r(s,a,s') + \gamma V^\pi(s')].
$$

Please keep `epsilon` unchanged. Remember to update the `delta` in the while loop to reflect the current convergency of the contraction mapping.

In [None]:
epsilon = 1e-10
#optimal policy
V_prev = np.array([0.0]*no_of_states)
delta = 1
while(delta > epsilon):
    ### YOUR CODE HERE
    # V_curr = ?
    # delta = compare V_prev and V_curr
    ### END OF YOUR CODE
    V_prev = V_curr

print("Value function under optimal policy: ")
print(V_prev)

#uniform policy
V_prev = np.array([0.0]*no_of_states)
delta = 1
while(delta > epsilon):
    ### YOUR CODE HERE
    # V_curr = ?
    # delta = compare V_prev and V_curr
    ### END OF YOUR CODE
    V_prev = V_curr
    
print("Value function under uniform policy: ")
print(V_prev)

### 2c. Comparison

**Write your answer below.**  
Answer:




## 3. Policy Iteration

### Hint: one way to do this
1. In the policy evaluation step, calculate `P_pol` matrix. This should be identical to Problem **2a**.
2. Calculate `r_pol` which is a $|\mathcal{S}|$-dimensional vector. You should have
$r_\mathrm{pol}[s] = \mathbb{E}_{a\sim\pi_{\mathrm{prev}}(\cdot\mid s)} \mathbb{E}_{s'\sim P(\cdot\mid s,a)} [r(s,a,s')]. $
Note that this should be just one line. Refer to **Explanation of the parameters** in **Problem 1** for `p`, `n`, and `r`.  
3. Compute `V_curr` using `P_pol` and `r_pol`. It represents the value function of the current policy $\pi_k$.
4. Compute new policy $\pi_{k+1}$ using the value function of $\pi_k$. In particular, you need to use `V_pol_prev` to compute `pi_curr`.

**Note, for this problem, you can remove everything below and start from the scratch. **

**However, you have to save your optimal policy as `piopt_politer` and your optimal value function as `Vopt_politer`.**

In [None]:
pi_prev = np.random.randint(0,no_of_actions,size=no_of_states)
V_pi_delta = []
V_pol_prev = np.array([0.0]*no_of_states)

epsilon_o = 1e-5
epsilon_i = 1e-10

delta_o = 1
while(delta_o > 0):
    ### YOUR CODE HERE
    # some prep work for policy evaluation
    ###
    delta_i = 1
    while(delta_i > epsilon_i):
        ### YOUR CODE HERE
        # V_curr = r_pol + gamma*(P_pol @ V_pol_prev_i)
        ### END OF YOUR CODE
        # delta_i = compare V_curr and V_pol_prev_i
        V_pol_prev_i = V_curr  # update while loop
        
    V_pol_prev = V_pol_prev_i
    
    pi_curr = np.zeros((no_of_states,), dtype=int)
    ### YOUR CODE HERE
    # design your own policy improvement steps
    ### END OF YOUR CODE
    # delta_o = compare pi_curr and pi_prev
    #print(delta_o)
    pi_prev = pi_curr  # update while loop

plt.plot(range(len(V_pi_delta)),V_pi_delta)
plt.title("Policy Iteration")
plt.xlabel("Number of iterations")
plt.ylabel("$||V_{\pi_k} - V_{\pi_{k-1}}||$")
plt.show()

# Remember to save your optimal policy as `piopt_politer` and your optimal value function as `Vopt_politer`.

### 3a. Optimal Policy and Value function

You do not need to modify the code below but you have to run it before submission.

In [None]:
print("Optimal Value function: ")
print(Vopt_politer)
print("Optimal Policy")
print(piopt_politer)

### Run below cell for policy animation

In [None]:
done = False
frames = []
episodes = 0
state, _ = env.reset()
while not done and episodes < 100:
    action = pi_prev[state]
    state, reward, done, truncated, info = env.step(action)
    frame = env.render()
    frames.append(frame)
    episodes += 1

gif_buffer = io.BytesIO()
pil_images = [PILImage.fromarray(frame) for frame in frames] 
pil_images[0].save(gif_buffer, format='GIF', save_all=True, append_images=pil_images[1:], loop=0, duration=500)  
gif_buffer.seek(0)
display(Image(data=gif_buffer.read(), format='png'))
env.close()


### 3b. Compare the convergence of QVI and PI

**Write your answer below.**  
Answer:

### 4a. Run Q-value iteration or Policy Iteration on Cliff walking and visualize the policy 

In [None]:
env = gym.make('CliffWalking-v0', render_mode = 'rgb_array',  is_slippery = 'True')
while hasattr(env, 'env'):
    env = env.env
no_of_actions = env.action_space.n
no_of_states = env.observation_space.n

In [None]:
### YOUR CODE ### 
# Your Choice for Policy learning

### Run below cell for policy animation

In [None]:
done = False
frames = []
episodes = 0
state, _ = env.reset()
while not done and episodes < 500:
    action = pi_prev[state]
    state, reward, done, truncated, info = env.step(action)
    frame = env.render()
    frames.append(frame)
    episodes += 1

gif_buffer = io.BytesIO()
pil_images = [PILImage.fromarray(frame) for frame in frames] 
pil_images[0].save(gif_buffer, format='GIF', save_all=True, append_images=pil_images[1:], loop=0, duration=500)  
gif_buffer.seek(0)
display(Image(data=gif_buffer.read(), format='png'))
env.close()
