# Imports 

In [1]:
import os

import random
import numpy as np
import pygame as pg
import constants as const
import matplotlib.pyplot as plt

import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from utils import message, progress_bar
from gen_maze import random_maze_generator

pygame 2.1.0 (SDL 2.0.16, Python 3.8.10)
Hello from the pygame community. https://www.pygame.org/contribute.html


  from .autonotebook import tqdm as notebook_tqdm


# Constants 

In [2]:
vec = pg.math.Vector2

N = 12
M = 12
P0 = (0, 0)
# P1 = (N-1, M-1)
P1 = (4, 0)
maze = random_maze_generator(N, M, P0, P1)

# Configuration
CONFIGURATION = np.array(maze)

# Main window
TITLE = "Maz3-DQN"
BLOCK_SIZE = 40
INFOS_WIDTH = 330
INFOS_HEIGHT = CONFIGURATION.shape[0] * BLOCK_SIZE
PLAY_WIDTH = CONFIGURATION.shape[1] * BLOCK_SIZE
PLAY_HEIGHT = INFOS_HEIGHT
TOTAL_WIDTH = INFOS_WIDTH + PLAY_WIDTH
TOTAL_HEIGHT = INFO_HEIGHT = PLAY_HEIGHT
FPS = 40

# Colors
GRID_COLOR = (40, 40, 40)
BACKGROUND_COLOR = (30, 30, 30)
FREE_CELL_COLOR = (220, 220, 220)
OCCUPIED_CELL_COLOR = (70, 70, 70)
VISITED_CELL_COLOR = pg.Color("Yellow")
PLAYER_COLOR = pg.Color("Red")
TARGET_COLOR = pg.Color("Green")
SEP_LINE_COLOR = (60, 60, 60)
INFOS_COLOR = (255, 255, 255)
PROGRESS_BAR_BACKGROUND = (25, 25, 25)
PROGRESS_BAR_THRESH_FOREGROUND = (186, 3, 252)
PROGRESS_BAR_EXPLO_FOREGROUND = (248, 165, 0)
PROGRESS_BAR_EXPLOIT_FOREGROUND = (0, 211, 225)

# Miscs
INFOS_SIZE = 20
Y_OFFSET_INFOS = 25
PROGRESS_BAR_WIDTH = 25

# DeepQNetwork

In [3]:
class DeepQNetwork(nn.Module):
    def __init__(self, lr, input_dims, fc1_dims, fc2_dims, n_actions) -> None:
        super(DeepQNetwork, self).__init__()
        self.input_dims = input_dims
        self.fc1_dims = fc1_dims
        self.fc2_dims = fc2_dims
        self.n_actions = n_actions

        # layers
        self.fc1 = nn.Linear(*self.input_dims, self.fc1_dims)  # inputs
        self.fc2 = nn.Linear(self.fc1_dims, self.fc2_dims)  # hidden
        self.fc3 = nn.Linear(self.fc2_dims, self.n_actions)  # outputs

        self.optimizer = optim.Adam(self.parameters(), lr=lr)
        self.loss = nn.MSELoss()
        self.device = T.device("cuda:0" if T.cuda.is_available() else "cpu")
        self.to(self.device)

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        actions = self.fc3(x)

        return actions

# Agent 

