# TP2 INFO8003
The idea behind this notebook is to get familiar with the state-action value function $Q(s,a)$ and learning this function using model-based and model-free approaches for small state-action spaces.

## Environment description 

Jack is searching for treasures on an island represented by a 5x5 grid, where each cell corresponds to a state. Jack starts at the bottom-left corner, and his goal is to navigate the grid to collect treasures. There are two types of treasures: a small one at (2, 2) that rewards him with 1 gold each time he collects it, and a large one at (4, 4) that rewards him with 10 gold each time. Walls are present at (1, 1), (2, 1), and (2, 3), and Jack cannot pass through these cells (they are not considered as state therefore).

Beware of cliffs at (4, 1), (4, 2), and (4, 3), as falling off a cliff will cost Jack 100 gold. Additionally, some cells are slippery (the blue ones shown in the figure below), increasing the likelihood of falling into a cliff.

Since Jack has consumed some rum, he has an 80% chance of performing the intended action and a 20% chance of performing a random action. He can move right, left, up, or down. The environment will display Jack’s position and the layout of the grid.

<p align="center"> 
    <img src="GridEnv.png" alt="GridEnv visualization">
    </p>

In [None]:
import gymnasium as gym 
from gymnasium import spaces
import numpy as np

class GridEnv(gym.Env):
    def __init__(self):
        super(GridEnv, self).__init__()
        self.grid_size = 5
        self.start_state = (4, 0)
        self.small_treasure_state = (2, 2)
        self.big_treasure_state = (4, 4)
        self.wall_states = [(1, 1), (2, 1), (2, 3)]
        self.cliff_states = [(4, 1), (4, 2), (4, 3)]
        self.slippery_states = [(3, 1), (3, 2), (3, 3)]
        
        # Actions: 0=Right, 1=Left, 2=Up, 3=Down
        self.action_space = spaces.Discrete(4)
        self.observation_space = spaces.Discrete(self.grid_size * self.grid_size-len(self.wall_states))
        
        self.state = self.encode_state(self.start_state)

    def encode_state(self, state_tuple):
        x, y = state_tuple
        return x  * self.grid_size + y 

    def decode_state(self, state_int):
        x = state_int // self.grid_size
        y = state_int % self.grid_size
        return (x, y)

    def step(self, action):
        x, y = self.decode_state(self.state)
        new_x, new_y = x, y

        # Check if Jack takes an action at random or not
        w = np.random.uniform()
        if w < 0.2:
            action = self.action_space.sample()

        if (x, y) in self.slippery_states:
            if w > 0.8:
                action = 3

        if action == 0:  # right
            new_y = min(y + 1, self.grid_size - 1)
        elif action == 1:  # left
            new_y = max(y - 1, 0)
        elif action == 2:  # up
            new_x = max(x - 1, 0)
        elif action == 3:  # down
            new_x = min(x + 1, self.grid_size - 1)

        # Check if the new position is a wall
        if (new_x, new_y) in self.wall_states:
            self.state = self.encode_state((x, y))
        else:
            self.state = self.encode_state((new_x, new_y))
        
        if (new_x, new_y) == self.big_treasure_state:
            reward = 10
        elif (new_x, new_y) in self.cliff_states:
            reward = -100
        elif (new_x, new_y) == self.small_treasure_state:
            reward = 1
        else:
            reward = 0
        
        done = False
        truncated = False
        
        return self.state, reward, done, truncated, {}

    def reset(self):
        self.state = self.encode_state(self.start_state)
        return self.state, {}

    def render(self, mode='human'):
        grid = np.zeros((self.grid_size, self.grid_size), dtype=str)
        grid[:] = '.'
        grid[self.big_treasure_state] = 'T'
        grid[self.small_treasure_state] = 't'
        for cliff in self.cliff_states:
            grid[cliff] = 'C'
        for wall in self.wall_states:
            grid[wall] = 'W'
        x, y = self.decode_state(self.state)
        grid[x, y] = 'J'
        print("\n".join(["".join(row) for row in grid]))
        print()

env = GridEnv()
state = env.reset()
env.render()

