In [None]:
# Ensure that the root directory is in Python's path. This is to make importing
# custom library easier.
from pathlib import Path
import sys
root = Path('.').absolute().parent.parent
if str(root) not in sys.path:
    sys.path.append(str(root))

# built-in packages
from collections import defaultdict
import json
from typing import List, Dict, Tuple, Callable
import math

# external packages
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
import scipy
import dill
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# matplotlib param
# https://stackoverflow.com/a/55188780/9723036
# SMALL_SIZE = 10
MEDIUM_SIZE = 15
BIGGER_SIZE = 17
TEXT_BOTTOM = 0.9

plt.rc('font', size=MEDIUM_SIZE)          # controls default text sizes
# plt.rc('axes', titlesize=SMALL_SIZE)     # fontsize of the axes title
plt.rc('axes', labelsize=MEDIUM_SIZE)    # fontsize of the x and y labels
plt.rc('xtick', labelsize=MEDIUM_SIZE)    # fontsize of the tick labels
plt.rc('ytick', labelsize=MEDIUM_SIZE)    # fontsize of the tick labels
plt.rc('legend', fontsize=MEDIUM_SIZE)    # legend fontsize
# plt.rc('figure', titlesize=BIGGER_SIZE)  # fontsize of the figure title

np.set_printoptions(formatter={'float': lambda x: "{0:0.5f}".format(x)})

# Initialize Parameters

In [None]:
ACTIONS = [[-1, 0], [0, 1], [1, 0], [0, -1]]  # N, E, S, W
M = 10  # number of rows
N = 10  # number of columns
NUM_BLK = 15  # number of blocks
GAMMA = 0.9  # discount rate
ACTOR_OPTIMIZER = keras.optimizers.SGD(learning_rate=0.01)
CRITIC_OPTIMIZER = keras.optimizers.SGD(learning_rate=0.01)
OPTIMAL_STEPS = 18  # empirically acquired
RANDOM_SEED = 42

# TensorFlow Models

In [None]:
INIT_WEIGHT = keras.initializers.RandomNormal(seed=RANDOM_SEED)


def state_value_func(hidden_node: int, num_layers: int):
    """Neural network model simulating the state value function.

    Note that a state value function takes a two dimensional input which represents
    the current position of the cell in the grid world. For instance, given a cell
    (i, j), the cell's state value is

    state_value_model(np.array[[i, j]])

    The return value is a tensor like this [[state_val]]. To access the value itself,
    we can do:

    state_value_model(np.array[[i, j]])[0, 0]

    :param hidden_node: The number of nodes in each hidden layer.
    :param num_layers: The number of hidden layers.
    :return: A keras NN model.
    """
    inputs = keras.Input(shape=(2,))
    x = layers.Dense(hidden_node, activation='relu', kernel_initializer=INIT_WEIGHT)(inputs)
    for _ in range(num_layers - 1):
        x = layers.Dense(hidden_node, activation='relu', kernel_initializer=INIT_WEIGHT)(x)
    outputs = -layers.Dense(1, activation='relu', kernel_initializer=INIT_WEIGHT)(x)
    return keras.Model(inputs=inputs, outputs=outputs)


def action_value_func(hidden_node: int, num_layers: int):
    """Neural network model simulating the state value function.

    Note that a state value function takes a two dimensional input which represents
    the current position of the cell in the grid world. For instance, given a cell
    (i, j), the cell's state value is

    state_value_model(np.array[[i, j]])

    The return value is a tensor like this [[state_val]]. To access the value itself,
    we can do:

    state_value_model(np.array[[i, j]])[0, 0]

    :param hidden_node: The number of nodes in each hidden layer.
    :param num_layers: The number of hidden layers.
    :return: A keras NN model.
    """
    inputs = keras.Input(shape=(3,))
    x = layers.Dense(hidden_node, activation='relu', kernel_initializer=INIT_WEIGHT)(inputs)
    for _ in range(num_layers - 1):
        x = layers.Dense(hidden_node, activation='relu', kernel_initializer=INIT_WEIGHT)(x)
    outputs = -layers.Dense(1, activation='relu', kernel_initializer=INIT_WEIGHT)(x)
    return keras.Model(inputs=inputs, outputs=outputs)


