In [18]:
import torch
import copy
import numpy as np
import numpy.random as npr

In [None]:
# Define the grid world environment
class GridWorldEnvironment:
    def __init__(self, size=5, agents_num=4):
        self.size = size
        self.agents_num = agents_num
        self.agents_positions = {}  # agent position
        self.agents_reached_A = {}  # if agents get item
        self.A_position = None
        self.B_position = (size - 1, size - 1)  # fixed location of B
        self.directions = ["up", "down", "left", "right"]
        self.total_collisions = 0
        self.total_steps = 0
        self.agents_idx = list(range(agents_num))
        self._reset()

    def _reset(self):
        """
        Reset the environment to its initial state.
        """
        # initialize A position
        self.A_position = (
            npr.randint(0, self.size - 1),
            npr.randint(0, self.size - 1),
        )
        # ensure A and B are not in the same position
        while self.A_position == self.B_position:
            self.A_position = (
                npr.randint(0, self.size - 1),
                npr.randint(0, self.size - 1),
            )

        # initialize agents' positions and reached_A status
        self.agents_positions = {}
        self.agents_reached_A = {}
        for idx in self.agents_idx:
            if npr.rand() < 0.5:
                self.agents_positions[idx] = self.A_position
                self.agents_reached_A[idx] = True
            else:
                self.agents_positions[idx] = self.B_position
                self.agents_reached_A[idx] = False

        self.total_collisions = 0
        self.total_steps = 0

    def _get_destination(self, agent_idx):
        """
        Get the destination position(A or B)
        """
        return "B" if self.agents_reached_A[agent_idx] else "A"

    def _find_nearby_collision_agents(self, agent_id):
        """
        Find nearby agents that might collide.
        """
        y, x = self.agents_positions[agent_id]
        destination_cur = self._get_destination(agent_id)
        nearby_agents = [
            (-1, -1),
            (-1, 0),
            (-1, 1),
            (0, -1),
            (0, 1),
            (1, -1),
            (1, 0),
            (1, 1),
        ]
        collision_status = []
        for dy, dx in nearby_agents:
            new_y, new_x = y + dy, x + dx
            # Check if new position is valid
            if 0 <= new_y < self.size and 0 <= new_x < self.size:
                collision = False
                for other_agent_id in self.agents_idx:
                    if (
                        other_agent_id != agent_id
                        and self.agents_positions[other_agent_id] == (new_y, new_x)
                        and self._get_destination(other_agent_id) == destination_cur
                    ):  # agents are going to the same destination would cause collision
                        collision = 1
                collision_status.append(collision)
            else:
                collision_status.append(0)

    def get_state(self, agent_idx):
        """
        Get the state of the environment for a specific agent.
        """
        position = self.agents_positions[agent_idx]
        reached_A = self.agents_reached_A[agent_idx]
        manhattan_distance_to_A = (self.A_position[0] - position[0]) + (
            self.A_position[1] - position[1]
        )
        manhattan_distance_to_B = (self.B_position[0] - position[0]) + (
            self.B_position[1] - position[1]
        )
        collision_agents = self._find_nearby_collision_agents(agent_idx)

        return np.array(
            [
                position[0],
                position[1],
                self.A_position[0],
                self.A_position[1],
                self.B_position[0],
                self.B_position[1],
                manhattan_distance_to_A[0],
                manhattan_distance_to_A[1],
                manhattan_distance_to_B[0],
                manhattan_distance_to_B[1],
                reached_A,
                *collision_agents,
            ]
        )

    def _check_done(self, agent_idx):
        """
        Check if the agent has reached its destination.
        """
        if (
            self.agents_positions[agent_idx] == self.B_position
            and self.agents_reached_A[agent_idx]
        ):
            return True
        else:
            return False

    def take_action(self, action_dict):
        """
        Take an action in the environment and return the next state, reward and collosions.
        """
        planned_actions = {}  # {action_idx: action}
        wall_collisions = []  # number of hitting wall

        for agent_idx, action in action_dict.items():
            y, x = self.agents_positions[agent_idx]
            if self.agents_positions[agent_idx] == "up":
                new_y, new_x = y - 1, x
            elif self.agents_positions[agent_idx] == "down":
                new_y, new_x = y + 1, x
            elif self.agents_positions[agent_idx] == "left":
                new_y, new_x = y, x - 1
            elif self.agents_positions[agent_idx] == "right":
                new_y, new_x = y, x + 1

            # check valid
            if 0 <= new_y < self.size and 0 <= new_x < self.size:
                planned_actions[agent_idx] = (new_y, new_x)  # move
                wall_collisions.append(False)
            else:
                planned_actions[agent_idx] = (y, x)  # not move
                wall_collisions.append(True)

        # check collision
        next_positions = copy.deepcopy(self.agents_positions)
        for idx in self.agents_idx:
            next_positions[idx] = planned_actions[idx]

        collisions = 0  # number of head-on collisions
        position_to_agents = {}
        for agent_idx, pos in next_positions.items():
            if pos not in position_to_agents:
                position_to_agents[pos] = []
            position_to_agents[pos].append(agent_idx)

        agents_collisions = set()
        for pos, agents_cur in position_to_agents.items():
            if len(agents_cur) > 1:
                dirs = [self._get_destination(agent_idx) for agent_idx in agents_cur]
                if "B" in dirs and "A" in dirs:
                    collisions += 1
                    agents_collisions.update(agents_cur)

        # update agents' positions
        self.agents_positions = next_positions

        # calculate rewards
        rewards = {}
        for agent_idx in self.agents_idx:
            reward = 0
            if wall_collisions[agent_idx]:
                reward -= 5  # hitting wall penalty
            else:
                reward -= 1  # step cost

            location = self.agents_positions[agent_idx]
            if self.agents_reached_A[agent_idx]:
                if location == self.B_position:
                    reward += 100  # delivery success
                    self.agents_reached_A[agent_idx] = False
            else:
                if location == self.A_position:
                    reward += 50  # pickup success
                    self.agents_reached_A[agent_idx] = True

        rewards[agent_idx] = reward  # store reward

        # accumulate total collisions and steps
        self.total_collisions += collisions
        self.total_steps += 1

        # format next state
        next_states = {}
        for agent_idx in self.agents_idx:
            next_states[agent_idx] = self.get_state(agent_idx)

        return next_states, rewards, collisions

