[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/pucrs-automated-planning/fa-rl/blob/main/src/Function-Approximation.ipynb)

# Reinforcement Learning

## Tutorial 03: Function Approximation

#### Prof. Felipe Meneguzzi

### Introduction

In this practical, you will implement one particular type of function approximation for reinforcement learning: [Tile Coding](http://www.incompleteideas.net/book/RLbook2018.pdf#page=239). Using this encoding we will implement function approximation in the [Sarsa](https://en.wikipedia.org/wiki/State–action–reward–state–action) algorithm. Our implementation will be based on a simple [RLAgent](rl/agent.py) class provided by the instructor, and the simplest of the [Gym](https://github.com/openai/gym) environments.

<div class="alert alert-block alert-warning">
You will need to configure your environment following the instructions in the `README.md` file included in this repository.
</div>

In [None]:
try:
  import google.colab
  print("We are in Google colab, we need to clone the repo")
  !git clone https://github.com/pucrs-automated-planning/fa-rl.git
  %cd fa-rl
  %pip install -r requirements.txt
  %cd src
except:
  print("Not in colab")

## The agent interface

In this notebook we will use a basic agent interface called [`RLAgent`](src/rl/agent.py). Open this file and become familiar with its key attributes and methods. 

The key attributes you will need for this notebook are the following:
- `self.env` — a reference to the [environment from gym](https://github.com/openai/gym/blob/58ed658d9b15fd410c50d1fdb25a7cad9acb7fa4/gym/core.py#L8);
- `self.episodes` — the number of episodes to train the agent when invoking [`RLAgent.learn()`](rl/agent.py#L125);
- `self.alpha` — the *learning rate* to be used when computing the RL update;
- `self.gamma` — the *discount factor* for the return computed during training; and
- `self.q_table` — a dictionary of lists of floating point numbers indexed by the state and an action (e.g., `self.q_table[state][action]` should return the q-value of the `state`-`action` pair)
- `self.actions` — an integer representing the size of the action space
- `self.last_state` — the last state visited by the agent during learning
- `self.last_action` — the last action chosen by the agent during learning
- `self._random` — a reference to a Python [Random](https://docs.python.org/3/library/random.html#random.Random) object.

Pay attention to the `self.random` object, and use it instead of the unbound methods from Python's [random library](https://docs.python.org/3/library/random.html), otherwise the tests will not work. 

The key methods you should be aware of are the following:
- [`policy`](rl/agent.py#L90) — returns the greedy policy following the currently learned q-values;
- [`epsilon_greedy_policy`](rl/agent.py#L101) — returns an $\epsilon$-greedy policy;
- [`agent_start`](rl/agent.py#L55) — the method to be called once *before each episode*;
- [`agent_step`](rl/agent.py#L67) — the method to be called once at *each time step* within an episode;
- [`agent_end`](rl/agent.py#L80) — the method to be called once at the *end of each episode* when the agent reaches a *terminal state*; and
- [`learn`](rl/agent.py#L125) — runs the underlying learning algorithm for `self.episodes` episodes

## Mountain Car Environment

In this tutorial, we will use the *Mountain Car Task* introduced in [Section 10.1 of the textbook](http://www.incompleteideas.net/book/RLbook2018.pdf#page=267). 

The task is for an under powered car to make it to the top of a hill:

![Mountain Car](img/mountain-car.svg "Mountain Car")

The car is under-powered, so the agent needs to learn to rock back and forth to get enough momentum to reach the goal. At each time step the agent receives from the environment its current velocity (a float between -0.07 and 0.07), and its current position (a float between -1.2 and 0.5). Since our state is continuous there are a potentially infinite number of states that our agent could be in. We need a function approximation method to help the agent deal with this. In this notebook we will use tile coding. We provide a tile coding implementation for you to use, imported above with tiles3.

In [None]:
import gym
import numpy as np
from rl.agent import RLAgent, Action, State
from random import Random
from gym import Env
from typing import Any, Collection, List, NoReturn, Sequence, Tuple
from numpy.typing import ArrayLike
import rl.tiles3 as tc
import itertools
import matplotlib.pyplot as plt
import time

env = gym.make("MountainCar-v0")

#### Tile Coding Helper Function

To begin we are going to build a tile coding class for our Sarsa agent that will make it easier to make calls to our tile coder. This is based on code we adapt from Rich Sutton. 

The text book introduces Tile coding in [Section 9.5.4 of the textbook](http://www.incompleteideas.net/book/RLbook2018.pdf#page=239) as a way to create features that can both provide good generalization and discrimination. It consists of multiple overlapping tilings, where each tiling is a partitioning of the space into tiles.

![Tile Coding](img/tile-coding-example.svg "Tile Coding")

To help keep our agent code clean we are going to make a function specific for tile coding for our Mountain Car environment. To help we are going to use the Tiles3 library. This is a Python 3 implementation of the tile coder. To start take a look at the documentation: [Tiles3 documentation](http://incompleteideas.net/tiles/tiles3.html)
To get the tile coder working we need to implement a few pieces:
- First: create an index hash table — this is done for you in the ```init``` function using ```tc.IHT```.
- Second is to scale the inputs for the tile coder based on the number of tiles and the range of values each input could take. The tile coder needs to take in a number in range [0, 1], or scaled to be [0, 1] * num_tiles. For more on this refer to the [Tiles3 documentation](http://incompleteideas.net/tiles/tiles3.html).
- Finally we call ```tc.tiles``` to get the active tiles back.

In [None]:
class MountainCarTileCoder:
    def __init__(self, iht_size=4096, num_tilings=8, num_tiles=8):
        """
        Initializes the MountainCar Tile Coder
        Initializers:
        iht_size -- int, the size of the index hash table, typically a power of 2
        num_tilings -- int, the number of tilings
        num_tiles -- int, the number of tiles. Here both the width and height of the
                     tile coder are the same
        Class Variables:
        self.iht -- tc.IHT, the index hash table that the tile coder will use
        self.num_tilings -- int, the number of tilings the tile coder will use
        self.num_tiles -- int, the number of tiles the tile coder will use
        """
        self.iht = tc.IHT(iht_size)
        self.num_tilings = num_tilings
        self.num_tiles = num_tiles
    
    def get_tiles(self, position: float, velocity: float) -> ArrayLike:
        """
        Takes in a position and velocity from the mountaincar environment
        and returns a numpy array of active tiles.
        
        Arguments:
        position -- float, the position of the agent between -1.2 and 0.5
        velocity -- float, the velocity of the agent between -0.07 and 0.07
        returns:
        tiles - np.array, active tiles
        """
        # Use the ranges above and self.num_tiles to scale position and velocity to the range [0, 1]
        # then multiply that range with self.num_tiles so it scales from [0, num_tiles]
        
        position_scaled = 0
        velocity_scaled = 0
        
        # YOUR CODE HERE (2 lines)
        position_scaled = ((position+1.2)/1.7)*self.num_tiles
        velocity_scaled = ((velocity+0.07)/0.14)*self.num_tiles
        # END CODE
        
        # get the tiles using tc.tiles, with self.iht, self.num_tilings and [scaled position, scaled velocity]
        # nothing to implment here
        tiles = tc.tiles(self.iht, self.num_tilings, [position_scaled, velocity_scaled])
        
        return np.array(tiles)

We can now test whether this implementation worked.

In [None]:
# create a range of positions and velocities to test
# then test every element in the cross-product between these lists
pos_tests = np.linspace(-1.2, 0.5, num=5)
vel_tests = np.linspace(-0.07, 0.07, num=5)
tests = list(itertools.product(pos_tests, vel_tests))

mctc = MountainCarTileCoder(iht_size=1024, num_tilings=8, num_tiles=2)

t = []
for test in tests:
    position, velocity = test
    tiles = mctc.get_tiles(position=position, velocity=velocity)
    t.append(tiles)

expected = [
    [0, 1, 2, 3, 4, 5, 6, 7],
    [0, 1, 8, 3, 9, 10, 6, 11],
    [12, 13, 8, 14, 9, 10, 15, 11],
    [12, 13, 16, 14, 17, 18, 15, 19],
    [20, 21, 16, 22, 17, 18, 23, 19],
    [0, 1, 2, 3, 24, 25, 26, 27],
    [0, 1, 8, 3, 28, 29, 26, 30],
    [12, 13, 8, 14, 28, 29, 31, 30],
    [12, 13, 16, 14, 32, 33, 31, 34],
    [20, 21, 16, 22, 32, 33, 35, 34],
    [36, 37, 38, 39, 24, 25, 26, 27],
    [36, 37, 40, 39, 28, 29, 26, 30],
    [41, 42, 40, 43, 28, 29, 31, 30],
    [41, 42, 44, 43, 32, 33, 31, 34],
    [45, 46, 44, 47, 32, 33, 35, 34],
    [36, 37, 38, 39, 48, 49, 50, 51],
    [36, 37, 40, 39, 52, 53, 50, 54],
    [41, 42, 40, 43, 52, 53, 55, 54],
    [41, 42, 44, 43, 56, 57, 55, 58],
    [45, 46, 44, 47, 56, 57, 59, 58],
    [60, 61, 62, 63, 48, 49, 50, 51],
    [60, 61, 64, 63, 52, 53, 50, 54],
    [65, 66, 64, 67, 52, 53, 55, 54],
    [65, 66, 68, 67, 56, 57, 55, 58],
    [69, 70, 68, 71, 56, 57, 59, 58],
]
assert np.all(expected == np.array(t))

## Q-Learning with Function Approximation

**Description** 

As we saw in the lecture, Q-Learning with Function approximation replaces the update on specific states for the update on the model weights. Tile coding lets us do that without the gradients, since we are only updating the affected tiles. So, instead of applying the semi-gradient update:

$$\bm{w} \gets \bm{w} + \alpha \left[ R_{t+1} + \gamma \max_{a'} \hat{q}(S_{t+1}, a', \bm{w}) - \hat{q}(S_{t}, A_{t}, \bm{w}) \right]\nabla\hat{q}(S_{t}, A_{t}, \bm{w})$$

We can simply ignore the gradients:

$$\bm{w} \gets \bm{w} + \alpha \left[ R_{t+1} + \gamma \max_{a'} \hat{q}(S_{t+1}, a', \bm{w}) - \hat{q}(S_{t}, A_{t}, \bm{w}) \right]$$

In the cell below, you should implement the following methods (as noted in the source code):
- `best_action` - You should iterate over possible actions and compute the value using the weights and the unpacked features
- `policy` and `epsilon_greedy_policy` - you can accomplish this by referring to the `best_action` method you just implemented
- `agent_start` - initialize the last state with the initial state given by the environment and choose one initial action
- `agent_step` - this should follow the algorithm in Section 9.3 from Sutton and Barto
- `agent_end` - one last update at the end of the episode, but using just the old value of Q as the TD-error

In [None]:
class FAQLearner(RLAgent):
    """
    A simple tile-coded FA Q-Learning agent.
    """
    def __init__(self,
                 env: Env,
                 episodes: int = 500,
                 decaying_eps: bool = True,
                 eps: float = 1.0,
                 alpha: float = 0.5,
                 decay: float = 0.000002,
                 gamma: float = 1.0,
                 rand: Random = Random(),
                 iht_size: int = 4096,
                 w: ArrayLike = None,
                 num_tilings: int = 8,
                 num_tiles: int = 8,
                 initial_weights: float = 0.0,
                #  num_actions: int = 3,
                 **kwargs):
        super().__init__(env, episodes=episodes, decaying_eps=decaying_eps, eps=eps, alpha=alpha, decay=decay, gamma=gamma, rand=rand)
        # self.actions = env.action_space.n
        self.actions = 3
        # Function approximation parameters
        self.num_tilings = num_tilings
        self.num_tiles = num_tiles
        self.iht_size = iht_size
        self.initial_weights = initial_weights
        # self.num_actions = num_actions # TODO Double check that this is not redundant with self.actions

        # We initialize self.mctc to the mountaincar verions of the 
        # tile coder that we created
        self.tc = MountainCarTileCoder(iht_size=self.iht_size, 
                                         num_tilings=self.num_tilings, 
                                         num_tiles=self.num_tiles)

        # We initialize self.w to three times the iht_size. Recall this is because
        # we need to have one set of weights for each action.
        self.w = np.ones((self.actions, self.iht_size)) * self.initial_weights

        # hyperparameters
        self.episodes = episodes
        self.gamma = gamma
        self.decay = decay
        self.c_eps = eps
        self.base_eps = eps
        if decaying_eps:
            def epsilon():
                self.c_eps = max((self.episodes - self.step)/self.episodes, 0.01)
                return self.c_eps
            self.eps = epsilon
        else:
            self.eps = lambda: eps
        self.decaying_eps = decaying_eps
        # self.alpha = alpha
        self.alpha = alpha / self.num_tilings # To comply with the course from coursera
        self.last_state = None
        self.last_action = None

    def argmax(self, q_values):
        top = float("-inf")
        ties = []

        for i in range(len(q_values)):
            if q_values[i] > top:
                top = q_values[i]
                ties = []

            if q_values[i] == top:
                ties.append(i)

        return np.random.choice(ties)

    def get_active_tiles(self, state: State) -> ArrayLike:
        position, velocity = state
        
        # Use self.tc to set active_tiles using position and velocity
        active_tiles = self.tc.get_tiles(position, velocity)
        return active_tiles

    def best_action(self, tiles: ArrayLike) -> Tuple[Action, float]:
        """Returns the best action for a state, breaking ties randomly

        Args:
            state (State): The state in which to extract the best action

        Returns:
            int: The index of the best action
        """
        action_values = []
        chosen_action = None

        ## YOUR CODE HERE (3 lines)
        
        

        ## END CODE
    
    def get_max_q(self, state: State) -> float:
        tiles = self.get_active_tiles(state)
        return np.max(self.best_action(tiles)[0])
    
    def get_q_value(self, state: State, action: Any) -> float:
        tiles = self.get_active_tiles(state)
        return np.sum(self.w[action][tiles])

    
    def policy(self, tiles: ArrayLike) -> Tuple[Action, float]:
        """Returns the greedy deterministic policy for the specified state

        Args:
            state (State): the state for which we want the action

        Raises:
            InvalidAction: Not sure about this one

        Returns:
            Tuple[Action, float]: The greedy action learned for state and its value
        """
        ## YOUR CODE HERE (1 line)
        
        ## END CODE

    def epsilon_greedy_policy(self, tiles: ArrayLike) -> Tuple[Action, float]:
        """Returns the epsilon-greedy policy

        Args:
            state (State): The state for which to return the epsilon greedy policy

        Returns:
            Tuple[Action, float]: The action to be taken and its value
        """
        action_values = []
        chosen_action = None
        eps = self.eps()
        ## YOUR CODE HERE (6 lines)
        
        




        ## END CODE
        return chosen_action, action_values[chosen_action]
    
    def agent_start(self, state: State) -> Action:
        """The first method called when the experiment starts,
        called after the environment starts.
        Args:
            state (Numpy array): the state from the
                environment's env_start function.
        Returns:
            (int) the first action the agent takes.
        """
        ## YOUR CODE HERE (2 lines)
        

        ## END CODE
        self.last_state = state
        self.previous_tiles = np.copy(active_tiles)
        return self.last_action

    def agent_step(self, reward: float, state: State) -> Action:
        """A step taken by the agent.

        Args:
            reward (float): the reward received for taking the last action taken
            state (Any): the state from the
                environment's step based on where the agent ended up after the
                last step
        Returns:
            (int) The action the agent takes given this state.
        """
        ## YOUR CODE HERE (6 lines)
        
        




        ## END CODE
        
        # action = self.best_action(state)
        self.last_state = state
        self.last_action = action
        self.previous_tiles = np.copy(active_tiles)
        return action

    def agent_end(self, reward: float) -> NoReturn:
        """Called when the agent terminates.

        Args:
            reward (float): the reward the agent received for entering the
                terminal state.
        """
        ## YOUR CODE HERE (3 lines)
        
        
        
        ## END CODE

In [None]:
# Test for Q-Learning
## Mini test
np.random.seed(0)


agent = FAQLearner(env, decaying_eps=False, eps=0.1, rand=Random(0))
agent.w = np.array([np.array([1, 2, 3]), np.array([4, 5, 6]), np.array([7, 8, 9])])

action_distribution = np.zeros(3)
for i in range(1000):
    chosen_action, action_value = agent.epsilon_greedy_policy(np.array([0,1]))
    action_distribution[chosen_action] += 1
    
print("action distribution:", action_distribution)
# notice that the two non-greedy actions are roughly uniformly distributed
assert np.all(action_distribution == [37, 23, 940])

agent = FAQLearner(env, decaying_eps=False, eps=0.0, rand=Random(0))
agent.w = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])

chosen_action, action_value = agent.epsilon_greedy_policy(np.array([0,1]))
assert chosen_action == 2
assert action_value == 15

# test update

agent = FAQLearner(env, decaying_eps=False, eps=0.1, rand=Random(0))

agent.agent_start((0.1, 0.3))
agent.agent_step(1, (0.02, 0.1))

assert np.all(agent.w[0,0:8] == 0.0625)
assert np.all(agent.w[1:] == 0)

Now let us test our agent in the Mountain Car environment from Gym

In [None]:
num_runs = 10
num_episodes = 50
all_steps = []
all_rewards = []

agent = FAQLearner
env = gym.make("MountainCar-v0")
env.seed(0)
env._max_episode_steps = 15000
start = time.time()

# q_agent = FAQLearner(env, episodes=num_episodes, gamma=1, eps=0.1, alpha=0.5, decaying_eps=False, rand=Random(42))
# rewards_q, steps_per_episode = q_agent.learn(max_tsteps=15000)

for run in range(num_runs):
    q_agent = agent(env, episodes=num_episodes, gamma=1, eps=0.1, alpha=0.5, decaying_eps=False, rand=Random(42))
    
    if run % 5 == 0:
        print("RUN: {}".format(run))
    rewards_per_episode, steps_per_episode = q_agent.learn(max_tsteps=15000)

    all_rewards.append(rewards_per_episode)
    all_steps.append(steps_per_episode)

print("Run time: {}".format(time.time() - start))

mean_steps = np.mean(all_steps, axis=0)
plt.title("Mean steps")
plt.plot(mean_steps, label='Q-Learning')
plt.show()
print("Mean Steps", mean_steps)
plt.title("Mean Reward")
mean_reward = np.mean(all_rewards, axis=0)
plt.plot(mean_reward, label='Q-Learning')
plt.show()

## Sarsa

Sarsa is an on policy algorithm where the source and target policy are the same. In our implementation, we will use the $\epsilon$-greedy policy we implemented above.

$$\bm{w} \gets \bm{w} + \alpha \left[ R_{t+1} + \gamma \hat{q}(S_{t+1}, A_{t+1}, \bm{w}) - \hat{q}(S_{t}, A_{t}, \bm{w}) \right]$$

Note that we can implement Sarsa by simply subclassing the `FAQLearner` class we just implemented. This time, the only modification we need is in the `agent_step` method. 

In [None]:
class FASarsaLearner(FAQLearner):
    def agent_step(self, reward: float, state: State) -> Action:
        """A step taken by the agent.

        Args:
            reward (float): the reward received for taking the last action taken
            state (Any): the state from the
                environment's step based on where the agent ended up after the
                last step
        Returns:
            (int) The action the agent takes given this state.
        """
        ## YOUR CODE HERE (5 lines)
        
        


        
        ## END CODE

        self.last_state = state
        self.last_action = action
        self.previous_tiles = np.copy(active_tiles)
        return action

In [None]:
# Test for Sarsa
num_runs = 10
num_episodes = 50
all_steps = []
all_rewards = []

agent = FASarsaLearner
env = gym.make("MountainCar-v0")
env.seed(0)
env._max_episode_steps = 15000
start = time.time()

for run in range(num_runs):
    sarsa_agent = agent(env, episodes=num_episodes, gamma=1, eps=0.1, alpha=0.5, decaying_eps=False, rand=Random(42))
    
    if run % 5 == 0:
        print("RUN: {}".format(run))
    rewards_per_episode, steps_per_episode = sarsa_agent.learn(max_tsteps=15000)

    all_rewards.append(rewards_per_episode)
    all_steps.append(steps_per_episode)

print("Run time: {}".format(time.time() - start))

mean_steps = np.mean(all_steps, axis=0)
plt.title("Mean steps")
plt.plot(mean_steps, label='Sarsa')
plt.show()
print("Mean Steps", mean_steps)
plt.title("Mean Reward")
mean_reward = np.mean(all_rewards, axis=0)
plt.plot(mean_reward, label='Sarsa')
plt.show()

# Comparing Learning Methods

Our final experiment compares the accumulated rewards for each of our TD learning algorithms. 
<!-- This experiment replicates exactly example 6.6 (plus Expected Sarsa) in the book by plotting the cumulative rewards per time step of training each of our algorithms achieves in the Cliff Walking environment.  -->

<!-- If your implementation is correct, you should see a graph like the one below:
!["Cumulative Rewards"](figure_example_6_6.svg). -->

In [None]:
num_runs = 10
num_episodes = 50
q_steps = []
q_rewards = []
sarsa_steps = []
sarsa_rewards = []

env = gym.make("MountainCar-v0")
env.seed(0)
env._max_episode_steps = 15000
start = time.time()

# q_agent = FAQLearner(env, episodes=num_episodes, gamma=1, eps=0.1, alpha=0.5, decaying_eps=False, rand=Random(42))
# rewards_q, steps_per_episode = q_agent.learn(max_tsteps=15000)

for run in range(num_runs):
    q_agent = FAQLearner(env, episodes=num_episodes, gamma=1, eps=0.1, alpha=0.5, decaying_eps=False, rand=Random(42))
    sarsa_agent = FASarsaLearner(env, episodes=num_episodes, gamma=1, eps=0.1, alpha=0.5, decaying_eps=False, rand=Random(42))
    
    if run % 5 == 0:
        print("RUN: {}".format(run))
    rewards_per_episode, steps_per_episode = q_agent.learn(max_tsteps=15000)
    q_rewards.append(rewards_per_episode)
    q_steps.append(steps_per_episode)

    rewards_per_episode, steps_per_episode = sarsa_agent.learn(max_tsteps=15000)
    sarsa_rewards.append(rewards_per_episode)
    sarsa_steps.append(steps_per_episode)

print("Run time: {}".format(time.time() - start))

plt.title("Mean steps")
mean_steps = np.mean(q_steps, axis=0)
plt.plot(mean_steps, label='Q-Learning')
mean_steps = np.mean(sarsa_steps, axis=0)
plt.plot(mean_steps, label='Sarsa')
plt.show()
# print("Mean Steps", mean_steps)
plt.title("Mean Reward")
mean_reward = np.mean(q_rewards, axis=0)
plt.plot(mean_reward, label='Q-Learning')
mean_reward = np.mean(sarsa_rewards, axis=0)
plt.plot(mean_reward, label='Sarsa')
plt.show()