def policy_func(hidden_node: int, num_layers: int, num_actions: int):
    """Neural network model simulating the policy function.

    Note that a policy function takes a two dimensional input which represents the
    current position of the cell in the grid world. For instance, given a cell
    (i, j), to obtain the policy distribution, we can call
    
    policy_model(np.array([[i, j]]))

    The return value of the above call is a tensor like this [[prob_0, prob_1, prob_2, prob_3]].
    To access the probability of an action with index a, we can d0:

    policy_model(np.array([[i, j]]))[0, a]

    :param hidden_node: The number of nodes in each hidden layer.
    :param num_layers: The number of hidden layers.
    :return: A keras NN model.
    """
    inputs = keras.Input(shape=(2,))
    x = layers.Dense(hidden_node, activation='relu', kernel_initializer=INIT_WEIGHT)(inputs)
    for _ in range(num_layers - 1):
        x = layers.Dense(hidden_node, activation='relu', kernel_initializer=INIT_WEIGHT)(x)
    outputs = layers.Dense(num_actions, activation='softmax', kernel_initializer=INIT_WEIGHT)(x)
    return keras.Model(inputs=inputs, outputs=outputs)

# Plot Related

In [None]:
def plot_step_to_go(ax, steps, optimal_steps: int, max_episodes, title: str):
    """Plot step-to-go vs. episode

    :param ax: An axis object of matplotlib.
    :param steps: Number of expected steps to reach the target for each episode.
    :param optimal_steps: The optimal number of steps to reach the target.
    :param max_episodes: The maximum number of episodes.
    :param title: Title for the step-to-go plot
    """
    ax.plot(np.arange(len(steps)), steps, marker='o', color='C0', label='Steps to Go')
    ax.axhline(optimal_steps, color='C2', label='Optimal Steps')
    ax.set_xlabel('Episodes')
    ax.set_ylabel('Steps')
    ax.set_xlim(left=0, right=max_episodes)
    ax.set_title(title)
    ax.legend()


def plot_state_value_heatmap(ax, V, fig, title: str, color_vmin: int = -15, color_vmax: int = 0):
    """Plot state value as a heatmap.

    :param ax: An axis object of matplotlib.
    :param V: The final estimated state values.
    :param fig: The fig parameter, acquired when calling plt.subplots().
    :param title: Title for the state value heatmap plot.
    :param color_vmin: Min value for the color map indicator, default to -15.
    :param color_vmax: Max value for the color map indicator, default to 0.
    """
    hm = ax.imshow(
        V,
        cmap='hot',
        interpolation='nearest',
        vmin=color_vmin,  # colorbar min
        vmax=color_vmax,  # colorbar max
    )
    # cbar = fig.colorbar(hm)
    # cbar.set_label('State Value')

    m, n = V.shape
    for i in range(m):  # add V value to each heatmap cell
        for j in range(n):
            val = '-inf' if V[i, j] <= -1000.0 else f'{V[i, j]:.2f}'
            ax.annotate(val, xy=(j - 0.5, i + 0.1), fontsize=10)
    ax.set_title(title)


