# Practical 11 : Q-Learning & Value-Based Methods (MC vs TD)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import math
import sys, os
from collections import defaultdict
from typing import Dict
from collections import namedtuple, defaultdict

from Practical11_Support.gym_simple_gridworlds.helper import *
from Practical11_Support.gym_simple_gridworlds.envs.grid_env import GridEnv
from Practical11_Support.gym_simple_gridworlds.envs.grid_2dplot import *

np.set_printoptions(precision=3, suppress=True)

## After-Class Exercise (Student): Implement MC & TD(0) Prediction

### The Grid World environment

Recall the grid in which our robot lives

![GridWorldExample.png](https://i.postimg.cc/5tMM5vqf/Grid-World-Example.png)

- The states $s \in \mathcal{S}$ correspond to locations in the grid. Each location has also a cell index associated to it, e.g., cell index 4 is associated to location (row=1,col=0)
- The robot can move up, down, left, or right. Actions correpond to unit increments or decrements in the specified direction.
    - Up : (-1,0)
    - Down: (1,0)
    - Left: (0,-1)
    - Right: (0, 1)
- Each action is represented by a number. Action (Up) is represented by 0, (Down) by 1, (Left) by 2 and, finally, (Right) by 3. No actions are available at a terminal state
- Discount factor $\gamma = 0.99$ (class attribute ``gamma=0.99``)
- Stochastic transition matrix (class attribute ``noise=0.2``)
- Rewards are only obtained at terminal states (class attribute ``living_reward=-0.04``)

### Generate episode
Let's first define the helper method generate_episode(.). It samples an episode i.e., a sequence of ( s,a,r,s′ ) tuples, from a given policy

In [None]:
Sample = namedtuple('Sample', ['state', 'action', 'reward', 'next_state'])

def generate_episode(grid_env, policy):
    """
    Generate an episode of experiences in environment under a given policy
    :param grid_env (GridEnv): Environment
    :param policy (dict of probabilites): Policy used to sample actions

    :return List(Sample) Complete episode
    """
    episode = []

    # Reset the environment to a random initial state
    state = grid_env.reset()

    # Set flag to indicate whether episode has ended
    done = False

    while not done:
        # Get actions available at current state
        all_actions = list(policy[state].keys())
        # Get action probabilities
        all_probabilities = np.array(list(policy[state].values()))
        # Sample an action from policy
        action = np.random.choice(all_actions, 1, p=all_probabilities)[0]

        next_state, reward, done, info = grid_env.step(action)
        episode.append(Sample(state, action, reward, next_state))
        state = next_state

    return episode

Use the same fixed policy for both (e.g., uniform random).

In [None]:
# Make a uniform-random stochastic policy (state-id → {action: prob})
def make_uniform_random_policy(env):
    acts = env.get_actions()
    prob = 1.0 / len(acts)
    policy = {}

    for sid in env.get_states(): # state ids as keys
        r, c = np.argwhere(env.grid == sid)[0]
        if env.is_terminal_state(r, c):
            policy[sid] = {}
        else:
            policy[sid] = {a: prob for a in acts}
    return policy
    
print("building uniform random policy.")
grid_world = GridEnv(gamma=0.99, noise=0.2, living_reward=-0.04)
pi_random = make_uniform_random_policy(grid_world)

### Implement **MC prediction** (first-visit)  
**Monte-Carlo Method**

Now, under the assumption that $\mathcal{T}(s,a,s')$ and $\mathcal{R}(s,a)$ are unknown, let's use the algorithm shown below to get an estimate $\hat{v}_\pi(s)$ of the true state-value function $v_\pi(s)$

![MCPolicyEvaluation.png](https://i.postimg.cc/6pXj5P6D/MCPolicy-Evaluation.png)

In [None]:
def monte_carlo_first_visit_policy_evaluation(grid_env, episodes):
    """
    Compute estimate of state-value function for a given policy
    :param grid_env (GridEnv): Environment
    :param episodes (List(episode)): Number of episodes to use for prediction
    
    :return List(float): Prediction error after each episode
    :return dict(float): Predicted state-value function

    """
    all_states = grid_env.get_states()

    # Counter of visits for all states
    state_visits = {s:0 for s in all_states}
    # Cummulative return for each state
    state_returns = {s:0 for s in all_states}
    # Predicted state-value function
    pred_v = {s:0 for s in all_states}

    for episode in episodes:
        # Create auxiliary variable to keep of first state visits
        visited = {s: False for s in all_states}
        # Return for current episode
        g = 0

        # Starting from last sampled observation in episode
        for obs in episode[::-1]:
            # Get visited state
            s = obs.state
            #TODO: Compute return for this state ----------------
            g = 
            #ENDTODO --------------------------------------------
            if not visited[s]:
                #TODO: Update state_visits, state_returns, pred_v, and visited ------
                state_visits[s] # Increment the visit counter
                state_returns[s] # Add return
                pred_v[s] # Compute mean return
                visited[s] # Set the state as visited
                #ENDTODO ------------------------------------------------------------
                
    return pred_v, state_visits, state_returns, visited

### Implement **TD(0) prediction**  

**Temporal Difference Method**

Estimate $\hat{v}_\pi(s)$ of the true state-value function $v_\pi(s)$

![TDPolicyEvaluation.png](https://i.postimg.cc/c4yywX4c/TDPolicy-Evaluation.png)

In [None]:
def temporal_learning_policy_evaluation(grid_env, episodes, alpha=0.1):
    """
    Compute estimate of state-value function for a given policy
    :param grid_env (GridEnv): Environment
    :param episodes (List(episode)): Number of episodes to use for prediction
    :param alpha (float): step-size

    :return List(float): Prediction error after each episode
    :return dict(float): Predicted state-value function

    """
    all_states = grid_env.get_states()
    # Predicted state-value function
    pred_v = {s:0 for s in all_states}

    for episode in episodes:
        # Starting from the first sampled observation in episode
        # remember that you have access to: 
        #   - obs.reward, 
        #   - obs.next_state, 
        #   - obs.state,
        for obs in episode:
            # Get visited state
            s = obs.state
            #TODO: Update the v(s) --------------------------------------
            pred_v[s]
            #ENDTODO ----------------------------------------------------
    return pred_v

In [None]:
import otter
grader = otter.Notebook(tests_dir = "Practical11_Support/tests")
grader.check_all()

## 4) Visualization & Comparison

In [None]:
grid_env = GridEnv(gamma=0.99, noise=0.2, living_reward=-0.04)
n_episodes=1000
episodes = []
for i in range(n_episodes):
    episode = generate_episode(grid_env, pi_random)
    episodes.append(episode)

V_mc, _, _, _ = monte_carlo_first_visit_policy_evaluation(grid_env, episodes)
V_td = temporal_learning_policy_evaluation(grid_env, episodes, alpha=0.1)

In [None]:
grid_env = GridEnv(gamma=0.99, noise=0.2, living_reward=-0.04)
print("MC Value Function")
plot_value_function(grid_env, V_mc)
plt.show()
print("TD(0) Value Function")
plot_value_function(grid_env, V_td)
plt.show()
print("One of optimal policy")
plot_policy(grid_env, np.array([[ 3,  3,  3, -1], [ 0, -1,  0, -1], [ 0,  3,  0,  2]]))
plt.show()