In [None]:
# Import the necessary libraries
import numpy as np
import random

from IPython import display
import matplotlib.pyplot as plt
from matplotlib import animation
from seaborn import heatmap
from scipy.ndimage import gaussian_filter1d

# Import the Environment class from the envi module
from envi import Environment

# Define the actions that the agent can take
ACTIONS = {'UP': 0, 'LEFT': 1, 'DOWN': 2, 'RIGHT': 3}

# Define the size of the gridworld
MAP_SIZE = 50

# Define the number of episodes to train for
EPISODES = 10_000



In [None]:
class Agent:
    """
    This class defines our Agent which will interact with the environment and update its Q Table
    """
    
    def __init__(self):
        # Initialize the Q Table for the agent with zeros
        self.q_table = np.zeros((MAP_SIZE ** 2, len(ACTIONS)))
    
    def greedy_action(self, state):
        """
        This method picks the most valuable action for the given state from the Q Table
        """
        # Return the action that has the highest value in the Q Table for the given state
        return np.argmax(self.q_table[state])
    
    def update_q_table(self, new_state, state, action, reward):
        """
        This method updates the Q Table
        """
        # Estimate the optimal future value of the new state
        estimate_of_optimal_future_value = max(self.q_table[new_state])
        
        # Calculate the new value for the given state and action
        new_value = reward + 0.99 * estimate_of_optimal_future_value
        
        # Calculate the temporal difference between the old and new values
        temporal_difference = new_value - self.q_table[state,action]
        
        # Update the Q Table for the given state and action
        self.q_table[state,action] += 0.05 * temporal_difference
        
        # Return the absolute value of the temporal difference
        return abs(temporal_difference)


This code defines the Agent class, which represents an agent that interacts with an environment and updates its Q-table based on its experiences. The Agent class has three methods:

**`__init__`**: This method initializes the Agent and sets the q_table to a 2D numpy array of zeros with dimensions MAP_SIZE ** 2 by len(ACTIONS).\
**`greedy_action`**: This method picks the action with the highest value in the Q-table for a given state.\
**`update_q_table`**: This method updates the Q-table for a given state and action, based on the reward obtained and the estimated optimal future value of the new state. It also returns the absolute value of the temporal difference between the old and new values.

In [None]:
# Create an environment and an agent
from collections import deque


env = Environment(MAP_SIZE, ACTIONS)
agent = Agent()

# Initialize empty lists for rewards and losses
recent_rewards = deque(maxlen=1_000)
train_rewards = []
train_loss = []

# Initialize the exploration rate
epsilon = 1

# Iterate over the number of episodes
for episode in range(EPISODES):
    # Reset the environment to get the initial state
    state = env.reset()

    # Initialize empty lists for rewards and losses in this episode
    episode_reward = []
    episode_loss = []

    # Iterate over the time steps in the episode
    for i in range(1000):
        # Decrease the exploration rate over time
        epsilon = max(epsilon * 0.995, 0.05)
        # Choose an action
        greed = random.random()
        if greed > epsilon:
            # Choose the greedy action
            action = agent.greedy_action(state)
        else:
            # Choose a random action
            action = random.randint(0, len(ACTIONS) - 1)

        # Interact with the environment to get the new state, reward, and done flag
        new_state, reward, done = env.step(action)
        episode_reward.append(reward)

        # Update the Q-table and get the loss
        loss = agent.update_q_table(new_state, state, action, reward)        
        episode_loss.append(loss)

        # Set the new state as the current state
        state = new_state

        # If the episode is done, break out of the loop
        if done is True:
            break
    
    # Log the rewards and losses for this episode
    train_rewards.append(np.sum(episode_reward))
    recent_rewards.append(train_rewards[-1])
    train_loss.append(np.mean(episode_loss))

    # Print a table of information about the episode every 5,000 episodes
    if episode % 1_000 == 0:
        print(f"Episode {episode:>6}: \tR:{np.mean(recent_rewards):>6.3f}\t Epsilon:{epsilon:>6.3f}\t State:{state:>6}")

# Reset the environment to get the initial state
state = env.reset()



This code creates an Environment object and an Agent object. It then runs a number of episodes, in which the agent interacts with the environment, updates its Q-table, and logs the rewards and losses. The exploration rate is decreased over time to encourage the agent to exploit its knowledge of the environment rather than explore it. After the episodes are run, the environment is reset to its initial state.

In [None]:
fig, ax = plt.subplots(1,2)

# plotting rewards
ax[0].plot(gaussian_filter1d(train_rewards, sigma=10))
ax[0].set_title('Rewards')
# plotting loss
ax[1].plot(gaussian_filter1d(train_loss, sigma=10), color='red')
ax[1].set_title('Loss')
# show figure
fig.show()

In [None]:
# Extract the optimal actions from the Q-table
best_actions = [np.argmax(x) if np.mean(x) != x[0] else -1 for x in agent.q_table]

# Initialize a matrix for the policy
policy = np.zeros((MAP_SIZE ** 2, len(ACTIONS)))

# Fill in the policy matrix
for y in range(MAP_SIZE ** 2):
    for x in range(MAP_SIZE):
        if x == best_actions[y]:
            policy[y][x] = 1

# Create a figure with two subplots
fig, ax = plt.subplots(1,2)

# Plot the policy matrix as a heatmap
heatmap(policy, ax=ax[0], xticklabels=ACTIONS, cbar=False)

# Plot the Q-table as a heatmap
heatmap(agent.q_table, ax=ax[1], xticklabels=ACTIONS, annot=MAP_SIZE<6)

# Show the figure
fig.show()

This code extracts the optimal actions from the Q-table and uses them to create a matrix representing the policy. It then plots the policy matrix and the Q-table as heatmaps. The policy matrix shows which actions are optimal in which states, while the Q-table shows the values of the actions in each state.

In [None]:
# Tell matplotlib to show the plots inline in the notebook
%matplotlib inline

# Set the path to the FFmpeg binary
plt.rcParams['animation.ffmpeg_path'] = 'ffmpeg'

# Initialize an empty list of frames
frames = []

# Create a figure with a single subplot
fig, ax = plt.subplots()

# Iterate over the time steps in the episode
for i in range(1000):
    # Add the current state of the environment to the list of frames
    frames.append([ax.imshow(env.graphic(), animated=True)])

    # Choose the greedy action for the current state
    action = agent.greedy_action(state)

    # Interact with the environment to get the new state, reward, and done flag
    new_state, reward, done = env.step(action)

    # Set the new state as the current state
    state = new_state

    # If the episode is done, reset the environment and break out of the loop
    if done is True:
        frames.append([ax.imshow(env.graphic(), animated=True)])
        state = env.reset()
        break

# Create an animation from the list of frames
ani = animation.ArtistAnimation(fig, frames, interval=50, blit=True)

# Convert the animation to an HTML5 video
video = ani.to_html5_video()

# Display the video in the notebook
html = display.HTML(video)
display.display(html)


This code creates an animation that shows the agent interacting with the environment and following the optimal policy learned from the Q-table. It iterates over the time steps in the episode, chooses the greedy action for the current state, and updates the environment. When the episode is done, it resets the environment and breaks out of the loop. The frames are then used to create an animation, which is converted to an HTML5 video and displayed in the notebook.