# Exercise 4: Temporal Difference Learning
Giorgia Ramponi, Kristijan Bundaleski


Grid World : A simple, discrete environment where an agent navigates a
grid (e.g., 5x5) to achieve goals. The agent learns to make decisions based
on states, actions, and rewards.

We have an environment that is a grid, with Start (S) and Finish (F). The agent can choose from four possible actions in each state grid square: up , right , left, or down. The transitions between states are deterministic, meaning the robot will carry out the selected action with probability one.
If the direction of movement is blocked, the agent remains in the same grid square i.e. if the robot is positioned at the edge of the grid and attempts to take an action that would move it outside the grid boundaries, it will remain in its current location. Upon reaching state F, the robot may earn a terminal reward of your choice, or you may also choose to set the state value function to a constant for any policy, which signifies the successful completion of its task. In all other states, the robot does not receive any rewards.



In [1]:
# Import necessary libraries
from operator import ne
import gym
from gym import spaces
import numpy as np
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.colors as mcolors

Define a custom Grid World environment for Reinforcement Learning (RL) using OpenAI Gym.

In [2]:
class GridWorldEnv(gym.Env):
    """
    Custom 2D Grid World environment in Gym.
    """

    def __init__(self, grid_size=(5, 5), start=(0, 0), goal=(4, 4), gamma = 0.9):
        super(GridWorldEnv, self).__init__()

        # Define the grid world size
        self.grid_size = grid_size

        # Define start and goal positions
        self.start = start
        self.goal = goal

        # Set the current position to the start
        self.agent_pos = np.array(self.start)

        # Define action space: 4 possible actions (up, down, left, right)
        self.action_space = spaces.Discrete(4)  # 0 = down, 1 = left, 2 = up, 3 = right

        # Define observation space (agent's position in the grid)
        self.observation_space = spaces.Box(low=0, high=max(grid_size), shape=(2,), dtype=np.int32)

        # State-value function initialized to 0
        self.V = np.zeros(self.grid_size)

        # Initialize the policy (a random policy for now, can be updated later)
        # Policy: a 5x5 grid where each entry is one of the actions (0=down, 1=left, 2=up, 3=right)
        self.policy = np.random.choice([0, 1, 2, 3], size=self.grid_size)

        # Define
        self.gamma = gamma

        # Define the movement directions corresponding to actions
        self.movement = {
            0: np.array([1, 0]),  # down
            1: np.array([0, -1]),  # left
            2: np.array([-1, 0]),  # up
            3: np.array([0, 1])  # right
        }

    def reset(self):
        """
        Reset the environment to the initial state (start position).
        """
        self.agent_pos = np.array(self.start)
        return self.agent_pos

    def step(self, action):
        """
        Perform an action and update the environment's state.
        """
        # Update the agent's position
        next_pos = self.agent_pos + self.movement[action]

        # Check if the new position is within grid bounds
        if self._is_valid(next_pos):
            self.agent_pos = next_pos

        # Check if the agent has reached the goal
        if tuple(self.agent_pos) == self.goal:
            reward = 2*max(self.grid_size[1],self.grid_size[0])  # Large positive reward for reaching the goal
            # reward = -1 # Treat the goal as any other state (In this case, one should set the state value function to a constant)
            done = True
        else:
            reward = -1  # Small step penalty for each move
            done = False

        return self.agent_pos, reward, done, {}

    def _is_valid(self, pos):
        """
        Check if the new position is valid (within bounds).
        """
        # Check if position is within bounds
        if 0 <= pos[0] < self.grid_size[0] and 0 <= pos[1] < self.grid_size[1]:
            return True
        return False


Define plotting functions for the policy and the state value function.

In [3]:
def plot_grid_with_policy(env):
    """This method plots the environment of the agent and shows the policy arrows.
    Works for grid environments.

    Args:
        env (GridWorldEnv): The environment to be plotted.
    """
    # Create a copy of the environment grid
    grid = np.zeros(env.grid_size)

    # Define custom colors for the colormap
    colors = ['white', 'black']  # 'white' for free cells
    cmap = mcolors.ListedColormap(colors)

    # Plot the grid with the custom colormap
    fig, ax = plt.subplots()
    ax.matshow(grid, cmap=cmap)

    # Display 'F' for the finish line
    ax.text(env.goal[1], env.goal[0], 'F', ha='center', va='center', color='black', fontsize=20)

    # Define arrow directions based on policy actions
    action_arrows = {
        0: '↓',  # down
        1: '←',  # left
        2: '↑',  # up
        3: '→'   # right
    }

    # Place arrows on the grid based on the policy
    for i in range(env.grid_size[0]):
        for j in range(env.grid_size[1]):
            if (i, j) == env.goal:
                continue
            action = env.policy[i, j]  # Get action for the current state
            ax.text(j, i, action_arrows[action], ha='center', va='center', color='blue', fontsize=20)

    # Add grid lines separating each square
    ax.set_xticks(np.arange(-0.5, env.grid_size[1], 1), minor=True)
    ax.set_yticks(np.arange(-0.5, env.grid_size[0], 1), minor=True)
    ax.grid(which='minor', color='black', linestyle='-', linewidth=1)
    ax.set_xticks([])
    ax.set_yticks([])

    plt.title("Grid World with Policy Arrows")
    plt.show()

