# ABD2

Wij gaan tijdens deze opdracht een food-collector simulatie bouwen.
In deze simulatie is het doel van de agents om zo veel mogelijk groen eten op te eten, terwijl ze rood eten vermijden.
We gaan deze simulatie grid-based maken. 

We maken gebruik van de gymnasium API: https://gymnasium.farama.org/

Ter visualisatie gebruiken we pygame.


### Simulation properties:
We beginnen in principe met een grid van 128x128, maar dit kan uitgebreid worden. 
We zullen de implementatie zoveel mogelijk scalable en aanpasbaar implementeren, door hyperparameters aan te maken voor alle belangrijke properties.

- Number of agents
- Grid size (default 128x128)
- Good food to total square ratio
- Good food spawning pattern (maybe)
- Bad food to total square ratio
- Bad food spawning pattern (maybe)
- Episode duration

### Agent properties:
Actions: up, down, left, right, wait
Perception: full information (knows coordinates of other agents, good food, and bad food)
Agent rules: not yet specified





In [1]:
import numpy as np
import pygame
import gymnasium as gym
from gymnasium import spaces
import random
import time
import tcod
import pandas as pd
import sys
import time
from batch_simulation import AuctionBatchSimulation, NoAuctionSharedMemoryBatchSimulation, NoAuctionNoSharedMemoryBatchSimulation

