In [None]:
!pip install gym
!pip install torch



In [None]:
from enum import Enum

class Action(Enum):
  MOVE_UP = 0
  MOVE_DOWN = 1
  MOVE_LEFT = 2
  MOVE_RIGHT = 3
  STAY = 4

class Index:
  APPLE = 0
  AGENT = 1

In [None]:
from gym import Env, spaces
import numpy as np
import random
from collections import defaultdict

class OrchardEnv(Env):
  def __init__(self, grid_size=(5, 5), spawn_prob=0.2, num_agents=1,
               max_apples_per_cell=100, max_steps=1000):
    """
    Constructor to create an orchard environment for multiple agents to
    work with.
    """
    # environment configuration
    super(OrchardEnv, self).__init__()
    self.grid_side = grid_size
    self.spawn_prob = spawn_prob
    self.num_agents = num_agents
    self.max_apples_per_cell = max_apples_per_cell
    self.current_step = 0
    self.max_steps = max_steps
    self.grid_size = grid_size

    # action space: 4 directions and staying
    self.action_space = spaces.Discrete(5)

    # observation space: 3D tensor
    # First 2 indices specify row and column.
    # Last index is indexed using Index.APPLE or Index.AGENT.
    self.observation_space = spaces.Box(
      low=np.zeros((grid_size[0], grid_size[1], 2), dtype=np.float32),
      high=np.stack([np.full((grid_size[0], grid_size[1]), max_apples_per_cell),
                   np.full((grid_size[0], grid_size[1]), num_agents)], axis=-1),
      shape=(grid_size[0], grid_size[1], 2),
      dtype=np.float32
    )

    # Initialize grids to keep track of apple and agent states
    # Agents are randomly placed in the orchard to begin with
    self.apples = np.zeros((grid_size[0], grid_size[1]))
    self.agent_positions = (np.random.choice(grid_size[0], num_agents),
                        np.random.choice(grid_size[1], num_agents))

  def _is_done(self):
    """
    Returns whether we have completed max_steps many steps.
    """
    return self.current_step >= self.max_steps

  def _get_observations(self):
    """
    Returns the observation generated from the orchard with dimensions
    grid_size[0] X grid_size[1] X 2.
    """
    # Initialize empty observation matrix.
    observation = np.zeros((self.grid_size[0], self.grid_size[1], 2),
                            dtype=np.float32)

    # Populate observation of number of apples.
    observation[:, :, Index.APPLE] = self.apples

    # Populate observation of number of agents
    for agent_row, agent_col in zip(*self.agent_positions):
      observation[agent_row, agent_col, Index.AGENT] += 1

    return observation

  def reset(self):
    # reset steps to 0.
    self.current_step = 0

    # reset the apple and agent positions
    self.apples = np.zeros((self.grid_size[0], self.grid_size[1]))
    self.agent_positions = (np.random.choice(self.grid_size[0], self.num_agents),
                        np.random.choice(self.grid_size[1], self.num_agents))


    observation = np.zeros((self.grid_size[0], self.grid_size[1], 2), dtype=np.float32)

    # Populate observation of the number of apples.
    observation[:, :, Index.APPLE] = self.apples

    # Populate observation of the number of agents
    for agent_row, agent_col in zip(*self.agent_positions):
      observation[agent_row, agent_col, Index.AGENT] += 1

    return observation

  def _move_agent(self, agent_id, action):
    """
    Update the position of an agent.
    """
    current_row = self.agent_positions[0][agent_id]
    current_col = self.agent_positions[1][agent_id]

    if action == Action.MOVE_UP:
      new_row = max(0, current_row - 1)
      self.agent_positions[0][agent_id] = new_row
    elif action == Action.MOVE_DOWN:
      new_row = min(self.grid_size[0] - 1, current_row + 1)
      self.agent_positions[0][agent_id] = new_row
    elif action == Action.MOVE_LEFT:
      new_col = max(0, current_col - 1)
      self.agent_positions[1][agent_id] = new_col
    elif action == Action.MOVE_RIGHT:
      new_col = min(self.grid_size[1] - 1, current_col + 1)
      self.agent_positions[1][agent_id] = new_col

  def _move_agents(self, actions):
    """
    Update the positions of all agents.
    """
    for agent_id, action in enumerate(actions):
      self._move_agent(agent_id, action)

  def _pickup_apples(self):
      """
      Pick up apples in each cell based on the number of agents
      and apples present.

      Returns a set of agents that have picked up an apple.
      """
      # Group agents by their locations
      agent_locations = defaultdict(list)
      for agent_id, (agent_row, agent_col) in enumerate(zip(*self.agent_positions)):
        agent_locations[(agent_row, agent_col)].append(agent_id)


      # Distribute apples in each cell.
      rewarded_agents = []
      for (row, col), agents_in_cell in agent_locations.items():
        num_apples_in_cell = int(self.apples[row, col])
        num_agents_in_cell = len(agents_in_cell)

        if num_agents_in_cell > num_apples_in_cell:
          self.apples[row, col] = 0
          agents_to_pick_apples = random.sample(agents_in_cell, num_apples_in_cell)
          rewarded_agents.extend(agents_to_pick_apples)
        else:
          self.apples[row, col] -= num_agents_in_cell
          rewarded_agents.extend(agents_in_cell)

      return set(rewarded_agents)

  def _spawn_apples(self):
    """
    Spawn apples in the orchard based on the spawn probability.
    """
    for row in range(self.grid_size[0]):
      for col in range(self.grid_size[1]):
        new_apple_can_spawn = random.random() < self.spawn_prob
        num_apples_below_max = self.apples[row, col] < self.max_apples_per_cell
        if new_apple_can_spawn and num_apples_below_max:
          self.apples[row, col] += 1

  def step(self, actions):
    """
    Performs 1 step in the orchard environment.
    """
    self._move_agents(actions)
    rewarded_agents = self._pickup_apples()
    self._spawn_apples()

    self.current_step += 1

    global_reward = len(rewarded_agents)
    done = self._is_done()
    observations = self._get_observations()
    info = {
      "rewarded agents": rewarded_agents
    }


    return observations, global_reward, done, info

  def render(self):
    for row in range(self.grid_size[0]):
      for col in range(self.grid_size[1]):
        cell_info = f"[apples: {int(self.apples[row, col])}, agents: {int(np.sum(self.agent_positions[0] == row) & (self.agent_positions[1] == col))}]"
        print(cell_info, end=" ")
      print()

