In [1]:
#| default_exp Environments/MultipleObservationsEnv

In [2]:
#| hide
# Imports for the nbdev development environment
from nbdev.showdoc import *
from fastcore.test import *

In [3]:
#| export
from fastcore.utils import *
import numpy as np

In [4]:
#| hide
%load_ext autoreload
%autoreload 2

In [5]:
#| export
class MultipleObservationsEnv(object):
    def __init__(self):
        self.transitions = self.transition_tensor()
        self.final_states = np.array(self.generate_final_states())
        self.rewards = self.reward_tensor()
        self.observations_list = self.generate_observation_tensors()

        self.actions_set = self.actions()
        self.states_set = self.states() 
        self.observations_set = self.generate_observation_labels()

        self.n_agents = self.rewards.shape[0]
        self.n_states = self.transitions.shape[0]
        self.n_agent_actions = self.transitions.shape[1]

        # Checks
        # assert all(self.transitions.shape[1:-1] == self.n_agent_actions for _ in range(self.n_agents)), 'Inconsistent number of actions'
        assert all(dim == self.n_agent_actions for dim in self.rewards.shape[2:-1]), 'Inconsistent number of actions'
        assert len(self.actions_set) == self.n_agents and all(len(a) == self.n_agent_actions for a in self.actions_set), 'Inconsistent number of actions'
        assert self.transitions.shape[-1] == self.n_states and self.rewards.shape[-1] == self.n_states, 'Inconsistent number of states'
        assert self.rewards.shape[1] == self.n_states, 'Inconsistent number of states'
        assert len(self.final_states) == self.n_states, 'Inconsistent number of states'
        assert len(self.states_set) == self.n_states, 'Inconsistent number of states'
        assert np.allclose(self.transitions.sum(-1), 1), 'Transition model probabilities do not sum to 1'

        # for obs, n_observations in zip(self.observations_list, self.n_observations_list):
        #     assert obs.shape[0] == self.n_agents, "Inconsistent number of agents"
        #     assert obs.shape[1] == self.n_states, "Inconsistent number of states"
        #     assert np.allclose(obs.sum(-1), 1), 'Observation model probabilities do not sum to 1'

In [6]:
#| export
@patch
def id(self:MultipleObservationsEnv):
    """Returns id string of environment"""
    return f"{self.__class__.__name__}"

@patch
def __str__(self:MultipleObservationsEnv): return self.id()

@patch
def __repr__(self:MultipleObservationsEnv): return self.id()

@patch
def transition_tensor(self:MultipleObservationsEnv):
    raise NotImplementedError

@patch
def reward_tensor(self:MultipleObservationsEnv):
    raise NotImplementedError

@patch
def generate_observation_tensors(self:MultipleObservationsEnv):
    """
    Creates a list of observation matrices for each agent, reflecting how each agent perceives the environment.
    Each observation matrix maps states to observations for an agent, allowing for custom observation
    mechanisms per agent.
    """
    observation_matrices_per_agent = []
    for agent_index in range(self.n_agents):
        # Determine the number of unique observations per agent; here, it's assumed equal to the number of states.
        num_observations = self.n_states  # This can be adjusted based on the environment's specifics.
        
        # Initialize an observation matrix for this agent with uniform probabilities.
        observation_matrix = np.ones((self.n_agents, self.n_states, num_observations))
        
        # Customize the observation matrix for each agent. Here, we use an identity matrix as an example,
        # implying a one-to-one mapping between states and observations.
        for i in range(self.n_agents):
            observation_matrix[i, :, :] = np.eye(num_observations)
        
        observation_matrices_per_agent.append(observation_matrix)
    return observation_matrices_per_agent


@patch
def generate_final_states(self:MultipleObservationsEnv):
    """Default final states: no final states"""
    return np.zeros(self.n_states, dtype=int)

@patch
def actions(self:MultipleObservationsEnv):
    """Default action set representations."""
    return [[str(a) for a in range(self.n_agent_actions)] for _ in range(self.n_agents)]

@patch
def states(self:MultipleObservationsEnv):
    """Default state set representation."""
    return [str(s) for s in range(self.n_states)]

@patch
def generate_observation_labels(self:MultipleObservationsEnv):
    """
    Creates observation label sets for each agent based on the observation tensors. This method generates
    a structured representation of all possible observations for each agent, where each observation is
    uniquely labeled.
    """
    n_observations_list = [obs.shape[-1] for obs in self.observations_list]
    observation_label_sets = []
    for num_observations_per_tensor in n_observations_list:
        # For each observation setting defined by the observation tensors, generate a set of observation labels
        # for each agent, labeling them from 0 to num_observations_per_tensor - 1.
        agent_observation_labels = [[str(observation_id) for observation_id in range(num_observations_per_tensor)] 
                                    for _ in range(self.n_agents)]
        observation_label_sets.append(agent_observation_labels)
    return observation_label_sets


@patch
def step(self:MultipleObservationsEnv, jA:Iterable) -> tuple:
    """Iterate the environment one step forward."""
    tps = self.transitions[tuple([self.state]+list(jA))].astype(float)
    next_state = np.random.choice(range(len(tps)), p=tps)
    rewards = self.rewards[tuple([slice(self.n_agents), self.state]+list(jA)+[next_state])]
    self.state = next_state
    obs = self.generate_stochastic_observations()
    done = self.state in np.where(self.final_states==1)[0]
    info = {'state': self.state}
    return obs, rewards.astype(float), done, info

@patch
def generate_stochastic_observations(self:MultipleObservationsEnv) -> np.ndarray:
    """
    Produces a set of observations for each agent based on the current state, utilizing the defined observation tensors.
    Each tensor represents a different observation model, and this method generates observations according to the probability
    distributions specified in those tensors for the current state.
    
    Returns:
        A list of numpy arrays, where each array contains observations for all agents as determined by one of the observation tensors.
    """
    all_agents_observations = []  # Stores observations generated by each observation tensor.
    for observation_tensor in self.observations_list:
        current_state_observations = np.zeros(self.n_agents, dtype=int)  # Initializes the observation array for this tensor.
        for agent_index in range(self.n_agents):
            # Retrieves the probability distribution of observations for the current agent and state from the tensor.
            observation_probabilities = observation_tensor[agent_index, self.state]
            # Generates a random observation based on the probability distribution.
            chosen_observation = np.random.choice(range(len(observation_probabilities)), p=observation_probabilities)
            current_state_observations[agent_index] = chosen_observation
        all_agents_observations.append(current_state_observations)
    return all_agents_observations

In [7]:
#| hide
import nbdev; nbdev.nbdev_export()