In [2]:
import sys
sys.path.append('Hexagon_env')

In [3]:

import functools

import gymnasium
import numpy as np
from gymnasium.spaces import Discrete

from pettingzoo import AECEnv
from pettingzoo.utils import agent_selector, wrappers
import pygame


In [4]:
agent = [1,2,3]
prey = [4,5,6]
a = dict([(age,0) for age in agent] + [(pre,0) for pre in prey])
print(a)  

{1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0}


In [5]:
NO_OF_PREY = 5
NO_OF_PREDATOR = 5

def env(render_mode=None):
    """
    The env function often wraps the environment in wrappers by default.
    You can find full documentation for these methods
    elsewhere in the developer documentation.
    """
    internal_render_mode = render_mode if render_mode != "ansi" else "human"
    env = raw_env(render_mode=internal_render_mode)
    # This wrapper is only for environments which print results to the terminal
    if render_mode == "ansi":
        env = wrappers.CaptureStdoutWrapper(env)
    # this wrapper helps error handling for discrete action spaces
    env = wrappers.AssertOutOfBoundsWrapper(env)
    # Provides a wide vareity of helpful user errors
    # Strongly recommended
    env = wrappers.OrderEnforcingWrapper(env)
    return env


class raw_env(AECEnv):
    """
    The metadata holds environment constants. From gymnasium, we inherit the "render_modes",
    metadata which specifies which modes can be put into the render() method.
    At least human mode should be supported.
    The "name" metadata allows the environment to be pretty printed.
    """
    
    metadata = {"render_modes": ["human"], "name": "hex_v1"}

    def __init__(self, render_mode=None):
        """
        The init method takes in environment arguments and
         should define the following attributes:
        - possible_agents
        - render_mode

        Note: as of v1.18.1, the action_spaces and observation_spaces attributes are deprecated.
        Spaces should be defined in the action_space() and observation_space() methods.
        If these methods are not overridden, spaces will be inferred from self.observation_spaces/action_spaces, raising a warning.

        These attributes should not be changed after initialization.
        """
        self.possible_prey = ["prey_" + str(r) for r in range(NO_OF_PREY)]
        self.possible_predator = ["predator_" + str(r) for r in range(NO_OF_PREDATOR)]
        self.possible_agents = self.possible_prey.copy() + self.possible_predator.copy()


        # optional: a mapping between agent name and ID
        self.agent_name_mapping = dict(
            zip(self.possible_agents, list(range(len(self.possible_agents))))
        )

        # optional: we can define the observation and action spaces here as attributes to be used in their corresponding methods
        self._action_spaces = dict([(agent, Discrete(7)) for agent in self.possible_prey] + [(agent,Discrete(9)) for agent in self.possible_predator])
        self._observation_spaces = dict([(agent, Discrete(36)) for agent in self.possible_prey] + [(agent,Discrete(15)) for agent in self.possible_predator])

        self.render_mode = render_mode
        if self.render_mode == "human":
            pygame.init()
            pygame.font.init()
            self.clock = pygame.time.Clock()
            self.world = pygame.display.set_mode([self.worldx, self.worldy])
            screen = pygame.display.set_mode((1200, 700))
            clearGrid(hexagons)
            renderAgents(preyAgents,predatorAgents,hexagons)
            render(screen, hexagons)
            
            pass

    # Observation space should be defined here.
    # lru_cache allows observation and action spaces to be memoized, reducing clock cycles required to get each agent's space.
    # If your spaces change over time, remove this line (disable caching).
    @functools.lru_cache(maxsize=None)
    def observation_space(self, agent):
        # gymnasium spaces are defined and documented here: https://gymnasium.farama.org/api/spaces/
        return Discrete(4)
    
    # Action space should be defined here.
    # If your spaces change over time, remove this line (disable caching).
    @functools.lru_cache(maxsize=None)
    def action_space(self, agent):
        return Discrete(3)

    def render(self):
        """
        Renders the environment. In human mode, it can print to terminal, open
        up a graphical window, or open up some other display that a human can see and understand.
        """
        if self.render_mode is None:
            gymnasium.logger.warn(
                "You are calling render method without specifying any render mode."
                )
            return

        if len(self.agents) == 2:
            string = "Current state: Agent1: {} , Agent2: {}".format(
                MOVES[self.state[self.agents[0]]], MOVES[self.state[self.agents[1]]]
            )

        else:
            string = "Game over"
        print(string)

    def observe(self, agent):
        """
        Observe should return the observation of the specified agent. This function
        should return a sane observation (though not necessarily the most up to date possible)
        at any time after reset() is called.
        """
        # observation of one agent is the previous state of the other
        return np.array(self.observations[agent])

    def close(self):
        """
        Close should release any graphical displays, subprocesses, network connections
        or any other environment data which should not be kept around after the
        user is no longer using the environment.
        """
        pass

    def reset(self, seed=None, options=None):
        """
        Reset needs to initialize the following attributes
        - agents
        - rewards
        - _cumulative_rewards
        - terminations
        - truncations
        - infos
        - agent_selection
        And must set up the environment so that render(), step(), and observe()
        can be called without issues.
        Here it sets up the state dictionary which is used by step() and the observations dictionary which is used by step() and observe()
        """
        self.agents = self.possible_agents[:]
        self.rewards = {agent: 0 for agent in self.agents}
        self._cumulative_rewards = {agent: 0 for agent in self.agents}
        self.terminations = {agent: False for agent in self.agents}
        self.truncations = {agent: False for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}
        self.state = {agent: NONE for agent in self.agents}
        self.observations = {agent: NONE for agent in self.agents}
        self.num_moves = 0
        """
        Our agent_selector utility allows easy cyclic stepping through the agents list.
        """
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()

    def step(self, action):
        """
        step(action) takes in an action for the current agent (specified by
        agent_selection) and needs to update
        - rewards
        - _cumulative_rewards (accumulating the rewards)
        - terminations
        - truncations
        - infos
        - agent_selection (to the next agent)
        And any internal state used by observe() or render()
        """
        if (
            self.terminations[self.agent_selection]
            or self.truncations[self.agent_selection]
        ):
            # handles stepping an agent which is already dead
            # accepts a None action for the one agent, and moves the agent_selection to
            # the next dead agent,  or if there are no more dead agents, to the next live agent
            self._was_dead_step(action)
            return

        agent = self.agent_selection

        # the agent which stepped last had its _cumulative_rewards accounted for
        # (because it was returned by last()), so the _cumulative_rewards for this
        # agent should start again at 0
        self._cumulative_rewards[agent] = 0

        # stores action of current agent
        self.state[self.agent_selection] = action

        # collect reward if it is the last agent to act
        if self._agent_selector.is_last():
            # rewards for all agents are placed in the .rewards dictionary
            self.rewards[self.agents[0]], self.rewards[self.agents[1]] = REWARD_MAP[
                (self.state[self.agents[0]], self.state[self.agents[1]])
            ]

            self.num_moves += 1
            # The truncations dictionary must be updated for all players.
            self.truncations = {
                agent: self.num_moves >= NUM_ITERS for agent in self.agents
            }

            # observe the current state
            for i in self.agents:
                self.observations[i] = self.state[
                    self.agents[1 - self.agent_name_mapping[i]]
                ]
        else:
            # necessary so that observe() returns a reasonable observation at all times.
            self.state[self.agents[1 - self.agent_name_mapping[agent]]] = NONE
            # no rewards are allocated until both players give an action
            self._clear_rewards()

        # selects the next agent.
        self.agent_selection = self._agent_selector.next()
        # Adds .rewards to ._cumulative_rewards
        self._accumulate_rewards()

        if self.render_mode == "human":
            self.render()

