In [None]:
%matplotlib widget
import numpy as np
import random


class Agent:
    def __init__(self, idx, all_states, actions):
        # Agent property (for illustration purposes)
        self.is_having_item = False

        self.actions = actions  # TODO: encode different action for different state. How to initialize Q-Table
        self.idx = idx

        # Initialize Q Table for all state-action to be 0
        self.Q = np.zeros((all_states, len(actions)))
        # for state in all_states:
        # self.Q[state] = [0 for i in actions]

        # Initialize Learning param
        self.epsilon = 1
        self.epsilon_decay = 0.995
        self.epsilon_min = -1
        self.gamma = 0.8
        self.alpha = 0.1

    # ----- Core Functions ----- #
    def choose_action(self, state, explore=True):
        if explore and np.random.rand() < self.epsilon:
            return random.choice(self.actions)
        else:
            # Extract immutable state information
            state_i = self.massage(state)
            return self.actions[np.argmax(self.Q[state_i])]

    def update_learn(self, state, action, reward, next_state, is_terminal, learn=True):
        self.update(next_state)

        # Extract immutable state information
        state_i = self.massage(state)
        nxt_state_i = self.massage(next_state)

        if not learn:
            return

        # All states (including terminal states) have initial Q-values of 0 and thus there is no need for branching for handling terminal next state
        self.Q[state_i][self.actions.index(action)] += self.alpha * (
            reward
            + self.gamma * np.max(self.Q[nxt_state_i])
            - self.Q[state_i][self.actions.index(action)]
        )

        # Epsilon decay
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

    # ----- Public Functions ----- #
    def has_item(self):
        return self.is_having_item

    def update(self, state):
        self.is_having_item = state.has_item()

    def reset(self):
        self.is_having_item = False

    # ----- Private Functions ----- #
    # Extract immutable information from State object
    def massage(self, state):
        return state.extract_state(self.idx)

    def get_q_table(self):
        return self.Q


In [None]:
class Empty:
    def __init__(self, pos):
        x, y = pos
        self.x = x
        self.y = y

    def interact(self, other: Agent):
        return -1, (self.x, self.y)

    def __copy__(self):
        return Empty((self.x, self.y))

    def __deepcopy__(self, memo):
        return self.__copy__()


class Goal(Empty):
    def __init__(self, pos):
        x, y = pos
        self.x = x
        self.y = y
        self.reached = False

    def interact(self, other: Agent):
        if other.has_item() and not self.reached:
            self.reached = True
            return 50, (self.x, self.y)
        else:
            return -1, (self.x, self.y)

    def has_reached(self):
        return self.reached

    def __copy__(self):
        copy = Goal((self.x, self.y))
        copy.reached = self.reached
        return copy

    def __deepcopy__(self, memo):
        return self.__copy__()


class Item(Empty):
    def __init__(self, pos):
        self.taken = False
        x, y = pos
        self.x = x
        self.y = y

    def interact(self, other: Agent):
        if not self.taken and not other.has_item():
            self.taken = True
            return 50, (self.x, self.y)

        return -1, (self.x, self.y)

    def get_pos(self):
        return self.x, self.y

    def __copy__(self):
        copy = Item((self.x, self.y))
        copy.taken = self.taken
        return copy

    def __deepcopy__(self, memo):
        return self.__copy__()


class Wall(Empty):
    def __init__(self, pos, dimensions):
        x, y = pos
        self.x = x
        self.y = y

        width, height = dimensions
        self.new_x = min(width - 1, max(0, x))
        self.new_y = min(height - 1, max(0, y))

    def interact(self, other: Agent):
        return -10, (self.new_x, self.new_y)

    def __copy__(self):
        return Wall((self.x, self.y))

    def __deepcopy__(self, memo):
        return self.__copy__()


In [None]:
from multiprocessing import Array