In [4]:
class Agent:
    def __init__(
        self,
        gamma,
        epsilon,
        lr,
        input_dims,
        batch_size,
        n_actions,
        min_epsilon=0.01,
        epsilon_decay=1e-4,
        max_mem_size=100_000,
    ) -> None:
        
        self.gamma = gamma
        self.epsilon = epsilon
        self.max_epsilon = epsilon
        self.min_epsilon = min_epsilon
        self.epsilon_decay = epsilon_decay
        self.lr = lr
        self.batch_size = batch_size
        self.mem_cntr = 0
        self.mem_size = max_mem_size
        self.epsilon_decay = epsilon_decay
        self.action_space = [i for i in range(n_actions)]
        
        self.current_decision = None
        self.last_decision = None
        self.n_exploration = 0
        self.n_exploitation = 0

        self.Q_eval = DeepQNetwork(
            self.lr,
            n_actions=n_actions,
            input_dims=input_dims,
            fc1_dims=256,
            fc2_dims=256,
        )
        
        self.state_memory = np.zeros((self.mem_size, *input_dims), dtype=np.float32)
        self.new_state_memory = np.zeros((self.mem_size, *input_dims), dtype=np.float32)
        self.action_memory = np.zeros(self.mem_size, dtype=np.int32)
        self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)
        self.terminal_memory = np.zeros(self.mem_size, dtype=np.bool)

    def store_transitions(self, state, action, reward, new_state, terminal):
        index = self.mem_cntr % self.mem_size  # wrapping around
        
        self.state_memory[index] = state
        self.new_state_memory[index] = new_state
        self.reward_memory[index] = reward
        self.action_memory[index] = action
        self.terminal_memory[index] = terminal

        self.mem_cntr += 1

    def choose_action(self, observation):
        if np.random.random() > self.epsilon:
            self.n_exploitation += 1
            self.current_decision = "exploitation"
            state = T.tensor([observation]).to(self.Q_eval.device)
            actions = self.Q_eval.forward(state)
            action = T.argmax(actions).item()
        else:
            self.n_exploration += 1
            self.current_decision = "exploration"
            action = np.random.choice(self.action_space)

        return action

    def learn(self):
        if self.mem_cntr < self.batch_size:
            return None

        self.Q_eval.optimizer.zero_grad()

        max_mem = min(self.mem_cntr, self.mem_size)
        batch = np.random.choice(max_mem, self.batch_size, replace=False)
        batch_index = np.arange(self.batch_size, dtype=np.int32)

        # np array -> pytorch tensor
        state_batch = T.tensor(self.state_memory[batch]).to(self.Q_eval.device)
        new_state_batch = T.tensor(self.new_state_memory[batch]).to(self.Q_eval.device)
        reward_batch = T.tensor(self.reward_memory[batch]).to(self.Q_eval.device)
        terminal_batch = T.tensor(self.terminal_memory[batch]).to(self.Q_eval.device)

        action_batch = self.action_memory[batch]

        q_eval = self.Q_eval.forward(state_batch)[batch_index, action_batch]
        q_next = self.Q_eval.forward(new_state_batch)
        q_next[terminal_batch] = 0.0

        q_target = reward_batch + self.gamma * T.max(q_next, dim=1)[0]

        loss = self.Q_eval.loss(q_target, q_eval).to(self.Q_eval.device)
        loss.backward()
        self.Q_eval.optimizer.step()

        if self.epsilon > self.min_epsilon:
            self.epsilon = self.epsilon - self.epsilon_decay
        else:
            self.epsilon = self.min_epsilon

# Environment 

In [5]:
# Manually places the window
os.environ["SDL_VIDEO_WINDOW_POS"] = "%d,%d" % (50, 50)

STATE_SPACE = 2
ACTION_SPACE = 4

REWARD_EXIT = 1.0
REWARD_FREE = 0.5

PENALTY_WANDER = -1
PENALTY_OCCUPIED = -0.75
PENALTY_OUT = -0.8
PENALTY_VISITED = -0.5

THRESHOLD_REWARD = -2 * const.CONFIGURATION.size

CELL_COLORS = {
    0: const.OCCUPIED_CELL_COLOR,
    1: const.FREE_CELL_COLOR,
    2: const.VISITED_CELL_COLOR,
    3: const.PLAYER_COLOR,
    4: const.TARGET_COLOR,
}


