Import Libraries

In [1]:
import numpy as np
import matplotlib
matplotlib.use('Agg')  # Use a non-interactive backend
import matplotlib.pyplot as plt
from tqdm import tqdm
from scipy.special import softmax

ModuleNotFoundError: No module named 'scipy'

Short corridor environment (Example 13.1).

States: 0, 1, 2, 3 (terminal)
Actions: go_right (True) or go_left (False)
Transitions:
  - From state 0 or 2:
      If go_right: state += 1
      Else: state = max(0, state - 1)
  - From state 1 (the tricky one):
      If go_right: state -= 1
      Else: state += 1

Reward:
  - At non-terminal transitions: -1
  - At terminal state (state == 3): 0

In [ ]:
class ShortCorridor:

    def __init__(self):
        self.reset()

    def reset(self):
        self.state = 0

    def step(self, go_right):
        """
        Take an action in the environment.
        
        Args:
            go_right (bool): If True, attempt to move right; otherwise attempt to move left.
        
        Returns:
            (reward, done): The reward for this step and whether the episode is finished.
        """
        if self.state == 0 or self.state == 2:
            if go_right:
                self.state += 1
            else:
                self.state = max(0, self.state - 1)
        else:  # self.state == 1
            if go_right:
                self.state -= 1
            else:
                self.state += 1

        if self.state == 3:
            # Terminal state
            return 0, True
        else:
            return -1, False

REINFORCE agent with no baseline.

Attributes:
    theta (np.array): Policy parameter vector for two actions (left, right).
    alpha (float): Step-size parameter for updating theta.
    gamma (float): Discount factor.
    x (np.array): Feature representation for the two actions.
    rewards (list): Reward trajectory for the current episode.
    actions (list): Actions taken during the current episode.

In [ ]:
class ReinforceAgent:

    def __init__(self, alpha, gamma):
        # Initialize theta such that initial conditions correspond to left-epsilon greedy.
        self.theta = np.array([-1.47, 1.47])
        self.alpha = alpha
        self.gamma = gamma
        # Feature representation: 
        # For two actions: left and right
        # x[:,0] = features for "left" action
        # x[:,1] = features for "right" action
        self.x = np.array([[0, 1],
                           [1, 0]])
        self.rewards = []
        self.actions = []

    def get_pi(self):
        """
        Compute the current policy probabilities for [left, right].
        Add an epsilon to ensure non-determinism.
        """
        h = np.dot(self.theta, self.x)
        t = np.exp(h - np.max(h))
        pmf = t / np.sum(t)

        # Ensure non-determinism
        imin = np.argmin(pmf)
        epsilon = 0.05
        if pmf[imin] < epsilon:
            pmf[:] = 1 - epsilon
            pmf[imin] = epsilon

        return pmf

    def get_p_right(self):
        """
        Probability of choosing the 'right' action according to the current policy.
        """
        return self.get_pi()[1]

    def choose_action(self, reward):
        """
        Choose an action based on the current policy.
        
        Args:
            reward (float or None): Reward from the previous step.
        
        Returns:
            go_right (bool): Chosen action.
        """
        if reward is not None:
            self.rewards.append(reward)

        pmf = self.get_pi()
        go_right = np.random.uniform() <= pmf[1]
        self.actions.append(go_right)
        return go_right

    def episode_end(self, last_reward):
        """
        Called at the end of an episode to perform the policy parameter update using REINFORCE.
        
        Args:
            last_reward (float): The final reward of the episode (from the terminal state).
        """
        self.rewards.append(last_reward)

        # Compute returns G
        G = np.zeros(len(self.rewards))
        G[-1] = self.rewards[-1]
        for i in range(2, len(G) + 1):
            G[-i] = self.gamma * G[-i + 1] + self.rewards[-i]

        gamma_pow = 1
        for i in range(len(G)):
            action_idx = 1 if self.actions[i] else 0
            pmf = self.get_pi()
            grad_ln_pi = self.x[:, action_idx] - np.dot(self.x, pmf)
            update = self.alpha * gamma_pow * G[i] * grad_ln_pi
            self.theta += update
            gamma_pow *= self.gamma

        # Reset episode data
        self.rewards = []
        self.actions = []

REINFORCE agent with a baseline (state-value estimator).

Attributes:
    alpha_w (float): Step-size parameter for updating the baseline w.
    w (float): Current estimate of the state value for the start state.

