In [86]:
import importlib

import Wrapper.src.wrapper_api.wrappers.communication_wrapper as cw
import Wrapper.src.wrapper_api.models.active_communication as ac
import Wrapper.src.wrapper_api.models.communication_model as cm

importlib.reload(ac)
importlib.reload(cw)
importlib.reload(cm)
from Wrapper.src.wrapper_api.models.communication_model import ProbabilisticModel, DistanceModel, MatrixModel
from Wrapper.src.wrapper_api.models.active_communication import ActiveCommunication
from Wrapper.src.wrapper_api.wrappers.communication_wrapper import CommunicationWrapper

## Dummy Env

In [87]:
from pettingzoo.utils import AECEnv
from gymnasium import spaces
import numpy as np

class DummyEnv(AECEnv):
    def __init__(self):
        super().__init__()
        self.possible_agents = ["agent_0", "agent_1", "agent_2"]
        self.agent_selection = "agent_2"

        self.terminations = {agent: False for agent in self.possible_agents}
        self.truncations = {agent: False for agent in self.possible_agents}

    def observe(self, agent):
        return {
            "self": np.array([1.0, 1.0]),
            "agent_0": np.array([0.5, 0.5]),
            "agent_1": np.array([0.8, 0.3]),
        }

    def last(self, observe= True):
        return self.observe(self.agent_selection), 0.0, False, False, {}

    def observation_space(self, agent):
        return spaces.Dict({
            "self": spaces.Box(low=0, high=1.0, shape=(2,), dtype=np.float32),
            "agent_0": spaces.Box(low=0, high=1.0, shape=(2,), dtype=np.float32),
            "agent_1": spaces.Box(low=0, high=1.0, shape=(2,), dtype=np.float32),
        })

    def action_space(self, agent):
        return spaces.Discrete(3)

    def reset(self, seed=None, options=None): pass
    def step(self, action): pass

#### Probabilistic Model

In [88]:
env = DummyEnv()

model = ProbabilisticModel(
    agent_ids=env.possible_agents,
    failure_prob=0.6,
    seed=42
)
wrapper = CommunicationWrapper(env, failure_models=[model])

In [89]:
print("Initial Communication:")
print(wrapper.get_communication_state())

wrapper.env.agent_selection = "agent_0"
wrapper.step(action=2)

print("\nUpdated Communication Matrix:")
print(wrapper.get_communication_state())

wrapper.env.agent_selection = "agent_2"
obs = wrapper.observe("agent_2")
print("\nObservation for agent 2 after masking:")
print(obs)

Initial Communication:
[[ True  True  True]
 [ True  True  True]
 [ True  True  True]]

Updated Communication Matrix:
[[ True False False]
 [ True  True  True]
 [ True False  True]]

Observation for agent 2 after masking:
{'self': array([1., 1.]), 'agent_0': array([0., 0.]), 'agent_1': array([0.8, 0.3])}


### Distance Based Model

In [90]:
def pos_fn():
    return {
        "agent_0": np.array([0.0, 0.0]),
        "agent_1": np.array([2.0, 0.0]),
        "agent_2": np.array([4.0, 1.0]),
    }

In [91]:
env = DummyEnv()
model = DistanceModel(agent_ids=env.possible_agents,
                      distance_threshold=3.0,
                      pos_fn=pos_fn,
                      failure_prob=0.0,
                      seed=42
                      )
wrapped_dist = CommunicationWrapper(env, failure_models=[model])


In [92]:
print("Initial Communication Matrix:")
print(wrapped_dist.get_communication_state())

wrapped_dist.env.agent_selection = "agent_0"
wrapped_dist.step(action=2)
print("\nUpdated Communication Matrix:")
print(wrapped_dist.get_communication_state())

obs_dist = wrapped_dist.observe("agent_2")
print("\nObservation for agent 2 after masking:")
for k, v in obs_dist.items():
    print(f"{k}: {v}")

Initial Communication Matrix:
[[ True  True  True]
 [ True  True  True]
 [ True  True  True]]

Updated Communication Matrix:
[[ True  True False]
 [ True  True  True]
 [False  True  True]]

Observation for agent 2 after masking:
self: [1. 1.]
agent_0: [0. 0.]
agent_1: [0.8 0.3]


### Matrix Model

In [95]:
man_matrix = np.array([
    [1,0,0],
    [0,1,1],
    [0,0,1],
], dtype=bool)

In [96]:
env = DummyEnv()
model = MatrixModel(agent_ids=env.possible_agents,
                    comms_matrix_values=man_matrix,
                    failure_prob=0.0,
                    seed=42
                    )
wrapped_matrix = CommunicationWrapper(env, failure_models=[model])

print("Initial Communication Matrix:")
print(wrapped_matrix.get_communication_state())

wrapped_matrix.env.agent_selection = "agent_1"
wrapped_matrix.step(action=1)

print("\nUpdated Communication Matrix:")
print(wrapped_matrix.get_communication_state())

Initial Communication Matrix:
[[ True  True  True]
 [ True  True  True]
 [ True  True  True]]

Updated Communication Matrix:
[[ True False False]
 [False  True  True]
 [False False  True]]
