Skip to content
127 changes: 110 additions & 17 deletions gym-unity/gym_unity/envs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import itertools
import numpy as np
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union, Set

import gym
from gym import error, spaces
Expand Down Expand Up @@ -57,17 +57,26 @@ def __init__(
:param no_graphics: Whether to run the Unity simulator in no-graphics mode
:param allow_multiple_visual_obs: If True, return a list of visual observations instead of only one.
"""
base_port = 5005
if environment_filename is None:
base_port = UnityEnvironment.DEFAULT_EDITOR_PORT

self._env = UnityEnvironment(
environment_filename, worker_id, no_graphics=no_graphics
environment_filename,
worker_id,
base_port=base_port,
no_graphics=no_graphics,
)

# Take a single step so that the brain information will be sent over
if not self._env.get_agent_groups():
self._env.step()

self.visual_obs = None
self._current_state = None
self._n_agents = None
self._n_agents = -1
self._done_agents: Set[int] = set()
# Save the step result from the last time all Agents requested decisions.
self._previous_step_result: BatchedStepResult = None
self._multiagent = multiagent
self._flattener = None
# Hidden flag used by Atari environments to determine if the game is over
Expand Down Expand Up @@ -111,6 +120,7 @@ def __init__(
self._env.reset()
step_result = self._env.get_step_result(self.brain_name)
self._check_agents(step_result.n_agents())
self._previous_step_result = step_result

# Set observation and action spaces
if self.group_spec.is_action_discrete():
Expand Down Expand Up @@ -153,16 +163,15 @@ def reset(self) -> Union[List[np.ndarray], np.ndarray]:
Returns: observation (object/list): the initial observation of the
space.
"""
self._env.reset()
info = self._env.get_step_result(self.brain_name)
n_agents = info.n_agents()
step_result = self._step(True)
n_agents = step_result.n_agents()
self._check_agents(n_agents)
self.game_over = False

if not self._multiagent:
res: GymStepResult = self._single_step(info)
res: GymStepResult = self._single_step(step_result)
else:
res = self._multi_step(info)
res = self._multi_step(step_result)
return res[0]

def step(self, action: List[Any]) -> GymStepResult:
Expand Down Expand Up @@ -204,19 +213,20 @@ def step(self, action: List[Any]) -> GymStepResult:

spec = self.group_spec
action = np.array(action).reshape((self._n_agents, spec.action_size))
action = self._sanitize_action(action)
self._env.set_actions(self.brain_name, action)
self._env.step()
info = self._env.get_step_result(self.brain_name)
n_agents = info.n_agents()

step_result = self._step()

n_agents = step_result.n_agents()
self._check_agents(n_agents)
self._current_state = info

if not self._multiagent:
single_res = self._single_step(info)
single_res = self._single_step(step_result)
self.game_over = single_res[2]
return single_res
else:
multi_res = self._multi_step(info)
multi_res = self._multi_step(step_result)
self.game_over = all(multi_res[2])
return multi_res

Expand All @@ -233,8 +243,13 @@ def _single_step(self, info: BatchedStepResult) -> GymSingleStepResult:
self.visual_obs = self._preprocess_single(visual_obs[0][0])

default_observation = self.visual_obs
else:
elif self._get_vec_obs_size() > 0:
default_observation = self._get_vector_obs(info)[0, :]
else:
raise UnityGymException(
"The Agent does not have vector observations and the environment was not setup"
+ "to use visual observations."
)

return (
default_observation,
Expand Down Expand Up @@ -335,7 +350,7 @@ def _check_agents(self, n_agents: int) -> None:
"The environment was launched as a mutli-agent environment, however"
"there is only one agent in the scene."
)
if self._n_agents is None:
if self._n_agents == -1:
self._n_agents = n_agents
logger.info("{} agents within environment.".format(n_agents))
elif self._n_agents != n_agents:
Expand All @@ -344,6 +359,84 @@ def _check_agents(self, n_agents: int) -> None:
"initialization. This is not supported."
)

def _sanitize_info(self, step_result: BatchedStepResult) -> BatchedStepResult:
n_extra_agents = step_result.n_agents() - self._n_agents
if n_extra_agents < 0 or n_extra_agents > self._n_agents:
# In this case, some Agents did not request a decision when expected
# or too many requested a decision
raise UnityGymException(
"The number of agents in the scene does not match the expected number."
)

# remove the done Agents
indices_to_keep: List[int] = []
for index, is_done in enumerate(step_result.done):
if not is_done:
indices_to_keep.append(index)

# Set the new AgentDone flags to True
# Note that the corresponding agent_id that gets marked done will be different
# than the original agent that was done, but this is OK since the gym interface
# only cares about the ordering.
for index, agent_id in enumerate(step_result.agent_id):
if not self._previous_step_result.contains_agent(agent_id):
step_result.done[index] = True
if agent_id in self._done_agents:
step_result.done[index] = True
self._done_agents = set()
self._previous_step_result = step_result # store the new original

_mask: Optional[List[np.array]] = None
if step_result.action_mask is not None:
_mask = []
for mask_index in range(len(step_result.action_mask)):
_mask.append(step_result.action_mask[mask_index][indices_to_keep])
new_obs: List[np.array] = []
for obs_index in range(len(step_result.obs)):
new_obs.append(step_result.obs[obs_index][indices_to_keep])
return BatchedStepResult(
obs=new_obs,
reward=step_result.reward[indices_to_keep],
done=step_result.done[indices_to_keep],
max_step=step_result.max_step[indices_to_keep],
agent_id=step_result.agent_id[indices_to_keep],
action_mask=_mask,
)

def _sanitize_action(self, action: np.array) -> np.array:
if self._previous_step_result.n_agents() == self._n_agents:
return action
sanitized_action = np.zeros(
(self._previous_step_result.n_agents(), self.group_spec.action_size)
)
input_index = 0
for index in range(self._previous_step_result.n_agents()):
if not self._previous_step_result.done[index]:
sanitized_action[index, :] = action[input_index, :]
input_index = input_index + 1
return sanitized_action

def _step(self, needs_reset: bool = False) -> BatchedStepResult:
if needs_reset:
self._env.reset()
else:
self._env.step()
info = self._env.get_step_result(self.brain_name)
# Two possible cases here:
# 1) all agents requested decisions (some of which might be done)
# 2) some Agents were marked Done in between steps.
# In case 2, we re-request decisions until all agents request a real decision.
while info.n_agents() - sum(info.done) < self._n_agents:
if not info.done.all():
raise UnityGymException(
"The environment does not have the expected amount of agents."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a more detailed exception here - something like "Agents didn't make decisions at the same time"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to the error

+ "Some agents did not request decisions at the same time."
)
self._done_agents.update(list(info.agent_id))
self._env.step()
info = self._env.get_step_result(self.brain_name)
return self._sanitize_info(info)

@property
def metadata(self):
return {"render.modes": ["rgb_array"]}
Expand Down
2 changes: 1 addition & 1 deletion gym-unity/gym_unity/tests/test_gym.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def create_mock_vector_step_result(num_agents=1, number_visual_observations=0):

:int num_agents: Number of "agents" to imitate in your BatchedStepResult values.
"""
obs = [np.array([num_agents * [1, 2, 3]])]
obs = [np.array([num_agents * [1, 2, 3]]).reshape(num_agents, 3)]
if number_visual_observations:
obs += [np.zeros(shape=(num_agents, 8, 8, 3), dtype=np.float32)]
rewards = np.array(num_agents * [1.0])
Expand Down