def plot_path(
    ax,
    actor,
    start: Tuple[int, int],
    end: Tuple[int, int],
    title: str,
    max_steps: int,
    rng,
    grid,
):
    """Plot a route from start to end on the grid world.

    Yellow is start; green is end; brown is obstacle.

    :param ax: An axis object of matplotlib.
    :param actor: The policy function NN simulation.
    :param start: The start state in the format of (row_idx, col_idx)
    :param end: The end state in the format of (row_idx, col_idx)
    :param title: Title for the path plot.
    :param max_steps: Max number of steps allowed when drawing a path. If
        max steps are reached, the path is left as is.
    :param rng: A random number generator.
    :param grid: The grid world. Default to GRID.
    """
    m, n = grid.shape
    ax.grid(True)
    ax.set_xticks(np.arange(n))
    ax.set_xlim(left=0, right=n)
    ax.set_yticks(np.arange(m))
    ax.set_ylim(top=m - 1, bottom=-1)
    ax.invert_yaxis()  # invert y axis such that 0 is at the top
    i, j = start
    step = 0  # note that each invalid action counts as a step as well.
    while (i, j) != end and step < max_steps:
        a, _ = next_action(i, j, actor, rng=rng)
        di, dj = ACTIONS[a]
        ni, nj = i + di, j + dj
        if is_valid_state(ni, nj, grid=grid):  
            ax.plot(  # jiggle on each path plot to visualize paths taken multiple times vs. once
                [j + 0.5 + (rng.random() - 0.5) / 10, nj + 0.5 + (rng.random() - 0.5) / 10],
                [i - 0.5 + (rng.random() - 0.5) / 10, ni - 0.5 + (rng.random() - 0.5) / 10],
                color='red',
                lw=2,
            )
            i, j = ni, nj
        step += 1
    # label start, end and blocks
    ax.add_patch(Rectangle((0, -1), 1, 1, fill=True, color='yellow'))
    ax.add_patch(Rectangle((n - 1, m - 2), 1, 1, fill=True, color='green'))
    for i, j in zip(*np.where(grid == 1)):
        ax.add_patch(Rectangle((j, i - 1), 1, 1, fill=True, color='brown'))
    ax.set_title(title)


def plot_loss(ax, critic_losses: List, actor_losses: List, title: str, max_episodes: int):
    """Plot critic and actor loss in a single graph.

    Critic loss takes the left y-axis. Actor loss takes the right.

    :param ax: An axis object of matplotlib.
    :param critic_losses: A list of critic loss per episode.
    :param actor_losses: A list of actor loss per episode.
    :param title: Title for the path plot.
    :param max_episodes: The maximum number of episodes.
    """
    num_loss = len(critic_losses)
    twin = ax.twinx()  # put actor loss on a separate y axis
    critic_plot, = ax.plot(np.arange(num_loss), critic_losses, color='C0', label='Critic')
    actor_plot, = twin.plot(np.arange(num_loss), actor_losses, color='C1', label='Actor')
    ax.set_xlim(left=0, right=max_episodes)
    # configure critic loss axis
    ax.set_ylabel('Critic Loss')
    ax.yaxis.label.set_color(critic_plot.get_color())
    ax.tick_params(axis='y', colors=critic_plot.get_color())
    # configure actor loss axis
    twin.set_ylabel('Actor Loss')
    twin.yaxis.label.set_color(actor_plot.get_color())
    twin.tick_params(axis='y', colors=actor_plot.get_color())
    ax.set_xlabel('Episodes')
    ax.set_title(title)
    ax.legend(handles=[critic_plot, actor_plot])