class Controller(object):
    # Iterate by number of games
    def __init__(self, game, max_itr):
        self.game = game
        self.timeout = 0.5
        self.auto_reset = True
        self.itr = 0
        self.max_itr = max_itr

        self.iterations = Array("i", range(max_itr))
        self.losses = Array("i", max_itr)
        self.epsilon = Array("f", max_itr)

        self.test_loss = Array("f", max_itr)

    def get_info(self):
        info = self.game.get_agent_info()
        items = self.game.get_untaken_items()
        tot_reward = self.game.get_total_reward()
        max_reward = self.game.get_max_reward()
        return info, items, tot_reward, max_reward

    def set_timeout(self, timeout):
        self.timeout = timeout

    def toggle_auto_reset(self):
        self.auto_reset = not self.auto_reset
        return self.auto_reset

    def next(self):
        if self.game.has_ended() and self.auto_reset:
            self.game.reset()
        self.game.step(learn=False)
        return self.get_info()

    def train(self, itr=1):
        self.game.reset()
        for _ in range(itr):
            (
                loss,
                reward,
                epsilon,
            ) = self.game.train_one_game()
            if self.itr >= self.max_itr:
                self.itr = 0
            self.losses[self.itr] = loss
            self.epsilon[self.itr] = epsilon
            self.itr += 1

    def test(self, itr=1):
        self.game.reset()
        for i in range(self.max_itr):
            self.test_loss[i] = 0
        for _ in range(itr):
            (
                loss,
                reward,
                epsilon,
            ) = self.game.train_one_game(learn=False)
            if self.itr >= self.max_itr:
                self.itr = 0
            self.test_loss[self.itr] = loss
            self.itr += 1

    def get_metrics(self):
        return self.iterations, self.losses, self.epsilon


In [None]:
import itertools
from copy import deepcopy

class Action:
    NORTH = "N"
    WEST = "W"
    EAST = "E"
    SOUTH = "S"


class State:
    def __init__(self, agent_positions, lookup):
        self.agent_positions = agent_positions
        self.lookup = deepcopy(lookup)

    def get_possible_actions():
        # Generate possible actions
        return [Action.NORTH, Action.SOUTH, Action.EAST, Action.WEST]

    def get_possible_states(width, height):
        # Generate all possible states
        return 5**5
        positions = [(x, y) for x in range(width) for y in range(height)]
        has_items = [True, False]
        return itertools.product(positions, positions, has_items)

    # ----- Private Functions ----- #
    def get_goal(self):
        return next((x for x in self.lookup if isinstance(x, Goal)), [None])

    def get_items(self):
        return [x for x in self.lookup if isinstance(x, Item)]

    def get_item_positions(self):
        return [item.get_pos() for item in self.get_items()]

    def has_item(self):
        item = next((x for x in self.lookup if isinstance(x, Item)), [None])
        return item.taken

    def extract_state(self, idx):
        x, y = self.agent_positions[idx]
        x2, y2 = self.get_item_positions()[0]
        # TODO: remove hardcoded item_pos indices
        # return agent_pos, item_pos[0], self.has_item()
        return (
            x * (5**4)
            + y * (5**3)
            + x2 * (5**2)
            + y2 * (5)
            + (1 if self.has_item() else 0)
        )

    # ----- Information Extraction ----- #
    def get_agent_positions(self):
        return self.agent_positions

    def get_goal_positions(self):
        goal = self.get_goal()
        return goal.x, goal.y

    def get_item_positions(self):
        return [item.get_pos() for item in self.get_items()]

    def is_terminal(self):
        goal = self.get_goal()
        return goal.has_reached()

    def get_untaken_item_pos(self):
        untaken_items = filter(lambda i: not i.taken, self.get_items())
        return [i.get_pos() for i in untaken_items]


In [None]:
import matplotlib.pyplot as plt

from typing import List, Tuple