def plot_value_function(env):
    """Plot the value function with respect to an end point, including grid lines."""
    grid = env.V.copy()

    # Create a new figure
    plt.figure()

    # Plot the grid representing the value function
    plt.imshow(grid, cmap='Purples', interpolation='nearest', aspect='auto')
    plt.colorbar()
    plt.title("Agent's value function")

    # Add grid lines to the plot
    ax = plt.gca()  # Get the current axis
    ax.set_xticks(np.arange(-0.5, env.grid_size[1], 1), minor=True)
    ax.set_yticks(np.arange(-0.5, env.grid_size[0], 1), minor=True)
    ax.grid(which='minor', color='black', linestyle='-', linewidth=2)  # Add the grid lines

    # Hide the tick labels (optional, for a cleaner plot)
    ax.set_xticklabels([])
    ax.set_yticklabels([])

    return plt


In [4]:
# Initialize the environment
env = GridWorldEnv(grid_size=(5, 5), start=(0, 0), goal=(4, 4))

In [None]:
# Plotting a random policy
plot_grid_with_policy(env)

### Theory Recap
We would like to find a the optimal policy

In order to apply Value Iteration or Policy Iteration, it is **necessary to have**:
Model dynamics, specifically:
 * The **transition probabilities** $P(s' \mid s, a)$, which describe the probability of transitioning from state $s$ to state $s'$ after taking action $a$.
 * The **reward function** $r(s, a)$, which specifies the reward received for taking action $a$ in state $s$.




Recall that the **state value function** for $s\in \mathcal{S}$ denoted

$$V^\pi(s)=\sum_{a \in A} \pi(a \mid s)\left[r(s, a)+\sum_{s^{\prime} \in S} P\left(s^{\prime} \mid s, a\right) \gamma V\left(s^{\prime}\right)\right]$$


Since the **transition probabilities** are unknown we approximate the state value function with 

$$ V(S_t) \leftarrow (1 - \alpha)V(S_t) + \alpha \left( R_{t+1} + \gamma V(S_{t+1}) \right) $$

In theory we should visit each state infinitely many times, but in practical cases the algorithm usually terminates after some criterion ( i.e. small TD error over time, Decay in learning rate)


In [6]:
def policy_improvement(env, num_episodes=100):
    """
    Perform policy improvement for the GridWorld environment based on the learned value function (V).

    This function updates the policy by evaluating all possible actions at each state and selecting
    the action that leads to the highest expected future reward. It performs a type of greedy action
    selection based on the value function estimates.
    """

    for i in range(env.grid_size[0]):
        for j in range(env.grid_size[1]):
            state = np.array([i, j])
            # Skip the goal state since no action is required there
            if tuple(state) == env.goal:
                continue
            # Variables to track the best action and its value for each state
            best_action = None
            best_value = -float('inf')

            # Loop through all possible actions (down, left, up, right)
            for action in range(env.action_space.n):
                reward_action = 0  # Initialize reward for the current action

                # Run multiple episodes to estimate the value of taking this action
                for k in range(num_episodes):
                    state = np.array([i, j])  # Reset state to the current (i, j)
                    next_state = state + env.movement[action]  # Calculate the resulting state from the action

                    # Check if the resulting state is valid
                    if env._is_valid(next_state):
                        next_state_tuple = tuple(next_state)
                        # Accumulate the value of the next state (from V) as part of the reward
                        reward_action += env.V[next_state_tuple]
                    else:
                        # Penalize for invalid moves (out of bounds)
                        reward_action -= 10

                # Average the reward over the episodes for more reliable action evaluation
                reward_action /= num_episodes

                # Keep track of the best action for the current state
                if reward_action > best_value:
                    best_value = reward_action
                    best_action = action

            # Update the policy at (i, j) to take the best action found
            env.policy[i, j] = best_action


def td_learning(env, epsilon=0.5):
    """
    Perform TD(0) learning.
    """
    visit_count = np.zeros(env.grid_size)
    # In case of no reward for reaching the goal
    # env.V[tuple(env.goal)] = max(env.grid_size[1], env.grid_size[0])

    while True:
        compare = env.V.copy()

        state = env.reset()  # Start from the beginning
        done = False

        while not done:
            state = env.agent_pos

            # Epsilon-greedy action selection
            if np.random.rand() < epsilon:
                action = env.action_space.sample()  # Explore
            else:
                action = env.policy[tuple(state)]  # Exploit

            # Take the action
            next_state, reward, done, _ = env.step(action)

            # Convert state and next_state from arrays to tuple for indexing
            state_tuple = tuple(state)
            next_state_tuple = tuple(next_state)

            # Increment visit count for the current state
            visit_count[next_state_tuple] += 1

            # Calculate learning rate for the current state
            alpha_s = 1 / visit_count[next_state_tuple]

            # TD update
            env.V[state_tuple] = env.V[state_tuple] + alpha_s * (
                reward + env.gamma * env.V[next_state_tuple] - env.V[state_tuple]
            )

            state = next_state  # Update the state variable

        delta = np.max(np.abs(compare - env.V))

        if delta < 0.0001:
            break

def optimal_policy(env):
    """
    Find the optimal policy using temporal difference learning.
    """
    # Policy iteration
    while True:
        new_policy = env.policy.copy()

        # Policy evaluation via td - learning
        td_learning(env)

        # # Policy improvement
        policy_improvement(env)

        # Break if there is no improvement
        if np.array_equal(env.policy, new_policy):
            break

In [7]:
# Calculate the optimal policy
optimal_policy(env)

In [None]:
# Plot the state action value function for the optimal policy
plot = plot_value_function(env)
plot.show()

In [None]:
# Plot the optimal policy
plot_grid_with_policy(env)