def plot(
    title: str,
    critic,
    actor,
    critic_losses: List,
    actor_losses: List,
    steps: List,
    start: Tuple[int, int],
    end: Tuple[int, int],
    max_steps: int,
    max_episodes: int,
    grid,
    V,
    optimal_steps: int = OPTIMAL_STEPS,
):
    """One-stop shop to plot all performance metrics during training.

    :param title: The suptitle of the entire figure.
    :param critic: The state value function NN simulation.
    :param actor: The policy function NN simulation.
    :param critic_losses: A list of critic losses per episode.
    :param actor_losses: A list of actor loss per episode.
    :param steps: A list of steps taken per episode. A failed episode has the max
        number of steps allowed.
    :param start: The start state in the format of (row_idx, col_idx)
    :param end: The end state in the format of (row_idx, col_idx)
    :param max_steps: Maximum number of steps per episode.
    :param max_episodes: Maximum number of episodes.
    :param grid: The grid world. Default to GRID.
    :param V: The final estimated state values.
    :param optimal_steps: The optimal number of steps to reach the target.
    """
    rng=np.random.default_rng()
    plt.rc('grid', linestyle="-", color='black')
    fig, axes = plt.subplots(2, 2, figsize=(14, 13))
    axes = axes.flatten()
    # First three plots
    plot_step_to_go(axes[0], steps, optimal_steps, max_episodes, '(A)')
    plot_loss(axes[1], critic_losses, actor_losses, '(B)', max_episodes)
    plot_path(axes[2], actor, start, end, '(C)', max_steps, rng, grid=grid)
    # Compute V table
    for i in range(grid.shape[0]):
        for j in range(grid.shape[1]):
            V[i, j] = -1000 if grid[i, j] else critic(state(i, j))[0, 0]
    V[grid.shape[0] - 1, grid.shape[1] - 1] = 0
    plot_state_value_heatmap(axes[3], V, fig, '(D)', color_vmin=-15, color_vmax=-3)
    
    fig.suptitle(title)
    plt.tight_layout()
    filename = '_'.join(title.split())
    if filename:
        plt.savefig(filename + '.png')
        plt.ioff()
    else:
        plt.show()

# Initialize Grid and State Value Table

In [None]:
def initialize_grid(num_rows: int, num_cols: int, num_blocks: int, random_state: int = RANDOM_SEED):
    """Initialize a grid world, with obstacles.

    :param num_rows: Number of rows in the grid world.
    :param num_cols: Number of columns in the grid world.
    :param num_blocks: Number of RANDOM blocks that agent cannot cross. The final grid world
        could include more blocks as additional  a manually 
    :param random_state: For reproducing random values.
    :return: The grid itself and the tabulation for state values.
    """
    # Create a grid
    grid = np.array([[0] * num_cols for _ in range(num_rows)])
    V = np.array([[0.0] * num_cols for _ in range(num_rows)])
    # randomly generate blocks
    rng = np.random.default_rng(random_state)
    block_i = rng.integers(low=0, high=num_rows, size=num_blocks)
    block_j = rng.integers(low=0, high=num_cols, size=num_blocks)
    
    ###################################################
    # hardcodede additional blocks, such that we can  #
    # limit the possible number of ways to reach the  #
    # terminal state                                  #
    ###################################################
    block_i = np.append(block_i, [8, 8, 8])
    block_j = np.append(block_j, [4, 5, 6])
    
    # Update grid and state values by identifying the blocks
    grid[block_i, block_j] = 1
    return grid, V


GRID, V = initialize_grid(M, N, NUM_BLK, random_state=10)

# Training Helper Functions

In [None]:
def next_state(i: int, j: int, a: int, grid=GRID) -> Tuple[int, int]:
    """Obtain the next state given the current state and action.

    The peculiarity of this is that if the next state is invalid, i.e.
    it is easier outside the grid world or in a blockage, we return the
    original state, signifying that the result of the illegal action
    results in the agent being stuck at the same location.

    :param i: Row index of the agent on the grid world.
    :param j: Column index of the agent on the grid world.
    :param a: Index of ACTIONS, signifying the specific action to take.
    :param grid: The grid world. Default to GRID.
    :return: The next cell's row and column index.
    """
    di, dj = ACTIONS[a]
    ni, nj = i + di, j + dj
    if is_valid_state(ni, nj, grid=grid):
        return ni, nj
    return i, j


def next_action(i: int, j: int, actor, rng) -> Tuple[float, float]:
    """Obtain the next action from the publicy function simulation.

    Note that we also obtain the probability of the action as reported
    by the actor.

    :param i: Row index of the agent on the grid world.
    :param j: Column index of the agent on the grid world.
    :param actor: The policy function simulation.
    :returns: The action and its associated probability.
    """
    a = rng.choice(len(ACTIONS), p=np.squeeze(actor(state(i, j))))
    return a, actor(state(i, j))[0, a]


