In [1]:
import numpy as np
import random
import time

# --- CONSTANTS ---
GRID_SIZE = 5
NUM_AGENTS = 4
ACTIONS = ['N', 'S', 'W', 'E']
ACTION_TO_DELTA = {'N': (-1, 0), 'S': (1, 0), 'W': (0, -1), 'E': (0, 1)}

In [2]:
# --- HYPERPARAMETERS ---
ALPHA = 0.1        # Learning rate
GAMMA = 0.9        # Discount factor
EPSILON = 0.1      # Exploration rate
MAX_STEPS = 1500000
MAX_COLLISIONS = 4000
MAX_TRAINING_TIME = 600  # 10 mins

In [3]:
# --- ENVIRONMENT SETUP ---
class Agent:
    def __init__(self, idx, start_pos, carrying=False):
        self.id = idx
        self.pos = start_pos
        self.carrying = carrying

    def reset(self, start_pos, carrying=False):
        self.pos = start_pos
        self.carrying = carrying

class Environment:
    def __init__(self):
        self.reset()

    def reset(self):
        # Random positions for A and B
        self.loc_A = (np.random.randint(0, GRID_SIZE), np.random.randint(0, GRID_SIZE))
        while True:
            self.loc_B = (np.random.randint(0, GRID_SIZE), np.random.randint(0, GRID_SIZE))
            if self.loc_B != self.loc_A:
                break

        # Place 4 agents at A or B randomly
        self.agents = []
        for i in range(NUM_AGENTS):
            start = self.loc_A if np.random.rand() < 0.5 else self.loc_B
            carrying = (start == self.loc_A)
            self.agents.append(Agent(i, start, carrying))

    def step(self, actions):
        rewards = [0] * NUM_AGENTS
        collisions = 0
        next_positions = []

        # Step 1: Compute next positions
        for i, agent in enumerate(self.agents):
            dx, dy = ACTION_TO_DELTA[actions[i]]
            nx = max(0, min(GRID_SIZE - 1, agent.pos[0] + dx))
            ny = max(0, min(GRID_SIZE - 1, agent.pos[1] + dy))
            next_positions.append((nx, ny))

        # Step 2: Check for head-on collisions
        for i in range(NUM_AGENTS):
            for j in range(i+1, NUM_AGENTS):
                if next_positions[i] == self.agents[j].pos and next_positions[j] == self.agents[i].pos:
                    # Head-on collision
                    collisions += 1
                    rewards[i] -= 10
                    rewards[j] -= 10
                    next_positions[i] = self.agents[i].pos  # revert
                    next_positions[j] = self.agents[j].pos

        # Step 3: Update agents
        for i, agent in enumerate(self.agents):
            agent.pos = next_positions[i]

            # Check for pickup/drop-off
            if not agent.carrying and agent.pos == self.loc_A:
                agent.carrying = True
            elif agent.carrying and agent.pos == self.loc_B:
                agent.carrying = False
                rewards[i] += 10  # successful delivery

            # Small penalty per move
            rewards[i] -= 1

        return rewards, collisions

In [6]:
# --- STATE ENCODING ---
def encode_state(agent, loc_A, loc_B):
    # (x, y, Ax, Ay, Bx, By, carrying)
    return (
        agent.pos[0], agent.pos[1],
        loc_A[0], loc_A[1],
        loc_B[0], loc_B[1],
        int(agent.carrying)
    )

In [4]:
# --- Q-LEARNING ---
Q = {}  # Q-table: key=(state, action), value=Q-value

def get_Q(state, action):
    return Q.get((state, action), 0.0)

def select_action(state):
    if np.random.rand() < EPSILON:
        return random.choice(ACTIONS)
    qs = [get_Q(state, a) for a in ACTIONS]
    return ACTIONS[np.argmax(qs)]

def update_Q(state, action, reward, next_state):
    max_next = max([get_Q(next_state, a) for a in ACTIONS])
    old_value = get_Q(state, action)
    Q[(state, action)] = old_value + ALPHA * (reward + GAMMA * max_next - old_value)

In [10]:
# --- TRAINING LOOP ---
env = Environment()
start_time = time.time()
total_steps = 0
total_collisions = 0

while total_steps < MAX_STEPS and time.time() - start_time < MAX_TRAINING_TIME and total_collisions < MAX_COLLISIONS:
    for agent_id in range(NUM_AGENTS):  # Central Clock: Round Robin
        agent = env.agents[agent_id]
        state = encode_state(agent, env.loc_A, env.loc_B)
        action = select_action(state)

        # One agent acts, others wait
        actions = ['X'] * NUM_AGENTS
        actions[agent_id] = action
        for j in range(NUM_AGENTS):
            if actions[j] == 'X':
                actions[j] = select_action(encode_state(env.agents[j], env.loc_A, env.loc_B))

        rewards, collisions = env.step(actions)
        total_collisions += collisions

        next_state = encode_state(agent, env.loc_A, env.loc_B)
        update_Q(state, action, rewards[agent_id], next_state)
        total_steps += 1

    # Occasionally reset environment for exploration
    if total_steps % 1000 == 0:
        env.reset()

print(f"Training complete. Total steps: {total_steps}, collisions: {total_collisions}, time: {time.time() - start_time:.2f}s")

Training complete. Total steps: 25056, collisions: 4000, time: 0.97s


In [None]:
Q