In [ ]:
class ReinforceBaselineAgent(ReinforceAgent):
    def __init__(self, alpha, gamma, alpha_w):
        super(ReinforceBaselineAgent, self).__init__(alpha, gamma)
        self.alpha_w = alpha_w
        self.w = 0

    def episode_end(self, last_reward):
        """
        Called at the end of an episode to perform the policy parameter update using REINFORCE with baseline.
        
        Args:
            last_reward (float): The final reward of the episode (from the terminal state).
        """
        self.rewards.append(last_reward)

        # Compute returns G
        G = np.zeros(len(self.rewards))
        G[-1] = self.rewards[-1]
        for i in range(2, len(G) + 1):
            G[-i] = self.gamma * G[-i + 1] + self.rewards[-i]

        gamma_pow = 1
        for i in range(len(G)):
            # Update baseline
            self.w += self.alpha_w * gamma_pow * (G[i] - self.w)

            # Update policy parameters
            action_idx = 1 if self.actions[i] else 0
            pmf = self.get_pi()
            grad_ln_pi = self.x[:, action_idx] - np.dot(self.x, pmf)
            update = self.alpha * gamma_pow * (G[i] - self.w) * grad_ln_pi
            self.theta += update

            gamma_pow *= self.gamma

        # Reset episode data
        self.rewards = []
        self.actions = []

Run a trial of multiple episodes using a given agent generator.

Args:
    num_episodes (int): Number of episodes to run.
    agent_generator (callable): A function that returns a new agent instance.

Returns:
    rewards (np.array): Array of total rewards obtained in each episode.

In [ ]:
def trial(num_episodes, agent_generator):
    env = ShortCorridor()
    agent = agent_generator()
    rewards = np.zeros(num_episodes)

    for episode_idx in range(num_episodes):
        rewards_sum = 0
        reward = None
        env.reset()

        while True:
            go_right = agent.choose_action(reward)
            reward, episode_end = env.step(go_right)
            rewards_sum += reward

            if episode_end:
                agent.episode_end(reward)
                break

        rewards[episode_idx] = rewards_sum

    return rewards

Run the experiment for both:
  - REINFORCE without baseline
  - REINFORCE with baseline

Returns:
    rewards (np.array): Shape (2, num_trials, num_episodes)

In [ ]:
def run_experiment(num_trials, num_episodes, alpha, gamma):
    agent_generators = [
        lambda: ReinforceAgent(alpha=alpha, gamma=gamma),
        lambda: ReinforceBaselineAgent(alpha=alpha * 10, gamma=gamma, alpha_w=alpha * 100)
    ]

    rewards = np.zeros((len(agent_generators), num_trials, num_episodes))

    for agent_index, agent_generator in enumerate(agent_generators):
        for i in tqdm(range(num_trials), desc=f'Agent {agent_index+1} of {len(agent_generators)}'):
            reward = trial(num_episodes, agent_generator)
            rewards[agent_index, i, :] = reward
    return rewards

Plot the results given the rewards array.

Args:
    rewards (np.array): Shape (num_agents, num_trials, num_episodes)
    num_episodes (int): Number of episodes
    labels (list): Labels for each agent's data    

In [ ]:
def plot_results(rewards, num_episodes, labels):
    # Plot the reference line at -11.6
    plt.axhline(y=-11.6, color='red', linestyle='dashed', label='-11.6')

    # Plot each agent's average performance
    for i, label in enumerate(labels):
        mean_rewards = rewards[i].mean(axis=0)
        plt.plot(np.arange(num_episodes) + 1, mean_rewards, label=label, linestyle='-', linewidth=2.0)

    plt.ylabel('Total reward on episode')
    plt.xlabel('Episode')
    plt.legend(loc='lower right')
    plt.title('Comparison of REINFORCE With and Without Baseline')
    plt.grid(True)  # Turn on grid for better visibility
    plt.savefig('figure_13_2.png')
    plt.close()

Parameters for the experiment

In [ ]:
num_trials = 100
num_episodes = 1000
alpha = 2e-4
gamma = 1

Run experiment and collect data

In [ ]:
rewards = run_experiment(num_trials, num_episodes, alpha, gamma)
labels = ['Reinforce without baseline', 'Reinforce with baseline']

Plot results separately, allowing for easy modifications of appearance

In [ ]:
plot_results(rewards, num_episodes, labels)