In [None]:
class CustomEnvironment(ParallelEnv):
    
    
    metadata = {
        "name": "hexagon_environment_v1",
    }
    def __init__(self):
        # this is the graph
        self.g_env = nx.read_graphml('g1.gml')
        self.g_no_node = len(self.g_env.nodes())
        self.node_list = list(self.g_env.nodes())
        
        # A dictionary that relates each discrete value in the observation
        # space to a node obtained from the graph
        
            # 1. Create empty dictionary
        self.node_dict = {}
        self.node_inv_dict = {}
        
            # 2. relating the key to the value of the node
        for key,value in enumerate(self.node_list):
            self.node_dict[key] = value 
            
            # 3. relating the value to the key ( used later on )
        self.node_inv_dict = {value: key for key, value in self.node_dict.items()}
        
        # sets the maximum steps after which the program will terminate 
        self.max_steps = 100
        self.step_now = 0
        
        self.no_of_thieves = 1
        self.no_of_police = 2
        self.possible_thieves = ['thief_'+str(r) for r in range (self.no_of_thieves)]
        self.possible_police = ['police_'+str(r) for r in range (self.no_of_police)]
        self.possible_agents = self.possible_thieves + self.possible_police
        self.agent_name_mapping = dict(
            zip(self.possible_agents, list(range(len(self.possible_agents))))
        )
        self._action_spaces = {agent: Discrete(4) for agent in self.possible_agents}
        self._observation_spaces = {
            agent: Discrete(self.g_no_node**len(self.possible_agents)) for agent in self.possible_agents
        }
        
        self.agents = [i for i in self.possible_agents]
        self.infos = {agent: {} for agent in self.possible_agents}
        self.state = {agent: None for agent in self.possible_agents}
        
        # not utilized
        self._cumulative_rewards = {agent: 0 for agent in self.possible_agents}
        
                
        # this is extra things for visualization you do not need to know
        graph = self.g_env
        position = list(graph.nodes())
        position = [self.str_to_tuple(name) for name in position]
        pos = dict(zip(graph.nodes(), position))
        self.node_positions = pos
        
        # sets the current state of agents
        self.terminations = {agent:False for agent in self.possible_agents}

        for thief in self.possible_thieves:
            self.state[thief]= self.node_dict[2]
            
        for police in self.possible_police:
            self.state[police]= self.node_dict[12]
            
        #forced by testing api
        self.action_spaces = self._action_spaces
        self.observation_spaces = self._observation_spaces
        
        
        
    def reset(self, seed=None, options=None):
        self.agents = [i for i in self.possible_agents]
        self.timestep = None
        self.state = {agent: None for agent in self.possible_agents}
        self._cumulative_rewards = {agent: 0 for agent in self.agents}
        self.step_now = 0
        self.terminations = {agent:False for agent in self.possible_agents}