pygame 2.1.3.dev8 (SDL 2.0.22, Python 3.10.9)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
class agent():
    def __init__(self, vision_range, grid_size):
        self.location = [None, None]
        self.points = 0
        self.vision_range = vision_range
        self.memory = []
        self._action_to_direction = {
            0: np.array([0, 0]),
            1: np.array([1, 0]),
            2: np.array([0, 1]),
            3: np.array([-1, 0]),
            4: np.array([0, -1])
        }
        self.path = []
        self.grid_size = grid_size
        self.shared_observation = {}

    def move(self, direction):
        """move an agent into a specific direction"""

        new_location = self.location + direction
        self.location = list(new_location)

    def clip_vision(self, obs):
        """clip the observation of the agent to the vision_range"""

        clipped_obs = {}

        for k in obs.keys():
            clipped_obs[k] = [x for x in obs[k]
                              if abs(self.location[0] - x[0]) <= self.vision_range
                              and abs(self.location[1] - x[1]) <= self.vision_range
                              ]

        return clipped_obs

    def reachable_food_filter(self, food_locations, agent_locations):
        """filters available food_locations based on whether a different agent is closer to the food (currently based on global distance)"""
        agent_location = [self.vision_range, self.vision_range]

        filtered_food_locations = []

        for fl in food_locations:
            unavailable = False
            self_agent_distance = abs(agent_location[0] - fl[0]) + abs(agent_location[1] - fl[1])

            other_agent_distances = [abs(al[0] - fl[0]) + abs(al[1] - fl[1]) for al in agent_locations]

            for oad in other_agent_distances:
                if oad < self_agent_distance:
                    unavailable = True

            if not unavailable:
                filtered_food_locations.append(fl)

        return filtered_food_locations

    def create_cost_array(self, obs, cost_dict={"default": 2,
                                                                            "bad_food_locs": 15,
                                                                            "good_food_locs": 1,
                                                                            "agent_locs": 0
                                                                            }):
        """creates a 2d-array representing the cost of the different objects in the clipped agent observation"""

        cost = np.full((self.grid_size, self.grid_size), cost_dict["default"])

        for k in obs.keys():
            for coord in obs[k]:
                cost[coord[0]][coord[1]] = cost_dict[k]
        np.set_printoptions(threshold=sys.maxsize)
        #print(np.rot90(cost, k=1, axes=(0, 1)))
       
        return cost
    
    def get_action_from_path(self, path):
        """Calculate what action to take based on the planned path"""
        move = np.array(path[0]) - np.array(self.location)
        return [list(x) for x in list(self._action_to_direction.values())].index(list(move))
    
    def get_path_to_target(self, target, cost):
        
        """Calculate the path to a specific coordinate based on a cost-map"""
        graph = tcod.path.SimpleGraph(cost=cost, cardinal=1, diagonal=0)
        pf = tcod.path.Pathfinder(graph)
        pf.add_root(self.location)
        pf.resolve()
    
        path = pf.path_to(target)[1:].tolist()
        

        return path

    def get_cost_to_target(self, target, obs):
        """Calculate the cost to a specific coordinate based on a cost-map"""
        cost = self.create_cost_array(obs)
        path = self.get_path_to_target(target, cost)
        cost = sum([cost[x[0]][x[1]] for x in path])
        return cost

    
    ###AGENT BEHAVIOUR###
    def collect(self, obs, target):
        """Collect food at a specific coordinate"""
        cost = self.create_cost_array(obs)
        path = self.get_path_to_target(target, cost)
        if len(path) == 0:
            action = self.explore(obs)
        else:
            action = self.get_action_from_path(path)

        return action

    def explore(self, obs):
        """Explore the environment"""
        other_agent_locations = [x for x in obs['agent_locs'] if x != self.location]
        viable_actions = list(self._action_to_direction.keys())

        # Rule 1: agents can't move into other agents
        viable_actions = [v for v in viable_actions if
                          list(self.location + self._action_to_direction[v]) not in other_agent_locations]
        # Rule 2: agents can't move out of bounds
        viable_actions = [v for v in viable_actions
                          if list(self.location + self._action_to_direction[v])[0] >= 0
                          and list(self.location + self._action_to_direction[v])[1] >= 0
                          and list(self.location + self._action_to_direction[v])[0] < self.grid_size
                          and list(self.location + self._action_to_direction[v])[1] < self.grid_size]

        #Rule 3: never step on bad food (while exploring)
        viable_actions = [k for k, v in
                          {v: list(self.location + self._action_to_direction[v]) for v in viable_actions}.items()
                          if v not in obs['bad_food_locs']]

        #Rule 4: prioritize undiscovered squares over discovered squares without good food in vision range
        # checks the last 15 coordinates (15 has the lowest average steps for sim completion)
        viable_actions_higher_prio = [k for k, v in {v: list(self.location + self._action_to_direction[v]) for v in
                                                     viable_actions}.items()
                                      if v not in [x['agent_location'] for x in self.memory[-15:] if
                                                   len(x['agent_obs']['good_food_locs']) == 0]]

        # unstuck the agent if it gets caught in a rare loop
        action_history = [x['agent_action'] for x in self.memory][-10:]
        if len(action_history) >= 10:
            if len(list(set(action_history))) == 2:
                if len(list(
                        set([action_history[l] for l in range(len(action_history)) if l % 2 == 0]))) == 1 and len(
                    list(set([action_history[l] for l in range(len(action_history)) if l % 2 == 1]))) == 1:
                    return random.choice(viable_actions)

        if len(viable_actions_higher_prio) > 0:
            action = random.choice(viable_actions_higher_prio)
        else:
            action = random.choice(viable_actions)

        return action

    def run(self, obs,target=[0,0]):
     
        # clip the observation to the agents vision
        clipped_obs = self.clip_vision(obs)
         
        # decide which action to take based on the previous observation
        if target:
            action = self.collect(self.shared_observation, target)
        else:
            action = self.explore(self.shared_observation)

        # append the new information to the agent's memory
        self.memory.append({'agent_action': action, "agent_obs": clipped_obs, "agent_location": self.location})
        
        #clip the memory to make simulation faster
        self.memory = self.memory[-15:]
        
        return action  #%% md