In [None]:
test_orchard_env = OrchardEnv()
test_orchard_env.render()

[apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] 
[apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 1] [apples: 0, agents: 0] [apples: 0, agents: 0] 
[apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] 
[apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] 
[apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] 


In [None]:
_, _, _, _ = test_orchard_env.step([Action.MOVE_LEFT])
test_orchard_env.render()

[apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 1, agents: 0] 
[apples: 0, agents: 0] [apples: 0, agents: 1] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] 
[apples: 1, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] 
[apples: 0, agents: 0] [apples: 1, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] 
[apples: 1, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] 


In [None]:
_, _, _, _ = test_orchard_env.step([Action.MOVE_DOWN])
test_orchard_env.render()

[apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 1, agents: 0] [apples: 0, agents: 0] [apples: 1, agents: 0] 
[apples: 0, agents: 0] [apples: 1, agents: 0] [apples: 1, agents: 0] [apples: 1, agents: 0] [apples: 0, agents: 0] 
[apples: 1, agents: 0] [apples: 0, agents: 1] [apples: 0, agents: 0] [apples: 1, agents: 0] [apples: 0, agents: 0] 
[apples: 1, agents: 0] [apples: 2, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] 
[apples: 1, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] [apples: 0, agents: 0] 


In [None]:
obs, reward, done, info = test_orchard_env.step([Action.STAY])

print(reward)
print(info["rewarded agents"])

0
set()


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

class QNetwork(nn.Module):
  def __init__(self, input_size, output_size):
    super(QNetwork, self).__init__()
    self.fc1 = nn.Linear(input_size, 128)
    self.fc2 = nn.Linear(128, 64)
    self.fc3 = nn.Linear(64, output_size)

  def forward(self, x):
    x = torch.relu(self.fc1(x))
    x = torch.relu(self.fc2(x))
    x = self.fc3(x)
    return x