class Game:
    def __init__(self):
        # Parameters
        self.width = 5
        self.height = 5
        # Metrics
        self.total_reward = 0

        # Agents
        self.agent = [
            Agent(
                idx,
                State.get_possible_states(self.width, self.height),
                State.get_possible_actions(),
            )
            for idx in range(1)
        ]

        # Grid
        self.grid = Grid(self.width, self.height)
        self.grid.add_agents(self.agent)
        self.reset()

    def train_one_game(self, learn=True):
        self.reset()
        self.total_reward = 0
        max_reward = GridUtil.calculate_max_reward(self.grid)

        max_step_count = 10000 if learn else 100
        step_count = 0
        while not self.grid.get_state().is_terminal() and step_count < max_step_count:
            self.step(learn)
            step_count += 1

        loss = max_reward - self.total_reward
        return loss, self.total_reward, self.agent[0].epsilon

    # ---- Public Getter Functions (For Visualisation) ----- #

    def get_agent_info(self) -> List[Tuple[Tuple[int, int], bool]]:
        """
        Output: List of
                - Tuple of:
                    - coordinate: (int, int)
                    - has_item: bool
        """
        has_items = map(lambda agent: agent.has_item(), self.agent)
        return list(zip(self.grid.get_state().get_agent_positions(), has_items))

    def get_untaken_items(self):
        return self.grid.get_state().get_untaken_item_pos()

    def get_max_reward(self):
        return self.max_reward

    def get_size(self):
        return self.width, self.height

    def get_target_location(self):
        return self.grid.get_state().get_goal_positions()

    def has_ended(self):
        return self.grid.get_state().is_terminal()

    def get_total_reward(self):
        return self.total_reward

    # ---- Public Control Functions ----- #
    def reset(self):
        self.total_reward = 0
        self.grid.reset()
        for agent in self.agent:
            agent.reset()
        self.max_reward = GridUtil.calculate_max_reward(self.grid)

    def step(self, learn=True):
        if self.grid.get_state().is_terminal():
            return
        state = self.grid.get_state()

        actions = [agent.choose_action(state, explore=learn) for agent in self.agent]
        results = self.grid.move(actions)

        for action, (reward, next_state, terminal), agent in zip(
            actions, results, self.agent
        ):
            self.total_reward += reward
            if learn:
                agent.update_learn(state, action, reward, next_state, terminal)
            else:
                agent.update(next_state)


In [None]:
import matplotlib.animation as animation
import matplotlib.pyplot as plt

from multiprocessing import Process


class Graph:
    def __init__(self, controller, fig, axs):
        self.controller = controller
        self.fig = fig
        self.ax1, self.ax2 = axs

        self.controller = controller
        self.ani = animation.FuncAnimation(
            self.fig, self.draw, frames=self.frames, interval=100, save_count=100
        )

        plt.show()

    def frames(self):
        while True:
            yield None

    def draw(self, args):
        self.plot_losses(
            self.ax1,
            self.controller.iterations,
            self.controller.losses,
        )
        self.plot_epsilon(
            self.ax2,
            self.controller.iterations,
            self.controller.epsilon,
        )

    def plot_losses(self, ax, iterations, loss):
        # Plotting the loss in the first subplot
        ax.plot(iterations, loss, color="blue", label="Loss")
        ax.set_title("Loss")
        ax.set_xlabel("Iteration")
        ax.set_ylabel("Loss")

    def plot_epsilon(self, ax, iterations, epsilon):
        # Plotting the loss in the first subplot
        ax.plot(iterations, epsilon, color="blue", label="Loss")
        ax.set_title("Epsilon")
        ax.set_xlabel("Iteration")
        ax.set_ylabel("Epsilon")


class TestGraph:
    def __init__(self, controller, fig, ax):
        self.controller = controller
        self.fig = fig
        self.ax = ax

        self.controller = controller
        self.ani = animation.FuncAnimation(
            self.fig, self.draw, frames=self.frames, interval=100, save_count=100
        )

        plt.show()

    def frames(self):
        while True:
            yield None

    def draw(self, args):
        self.plot_losses(
            self.ax,
            self.controller.iterations,
            self.controller.test_loss,
        )

    def plot_losses(self, ax, iterations, loss):
        # Plotting the loss in the first subplot
        ax.plot(iterations, loss, color="blue", label="Loss")
        ax.set_title("Loss")
        ax.set_xlabel("Iteration")
        ax.set_ylabel("Loss")


In [None]:
import random


class GridFactory:
    # Getting a random location in a grid, excluding certain locations
    def get_random_pos(width, height, exclude=[]):
        while True:
            position = (
                random.randint(0, width - 1),
                random.randint(0, height - 1),
            )
            if position not in exclude:
                return position