#         ran = np.random.randint(0,self.g_no_node)     # later on if you have to start at random places
        for thief in self.possible_thieves:
            self.state[thief]= self.node_dict[2]
            
        for police in self.possible_police:
            self.state[police]= self.node_dict[10]
        return self.state

    def step(self, actions):
        
        # observation returns the next state of the agents
        # for each action selected for the agent the observations sould be sent back
        # the impletentation is bad change if possible
        terminations = self.terminations.copy()        
        rewards = {}
        for agent in self.possible_agents:
            rewards[agent] = None  
        agents = self.agents.copy()
        
        # movement of the thief and the police according to the action and their rewards       
        for agent in self.agents:
            temp_neighbours = []
            for neighbour in self.g_env.neighbors(self.state[agent]):
                temp_neighbours.append(neighbour)
    
            if actions[agent] < len(list(self.g_env.neighbors(self.state[agent]))):
                if (agent in self.possible_police):
                    rewards[agent]  = -10;
                else:
                    rewards[agent] = 10
                self.state[agent] = temp_neighbours[actions[agent]]

            elif actions[agent] == len(list(self.g_env.neighbors(self.state[agent]))):
                if (agent in self.possible_police):
                    rewards[agent]  = -5;
                else:
                    rewards[agent] = 10
                
            else:
                if (agent in self.possible_police):
                    rewards[agent]  = -100;
                else:
                    rewards[agent] = 10

        self.step_now += 1
        

        # Get dummy infos (not used in this example)
        infos = {a: {} for a in self.agents}
        observations = {a: self.state[a] for a in self.agents}
        truncations = {a: None for a in self.possible_agents}
        terminations = {a: False for a in self.possible_agents}
        self.agents = agents
        
        
        # termination if on the same place
        for thief in self.possible_thieves:
            if thief in self.agents:
                for police in self.possible_police:
                    if self.state[police] == self.state[thief]:
                        terminations[thief] = True
                        rewards[police] = 10
        
        # agents exist if alive/not terminated
        for i in self.agents:
            if terminations[i] == True:
                agents.remove(i)
        self.agents = agents
            
        
        self.terminations  = terminations.copy()

        # negative reward for taking time
        for police in self.possible_police:
            rewards[police] -= 20
        return observations, rewards, terminations, truncations, infos
    
    def render(self):
        pass
    

    def temp_render(self,episode):
        
        nx.draw(self.g_env, self.node_positions,node_size=200)
        nx.draw_networkx_labels(self.g_env, self.node_positions,labels = self.node_inv_dict,font_color='black' )
        
        # drawing the agents
        for agent in self.agents:
            x,y = self.node_positions[str(self.state[agent])]
            if agent in self.possible_police:
                plt.scatter(x, y, s=550, c='yellow')
            elif agent in self.possible_thieves:
                plt.scatter(x, y, s=450, c='red')
        
        filename = f"images/Multi_large_Env{episode}_{self.step_now}.png"
        
        plt.savefig(filename)
        plt.show()

    def observation_space(self, agent):
        return self.observation_spaces[agent]

    def action_space(self, agent):
        return self.action_spaces[agent]
    
    def str_to_tuple(self,string):
        return tuple(float(x) for x in string.strip('()').split(','))
    
    def possible_move_range(self,player):
        return len(list(self.g_env.neighbors(self.state[player])))