class Game:
    def __init__(self, human=False, grid=False, infos=True, progress_bars=True) -> None:
        pg.init()
        self.human = human
        self.grid = grid
        self.infos = infos
        self.progress_bars = progress_bars
        self.screen = pg.display.set_mode([const.TOTAL_WIDTH, const.TOTAL_HEIGHT])
        self.clock = pg.time.Clock()
        self.running = True

        pg.display.set_caption(const.TITLE)

        self.state_space = STATE_SPACE
        self.action_space = ACTION_SPACE

        self.score = 0
        self.n_episode = 0
        self.rewards = [0]

    ####### Methods #######

    def reset(self) -> np.array:
        """Resets the game and return its corresponding state."""
        self.reward_episode = 0
        self.maze = const.CONFIGURATION.copy()
        self.place_player()

        return self.get_state()

    def place_player(self) -> None:
        while True:
            i = np.random.randint(self.maze.shape[0])
            j = np.random.randint(self.maze.shape[1])

            if self.maze[i, j] != 1:
                continue

            self.maze[i, j] = 3
            self.position = (i, j)
            break

    def move(self, action) -> None:
        """
        Moves player according to the action chosen by the model.

        args:
            action (int, required): action chosen by the human/agent to move the player
        """
        self.exit = self.visited = self.out = self.occupied = False
        i, j = self.position
        old_position = i, j

        if action == 0:
            j += 1
        elif action == 1:
            j -= 1
        elif action == 2:
            i -= 1
        elif action == 3:
            i += 1

        # move out of bounds
        if i == self.maze.shape[0] or j == self.maze.shape[1] or j < 0 or i < 0:
            self.out = True
        # move to free/visited cell
        elif self.maze[i, j] in (1, 2):
            if self.maze[i, j] == 2:
                self.visited = True

            self.maze[old_position] = 2
            self.maze[i, j] = 3
            self.position = i, j
        # trying to move to an occupied cell
        elif self.maze[i, j] == 0:
            self.occupied = True
        # move to exit (win)
        elif self.maze[i, j] == 4:
            self.position = i, j
            self.exit = True

    def step(self, action):
        self.events()
        self.move(action)

        reward, done = self.get_reward()
        self.reward_episode += reward

        return self.get_state(), reward, done, False

    def get_state(self) -> np.ndarray:
        """Returns the current state of the game, i.e. player's current position."""
        state = [
            self.position[0],
            self.position[1],
        ]

        return np.array(state, dtype=np.float32)

    def get_reward(self) -> tuple:

        # stops episode if the player does nothing but wonder around
        if self.reward_episode < THRESHOLD_REWARD:
            return PENALTY_WANDER, True
        # player moves out of bounds
        elif self.out:
            return PENALTY_OUT, False
        # player moves to a visited cell
        elif self.visited:
            return PENALTY_VISITED, False
        elif self.occupied:
            return PENALTY_OCCUPIED, False
        # player finds the exit
        elif self.exit:
            self.score += 1
            return REWARD_EXIT, True

        # player moves to a free cell
        return REWARD_FREE, False

    def get_values_neighbours(self):
        offsets = [(1, 0), (-1, 0), (0, 1), (0, -1)]
        values_neighbours = []

        for offset in offsets:
            i, j = self.position[0], self.position[1]
            i, j = i + offset[0], j + offset[1]
            if 0 <= i < self.maze.shape[0] and 0 <= j < self.maze.shape[1]:
                values_neighbours.append(self.maze[i, j])
            else:
                values_neighbours.append(-1)  # out of bounds

        return values_neighbours

    def events(self):
        for event in pg.event.get():
            if (
                event.type == pg.QUIT
                or event.type == pg.KEYDOWN
                and event.key == pg.K_q
            ):
                self.running = False

    def get_data_ratios(self, agent):
        r_exploration = agent.n_exploration / (
            agent.n_exploration + agent.n_exploitation
        )
        r_exploitation = agent.n_exploitation / (
            agent.n_exploration + agent.n_exploitation
        )
        r_threshold = self.reward_episode / THRESHOLD_REWARD

        return r_exploration, r_exploitation, r_threshold

    def render(self, agent):
        """TODO"""

        self.screen.fill(const.BACKGROUND_COLOR)
        data_ratios = self.get_data_ratios(agent)

        for i in range(self.maze.shape[0]):
            for j in range(self.maze.shape[1]):
                value = self.maze[i, j]
                color = CELL_COLORS[value]
                x, y = const.INFOS_WIDTH + j * const.BLOCK_SIZE, i * const.BLOCK_SIZE
                w, h = const.BLOCK_SIZE, const.BLOCK_SIZE

                pg.draw.rect(self.screen, color, (x, y, w, h))

        if self.grid:
            self.draw_grid()
        if self.infos:
            self.draw_infos(agent, *data_ratios)
        if self.progress_bars:
            self.draw_progress_bars(*data_ratios)

        pg.display.flip()
        self.clock.tick(const.FPS)

    def draw_entities(self):
        """TODO"""
        self.player.draw(self.screen)
        self.food.draw(self.screen)

        for enemy in self.enemies:
            enemy.draw(self.screen)

    def draw_grid(self):
        """TODO"""
        for i in range(1, const.PLAY_WIDTH // const.BLOCK_SIZE):
            # vertical lines
            p_v1 = const.INFOS_WIDTH + const.BLOCK_SIZE * i, 0
            p_v2 = const.INFOS_WIDTH + const.BLOCK_SIZE * i, const.PLAY_HEIGHT

            # horizontal lines
            p_h1 = const.INFOS_WIDTH, const.BLOCK_SIZE * i
            p_h2 = const.TOTAL_WIDTH, const.BLOCK_SIZE * i

            pg.draw.line(self.screen, const.GRID_COLOR, p_v1, p_v2)
            pg.draw.line(self.screen, const.GRID_COLOR, p_h1, p_h2)

    def draw_infos(self, agent, r_exploration, r_exploitation, r_threshold):
        """Draws game informations"""

        infos = [
            f"Score: {self.score}",
            f"Episode: {self.n_episode}",
            f"Episode reward: {round(self.reward_episode, 1)}",
            f"Mean reward: {round(np.mean(self.rewards), 1)}",
            f"Initial Epsilon: {agent.max_epsilon}",
            f"Epsilon: {round(agent.epsilon, 4)}",
            f"Epsilon decay: {agent.epsilon_decay}",
            f"Exploration: {round(r_exploration * 100, 3)}%",
            f"Exploitation: {round(r_exploitation * 100, 3)}%",
            f"Last decision: {agent.last_decision}",
            f"Reward threshold: {int(r_threshold * 100)}%",
            f"Time: {int(pg.time.get_ticks() / 1e3)}s",
            f"FPS: {int(self.clock.get_fps())}",
        ]

        # Drawing infos
        for i, info in enumerate(infos):
            message(
                self.screen,
                info,
                const.INFOS_SIZE,
                const.INFOS_COLOR,
                (5, 5 + i * const.Y_OFFSET_INFOS),
            )

    def draw_progress_bars(self, r_exploration, r_exploitation, r_threshold):
        x_thresh, y = const.INFOS_WIDTH - const.PROGRESS_BAR_WIDTH - 6, 0
        x_explo, y = const.INFOS_WIDTH - 2 * const.PROGRESS_BAR_WIDTH - 11, 0
        x_exploit, y = const.INFOS_WIDTH - 3 * const.PROGRESS_BAR_WIDTH - 16, 0

        w_bg, h_bg = const.PROGRESS_BAR_WIDTH, const.TOTAL_HEIGHT

        w_fg, h_fg_thresh = w_bg, r_threshold * h_bg
        w_fg, h_fg_explo = w_bg, r_exploration * h_bg
        w_fg, h_fg_exploit = w_bg, r_exploitation * h_bg

        # reward threshold
        progress_bar(
            self.screen, x_thresh, y, w_bg, h_bg, w_fg, h_fg_thresh,
            const.PROGRESS_BAR_BACKGROUND, const.PROGRESS_BAR_THRESH_FOREGROUND,
        )
        message(
            self.screen, "Reward threshold",
            const.INFOS_SIZE,
            const.PROGRESS_BAR_THRESH_FOREGROUND,
            (x_thresh + const.PROGRESS_BAR_WIDTH // 2 + 1, h_bg // 2),
             anchor="center", rotation=90
        )

        # exploration
        progress_bar(
            self.screen, x_explo, y, w_bg, h_bg, w_fg, h_fg_explo,
            const.PROGRESS_BAR_BACKGROUND, const.PROGRESS_BAR_EXPLO_FOREGROUND,
        )
        message(
            self.screen, "Exploration",
            const.INFOS_SIZE,
            const.PROGRESS_BAR_EXPLO_FOREGROUND,
            (x_explo + const.PROGRESS_BAR_WIDTH // 2 + 1, h_bg // 2),
             anchor="center", rotation=90
        )

        # exploitation
        progress_bar(
            self.screen, x_exploit, y, w_bg, h_bg, w_fg, h_fg_exploit,
            const.PROGRESS_BAR_BACKGROUND, const.PROGRESS_BAR_EXPLOIT_FOREGROUND,
        )
        message(
            self.screen, "Exploitation",
            const.INFOS_SIZE,
            const.PROGRESS_BAR_EXPLOIT_FOREGROUND,
            (x_exploit + const.PROGRESS_BAR_WIDTH // 2 + 1, h_bg // 2),
            anchor="center", rotation=90
        )


def set_global_seed(seed: int) -> None:
    """
    Sets random seed into PyTorch, numpy and random.

    Args:
        seed: random seed
    """

    try:
        import torch
    except ImportError:
        print("Module PyTorch cannot be imported")
        pass
    else:
        torch.manual_seed(seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

    random.seed(seed)
    np.random.seed(seed)

# main 

In [None]:
env = Game(human=True, grid=True, infos=True, progress_bars=True)
agent = Agent(
    gamma=0.9,
    epsilon=1.0,
    batch_size=64,
    n_actions=env.action_space,
    min_epsilon=0.01,
    input_dims=[env.state_space],
    lr=0.001,
)

while env.running:
    score = 0
    done = False
    state = env.reset()

    while not done:
        if not env.running:
            break

        action = agent.choose_action(state)
        new_state, reward, done, info = env.step(action)
        agent.store_transitions(state, action, reward, new_state, done)
        agent.learn()
        state = new_state
        env.render(agent)

    agent.last_decision = agent.current_decision
    agent.n_exploration = 0
    agent.n_exploitation = 0

    env.rewards.append(env.reward_episode)
    env.n_episode += 1

pg.quit()

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  self.terminal_memory = np.zeros(self.mem_size, dtype=np.bool)
  state = T.tensor([observation]).to(self.Q_eval.device)