class Grid:
    def __init__(self, width=5, height=5):
        self.width = width
        self.height = height

        self.state = {}  # TODO: multiple entities in one cell
        self.lookup = set()  # Interactive tiles
        self.agents = []
        self.agent_positions = []

        self.init_environment()

    # ----- Init Functions ----- #
    def init_environment(self):
        for x in range(-1, self.width + 1):
            for y in range(-1, self.height + 1):
                if x < 0 or x >= self.width:
                    self.state[(x, y)] = Wall((x, y), (self.width, self.height))
                elif y < 0 or y >= self.height:
                    self.state[(x, y)] = Wall((x, y), (self.width, self.height))
                else:
                    self.state[(x, y)] = Empty((x, y))

    # ----- Core Functions ----- #
    def move(self, actions):  # List of actions, in the same order as self.agents
        # Update agent to temporary location according to move
        temp_positions = [
            self.process_action(action, agent_pos)
            for action, agent_pos in zip(actions, self.agent_positions)
        ]

        # Retreive reward and new location according to Entity.interaction
        reward_new_positions = [
            self.state[(x, y)].interact(agent)
            for agent, (x, y) in zip(self.agents, temp_positions)
        ]
        rewards, new_positions = zip(*reward_new_positions)

        # Update new positions
        self.agent_positions = new_positions

        # Return move results, in the same order as self.agents
        return [
            (reward, self.get_state(), self.get_state().is_terminal())
            for reward in rewards
        ]

    # ----- Private Functions ----- #
    def process_action(self, action, agent_position):
        # Move according to action
        x, y = agent_position
        dx, dy = self.interpret_action(action)
        return x + dx, y + dy

    def interpret_action(self, action):
        if action == Action.NORTH:
            return 0, -1
        if action == Action.SOUTH:
            return 0, 1
        if action == Action.EAST:
            return 1, 0
        if action == Action.WEST:
            return -1, 0

    def set_interactive_tiles(self):
        self.lookup.clear()
        used_pos = []

        # TODO: extract repeated code

        # Assign goal to set position
        goal_pos = (self.width - 1, self.height - 1)
        goal = Goal(goal_pos)
        self.state[goal_pos] = goal
        self.lookup.add(goal)
        used_pos.append(goal_pos)

        # Assign items to a random position in the remaining tiles
        item_pos = GridFactory.get_random_pos(self.width, self.height, used_pos)
        item = Item(item_pos)
        self.state[item_pos] = item
        self.lookup.add(item)
        used_pos.append(item_pos)

        # Assign agents to random positions
        self.agent_positions = []
        for _ in self.agents:
            agent_pos = GridFactory.get_random_pos(self.width, self.height, used_pos)
            used_pos.append(agent_pos)
            self.agent_positions.append(agent_pos)

        # Future proofing: update agents in case they spwaned on an item
        for agent in self.agents:
            agent.update(State(self.agent_positions, self.lookup))

    # ----- Public Functions ----- #
    def reset(self):
        self.init_environment()
        self.set_interactive_tiles()

    def add_agents(self, agents):
        self.agents = agents

    def get_state(self):
        return State(self.agent_positions, self.lookup)


class GridUtil:
    def calculate_max_reward(grid):
        # TODO: can only work with one agent and one item ATM
        x1, y1 = grid.get_state().get_agent_positions()[0]
        x2, y2 = grid.get_state().get_item_positions()[0]
        x3, y3 = grid.get_state().get_goal_positions()

        # Manhanttan distance from agent to obj and obj to goal
        dist_to_obj = abs(x1 - x2) + abs(y1 - y2)
        dist_to_goal = abs(x2 - x3) + abs(y2 - y3)

        # +100 for reward and +2 for 2 unneeded mark deduction when stepping on item and goal respectively
        return (dist_to_obj + dist_to_goal) * -1 + 102


In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches

# import plotly
from matplotlib.widgets import Button, Slider
import matplotlib.animation as animation
from typing import Tuple, TypeAlias, TYPE_CHECKING
import json

Coordinates: TypeAlias = Tuple[float, float, float, float]

# plotly.offline.init_notebook_mode(connected=True)

import numpy as np
from multiprocessing import Process, Queue, shared_memory, Pipe