In [3]:
class foodCollectorEnv(gym.Env):
    metadata = {"render_modes": ["human", None], "render_fps": 4}

    def __init__(self, render_mode=None, grid_size=128, number_of_rounds=200, agents=1, good_food_ratio=0.1,
                 bad_food_ratio=0.1, step_sleep=0, agent_vision_range = 5, random_seed = random.random()):
        
        self.random_seed = random_seed
        self.step_sleep = step_sleep
        self.grid_size = grid_size
        self.window_size = 1028
        self.game_duration = (number_of_rounds -1) * agents
        self.current_step = 0
        self.agents = [agent(agent_vision_range, grid_size) for _ in range(agents)]
        self.good_food_ratio = good_food_ratio
        self.bad_food_ratio = bad_food_ratio
        self.good_food_points = 5
        self.bad_food_points = -1
        self.render_mode = render_mode
        self.window = None
        self.grid = np.ones((self.grid_size, self.grid_size))
        self.nr_of_agents = agents

        self.observation_space = spaces.Dict({
            'agent_locs': spaces.Sequence(spaces.MultiDiscrete([self.grid_size - 1, self.grid_size - 1])),
            'good_food_locs': spaces.Sequence(spaces.MultiDiscrete([self.grid_size - 1, self.grid_size - 1])),
            'bad_food_locs': spaces.Sequence(spaces.MultiDiscrete([self.grid_size - 1, self.grid_size - 1])),
        })

        self.action_space = spaces.MultiDiscrete(5)

        self._action_to_direction = {
            0: np.array([0, 0]),
            1: np.array([1, 0]),
            2: np.array([0, 1]),
            3: np.array([-1, 0]),
            4: np.array([0, -1])
        }

        self.good_food_locations = []
        self.bad_food_locations = []

    def get_obs(self):
        return {
            "agent_locs": [agent.location for agent in self.agents],
            "good_food_locs": self.good_food_locs,
            "bad_food_locs": self.bad_food_locs
        }
    
    def get_shared_observation(self):
        observation = self.get_obs()
        shared_observation = {'agent_locs': [], 'good_food_locs': [], 'bad_food_locs': []}

        for agent in self.agents:
            clipped_obs = agent.clip_vision(observation)
            for key in shared_observation.keys():
                for v in clipped_obs[key]:
                    if v not in shared_observation[key]:
                        shared_observation[key].append(v)

        return shared_observation

    def reset(self, seed=None, options=None):

        super().reset(seed=seed)

        self.agent_locs = []
        self.good_food_locs = []
        self.bad_food_locs = []
        
        for agent in self.agents:
            agent.memory = []
            agent.points = 0
        
        
        self.current_step = 0
        #self.agents = [agent(agent_vision_range, grid_size) for _ in range(self.nr_of_agents)]


        # place agents
        random.seed(self.random_seed)
        while True:
            if len(self.agent_locs) >= len(self.agents):
                break
            loc = [random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)]
            if loc not in self.agent_locs:
                self.agent_locs.append(loc)

        # place good food
        while True:
            if len(self.good_food_locs) >= round(self.good_food_ratio * self.grid_size * self.grid_size):
                break
            loc = [random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)]
            if loc not in self.agent_locs and loc not in self.good_food_locs and loc not in self.bad_food_locs:
                self.good_food_locs.append(loc)

        #place bad food
        while True:
            if len(self.bad_food_locs) >= round(self.bad_food_ratio * self.grid_size * self.grid_size):
                break
            loc = [random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)]
            if loc not in self.agent_locs and loc not in self.good_food_locs and loc not in self.bad_food_locs:
                self.bad_food_locs.append(loc)

        for i, agent in enumerate(self.agents):
            agent.location = self.agent_locs[i]

        observation = self.get_obs()

        if self.render_mode == "human":
            self.render_frame()

        info = {"agent_points": [agent.points for agent in self.agents]}

        return observation, info

    def step(self, action, agent_index):

        direction = self._action_to_direction[action[0]]

        self.agents[agent_index].move(direction)

        if self.agents[agent_index].location in self.good_food_locs:
            self.good_food_locs.remove(self.agents[agent_index].location)
            self.agents[agent_index].points += self.good_food_points

        if self.agents[agent_index].location in self.bad_food_locs:
            self.bad_food_locs.remove(self.agents[agent_index].location)
            self.agents[agent_index].points += self.bad_food_points
        
        terminated = self.current_step >= self.game_duration or (
                len(self.get_obs()['good_food_locs']) == 0 and self.current_step > 0)

        reward = 0  # niet relevant voor nu
        observation = self.get_obs()

        if self.render_mode == "human":
            self.render_frame()
        info = {"agent_points": [agent.points for agent in self.agents]}

        self.current_step += 1
        time.sleep(self.step_sleep)

        return observation, reward, terminated, info

    def render(self):
        if self.render_mode == "rgb_array":
            return self.render_frame()

    def render_frame(self):
        pygame.font.init()
        font = pygame.font.Font(None, 24)
        scoreboard_width = 0.2

        if self.window is None and self.render_mode == "human":
            pygame.init()
            pygame.display.init()
            self.window = pygame.display.set_mode(
                (self.window_size + self.window_size * scoreboard_width, self.window_size)
            )

        canvas = pygame.Surface((self.window_size + self.window_size * scoreboard_width, self.window_size))
        canvas.fill((204, 255, 229))
        pix_square_size = (
                self.window_size / self.grid_size
        )

        #draw the background for the scoreboard
        scoreboard_bg = pygame.Surface(
            (self.window_size * scoreboard_width, self.window_size + self.window_size * scoreboard_width))
        scoreboard_bg.fill((255, 255, 255))
        scoreboard_bg.set_alpha(255)
        canvas.blit(scoreboard_bg, (self.window_size, 0))

        for i, agent in enumerate(self.agents):
            #draw agent picture
            deer_image = pygame.image.load("deer.png").convert_alpha()
            deer_image = pygame.transform.scale(deer_image, (pix_square_size, pix_square_size))
            canvas.blit(deer_image, (
                agent.location[0] * pix_square_size, (self.grid_size - 1 - agent.location[1]) * pix_square_size))

            #try to draw vision_range, a bit awkward imo
            fov = pygame.Surface(
                (pix_square_size * (agent.vision_range * 2 + 1), pix_square_size * (agent.vision_range * 2 + 1)))
            fov.set_alpha(128)
            fov.fill((137, 137, 137))
            canvas.blit(fov, ((agent.location[0] * pix_square_size) - pix_square_size * agent.vision_range, ((
                                                                                                                     self.grid_size - 1 -
                                                                                                                     agent.location[
                                                                                                                         1]) * pix_square_size) - pix_square_size * agent.vision_range))

            score_text = font.render(f'Agent {i} | {agent.points}', True, "black")
            canvas.blit(score_text, (self.window_size + 10, 10 + (i * 50)))

            score_text = font.render(f"{i}", True, "white")
            canvas.blit(score_text, (
                agent.location[0] * pix_square_size, (self.grid_size - 1 - agent.location[1]) * pix_square_size))

            #path to good food
            if len(agent.path) > 1:
                for coord in agent.path[1:]:
                    #draw the path
                    path = pygame.Surface((pix_square_size, pix_square_size))
                    path.set_alpha(128)
                    path.fill((255, 0, 0))
                    canvas.blit(path, (coord[0] * pix_square_size, (self.grid_size - 1 - coord[1]) * pix_square_size))


        for gfl in self.good_food_locs:
            good_food_image = pygame.image.load("plant.png").convert_alpha()
            good_food_image = pygame.transform.scale(good_food_image, (pix_square_size, pix_square_size))
            canvas.blit(good_food_image, (gfl[0] * pix_square_size, (self.grid_size - 1 - gfl[1]) * pix_square_size))

        for bfl in self.bad_food_locs:
            bad_food_image = pygame.image.load("evil_plant.png").convert_alpha()
            bad_food_image = pygame.transform.scale(bad_food_image, (pix_square_size, pix_square_size))
            canvas.blit(bad_food_image, (bfl[0] * pix_square_size, (self.grid_size - 1 - bfl[1]) * pix_square_size))

        for x in range(self.grid_size + 1):
            pygame.draw.line(
                canvas,
                color=0,
                start_pos=(0, pix_square_size * x),
                end_pos=(self.window_size, pix_square_size * x),
                width=1,
            )
            pygame.draw.line(
                canvas,
                color=0,
                start_pos=(pix_square_size * x, 0),
                end_pos=(pix_square_size * x, self.window_size),
                width=1,
            )

        if self.render_mode == "human":

            self.window.blit(canvas, canvas.get_rect())
            pygame.event.pump()
            pygame.display.update()

            #self.clock.tick(self.metadata["render_fps"])
        else:
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)
            )