def state(i: int, j: int):
    """Generate a numpy array version of the state.

    This is used as the input value for the state and policy function
    NN.

    :param i: Row index of the agent on the grid world.
    :param j: Column index of the agent on the grid world.
    :return: A numpy array of shape (1, 2)
    """
    return np.array([[i, j]])


def state_action(i: int, j: int, a: int):
    """Generate a numpy array version of the state action pair.

    This is used as the input value for the action value function NN.

    :param i: Row index of the agent on the grid world.
    :param j: Column index of the agent on the grid world.
    :param a: Index of ACTIONS, signifying the specific action to take.
    :return: A numpy array of shape (1, 3)
    """
    return np.array([[i, j, a]])


def is_valid_state(i: int, j: int, grid=GRID) -> bool:
    """Check whether the current state is valid.

    A valid state must be within the grid world and NOT on any of the
    blocks.

    :param i: Row index of the agent on the grid world.
    :param j: Column index of the agent on the grid world.
    :param grid: The grid world. Default to GRID.
    :return: A boolean value, true => state is valid, false otherwise.
    """
    m, n = grid.shape
    return 0 <= i < m and 0 <= j < n and grid[i, j] == 0

# Actor Critic Architecture

In [None]:
MEAN_SQUARED_ERROR = tf.keras.losses.MeanSquaredError()

def actor_critic_monte_carlo(
    actor,
    critic,
    grid,
    V,
    start: Tuple[int, int],
    end: Tuple[int, int],
    include_all: bool = True,
    max_episodes: int = 200,
    max_steps: int = 1000,
    actor_opt = ACTOR_OPTIMIZER,
    critic_opt = CRITIC_OPTIMIZER,
    gamma: float = GAMMA,
    random_seed: int = RANDOM_SEED,
):
    """Find the shortest path in the grid world using actor critic architecture with Monte Carlo.

    Since Monte Carlo is used, each episode extends until the terminal state is reached or the
    max number of steps achieved, whichever comes first. Based on the path, we can compute the
    actual discounted reward at each step, and use that to compute the error on critic. This
    error is then used to compute the loss function for both the critic and actor NN for gradient
    descent/ascent optimization.

    :param actor: The policy function NN simulation.
    :param critic: The state value function NN simulation.
    :param grid: The grid world.
    :param V: The tabular state values. For use in the plot only.
    :param start: The grid coordinates of the starting position.
    :param end: The grid coordinates of the end position.
    :param include_all: A boolean flag to determine whether all episodes, successful or failed,
        are included in the training data. Default to True.
    :param max_episodes: Maxinum episodes to use in training the critic and actor. If include_all
        is set to False, significantly more episodes might be needed overall to achieve max_episodes
        number of successful expisodes. Default to 200.
    :param max_steps: Maximum steps to take in the grid world in each episode.
        Default to 1000.
    :param actor_opt: Optimizer for the actor NN. Default to ACTOR_OPTIMIZER.
    :param critic_opt: Optimizer for the critic NN. Default to CRITIC_OPTIMIZER.
    :param gamma: Discount rate. Default to GAMMA.
    :param random_seed: To seed a random number generator.
    """
    rng = np.random.default_rng(random_seed)
    m, n = grid.shape
    all_critic_losses = []
    all_actor_losses = []
    all_steps = []
    ep = 0
    total_ep = 0
    while ep < max_episodes:
        critic_vals = []
        actor_log_probs = []
        rewards = []
        with tf.GradientTape(persistent=True) as tape:
            i, j = start
            step = 0  # record steps taken in the current episode
            while step < max_steps and (i, j) != end:
                a, aprob = next_action(i, j, actor, rng=rng)
                cval = critic(state(i, j))[0, 0]
                critic_vals.append(cval)
                actor_log_probs.append(tf.math.log(aprob))
                rewards.append(-1)  # reward is always -1.
                i, j = next_state(i, j, a, grid=grid)
                step += 1
            # Compute discounted returns
            returns = []
            discounted_return = 0
            for r in rewards[::-1]:
                discounted_return = r + gamma * discounted_return
                returns.append(discounted_return)
            returns = returns[::-1]
            # Convert to Tensors
            critic_vals = tf.convert_to_tensor(critic_vals, dtype=tf.float32)
            actor_log_probs = tf.convert_to_tensor(actor_log_probs, dtype=tf.float32)
            returns = tf.convert_to_tensor(returns, dtype=tf.float32)
            # Compute critic and actor NN loss
            advtange = returns - critic_vals
            actor_loss = -tf.math.reduce_mean(actor_log_probs * advtange)
            critic_loss = MEAN_SQUARED_ERROR(critic_vals, returns)

        # Compute gradient and update NN weights
        # However, NN weights update ONLY if the current episode is successful, or
        # it is instructed to include all expisodes
        if step < max_steps or include_all:
            actor_gradient = tape.gradient(actor_loss, actor.trainable_variables)
            actor_opt.apply_gradients(zip(actor_gradient, actor.trainable_variables))
            critic_gradient = tape.gradient(critic_loss, critic.trainable_variables)
            critic_opt.apply_gradients(zip(critic_gradient, critic.trainable_variables))
            ep += 1
            # Record losses and steps
            all_critic_losses.append(critic_loss)
            all_actor_losses.append(actor_loss)
            all_steps.append(step)
            if ep % 10 == 0:
                print(f'Episode: {ep}, critic loss: {critic_loss}, actor loss: {actor_loss}', end=' ')
                if (i, j) == end:
                    print(f'Terminal State, steps: {step}')
                else:
                    print('Max steps exhausted')
                plot(
                    f'Ep {(ep):03} Summary',
                    critic,
                    actor,
                    all_critic_losses,
                    all_actor_losses,
                    all_steps,
                    start,
                    end,
                    max_steps,
                    max_episodes,
                    grid,
                    V,
                )
        total_ep += 1
    print(f'Total episodes: {total_ep}')

