# Add Memory Replay to the DeepQLearning Agent from previous Unit

Add a queue `from collections import deque` to the agent to save the actions and use it to train the neural network in minibatchs

## Environment from previous unit

In [1]:
import numpy as np
from io import BytesIO
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

class MazeGameEnv():

    def __init__(self, render_mode=None):
        self.render_mode = render_mode
        self.max_steps = 100
        self.step_count = 0
        self.board = [
            ['😊', '😺', ' '],
            [' ', ' ', ' '],
            ['😺', ' ', '😍']
        ]
        self.player_pos = (0, 0)
        self.goal_pos = (2, 2)
        self.board_history = []
        self.action_space = 4  # Up, Down, Left, Right
        self.observation_space = (3, 3)


    def reset(self):
        self.step_count = 0
        self.player_pos = (0, 0)
        self.board[0][0] = '😊'
        self.board[2][2] = '😍'
        self.board_history = []

        self.render()

        observation = self.player_pos
        return observation

    def step(self, action):
        self.step_count += 1
        x, y = self.player_pos
        if action == 0 and x > 0:  # Up
            x -= 1
        elif action == 1 and x < 2:  # Down
            x += 1
        elif action == 2 and y > 0:  # Left
            y -= 1
        elif action == 3 and y < 2:  # Right
            y += 1

        self.board[self.player_pos[0]][self.player_pos[1]] = ' '
        self.player_pos = (x, y)
        self.board[x][y] = '😊'

        terminated = self.player_pos == self.goal_pos
        truncated = self.step_count >= self.max_steps
        reward = 1 if terminated else -1

        self.render()

        observation = self.player_pos
        return observation, reward, terminated, truncated


    def render(self):
      if self.render_mode == "human" or self.render_mode == "rgb_array":
        self._render_frame()

    def _render_frame(self):
      fig, ax = plt.subplots()
      ax.set_xticks(np.arange(0, 4, 1))
      ax.set_yticks(np.arange(0, 4, 1))
      ax.grid(True, color='black')

      if self.player_pos == self.goal_pos:
          self.board[self.goal_pos[0]][self.goal_pos[1]] = '😊😍'

      [[ax.text(j + 0.5, 2.5 - i, self.board[i][j], ha='center', va='center', fontsize=50, color='blue') for j in range(3)] for i in range(3)]

      buf = BytesIO()
      fig.savefig(buf, format='png')
      buf.seek(0)
      img = mpimg.imread(buf)
      self.board_history.append(img)
      buf.close()
      if self.render_mode != "human":
          plt.close(fig)
      return img

    def close(self):
        plt.close()

## Agent to implement

In [2]:
from collections import deque
import numpy as np
import matplotlib.pyplot as plt
import random
from sklearn.neural_network import MLPRegressor

class DQNAgent():
    def __init__(self, env, learning_rate=0.001, discount_factor=0.9, exploration_rate=0.9, buffer_size=1000):
        self.env = env
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate
        self.q_network = MLPRegressor(hidden_layer_sizes=(64, 64), activation='relu', solver='adam', learning_rate_init=learning_rate)
        # Initialize the neural network with random weights
        self.q_network.fit(np.random.rand(100, 2), np.random.rand(100, 4))
        self.replay_buffer = deque(maxlen=buffer_size)

    def act(self, observation):
        if random.uniform(0, 1) < self.exploration_rate:
            return random.randint(0, 3)  # Explore
        else:
            return self.predict(observation) # Exploit

    def learn(self, observation, action, reward, next_observation, done):
        self.replay_buffer.append((observation, action, reward, next_observation, done))

        # Sample a minibatch from the replay buffer
        minibatch_size = 32 # You can adjust this
        if len(self.replay_buffer) >= minibatch_size:
            minibatch = random.sample(self.replay_buffer, minibatch_size)

            for obs, act, rew, next_obs, dn in minibatch:
                obs = np.array(obs).reshape(1, -1)
                next_obs = np.array(next_obs).reshape(1, -1)
                q_values = self.q_network.predict(obs)
                next_q_values = self.q_network.predict(next_obs)

                if dn:
                    target = rew
                else:
                    target = rew + self.discount_factor * np.max(next_q_values)
                q_values[0, act] = target
                self.q_network.partial_fit(obs, q_values)

    def predict(self, observation):
        observation = np.array(observation).reshape(1, -1)
        q_values = self.q_network.predict(observation)
        return np.argmax(q_values)

## Agent training

In [3]:
from tqdm import tqdm

# Hyperparameters
learning_rate = 0.001
discount_factor = 0.9
exploration_rate = 0.9
num_episodes = 100

# Initialize environment and agent
env = MazeGameEnv()
agent = DQNAgent(env, learning_rate, discount_factor, exploration_rate)

# Training loop
for episode in tqdm(range(num_episodes)):
    observation = env.reset()  # Reset environment at the start of each episode
    terminated, truncated = False, False
    while not terminated:
        action = agent.act(observation)
        next_observation, reward, terminated, truncated = env.step(action)
        agent.learn(observation, action, reward, next_observation, terminated)
        observation = next_observation

print("Training finished.")

100%|██████████| 100/100 [01:46<00:00,  1.06s/it]

Training finished.





## Agent Evaluation

In [4]:
# Evaluate the trained agent
def evaluate_agent(agent, env, num_episodes=100):
    total_reward = 0
    for _ in tqdm(range(num_episodes)):
        observation = env.reset()
        terminated, truncated = False, False
        while not terminated and not truncated:
            action = agent.predict(observation)  # Use predict for evaluation
            observation, reward, terminated, truncated = env.step(action)
            total_reward += reward
    return total_reward / num_episodes

env = MazeGameEnv()
average_reward = evaluate_agent(agent, env)
print(f"Average reward over 100 episodes: {average_reward}")

100%|██████████| 100/100 [00:00<00:00, 1166.29it/s]

Average reward over 100 episodes: -2.0





## Trained Policy

In [5]:
# Print the trained policy in a 3 by 3 dataframe with an up, down, left or right arrow icon

import pandas as pd

# Create a dictionary to map action indices to arrow symbols
action_mapping = {
    0: "↑",  # Up
    1: "↓",  # Down
    2: "←",  # Left
    3: "→"   # Right
}

# Create the policy DataFrame
policy_data = np.empty((3, 3), dtype=str)
for r in range(3):
    for c in range(3):
        action = agent.predict((r, c))
        policy_data[r,c] = action_mapping[action]

policy_df = pd.DataFrame(policy_data)
policy_df

Unnamed: 0,0,1,2
0,↓,↓,↓
1,↓,↓,↓
2,→,→,↓


## Sample Episode

In [6]:
env = MazeGameEnv(render_mode="rgb_array")
observation = env.reset()
terminated, truncated = False, False
total_reward = 0
while not terminated and not truncated:
    action = agent.predict(observation)  # Use predict for evaluation
    observation, reward, terminated, truncated = env.step(action)
    total_reward += reward

print(f"Total reward: {total_reward}")

Total reward: -2


## Animation function from previous unit

In [7]:
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.animation import FuncAnimation
from matplotlib import rc
rc('animation', html='jshtml')

def animate_board_history(board_history):
    fig, ax = plt.subplots()
    img_plot = ax.imshow(board_history[0])  # Initial plot

    def update(frame):
        img_plot.set_data(board_history[frame])
        return img_plot,

    ani = FuncAnimation(fig, update, frames=len(board_history), interval=500, blit=True)
    plt.close(fig)
    return ani

animate_board_history(env.board_history)