class Visualization:
    def __init__(self, game: "Game", controller, fig, ax):
        self.game = game
        self.is_stopping = False
        self.timer = None
        self.game.reset()
        self.speed = 1
        self.fig = fig
        self.ax = ax

        self.add_ui_elements()
        self.controller = controller
        self.fig.canvas.mpl_connect("close_event", self.on_close)
        self.ani = animation.FuncAnimation(
            self.fig, self.draw, frames=self.frames, interval=200, save_count=100
        )

        self.animating = True

        plt.show()

    def frames(self):
        while True:
            yield self.controller.next()

    def draw(self, args):
        info, items, tot_reward, max_reward = args

        self.ax.clear()
        self.draw_grid()
        self.draw_agent(info)
        self.draw_item(items)

        self.reward.set_text(f"Reward: {tot_reward}")
        self.max_reward.set_text(f"Max Reward: {max_reward}")

        # Check if the environment is terminal
        if self.game.has_ended():
            self.draw_complete()
        if not self.animating:
            self.fig.canvas.draw()

    def draw_grid(self):
        width, height = self.game.get_size()
        for x in range(width):
            for y in range(height):
                rect = patches.Rectangle(
                    (x, y), 1, 1, linewidth=1, edgecolor="black", facecolor="white"
                )
                self.ax.add_patch(rect)
        self.ax.set_xlim(0, width)
        self.ax.set_ylim(height, 0)
        self.ax.set_aspect("equal")

        # Move x-axis labels to the top
        self.ax.xaxis.set_label_position("top")
        self.ax.xaxis.tick_top()

        # Draw target
        tx, ty = self.game.get_target_location()
        target_patch = patches.Rectangle(
            (tx, ty), 1, 1, linewidth=1, edgecolor="black", facecolor="green"
        )
        self.ax.add_patch(target_patch)

    def draw_agent(self, info):
        # Draw agent
        for pos, has_item in info:
            ax, ay = pos
            agent_color = "blue" if not has_item else "orange"
            agent_patch = patches.Circle((ax + 0.5, ay + 0.5), 0.3, color=agent_color)
            self.ax.add_patch(agent_patch)

    def draw_item(self, items):
        for item in items:
            ix, iy = item
            item_patch = patches.Circle((ix + 0.5, iy + 0.5), 0.2, color="red")
            self.ax.add_patch(item_patch)

    def draw_complete(self):
        self.ax.text(
            0.5,
            0.5,
            "Complete",
            horizontalalignment="center",
            verticalalignment="center",
            transform=self.ax.transAxes,
            fontsize=20,
            color="red",
        )

    # ----- ----- ----- ----- Render UI Element  ----- ----- ----- ----- #

    def add_ui_elements(self):
        self.init_buttons()
        self.init_text()

    def init_buttons(self):
        # Add button for next step
        self.next_step_btn = self.add_button(
            [0.85, 0.01, 0.12, 0.075], "Next Step", self.on_next
        )
        # Add button for reset
        self.reset_btn = self.add_button(
            [0.85, 0.11, 0.12, 0.075], "Reset", self.on_reset
        )
        # Add button for animation on/off
        self.toggle_anim_btn = self.add_button(
            [0.85, 0.21, 0.12, 0.075], "Anim\nOn", self.on_toggle_anim
        )
        # Add button for auto reset on/off
        self.toggle_auto_reset_btn = self.add_button(
            [0.85, 0.31, 0.12, 0.075], "Auto Reset\nOn", self.on_auto_reset
        )
        # Add button for training
        self.train_1000_btn = self.add_button(
            [0.85, 0.41, 0.12, 0.075], "Train 1000", self.on_train_1000
        )
        # Add button for training
        self.train_15000_btn = self.add_button(
            [0.85, 0.51, 0.12, 0.075], "Train 15000", self.on_train_15000
        )
        # Add button for training
        self.test_button = self.add_button(
            [0.85, 0.61, 0.12, 0.075], "Test", self.on_test
        )

    def init_text(self):
        # Add text box for cumulative reward
        self.reward = self.add_text(
            [0.01, 0.01, 0.2, 0.075], f"Reward: {self.game.total_reward}"
        )

        # Add text box for max reward
        self.max_reward = self.add_text(
            [0.25, 0.01, 0.2, 0.075],
            f"Max Reward: {self.game.get_max_reward()}",
        )

    def add_button(self, coordinates: Coordinates, text, on_click):
        axis = plt.axes(coordinates)
        # axis = self.ax
        button = Button(axis, text)
        button.on_clicked(on_click)

        return button

    def add_text(self, coordinates: Coordinates, text):
        axis = plt.axes(coordinates)
        # axis = self.ax
        textbox = axis.text(
            0.5,
            0.5,
            text,
            horizontalalignment="center",
            verticalalignment="center",
            transform=axis.transAxes,
            fontsize=12,
        )
        axis.axis("off")
        return textbox

    # ----- ----- ----- ----- Render Main Board  ----- ----- ----- ----- #
    def one_step(self):
        self.draw(self.controller.next())

    def stop_anim(self):
        pass

    def start_anim(self):
        pass

    # ----- ----- ----- ----- Event Handlers  ----- ----- ----- ----- #

    def on_toggle_anim(self, event):
        if self.animating:
            self.ani.pause()
            self.toggle_anim_btn.label.set_text("Anim\nOff")
        else:
            self.ani.resume()
            self.toggle_anim_btn.label.set_text("Anim\nOn")

        self.animating = not self.animating
        plt.show()

    def on_auto_reset(self, event):
        auto_reset_is_on = self.controller.toggle_auto_reset()
        if auto_reset_is_on:
            self.toggle_auto_reset_btn.label.set_text("Auto Reset\nOn")
        else:
            self.toggle_auto_reset_btn.label.set_text("Auto Reset\nOff")
        plt.show()

    def on_reset(self, event):
        self.game.reset()
        self.draw(self.controller.get_info())

    def on_next(self, e):
        self.draw(self.controller.next())

    def on_train_1000(self, e):
        self.before_auto_train()

        s = self.auto_train()

        self.game.agent[0].Q = self.get_np_from_name(s)

        self.after_auto_train()

    def before_auto_train(self):
        self.ani.pause()
        self.animating = False
        self.controller.game.reset()

        self.toggle_anim_btn.label.set_text("Anim\nOff")
        self.draw(self.controller.get_info())

    def auto_train(self):
        gp, tp, conn1 = get_process(self.game, self.controller)
        gp.start()
        tp.start()
        gp.join()
        tp.join()
        return conn1.recv()

    def after_auto_train(self):
        self.ani.resume()
        self.animating = True
        self.controller.game.reset()

        self.toggle_anim_btn.label.set_text("Anim\nOn")
        self.draw(self.controller.get_info())

    def get_np_from_name(self, name):
        existing_shm = shared_memory.SharedMemory(name=name)
        q = np.ndarray((5**5, 4), buffer=existing_shm.buf)
        s = np.copy(q)
        existing_shm.close()
        existing_shm.unlink()
        return s

    def on_train_15000(self, e):
        self.before_auto_train()
        self.controller.train(15000)
        self.after_auto_train()

    def np_to_name(self, np):
        pass

    def on_close(self, e):
        pass

    def on_test(self, e):
        self.before_auto_train()
        gp, tp = get_test_process(self.controller)
        gp.start()
        tp.start()
        gp.join()
        tp.join()
        self.after_auto_train()

    # ----- ----- ----- ----- Plot Metrics  ----- ----- ----- ----- #
    def plot_training(results):
        iterations, losses, total_rewards = results
        # Create a figure with 1 row and 2 columns of subplots
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

        # Plotting the loss in the first subplot
        ax1.plot(iterations, losses, marker="o", label="Loss")
        ax1.set_title("Iteration vs Loss")
        ax1.set_xlabel("Iteration Number")
        ax1.set_ylabel("Loss")

        # Plotting the total rewards in the second subplot
        ax2.plot(
            iterations, total_rewards, label="Total Reward", color="orange", marker="o"
        )
        ax2.set_title("Epsilon decay across iteration")
        ax2.set_xlabel("Iteration Number")
        ax2.set_ylabel("Epsilon")

        # Display the plots
        plt.tight_layout()
        plt.show()


