# Getting started with reinforcement learning: TD learning


In [None]:
import random
import time

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

from matplotlib.colors import ListedColormap

## Markov Decision Processes (MDP)

A MDP consists of

* An agent, or decision maker, who observes the "state" ($S_t$) and each period chooses an action ($A_t$)
* There is an environment that takes the current state and the action taken by the decision maker and produces a new state ($S_{t+1}$). The state follows the Markov property, i.e. $p(S_{t+1} | S^t, A_t) = p(S_{t+1} | S_t, A_t)$.
* After selecting an action, the decision maker earns and a reward in the next period ($R_{t+1}$).
* Agent seeks to maximize their "lifetime" (whether finite or infinite) discounted rewards.

Macro models often fit into the category of a MDP.

### Policy and value functions

Two objects of interest that we can recall 

**Policy function**

A policy function, $\pi(s)$ is a mapping from states to a probability distribution over actions.

If an agent is following $\pi$ then $\pi(a | s)$ is the probability that the agent selection action $a$ when it observes state $s$.

**Value function**

A value function, $v_\pi(s)$, is the value of being in state $s$ and following the specified policy function, $\pi(s)$, into the future.

$$v_\pi(s) = E \left[ \sum_{\tau=0}^\infty \gamma^{\tau} R^\pi_{t+\tau} \right]$$


### Action-value function

We introduce a new version of the value function called the _action-value function_. The action-value function representes the value of being in state $s$ and choosing to take action $a$ today and then following $\pi(s)$ into the future

$$q_\pi(s, a) = E \left[ R_{t+1}(a, s) + \sum_{\tau=1}^\infty \gamma^{\tau} R^\pi_{t+\tau} \right]$$

We can think about the value function in terms of the action-value function by writing:

$$v_\pi(s) = \arg \max_a q_\pi(s, a)$$

### What is the solution?

The solution to a MDP is an optimal policy function ($\pi^*$) and an optimal value function ($v^*$).

The solutions satisfy the Bellman equation given by