In [4]:
with open('seeds.txt', 'r') as file:
    random_seeds = [line.strip() for line in file]

In [5]:
env = foodCollectorEnv(render_mode= '', grid_size=24, number_of_rounds=1000, agents=5, good_food_ratio=0.15,
                       bad_food_ratio=0.25, step_sleep=0, agent_vision_range = 5)

In [6]:
%%time
# list of random seeds need to be longer than the number of runs

#AuctionBatchSimulation, NoAuctionSharedMemoryBatchSimulation, NoAuctionNoSharedMemoryBatchSimulation
sim = NoAuctionNoSharedMemoryBatchSimulation(env, runs = 1, random_seeds = random_seeds)
sim.run()

0/1000
1/1000
2/1000
3/1000
4/1000
5/1000
6/1000
7/1000
8/1000
9/1000
10/1000
11/1000
12/1000
13/1000
14/1000
15/1000
16/1000
17/1000
18/1000
19/1000
20/1000
21/1000
22/1000
23/1000
24/1000
25/1000
26/1000
27/1000
28/1000
29/1000
30/1000
31/1000
32/1000
33/1000
34/1000
35/1000
36/1000
37/1000
38/1000
39/1000
40/1000
41/1000
42/1000
43/1000
44/1000
45/1000
46/1000
47/1000
48/1000
49/1000
50/1000
51/1000
52/1000
53/1000
54/1000
55/1000
56/1000
57/1000
58/1000
59/1000
60/1000
61/1000
62/1000
63/1000
64/1000
65/1000
66/1000
67/1000
68/1000
69/1000
70/1000
71/1000
72/1000
73/1000
74/1000
75/1000
76/1000
77/1000
78/1000
79/1000
80/1000
81/1000
82/1000
83/1000
84/1000
85/1000
86/1000
87/1000
88/1000
89/1000
90/1000
91/1000
92/1000
93/1000
94/1000
95/1000
96/1000
97/1000
98/1000
99/1000
100/1000
101/1000
102/1000
103/1000
104/1000
105/1000
106/1000
107/1000
108/1000
109/1000
110/1000
111/1000
112/1000
113/1000
114/1000
115/1000
116/1000
117/1000
118/1000
119/1000
120/1000
121/1000
122/1000
123

923/1000
924/1000
925/1000
926/1000
927/1000
928/1000
929/1000
930/1000
931/1000
932/1000
933/1000
934/1000
935/1000
936/1000
937/1000
938/1000
939/1000
940/1000
941/1000
942/1000
943/1000
944/1000
945/1000
946/1000
947/1000
948/1000
949/1000
950/1000
951/1000
952/1000
953/1000
954/1000
955/1000
956/1000
957/1000
958/1000
959/1000
960/1000
961/1000
962/1000
963/1000
964/1000
965/1000
966/1000
967/1000
968/1000
969/1000
970/1000
971/1000
972/1000
973/1000
974/1000
975/1000
976/1000
977/1000
978/1000
979/1000
980/1000
981/1000
982/1000
983/1000
984/1000
985/1000
986/1000
987/1000
988/1000
989/1000
990/1000
991/1000
992/1000
993/1000
994/1000
995/1000
996/1000
997/1000
998/1000
999/1000
CPU times: total: 11min 18s
Wall time: 11min 19s


In [7]:
#sim.df.to_csv(".csv")

In [8]:
sim.df.mean()

points                  426.245
steps_at_termination    645.990
dtype: float64