def draw_graphs(game, controller):
    fig, axs = plt.subplots(1, 2, figsize=(12, 5))
    graph = Graph(controller, fig, axs)


def train(controller, connection, ep):
    controller.train(ep)
    q = controller.game.agent[0].get_q_table()

    shm = shared_memory.SharedMemory(create=True, size=q.nbytes)
    b = np.ndarray(q.shape, dtype=q.dtype, buffer=shm.buf)
    b[:] = q[:]
    connection.send(shm.name)
    shm.close()


def get_process(game, controller):
    conn1, conn2 = Pipe()
    graph_p = Process(
        target=draw_graphs,
        args=[
            game,
            controller,
        ],
    )
    train_p = Process(target=train, args=[controller, conn2, 1000])
    return graph_p, train_p, conn1


def test(controller, ep):
    controller.test(ep)


def draw_test_graph(controller):
    fig, axs = plt.subplots()
    graph = TestGraph(controller, fig, axs)


def get_test_process(controller):
    graph_p = Process(
        target=draw_test_graph,
        args=[
            controller,
        ],
    )
    test_p = Process(target=test, args=[controller, 1000])
    return graph_p, test_p



# **Single Agent Object-Pickup Problem**

## **Problem Description**

This task involves an agent navigating a 5x5 grid world to pick up an item located at a random position `A` and delivering it to a fixed destination `B`, located at the bottom-right corner of the grid. The agent must learn to complete this task as efficiently as possible, regardless of its starting position.