$$v^*(s) = E \left[ R_{t+1}(s, \pi^*(s)) + \gamma v^*(s') \right]$$

### Finding the solution with dynamic programming

We saw with John some ways that we can find the solutions $v^*$ and $\pi^*$ -- The key thing I want to remind us of is that the algorithm went something like:

1. Take an initial guess at $v^*$
2. Iterate through all of the states and find the optimal action -- Use this to update your guess of $v^*$.
3. Check whether $v^*$ is roughly the same and, if it is, then you've found a solution

There are variations on this algorithm but they roughly 

**How reinforcement learning is going to differ**

Reinforcement learning is going to slightly differ on two dimensions:

1. We assume that the decision maker understands all of the probability distributions that govern the model in dynamic programming but, in reinforcement learning, the only information that the decision maker receives is the rewards that they get.
2. For each "update pass" that we make, we are not going to iterate through "all of the possible states" -- The agent simply is going to "interact with the environment.

_Note_: There are some clever papers that work to reduce the number of states that you iterate through in dynamic programming as well.

## Temporal difference (TD) learning

In the spirit of "walk before you run", we are going to start by learning about a discrete setting where:

* The set of states $\mathcal{S}$ is discrete.
* The set of actions $\mathcal{A}(S_t)$ is discrete.

This means that our policy function and our value function are effectively just "look-up" tables.

Let's rewrite our action-state value function in the optimal format and do some rearranging:

\begin{align*}
  Q(s_t, a_t) = E \left[ R_{t+1} + \gamma \max_{a_{t+1}} Q(s_{t+1}, a_{t+1}) | s_t, a_t \right] \\
\end{align*}

Now suppose that we had some tuple of $(s_t, a_t, r_{t+1}, s_{t+1}, a_{t+1})$ that came from "somewhere".

We could define $\delta_t$ as

\begin{align*}
  \delta_t = R_{t+1} + (\gamma Q(s_{t+1}, a_{t+1}) - Q(s_t, a_t))
\end{align*}

Notice that this is a _sample value_ not an evaluation of the expectation -- It gives us some sense of whether $Q(s_t, a_t)$ was too high or too low but we shouldn't expect $\delta_t$ to be 0... We just expect that if we did this lots of times then it would be 0 in expectation.

TD learning methods are going to use $\delta_t$ as a way to update our beliefs about the action-state value function. More specifically, once we have $(s_t, a_t, r_{t+1}, s_{t+1}, a_{t+1})$ and $\delta_t$ we are going to update our action-state value function by doing a TD(0) update:

\begin{align*}
  Q(s_t, a_t) = Q(s_t, a_t) + \alpha \delta_t
\end{align*}

where $\alpha$ is something like a "learning rate" like you would have in an algorithm like gradient descent. It ensures that steps aren't too big and that we don't get oscillatory behavior.

> There are extensions to the temporal difference that allow for multiple time periods. The 0 in $TD(0)$ indicates that this is *one-step* TD learning. See Chapters 7 and 12 of Sutton/Barto for more info.

## Cliff-walking environment

In [None]:
class CliffWalkingEnvironment:
    def __init__(self, height=4, width=12):
        self.height = height
        self.width = width

        self.start_loc = (0, 0)
        self.end_loc = (width-1, 0)

        self.cliff = [(i, 0) for i in range(1, width-1)]

        self.loc = self.start_loc
        self.total_cost = 0

    def reset(self):
        self.loc = self.start_loc
        self.total_cost = 0

    def is_inbounds(self, loc):
        x, y = loc

        inbounds = True
        if x < 0:
            inbounds = False
        if x >= self.width:
            inbounds = False
        if y < 0:
            inbounds = False
        if y >= self.height:
            inbounds = False

        return inbounds

    def loc_update(self, loc, action):
        x, y = loc

        # Left
        if action == 0:
            x = x - 1
        # Right
        elif action == 1:
            x = x + 1
        # Up
        elif action == 2:
            y = y + 1
        # Down
        else:
            y = y - 1

        return (x, y)

    def enumerate_options(self, loc):
        actions = []

        for action in [0, 1, 2, 3]:
            if self.is_inbounds(self.loc_update(loc, action)):
                actions.append(action)

        return actions

    def step(self, action):
        x, y = self.loc

        # Update location
        xu, yu = self.loc_update((x, y), action)

        # Force inbounds -- If action would move you out of bounds
        # then leave the agent where they were
        inbounds = self.is_inbounds((xu, yu))
        if not inbounds:
            xu, yu = x, y

        # Figure out the penalty
        penalty = 1
        if (xu, yu) in self.cliff:
            penalty = 100

            # If you fall off the cliff, go back to the
            # starting position
            xu, yu = self.start_loc
        self.total_cost += penalty

        # Check if we're finished
        self.loc = (xu, yu)
        terminated = False
        if (xu, yu) == self.end_loc:
            terminated = True

        return (xu, yu), penalty, terminated

## Representing $Q(s_t, a_t)$

Again, as stated above, everything in our model is going to initially be discrete. Discrete states. Discrete actions.

We will represent Q with a matrix -- It can be something along the lines of `(n_states, n_actions)`.

### Sarsa Algorithm

- The Sarsa algorithm is one version of a TD algorithm
- The algorithm is summarized by Barto and Sutton as follows (section 6.4)

![sarsa_barto_sutton.png](https://compsosci-resources.s3.amazonaws.com/images/sarsa_barto_sutton.png)

In [None]:
class SARSAAgent:
    def __init__(self, env, alpha=0.1, gamma=0.9, epsilon=0.1):
        """
        Initialize a SARSA agent.
        
        Args:
            env: The environment the agent will interact with
            alpha: Learning rate
            gamma: Discount factor
            epsilon: Exploration rate
        """
        self.env = env
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        
        # Initialize Q-table with zeros
        # State is (x, y) and actions are 0, 1, 2, 3 (left, right, up, down)
        self.Q = np.zeros((env.width, env.height, 4))
        
        # For tracking learning progress
        self.episode_rewards = []
        self.episode_lengths = []
    
    def choose_action(self, state, explore=True):
        """
        Choose an action using epsilon-greedy policy.
        
        Args:
            state: Current state (x, y)
            explore: Whether to use exploration or not
            
        Returns:
            The chosen action
        """
        x, y = state
        
        # Get valid actions from the environment
        valid_actions = self.env.enumerate_options(state)
        
        if explore and random.random() < self.epsilon:
            # Exploration: choose a random valid action
            action = random.choice(valid_actions)
        else:
            # Exploitation: choose the best action (with random tie-breaking)
            q_values = [self.Q[x, y, a] if a in valid_actions else -np.inf for a in range(4)]
            max_q = max([q_values[a] for a in valid_actions])
            best_actions = [a for a in valid_actions if q_values[a] == max_q]
            action = random.choice(best_actions)

        return action
    
    def train(self, num_episodes=500, verbose=False):
        """
        Train the agent using SARSA algorithm.
        
        Args:
            num_episodes: Number of episodes to train for
        """
        for episode in range(num_episodes):
            # Reset environment and get initial state
            self.env.reset()
            state = self.env.loc
            
            # Choose initial action
            action = self.choose_action(state)
            
            total_reward = 0
            steps = 0
            terminated = False
            
            while not terminated:
                # Take action, observe reward and next state
                next_state, reward, terminated = self.env.step(action)

                # Negative reward (we want to minimize penalties)
                reward = -reward  
                total_reward += reward
                steps += 1
                
                if not terminated:
                    # Choose next action (SARSA is on-policy)
                    next_action = self.choose_action(next_state)
                    
                    # Update Q-value using SARSA update rule
                    x, y = state
                    nx, ny = next_state
                    
                    # Q(s,a) = Q(s,a) + alpha * [r + gamma * Q(s',a') - Q(s,a)]
                    self.Q[x, y, action] += self.alpha * (
                        reward + self.gamma * self.Q[nx, ny, next_action] - self.Q[x, y, action]
                    )
                    
                    # Move to next state and action
                    state = next_state
                    action = next_action
                else:
                    # Terminal state update
                    x, y = state
                    self.Q[x, y, action] += self.alpha * (reward - self.Q[x, y, action])
            
            # Track progress
            self.episode_rewards.append(total_reward)
            self.episode_lengths.append(steps)
            
            # Optionally reduce epsilon over time
            # self.epsilon = max(0.01, self.epsilon * 0.995)
            
            if verbose and (episode + 1) % 100 == 0:
                print(f"Episode {episode+1}/{num_episodes}, Avg Reward: {np.mean(self.episode_rewards[-100:]):.2f}")
            elif episode == num_episodes - 1:
                print(f"Episode {episode+1}/{num_episodes}, Avg Reward: {np.mean(self.episode_rewards[-100:]):.2f}")

In [None]:
cw = CliffWalkingEnvironment()
sarsa_agent = SARSAAgent(cw)

In [None]:
sarsa_agent.train(1)

In [None]:
sarsa_agent.train(5)

In [None]:
sarsa_agent.train(25)

In [None]:
sarsa_agent.train(50)

In [None]:
sarsa_agent.train(150)

In [None]:
sarsa_agent.train(250)

In [None]:
sarsa_agent.train(500)

In [None]:
sarsa_agent.train(5000)

**Extra Simulation and visualization code**

In [None]:
def test_run(agent, render=False):
    """
    Test the trained agent on one episode.
    
    Args:
        render: Whether to visualize the episode
    
    Returns:
        Total reward and steps taken
    """
    agent.env.reset()
    state = agent.env.loc
    total_reward = 0
    steps = 0
    terminated = False
    
    path = [state]  # Track the agent's path
    
    while not terminated:
        # Choose action (no exploration)
        action = agent.choose_action(state, explore=False)
        
        # Take action
        next_state, reward, terminated = agent.env.step(action)
        reward = -reward  # Convert penalty to reward
        
        total_reward += reward
        steps += 1
        state = next_state
        
        path.append(state)  # Add to path
        
    return path, total_reward, steps

def visualize_policy(agent, path=None):
    """
    Visualize the learned policy and the environment.
    
    Args:
        path: Optional list of states visited during an episode
    """
    policy_map = np.zeros((agent.env.height, agent.env.width), dtype=int)
    value_map = np.zeros((agent.env.height, agent.env.width))
    
    # Create policy and value maps
    for y in range(agent.env.height):
        for x in range(agent.env.width):
            state = (x, y)
            valid_actions = agent.env.enumerate_options(state)
            if valid_actions:
                q_values = [agent.Q[x, y, a] if a in valid_actions else -np.inf for a in range(4)]
                policy_map[agent.env.height - 1 - y, x] = np.argmax(q_values)
                value_map[agent.env.height - 1 - y, x] = max([q_values[a] for a in valid_actions])
    
    # Create a grid
    fig, ax = plt.subplots(figsize=(12, 5))
    
    # Define colors for different cell types
    colors = ['white', 'lightgrey', 'lightgreen', 'red']
    cmap = ListedColormap(colors)
    
    # Create grid
    grid = np.zeros((agent.env.height, agent.env.width))
    
    # Mark cliff cells
    for x, y in agent.env.cliff:
        grid[agent.env.height - 1 - y, x] = 3  # Red
    
    # Mark start cell
    start_x, start_y = agent.env.start_loc
    grid[agent.env.height - 1 - start_y, start_x] = 1  # Light grey
    
    # Mark goal cell
    goal_x, goal_y = agent.env.end_loc
    grid[agent.env.height - 1 - goal_y, goal_x] = 2  # Light green
    
    # Plot grid
    ax.imshow(grid, cmap=cmap)
    
    # Plot policy arrows
    for y in range(agent.env.height):
        for x in range(agent.env.width):
            if grid[y, x] != 3:  # Skip cliff cells
                action = policy_map[y, x]
                if action == 0:  # Left
                    ax.arrow(x, y, -0.3, 0, head_width=0.2, head_length=0.1, fc='black', ec='black')
                elif action == 1:  # Right
                    ax.arrow(x, y, 0.3, 0, head_width=0.2, head_length=0.1, fc='black', ec='black')
                elif action == 2:  # Up
                    ax.arrow(x, y, 0, -0.3, head_width=0.2, head_length=0.1, fc='black', ec='black')
                elif action == 3:  # Down
                    ax.arrow(x, y, 0, 0.3, head_width=0.2, head_length=0.1, fc='black', ec='black')
    
    # Plot path if provided
    if path:
        path_x = [x for x, y in path]
        path_y = [agent.env.height - 1 - y for x, y in path]
        ax.plot(path_x, path_y, 'b-', linewidth=2, alpha=0.5)
        ax.plot(path_x[0], path_y[0], 'go', markersize=10)  # Start
        ax.plot(path_x[-1], path_y[-1], 'ro', markersize=10)  # End
    
    # Add grid lines
    ax.grid(which='major', axis='both', linestyle='-', color='k', linewidth=1)
    ax.set_xticks(np.arange(-0.5, agent.env.width, 1), minor=True)
    ax.set_yticks(np.arange(-0.5, agent.env.height, 1), minor=True)
    ax.grid(which='minor', color='k', linestyle='-', linewidth=1)
    ax.set_xticks(np.arange(0, agent.env.width, 1))
    ax.set_yticks(np.arange(0, agent.env.height, 1))
    
    # Create legend
    start_patch = mpatches.Patch(color='lightgrey', label='Start')
    goal_patch = mpatches.Patch(color='lightgreen', label='Goal')
    cliff_patch = mpatches.Patch(color='red', label='Cliff')
    path_line = plt.Line2D([0], [0], color='blue', linewidth=2, alpha=0.5, label='Path')
    
    ax.legend(handles=[start_patch, goal_patch, cliff_patch, path_line], 
              loc='upper center', bbox_to_anchor=(0.5, -0.05), ncol=4)
    
    plt.title('SARSA Policy for Cliff Walking')
    plt.tight_layout()
    plt.show()

In [None]:
path, penalty, steps = test_run(sarsa_agent)

In [None]:
penalty

In [None]:
visualize_policy(sarsa_agent, path)

## Q-learning algorithm

Sarsa is known as an "on-policy" learning method because it chooses actions and updates from the same $Q$ function.

Q-learning is what is known as an "off-policy" learning method because it will have the ability to use a different policy for proposing new actions but use the greedy policy when computing the temporal difference. This allows the algorithm to make use of $S, A, R, S'$ transitions obtained from *any* source, and still learn an approximation $Q$ that converges to $q^*$ with probability 1... Convergence requires some conditions, most importantly that the transitions $S, A, R, S'$ *cover* the action space of $q^*$

Self-driving is a nice example of how these two strategies can differ:

- Sarsa method:
    - Give control of vehicle over to Sarsa, so it can choose $A$ and observe implied $R$, $S'$ transitions
- Off-policy:
    - Let human expert driver drive vehicle in intended way
    - Record $S, A, R, S'$ transitions visited by human driver
    - Train RL agent based on data generated from human experience

![q-learning_barto_sutton.png](https://compsosci-resources.s3.amazonaws.com/images/q-learning_barto_sutton.png)

The main difference is in the update step where there's an explicit max operation in the update equation.

In [None]:
class QLearningAgent:
    def __init__(self, env, alpha=0.1, gamma=0.9, epsilon=0.1):
        """
        Initialize a Q-learning agent.
        
        Args:
            env: The environment the agent will interact with
            alpha: Learning rate
            gamma: Discount factor
            epsilon: Exploration rate
        """
        self.env = env
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        
        # Initialize Q-table with zeros
        # State is (x, y) and actions are 0, 1, 2, 3 (left, right, up, down)
        self.Q = np.zeros((env.width, env.height, 4))
        
        # For tracking learning progress
        self.episode_rewards = []
        self.episode_lengths = []
    
    def choose_action(self, state, explore=True):
        """
        Choose an action using epsilon-greedy policy.
        
        Args:
            state: Current state (x, y)
            explore: Whether to use exploration or not
            
        Returns:
            The chosen action
        """
        x, y = state
        
        # Get valid actions from the environment
        valid_actions = self.env.enumerate_options(state)
        
        if explore and random.random() < self.epsilon:
            # Exploration: choose a random valid action
            action = random.choice(valid_actions)
        else:
            # Exploitation: choose the best action (with random tie-breaking)
            q_values = [self.Q[x, y, a] if a in valid_actions else -np.inf for a in range(4)]
            max_q = max([q_values[a] for a in valid_actions])
            best_actions = [a for a in valid_actions if q_values[a] == max_q]
            action = random.choice(best_actions)
            
        return action
    
    def train(self, num_episodes=500, verbose=False):
        """
        Train the agent using Q-learning algorithm.
        
        Args:
            num_episodes: Number of episodes to train for
        """
        for episode in range(num_episodes):
            # Reset environment and get initial state
            self.env.reset()
            state = self.env.loc
            
            total_reward = 0
            steps = 0
            terminated = False
            
            while not terminated:
                # Choose action using epsilon-greedy policy
                action = self.choose_action(state)
                
                # Take action, observe reward and next state
                next_state, reward, terminated = self.env.step(action)
                # Negative reward (we want to minimize penalties)
                reward = -reward  
                total_reward += reward
                steps += 1
                
                # Update Q-value using Q-learning update rule
                x, y = state
                nx, ny = next_state
                
                if not terminated:
                    # Q(s,a) = Q(s,a) + alpha * [r + gamma * max_a' Q(s',a') - Q(s,a)]
                    # Get valid actions for next state
                    next_valid_actions = self.env.enumerate_options(next_state)
                    # Find maximum Q-value for next state
                    next_q_values = [self.Q[nx, ny, a] if a in next_valid_actions else -np.inf for a in range(4)]
                    max_next_q = max([next_q_values[a] for a in next_valid_actions])
                    
                    # Update current Q-value
                    self.Q[x, y, action] += self.alpha * (
                        reward + self.gamma * max_next_q - self.Q[x, y, action]
                    )
                else:
                    # Terminal state update (no future rewards)
                    self.Q[x, y, action] += self.alpha * (reward - self.Q[x, y, action])
                
                # Move to next state
                state = next_state
            
            # Track progress
            self.episode_rewards.append(total_reward)
            self.episode_lengths.append(steps)
            
            # Optionally reduce epsilon over time for better exploitation
            # self.epsilon = max(0.01, self.epsilon * 0.995)
            
            if verbose and (episode + 1) % 100 == 0:
                print(f"Episode {episode+1}/{num_episodes}, Avg Reward: {np.mean(self.episode_rewards[-100:]):.2f}")
            elif episode == num_episodes - 1:
                print(f"Episode {episode+1}/{num_episodes}, Avg Reward: {np.mean(self.episode_rewards[-100:]):.2f}")

In [None]:
cw = CliffWalkingEnvironment()
q_agent = QLearningAgent(cw)

In [None]:
q_agent.train(1)

In [None]:
q_agent.train(5)

In [None]:
q_agent.train(25)

In [None]:
q_agent.train(50)

In [None]:
q_agent.train(150)

In [None]:
q_agent.train(250)

In [None]:
q_agent.train(500)

In [None]:
q_agent.train(5000)

**Extra Simulation and visualization code**

In [None]:
path, penalty, steps = test_run(q_agent)

In [None]:
penalty

In [None]:
visualize_policy(q_agent, path)