In [None]:
class QLearningAgent:
  def __init__(self, observation_space, action_space, learning_rate=0.001, gamma=0.99):
    self.observation_space = observation_space
    self.action_space = action_space
    self.learning_rate = learning_rate
    self.gamma = gamma

    # Q-network
    input_size = np.prod(observation_space.shape)
    output_size = action_space.n
    self.q_network = QNetwork(input_size, output_size)
    self.optimizer = optim.Adam(self.q_network.parameters(), lr=learning_rate)

    # Epsilon-greedy exploration
    self.epsilon = 1.0
    self.epsilon_decay = 0.995
    self.epsilon_min = 0.01

  def select_action(self, observation):
    if np.random.rand() < self.epsilon:
      return np.random.choice(self.action_space.n)
    else:
      q_values = self.q_network(torch.tensor(observation.flatten(), dtype=torch.float32))
      return torch.argmax(q_values).item()

  def update_q_function(self, state, action, reward, next_state, done):
    state = torch.tensor(state.flatten(), dtype=torch.float32)
    next_state = torch.tensor(next_state.flatten(), dtype=torch.float32)

    q_values = self.q_network(state)
    next_q_values = self.q_network(next_state)

    q_value = q_values[action]
    next_q_value = torch.max(next_q_values).item() if not done else 0

    target = torch.tensor(reward + self.gamma * next_q_value, dtype=torch.float32)
    # print(type(q_value))
    # print(type(target))
    # return
    loss = nn.MSELoss()(q_value, target)

    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()

    # Decay epsilon for exploration
    self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)


In [None]:
env = OrchardEnv()
agent = QLearningAgent(env.observation_space, env.action_space)

# Training loop
num_episodes = 1000
for episode in range(num_episodes):
  observation = env.reset()
  total_reward = 0

  while True:
    action = agent.select_action(observation)
    observation, global_reward, done, info = env.step([action])
    agent_reward = 0 in info["rewarded agents"]
    if global_reward > 0:
      print("here")
    agent.update_q_function(observation, action, agent_reward, observation, done)
    break

    total_reward += agent_reward

    if done:
      break


In [None]:
num_test_episodes = 10
test_total_rewards = []

for episode in range(num_test_episodes):
    observation = env.reset()
    total_reward = 0

    while True:
        action = agent.select_action(observation)
        observation, _, done, info = env.step([action])
        agent_reward = 0 in info["rewarded agents"]

        total_reward += agent_reward

        if done:
            break

    test_total_rewards.append(total_reward)

# Print average test performance
average_test_reward = sum(test_total_rewards) / num_test_episodes
print(f"Average Test Reward: {average_test_reward}")


Average Test Reward: 200.0


In [None]:
import numpy as np

class GreedyApplePickerAgent:
    def __init__(self, observation_space, action_space):
        self.observation_space = observation_space
        self.action_space = action_space

    def select_action(self, observation):
        # Extract relevant information from the observation
        apples = observation[:, :, 0]  # Assuming the first channel represents the number of apples

        # Get the coordinates of the agent
        agent_row, agent_col = np.where(observation[:, :, 1] > 0)

        if len(agent_row) == 0:
            # No agent found in the observation
            return 0  # Move Up as a default action

        agent_row, agent_col = agent_row[0], agent_col[0]

        # Find the coordinates of all apples in the observation
        apple_rows, apple_cols = np.where(apples > 0)

        if len(apple_rows) == 0:
            # No apples found, stay in the same spot
            return 0  # Stay in the same spot

        # Calculate distances to all apples
        distances = np.abs(apple_rows - agent_row) + np.abs(apple_cols - agent_col)

        # Find the index of the closest apple
        closest_apple_idx = np.argmin(distances)

        # Calculate the direction towards the closest apple
        move_up = apple_rows[closest_apple_idx] < agent_row
        move_down = apple_rows[closest_apple_idx] > agent_row
        move_left = apple_cols[closest_apple_idx] < agent_col
        move_right = apple_cols[closest_apple_idx] > agent_col

        # Choose the action based on the direction towards the closest apple
        if move_up:
            return 0  # Move Up
        elif move_down:
            return 1  # Move Down
        elif move_left:
            return 2  # Move Left
        elif move_right:
            return 3  # Move Right
        else:
            return 4  # Stay in the same spot