In [20]:
def movingAverage(arr, window_size):
    i = 0
    moving_averages = []
    while i < len(arr) - window_size + 1:
        window_average = round(np.sum(arr[i : i + window_size]) / window_size, 2)
        moving_averages.append(window_average)
        i += 1
    return moving_averages

In [21]:
from time import process_time

t = process_time()

# Define the learning parameters
learning_rate = 0.3
discount_factor = 0.98
num_episodes = 50000
max_steps = 100  # this was never used - 100 is good
epsilon_initial = 1.0
epsilon_final = 0.1
epsilon_decay = 0.99995
# Define the grid world dimensions
grid_rows = 5
grid_cols = 5
# Define the number of actions (up, down, left, right)
num_actions = 4

# Create the Q-table agent and grid world environment
agent = QTableAgent()
environment = GridWorldEnvironment(grid_rows, grid_cols)

reward_total = []
epsilon = epsilon_initial
# Implement the Q-learning algorithm
for episode in range(num_episodes + 1):
    environment._reset()
    number_of_steps = 0
    reward_per_episode = 0
    while number_of_steps <= max_steps and (
        environment.agent_position != environment.nest_location
        or environment.reached_A != True
    ):  # Continue until reaching location B
        # why does this never seem to run into an infinite loop? number_of_steps wasn't tested by Yue
        # Presumably the high and everlasting epsilon lets everyone walk into one of the goals sooner or later
        state = environment.get_state()
        action = agent.choose_action(state, epsilon)
        reward = environment.take_action(action)
        next_state = environment.get_state()
        reward_per_episode += reward
        done = environment.check_done()
        agent.update_q_table(
            state, action, next_state, reward, learning_rate, discount_factor, done
        )
        number_of_steps += 1
    reward_total.append(reward_per_episode)
    epsilon = np.max([epsilon_final, epsilon * epsilon_decay])
    if episode % 5000 == 0:
        print(episode, "episodes")

elapsed_time = process_time() - t

import matplotlib.pyplot as plt

plt.plot(movingAverage(reward_total, 100))
# Add x and y axis labels
plt.xlabel("#episodes")
plt.ylabel("rewards")
plt.title("Q table Agent")
# Show the plot
plt.show()

agent.q_table.astype("float32").tofile("qTable single agent fast.dat")

print("Finished in ", elapsed_time, " seconds.")

AttributeError: 'GridWorldEnvironment' object has no attribute 'agent_position'

You need to instantiate the constant for the size of the state space below. This will be used as the size of the input tensor for your Q-network

In [None]:
statespace_size = 7

The function "prepare_torch" needs to be called once and only once at the start of your program to initialise PyTorch and generate the two Q-networks. It returns the target model (for testing).

In [None]:
def prepare_torch():
    global statespace_size
    global model, model2
    global optimizer
    global loss_fn
    l1 = statespace_size
    l2 = 24
    l3 = 24
    l4 = 4
    model = torch.nn.Sequential(
        torch.nn.Linear(l1, l2),
        torch.nn.ReLU(),
        torch.nn.Linear(l2, l3),
        torch.nn.ReLU(),
        torch.nn.Linear(l3, l4),
    )
    model2 = copy.deepcopy(model)
    model2.load_state_dict(model.state_dict())
    loss_fn = torch.nn.MSELoss()
    learning_rate = 1e-3
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    return model2

The function "update_target" copies the state of the prediction network to the target network. You need to use this in regular intervals.

In [None]:
def update_target():
    global model, model2
    model2.load_state_dict(model.state_dict())

The function "get_qvals" returns a numpy list of qvals for the state given by the argument _based on the prediction network_.

In [None]:
def get_qvals(state):
    state1 = torch.from_numpy(state).float()
    qvals_torch = model(state1)
    qvals = qvals_torch.data.numpy()
    return qvals

The function "get_maxQ" returns the maximum q-value for the state given by the argument _based on the target network_.

In [None]:
def get_maxQ(s):
    return torch.max(model2(torch.from_numpy(s).float())).float()

The function "train_one_step_new" performs a single training step. It returns the current loss (only needed for debugging purposes). Its parameters are three parallel lists: a minibatch of states, a minibatch of actions, a minibatch of the corresponding TD targets and the discount factor.

In [None]:
def train_one_step(states, actions, targets, gamma):
    # pass to this function: state1_batch, action_batch, TD_batch
    global model, model2
    state1_batch = torch.cat([torch.from_numpy(s).float() for s in states])
    action_batch = torch.Tensor(actions)
    Q1 = model(state1_batch)
    X = Q1.gather(dim=1, index=action_batch.long().unsqueeze(dim=1)).squeeze()
    Y = torch.tensor(targets)
    loss = loss_fn(X, Y)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss.item()