## **Methodology**

The agent uses Q-learning to learn an optimal policy. The algorithm updates the Q-values stored in a Q-table based on the agent's interactions with the environment, gradually improving its strategy over time.
Q table is initialised at ...
The learning rate Alpha is set at 0.1

### **State Space**

The state space is defined by the agent’s position on the grid, the location of the item (A), and whether the agent is carrying the item. This can be represented as a tuple `((agent_coor_x, agent_coor_y), has_item)` where `has_item` is a boolean variable indicating if the agent has picked up the item.

### **Action Space**

The agent can perform one of four actions at any given time:

- **Move North**
- **Move South**
- **Move West**
- **Move East**

These actions move the agent one step in the corresponding direction unless the movement would result in the agent hitting a wall, in which case the agent remains in the same position.

### **Reward Structure**

The reward structure is designed to guide the agent toward efficiently solving the task:

|    **Event**    | **Reward** |
|:---------------|-----------:|
| Picking up the item at `A` | +50 |
| Delivering the item to `B` | +50 |
| Moving to an empty grid | -1 |
| Hitting a wall | -10 |

This reward system incentivizes the agent to quickly locate and pick up the item and then deliver it to the goal while penalizing unnecessary movements and collisions with walls.

## **Performance Metrics**

### **Loss-against-Iteration Graph**

To evaluate the agent’s learning progress, we track the loss against the number of iterations.

#### **Maximum Reward Calculation**

For each episode, the maximum possible reward is calculated by determining the optimal route:

1. **Agent → Item**: Calculate the Manhattan distance between the agent’s starting position and the item's location.
2. **Item → Goal**: Calculate the Manhattan distance between the item's location and the goal at `B`.

The maximum possible reward is computed by subtracting the sum of these distances from 102 (which includes the reward for picking up the item, delivering it to the goal, and a 2-point compensation for the optimal path).

#### **Loss Calculation**

The loss for each iteration is calculated as the difference between the maximum possible reward and the actual reward obtained by the agent in that iteration. This loss is then plotted against the number of iterations to visualize the agent’s learning progress.

### **Epsilon Decay Graph**

The Epsilon decay graph illustrates how the exploration rate (`ε`) changes over time. Initially, the agent explores more (`high ε`), but as learning progresses, `ε` decays, leading the agent to exploit known information more often. This graph provides insight into the balance between exploration and exploitation throughout the training process.


## <u>Visualisation</u>

- Press Reset

We initialize a new game and train the agent for 500 times

In [None]:
game = Game()
times = 500
controller = Controller(game, times)
controller.train(times)
fig1, ax1 = plt.subplots()
vis = Visualization(game, controller, fig1, ax1)

After training it for 500 times, we can see that the agent can sparingly complete the goal, but more often times it oscillates between 2 cells, or being stuck at the corner/near a wall, not completing the task. We can see that the performance metrics below:

In [None]:
Visualization.plot_training(controller.get_metrics())

As seen above, The loss at 500th iteration hasn't converged to 0, there is still room for improvement for our agent.

Now, we allow it to train 1000 more times

In [None]:
game2 = Game()
times = 2000
controller2 = Controller(game2, times)
controller2.train(times)
fig2, ax2 = plt.subplots()
vis2 = Visualization(game2, controller2, fig2, ax2)

In [None]:

Visualization.plot_training(controller2.get_metrics())

The metrics graph is much better than before but we still havent converged yet. Lets train the agent up to 3000 times.

In [None]:
game3 = Game()
times = 4000
controller3 = Controller(game3, times)
controller3.train(times)
fig3, ax3 = plt.subplots()
vis3 = Visualization(game3, controller3, fig3, ax3)


In [None]:

Visualization.plot_training(controller3.get_metrics())


If we observe the final few hundred iterations of the training, we can see that the loss is almost consistently 0. We can say that it has converged and the agent has fully learnt about the problem set.

# Conclusion
....