In [None]:
env = OrchardEnv()
agent = GreedyApplePickerAgent(env.observation_space, env.action_space)

# Testing loop with the GreedyApplePickerAgent
num_test_episodes = 10
test_total_rewards = []

for episode in range(num_test_episodes):
    observation = env.reset()
    total_reward = 0

    while True:
        action = agent.select_action(observation)
        observation, _, done, info = env.step([action])
        agent_reward = 0 in info["rewarded agents"]

        total_reward += agent_reward

        if done:
            break

    test_total_rewards.append(total_reward)

# Print average test performance
average_test_reward = sum(test_total_rewards) / num_test_episodes
print(f"Average Test Reward (Greedy Apple Picker Agent): {average_test_reward}")


Average Test Reward (Greedy Apple Picker Agent): 201.2


In [None]:
env = OrchardEnv(num_agents=5)  # Initialize environment with 5 agents
agents = [QLearningAgent(env.observation_space, env.action_space) for _ in range(env.num_agents)]  # Initialize agents
test_total_rewards = []

num_test_episodes = 10
for episode in range(num_test_episodes):

    observations = env.reset()  # Reset environment and get initial observations
    total_reward = 0
    while True:
        actions = [agent.select_action(observation) for agent, observation in zip(agents, observations)]  # Get action from each agent

        next_observations, rewards, done, info = env.step(actions)  # Environment step based on actions

        for i, agent in enumerate(agents):
            total_reward += 1 if i in info["rewarded agents"] else 0

        observations = next_observations  # Update observations for next step

        if done:
            break
    test_total_rewards.append(total_reward / env.num_agents)


average_test_reward = sum(test_total_rewards) / num_test_episodes
print(f"Average Test Reward (5 QLearningAgents): {average_test_reward}")


Average Test Reward (5 QLearningAgents): 194.45999999999998


In [1]:
env = OrchardEnv(num_agents=5)  # Initialize environment with 5 agents
agents = [GreedyApplePickerAgent(env.observation_space, env.action_space) for _ in range(env.num_agents)]  # Initialize agents
test_total_rewards = []

num_test_episodes = 10
for episode in range(num_test_episodes):

    observations = env.reset()  # Reset environment and get initial observations
    total_reward = 0
    while True:
        actions = [agent.select_action(observation) for agent in agents]  # Get action from each agent

        observations, rewards, done, info = env.step(actions)  # Environment step based on actions

        for i, agent in enumerate(agents):
            total_reward += 1 if i in info["rewarded agents"] else 0

        if done:
            break
    test_total_rewards.append(total_reward / env.num_agents)


average_test_reward = sum(test_total_rewards) / num_test_episodes
print(f"Average Test Reward (5 QLearningAgents): {average_test_reward}")



NameError: name 'OrchardEnv' is not defined

Main Work so Far
1. Created an environment that is built with cleaner code.
   - Adheres to interfaces that are quite popular in the RL communities.
   - Uses `numpy` where applicable. Maybe, I can look into more code to parallelize.
2. Created and tested a single Q-learning agent.
   - Had lots of trouble understanding how to build the Q-network and train the agent.
   - I will probably revisit this again next week.
3. Created and tested a single greedy agent (moves towards nearest apple).
   - This serves as the "top line" for what a single agent should be capable of doing.


Intuitively speaking, the greedy agent should be optimal in solving this problem on an independent basis. Tests show that a greedy agent collects about 204.4 apples (on average) in 1000 steps.

Given the Q-learning agent collects 194.9 apples (on average) in 1000 steps, these are pretty good results and show that the Q-learning setup should be correct.

As next steps, I would like to better understand some of the code that I've written for Q-learning and also extend these tests to a multi-agent environment.