[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/abdn-cs3033-ai/practicals/blob/main/week10/tutorial9-rl.ipynb)

# CS3033: Artificial Intelligence

## Tutorial 09: Reinforcement Learning

#### Prof. Felipe Meneguzzi

Adapted from code in the [AIMA-Python](https://github.com/aimacode/aima-python) public repository.

In order to run this tutorial, you need to download the auxiliary files from Github into your notebook, which we do with Jupyter's shell commands (if you downloaded the entire repo, the code below is not necessary).

In [None]:
try:
  import google.colab
  print("We are in Google colab, we need to clone the repo")
  !pip3 install seaborn --user
  !git clone https://github.com/abdn-cs3033-ai/practicals.git
  %cd practicals/week10
except:
  print("Not in colab")

- [Overview](#overview)
- [Passive RL](#passive-reinforcement-learning)
- [Active RL](#active-reinforcement-learning)

## Overview

**Reinforcement Learning** is a family of machine learning techniques concerned with learning how to act in an environment where the agent does not fully know its dynamics. In reinforcement learning, an agent learns how to act in an environment by trial and error, interacting with the environment over repeated trials or episodes in which the agent tries different actions and observes a reward signal. Unlike supervised learning, reinforcement learning provides no explicit *right answer*, but rather a potentially sparse reward signal, as the agent acts in the environment.

### MDPs

The key assumption behind most Reinforcement Learning algorithms is that the environment behaves like an MDP, even if the agent is not aware of all its parameters. Recall the elements of an MDP are the following:

- A finite set of states $\mathcal{S}$ (known by the agent)
- A finite set of actions $\mathcal{A}$ (known by the agent)
- A *markovian* transition model $T(s,a,s') = \mathbb{P}(S_{t+1}=s' \mid S_t = s, A_t = a)$ (possibly known by the agent)
- A reward function $R(s)$, alternatively $R(s,a) = \mathbb{E}[ R_{t+1} \mid S_t = s, A_t = a]$ (unknown by the agent)
- A discount factor $\gamma$ (known by the agent)

We will use an implementation of an MDP (and its algorithms), as an oracle against which we can compare our reinforcement learning approaches. Recall our 

In [None]:
import random
from collections import defaultdict
import numpy as np
from utils4e import vector_add, orientations, turn_right, turn_left
from notebook import psource, pseudocode
from mdp4e import MDP, value_iteration, GridMDP

from tqdm.notebook import tqdm




## Passive Reinforcement Learning

In passive Reinforcement Learning the agent follows a fixed policy $\pi$. Passive learning attempts to evaluate the given policy $pi$ - without any knowledge of the Reward function $R(s)$ and the Transition model $P(s' \mid s, a)$.

This is usually done by some method of **utility estimation** or **prediction**. The agent attempts to directly learn the utility of each state that would result from following the policy. Note that at each step, it has to *perceive* the reward and the state - it has no global knowledge of these. Thus, if in a certain state the entire set of actions offers a very low probability of attaining some state $s_+$ - the agent may never perceive the reward $R(s_+)$.

Consider a situation where an agent is given a policy to follow. Thus, at any point it knows only its current state and current reward, and the action it must take next. This action may lead it to more than one state, with different probabilities.

For a series of actions given by $\pi$, the estimated utility $U$:
$$U^{\pi}(s) = E\left[\sum_{t=0}^{\infty} \gamma^{t} R(s_{t})\right]$$
Or the expected value of summed discounted rewards until termination.

In this tutorial, we implement the method of utility estimation called Temporal Difference (TD) Learning. Instead of explicitly building the transition model $P$, the temporal-difference model uses the expected closeness between the utilities of two consecutive states $s$ and $s'$.
 For the transition $s$ to $s'$, we update the utility of state $s'$ using the following formula:
$$U^{\pi}(s) \gets U^{\pi}(s) + \alpha \left( R(s) + \gamma U^{\pi}(s') - U^{\pi}(s) \right)$$
 This model implicitly incorporates the transition probabilities by being weighed for each state by the number of times it is achieved from the current state. Thus, over a number of iterations, it converges similarly to the Bellman equations.
 The advantage of the TD learning model is its relatively simple computation at each step, rather than having to keep track of various counts.
 For $n_s$ states and $n_a$ actions the ADP model would have $n_s \times n_a$ numbers $N_{sa}$ and $n_s^2 \times n_a$ numbers $N_{s' \mid sa}$ to keep track of. The TD model must only keep track of a utility $U(s)$ for each state.

Our first implementation will be of the Passive TD Agent we saw in the lecture. 

In [None]:
pseudocode("Passive-TD-Agent")

In the cell below, you will implement the passive TD algorithm above within the `__call__` method, which allows us to use this class as a function in the subsequent tests. 

In [None]:
from mdp4e import MDP, policy_evaluation

# 21.2.3 Temporal-difference learning


class PassiveTDAgent:
    """
    [Figure 21.4]
    The abstract class for a Passive (non-learning) agent that uses
    temporal differences to learn utility estimates. Override update_state
    method to convert percept to state and reward. The mdp being provided
    should be an instance of a subclass of the MDP Class.

    import sys
    from mdp import sequential_decision_environment
    north = (0, 1)
    south = (0,-1)
    west = (-1, 0)
    east = (1, 0)
    policy = {(0, 2): east, (1, 2): east, (2, 2): east, (3, 2): None, (0, 1): north, (2, 1): north,
              (3, 1): None, (0, 0): north, (1, 0): west, (2, 0): west, (3, 0): west,}
    agent = PassiveTDAgent(policy, sequential_decision_environment, alpha=lambda n: 60./(59+n))
    for i in range(200):
        run_single_trial(agent,sequential_decision_environment)

    agent.U[(0, 0)] > 0.2
    True
    agent.U[(0, 1)] > 0.2
    True
    """

    def __init__(self, pi, mdp, alpha=None):

        self.pi = pi
        self.U = {s: 0. for s in mdp.states}
        self.Ns = {s: 0 for s in mdp.states}
        self.s = None
        self.a = None
        self.r = None
        self.gamma = mdp.gamma
        self.terminals = mdp.terminals

        if alpha:
            self.alpha = alpha
        else:
            self.alpha = lambda n: 1 / (1 + n)  # udacity video

    def __call__(self, percept):
        s1, r1 = self.update_state(percept)
        pi, U, Ns, s, r = self.pi, self.U, self.Ns, self.s, self.r
        alpha, gamma, terminals = self.alpha, self.gamma, self.terminals
        #### YOUR CODE HERE ####
        






        
        #########################
        return self.a

    def update_state(self, percept):
        """To be overridden in most cases. The default case
        assumes the percept to be of type (state, reward)."""
        return percept

### Demonstrating Passive TD-Learning

To demonstrate these agents, we make use of the `GridMDP` object from the `MDP` module. `sequential_decision_environment` is similar to that used for the `MDP` notebook but has discounting with $\gamma = 0.9$.

Recall that we have a class for grids, such as the one we saw in the lecture, reproduced below.

![Grid World](img/mdp-bare.svg "Grid World MDP illustration")

We instantiate the object **`mdp`** of the class using a list of lists for both the transition and the sensor model. The code below instantiates the Grid World shown above.

The `Agent-Program` can be obtained by creating an instance of the relevant `Agent-Class`. The `__call__` method allows the `Agent-Class` to be called as a function. The class needs to be instantiated with a policy ($\pi$) and an `MDP` whose utility of states will be estimated.



In [None]:
from mdp4e import sequential_decision_environment

The `sequential_decision_environment` is a GridMDP object as shown below. The rewards are **+1** and **-1** in the terminal states, and **-0.04** in the rest. 

![Grid World](img/rl-tutorial.svg "Grid World MDP illustration")

Now we define actions and a policy similar to **Fig 21.1** in the book.

In [None]:
# Action Directions
north = (0, 1)
south = (0,-1)
west = (-1, 0)
east = (1, 0)

policy = {
    (0, 2): east,  (1, 2): east,  (2, 2): east,   (3, 2): None,
    (0, 1): north,                (2, 1): north,  (3, 1): None,
    (0, 0): north, (1, 0): west,  (2, 0): west,   (3, 0): west, 
}

`PassiveTDAgent` uses temporal differences to learn utility estimates. We learn the difference between the states and backup the values to previous states.  Let us look into the source before we see some usage examples. In creating the `TDAgent`, we use the **same learning rate** $\alpha$ as given in the footnote of the book for Figure 22.5.

In [None]:
TDagent = PassiveTDAgent(policy, sequential_decision_environment, alpha = lambda n: 60./(59+n))

def run_single_trial(agent_program, mdp):
    """Execute trial for given agent_program
    and mdp. mdp should be an instance of subclass
    of mdp.MDP """

    def take_single_action(mdp, s, a):
        """
        Select outcome of taking action a
        in state s. Weighted Sampling.
        """
        x = random.uniform(0, 1)
        cumulative_probability = 0.0
        for probability_state in mdp.T(s, a):
            probability, state = probability_state
            cumulative_probability += probability
            if x < cumulative_probability:
                break
        return state

    current_state = mdp.init
    while True:
        current_reward = mdp.R(current_state)
        percept = (current_state, current_reward)
        next_action = agent_program(percept)
        if next_action is None:
            break
        current_state = take_single_action(mdp, current_state, next_action)

Now we run **200 trials** for the agent to estimate Utilities.

In [None]:
for i in tqdm(range(200)):
    run_single_trial(TDagent,sequential_decision_environment)

The calculated utilities are:

In [None]:
print('\n'.join([str(k)+':'+str(v) for k, v in TDagent.U.items()]))

### Comparison with value iteration

We can also compare the utility estimates learned by our agent to those obtained via **value iteration**.

**Note that value iteration has a priori knowledge of the transition table $P$, the rewards $R$, and all the states $s$.**

In [None]:
U = value_iteration(sequential_decision_environment)
print('\n'.join([str(k)+':'+str(v) for k, v in U.items()]))

### Evolution of utility estimates over iterations

We can explore how these estimates vary with time by using plots similar to **Fig 22.5a**. We will first enable `matplotlib` using the inline backend. We also define a function to collect the values of utilities at each iteration.

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

def graph_utility_estimates(agent_program, mdp, no_of_iterations, states_to_graph):
    graphs = {state:[] for state in states_to_graph}
    for iteration in range(1,no_of_iterations+1):
        run_single_trial(agent_program, mdp)
        for state in states_to_graph:
            graphs[state].append((iteration, agent_program.U[state]))
    for state, value in graphs.items():
        state_x, state_y = zip(*value)
        plt.plot(state_x, state_y, label=str(state))
    plt.ylim([0,1.2])
    plt.legend(loc='lower right')
    plt.xlabel('Iterations')
    plt.ylabel('U')

Here is a plot of state $(2,2)$.

In [None]:
agent = PassiveTDAgent(policy, sequential_decision_environment, alpha=lambda n: 60./(59+n))
graph_utility_estimates(agent, sequential_decision_environment, 500, [(2,2)])

It is also possible to plot multiple states on the same plot. As expected, the utility of the finite state $(3,2)$ stays constant and is equal to $R((3,2)) = 1$.

In [None]:
graph_utility_estimates(agent, sequential_decision_environment, 500, [(2,2), (3,2)])

## Active Reinforcement Learning

Unlike Passive Reinforcement Learning in Active Reinforcement Learning we are not bound by a policy $\pi$ and we need to select our actions. In other words the agent needs to learn an optimal policy. The fundamental tradeoff the agent needs to face is that of exploration vs. exploitation. 

### QLearning Agent

The `QLearningAgent` class implements the Agent Program described in **Fig 22.8** of the AIMA Book. In Q-Learning the agent learns an action-value function Q which gives the utility of taking a given action in a particular state. Q-Learning does not require a transition model and hence is a model free method. Let us look into its pseudocode before we implement our own version of this algorithm.

In [None]:
pseudocode("Q-Learning-Agent")

The Agent Program can be obtained by creating the instance of the class by passing the appropriate parameters. Because of the ``__call__`` method the object that is created behaves like a callable and returns an appropriate action as most Agent Programs do. To instantiate the object we need an MDP similar to the `PassiveTDAgent`. Let us use the same GridMDP object we used above. 

The `QLearningAgent` class also implements an exploration function **`f`** which returns fixed `Rplus` ($R^{+}$) until agent has visited state, action **`Ne`** ($N_{e}$) number of times. This is the same as the one defined for our optimistic initialisation. The method **`actions_in_state`** returns actions possible in given state. It is useful when applying `max` and `argmax` operations.


In [None]:
class QLearningAgent:
    """
    [Figure 21.8]
    An exploratory Q-learning agent. It avoids having to learn the transition
    model because the Q-value of a state can be related directly to those of
    its neighbors.

    import sys
    from mdp import sequential_decision_environment
    north = (0, 1)
    south = (0,-1)
    west = (-1, 0)
    east = (1, 0)
    policy = {(0, 2): east, (1, 2): east, (2, 2): east, (3, 2): None, (0, 1): north, (2, 1): north,
              (3, 1): None, (0, 0): north, (1, 0): west, (2, 0): west, (3, 0): west,}
    q_agent = QLearningAgent(sequential_decision_environment, Ne=5, Rplus=2, alpha=lambda n: 60./(59+n))
    for i in range(200):
        run_single_trial(q_agent,sequential_decision_environment)

    q_agent.Q[((0, 1), (0, 1))] >= -0.5
    True
    q_agent.Q[((1, 0), (0, -1))] <= 0.5
    True
    """

    def __init__(self, mdp, Ne, Rplus, alpha=None):

        self.gamma = mdp.gamma
        self.terminals = mdp.terminals
        self.all_act = mdp.actlist
        self.Ne = Ne  # iteration limit in exploration function
        self.Rplus = Rplus  # large value to assign before iteration limit
        self.Q = defaultdict(float)
        self.Nsa = defaultdict(float)
        self.s = None
        self.a = None
        self.r = None

        if alpha:
            self.alpha = alpha
        else:
            self.alpha = lambda n: 1. / (1 + n)  # udacity video

    def f(self, u, n):
        """Exploration function. Returns fixed Rplus until
        agent has visited state, action a Ne number of times.
        Same as ADP agent in book."""
        if n < self.Ne:
            return self.Rplus
        else:
            return u

    def actions_in_state(self, state):
        """Return actions possible in given state.
        Useful for max and argmax."""
        if state in self.terminals:
            return [None]
        else:
            return self.all_act

    def __call__(self, percept):
        s1, r1 = self.update_state(percept)
        Q, Nsa, s, a, r = self.Q, self.Nsa, self.s, self.a, self.r
        alpha, gamma, terminals = self.alpha, self.gamma, self.terminals,
        actions_in_state = self.actions_in_state

        #### YOUR CODE HERE ####
        








        
        #########################
        return self.a

    def update_state(self, percept):
        """To be overridden in most cases. The default case
        assumes the percept to be of type (state, reward)."""
        return percept

Let us create our object now. We also use the **same alpha** as  before. We use **`Rplus = 2`** and **`Ne = 5`** as defined for **Fig 22.7**.

In [None]:
q_agent = QLearningAgent(sequential_decision_environment, Ne=5, Rplus=2, 
                         alpha=lambda n: 60./(59+n))

Now to try out the q_agent we use the **`run_single_trial`** function (which was also used above). Let us use **200** iterations.

In [None]:
for i in tqdm(range(200)):
    run_single_trial(q_agent,sequential_decision_environment)

Now let us see the Q Values. The keys are state-action pairs. Where different actions correspond according to:

north = (0, 1)
south = (0,-1)
west = (-1, 0)
east = (1, 0)

In [None]:
q_agent.Q
print('\n'.join([str(k)+':'+str(v) for k, v in q_agent.Q.items()]))

The Utility **U** of each state is related to **Q** by the following equation.

$U(s) = \max_{a}Q(s, a)$

Let us convert the Q Values above into U estimates.


In [None]:
U = defaultdict(lambda: -1000.) # Very Large Negative Value for Comparison see below.
for state_action, value in q_agent.Q.items():
    state, action = state_action
    if U[state] < value:
                U[state] = value
print('\n'.join([str(k)+':'+str(v) for k, v in U.items()]))

Let us finally compare these estimates to `value_iteration` results.

In [None]:
Uvi = value_iteration(sequential_decision_environment)
print('\n'.join([str(k)+':'+str(v) for k, v in Uvi.items()]))

And finally, let us compare the resulting policies.

In [None]:
from mdp4e import best_policy


mdp = sequential_decision_environment

mdp.display_policy(best_policy(mdp,U))
mdp.display_policy(best_policy(mdp,Uvi))