# Grid World with Two Openings

In [None]:
# Train the actor critic architecture
policy_model_1 = policy_func(128, 1, len(ACTIONS))
state_value_model_1 = state_value_func(128, 1)
actor_critic_monte_carlo(
    policy_model_1,
    state_value_model_1,
    GRID,
    V,
    (0, 0),
    (GRID.shape[0] - 1, GRID.shape[1] - 1),
)
# Save trained models
policy_model_1.save('policy_model_1')
state_value_model_1.save('state_value_model_1')

# Grid World with One Opening

## Top left to bottom right

In [None]:
# Add one more block such that there is only one opening to the terminal state
GRID[7, 7] = 1

# Train the actor critic architecture
policy_model_3 = policy_func(128, 1, len(ACTIONS))
state_value_model_3 = state_value_func(128, 1)
actor_critic_monte_carlo(
    policy_model_3,
    state_value_model_3,
    GRID,
    V,
    (0, 0),
    (GRID.shape[0] - 1, GRID.shape[1] - 1),
)
# Save trained models
policy_model_3.save('policy_model_3')
state_value_model_3.save('state_value_model_3')

# Remove the block
GRID[7, 7] = 0

## Bottom right to top left

In [None]:
# Add one more block such that there is only one opening to the terminal state
GRID[7, 7] = 1

# Train the actor critic architecture
policy_model_4 = policy_func(128, 1, len(ACTIONS))
state_value_model_4 = state_value_func(128, 1)
actor_critic_monte_carlo(
    policy_model_4,
    state_value_model_4,
    GRID,
    V,
    (GRID.shape[0] - 1, GRID.shape[1] - 1),
    (0, 0),
)
# Save trained models
policy_model_4.save('policy_model_4')
state_value_model_4.save('state_value_model_4')

# Remove the block
GRID[7, 7] = 0