for _ in range(10):
    action = env.action_space.sample()  # Random action
    state, reward, done, truncated, _ = env.step(action)
    print(f"Action: {action}, Reward: {reward}")
    env.render()

In [None]:
import numpy as np
env = GridEnv()

def simulate_policy(env, policy, steps=10):
    """
    Simulate a policy in the given environment over a specified number of steps.

    Parameters:
    env (Env): The environment in which the policy is to be simulated.
    policy (function): A function that takes a state as input and returns an action.
    steps (int): The number of steps to simulate the policy for.

    Returns:
    list: A list representing the trajectory, i.e. (s_0, a_0, r_0, s_1, a_1, r_1,..., a_{t-1}, r_{t-1}, s_t).
    """
    state, _ = env.reset()
    trajectory = [state]
    for _ in range(steps):
        action = policy(state)
        next_state, reward, _, _, _ = env.step(action)
        trajectory.append(action)
        trajectory.append(reward)
        trajectory.append(next_state)
        state = next_state

    return trajectory

def random_policy(state):
    """
    Policy taking an action at random in [0, 4[.

    Parameters:
    state (int): the current state.

    Returns:
    action (int): a random action.
    """
    action = np.random.randint(4)
    return action

trajectory =  simulate_policy(env, random_policy, 100000)
print("trajectory:", trajectory)

## Question 1
Implement a routine which estimates $r(s,a)$ and $p(s'|s,a)$ from a given trajectory $h_t = (s_0, a_0, r_0, s_1, a_1, r_1, \ldots, a_{t-1}, r_{t-1}, s_t)$ produced by a random policy. Justify why your routine converges towards the true values for a growing trajectory ? 

Advice: take a trajectory large enough to estimate as close as possible the true mdp structure.

In [None]:
import numpy as np
from collections import defaultdict

def estimate_mdp_from_trajectory(trajectory):
    """
    Estimate the reward function r(s, a) and transition probabilities p(s'|s, a) from a given trajectory.
    Hint: use defaultdict structure.

    Parameters:
    trajectory (list): A list representing the trajectory, i.e. (s_0, a_0, r_0, s_1, a_1, r_1,..., a_{t-1}, r_{t-1}, s_t).
    
    Returns:
    estimated_rewards (dict): Estimated reward function with the structure {(s, a): mean_reward, ...}
    estimated_transitions (dict): Estimated transition probabilities with the strcuture {(s, a): {s_prime: probability, ...}, ...}.
    """
    

    # Implementation goes here

    estimated_rewards = dict(sorted(estimated_rewards.items()))
    estimated_transitions = dict(sorted(estimated_transitions.items()))

    return estimated_rewards, estimated_transitions

steps = #...
estimate_mdp_from_trajectory(simulate_policy(env, random_policy, steps))


## Question 2
Compute $\widehat{Q}$ by using $\widehat{r}(s,a)$ and $\widehat{p}(s'|s,a)$ that estimate the MDP structure computed in question 1.  Explain the influence of the length of the trajectory on the quality of the approximation $\hat{Q}$. Derive $\widehat{\mu}^*$ from $\widehat{Q}$.

Display $V_N^{\widehat{\mu}^*}$ for each state $s$. Supposing you estimate the true $Q_N$ functions, determine the smallest N such that the bound $V^{\mu^*}-V^{\mu^*_N}$ is smaller than 0.1 ? Justify.

In [None]:
def estimated_q_n(estimated_rewards, estimated_transitions, gamma=0.95, N=500):
    """
    Compute the Q-values using the estimated rewards and transition probabilities.

    Parameters:
    estimated_rewards (dict): Estimated reward function.
    estimated_transitions (dict): Estimated transition probabilities.
    gamma (float): Discount factor.
    iterations (int): Number of iterations for convergence.

    Returns:
    dict: Estimated Q_N-values with the structure  {(s, a): value, ...}.
    """
    estimated_q_n = defaultdict(lambda: 0)

    # Implementation goes here

    return dict(estimated_q_n)

def derive_optimal_policy(q_values):
    """
    Derive the optimal policy from Q-values.

    Parameters:
    q_values (dict): Q-values.

    Returns:
    dict: Optimal policy mapping states to actions with the structure  {s: a, ...}.
    """
    policy = {}
    
    # Implementation goes here

    return policy


def compute_value_function(q_values, policy):
    """
    Compute the value function from Q-values and a policy.

    Parameters:
    q_values (dict): Q-values.
    policy (dict): Policy mapping states to actions.

    Returns:
    dict: Value function for each state with the structure  {s: value, ...}.
    """
    value_function = {}
    
    # Implementation goes here

    return value_function

estimated_rewards, estimated_transitions = estimate_mdp_from_trajectory(simulate_policy(env, random_policy, 1000000))
q_values = estimated_q_n(estimated_rewards, estimated_transitions)

optimal_policy = derive_optimal_policy(q_values)
value_function = compute_value_function(q_values, optimal_policy)

print("Estimated Q-values:", q_values)
print("Optimal Policy:", optimal_policy)
print("Value Function:", value_function)
print("Bound:", )



Display your optimal policy using the following function. Describe your result, is it coherent ?

In [None]:
from IPython.display import display, Math

def display_policy_as_latex_table(policy, grid_size):
    """
    Display the optimal policy as a LaTeX table with arrows.

    Parameters:
    policy (dict): Optimal policy mapping states to actions.
    grid_size (int): The size of the grid (assuming a square grid).

    Returns:
    None: Displays the LaTeX table in the notebook.
    """
    action_symbols = {
        0: r'\rightarrow',
        1: r'\leftarrow',
        2: r'\uparrow',
        3: r'\downarrow'
    }

    latex_table = "\\begin{array}{" + "c" * grid_size + "}\n"

    for i in range(grid_size):
        row = []
        for j in range(grid_size):
            state = i * grid_size + j
            action = policy.get(state, None)
            if action is not None:
                row.append(action_symbols[action])
            else:
                row.append("")

        latex_table += " & ".join(row) + " \\\\\n"

    latex_table += "\\end{array}"

    display(Math(latex_table))

display_policy_as_latex_table(optimal_policy, grid_size=5)

## Question 3

Sample a trajectory from a random uniform policy with a finite, large enough time horizon $T$. Motivate your choice of $T$. Implement a routine which iteratively updates $\widehat{Q}(s_k,a_k)$ using the Q-learning algorithm from this trajectory. Run your routine with a constant learning rate $\alpha = 0.05$. Derive directly $\widehat{\mu}^*$ from $\widehat{Q}$ and display this optimal policy. Display $V_N^{\widehat{\mu}^*}$ for each initial state $s$.

In [None]:
def q_learning_update(trajectory, alpha=0.05, gamma=0.95):
    """
    Compute the Q-values using the Q-learning algorithm.

    Parameters:
    estimated_rewards (dict): Estimated reward function.
    estimated_transitions (dict): Estimated transition probabilities.
    gamma (float): Discount factor.
    iterations (int): Number of iterations for convergence.

    Returns:
    dict: Estimated Q-values with the structure  {(s, a): value, ...}.
    """
    q_values = defaultdict(lambda: 0)

    # Implementation goes here
    
    return q_values


env = GridEnv()
T = #...
trajectory = simulate_policy(env, random_policy, T)
q_values = q_learning_update(trajectory)

optimal_policy = derive_optimal_policy(q_values)

display_policy_as_latex_table(optimal_policy, grid_size=5)

value_function = compute_value_function(q_values, optimal_policy)
print("value function", value_function)


## Question 4

Implement an intelligent agent which learns $Q$ with Q-learning using an $\epsilon$-greedy policy, i.e. a policy that is greedy with a probability of $(1-\epsilon)$ and random otherwise.

Run the three following experimental protocols and display for each of them the derived optimal policy and their value function. Discuss the results.

The first experimental protocol is the following. The agent trains over $100$ episodes having $1000$ transitions. An episode always starts from the initial state. The learning rate $\alpha$ is equal to $0.05$ and the exploration rate $\epsilon$ is equal to $0.5$. The values of $\alpha$ and $\epsilon$ are both constant over time. The function $\hat{Q}$ is updated after every transition. The transitions are used only once for updating $\hat{Q}$ i.e. there are not stored in a replay buffer.

In [None]:
def epsilon_greedy_policy(q_values, state, epsilon):
    """
    Selects an action using the epsilon-greedy policy.

    Parameters:
    q_values (dict): A dictionary where keys are tuples of (state, action) and values are the estimated Q-values.
    state (tuple): The current state for which the action is to be selected.
    epsilon (float): The probability of selecting a random action (exploration). Should be between 0 and 1.

    Returns:
    int: The action selected according to the epsilon-greedy policy. With probability epsilon, a random action is chosen.
         With probability 1-epsilon, the action with the highest Q-value for the given state is chosen.
    """
    if np.random.rand() < epsilon:
        return np.random.randint(4)
    else:
        return np.argmax(q_values[(state, a_prime)] for a_prime in range(4))

def q_learning(env, episodes=100, alpha=0.05, epsilon=0.5, gamma=0.95):
    """
    Perform Q-learning to learn the optimal Q-values with the first protocol.

    Parameters:
    env (Env): The environment in which Q-learning is performed.
    episodes (int): The number of episodes to run Q-learning.
    alpha (float): The learning rate for updating Q-values.
    epsilon (float): The probability of selecting a random action (exploration) in epsilon-greedy policy.
    gamma (float): The discount factor for future rewards.

    Returns:
    dict: A dictionary containing the learned Q-values for state-action pairs.
    """
    q_values = defaultdict(lambda: 0)
    
    # Implementation goes here

    return q_values

env = GridEnv()
q_values = q_learning(env)
optimal_policy = derive_optimal_policy(q_values)
display_policy_as_latex_table(optimal_policy, grid_size=5)

value_function = compute_value_function(q_values, optimal_policy)
print("value function", value_function)


The second experimental protocol differs from the first one only by the learning rate which is such that $\alpha_0 = 0.05$ and $\forall t > 0, \alpha_t = 0.8\alpha_{t-1}$.

In [None]:
def q_learning_2(env, episodes=100, alpha=0.05, epsilon=0.5, gamma=0.95):
    """
    Perform Q-learning to learn the optimal Q-values with the second protocol.

    Parameters:
    env (Env): The environment in which Q-learning is performed.
    episodes (int): The number of episodes to run Q-learning.
    alpha (float): The learning rate for updating Q-values.
    epsilon (float): The probability of selecting a random action (exploration) in epsilon-greedy policy.
    gamma (float): The discount factor for future rewards.

    Returns:
    dict: A dictionary containing the learned Q-values for state-action pairs.
    """
    q_values = defaultdict(lambda: 0)
    
    # Implementation goes here
    
    return q_values

env = GridEnv()
q_values = q_learning(env)
optimal_policy = derive_optimal_policy(q_values)
display_policy_as_latex_table(optimal_policy, grid_size=5)

value_function = compute_value_function(q_values, optimal_policy)
print("value function", value_function)

In the following, a replay buffer is defined as a data structure of previously seen transitions i.e. $(s_t, a_t, r_t, s_{t+1})$. The third experimental protocol is identical from the first one except that the one-step system transitions are stored in a replay buffer and that at each time-step, the function $\hat{Q}$ is updated ten times by drawing ten transitions at random from the replay buffer.


In [None]:
def q_learning_3(env, episodes=100, alpha=0.05, epsilon=0.5, gamma=0.95):
    """
    Perform Q-learning to learn the optimal Q-values with the third protocol.

    Parameters:
    env (Env): The environment in which Q-learning is performed.
    episodes (int): The number of episodes to run Q-learning.
    alpha (float): The learning rate for updating Q-values.
    epsilon (float): The probability of selecting a random action (exploration) in epsilon-greedy policy.
    gamma (float): The discount factor for future rewards.

    Returns:
    dict: A dictionary containing the learned Q-values for state-action pairs.
    """
    q_values = defaultdict(lambda: 0)
    
    # Implementation goes here
            
    return q_values

env = GridEnv()
q_values = q_learning(env)
optimal_policy = derive_optimal_policy(q_values)
display_policy_as_latex_table(optimal_policy, grid_size=5)

value_function = compute_value_function(q_values, optimal_policy)
print("value function", value_function)