-
Notifications
You must be signed in to change notification settings - Fork 80
/
Copy pathtest_gym_wrapper.py
141 lines (124 loc) · 4.52 KB
/
test_gym_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Copyright (c) 2024.
# ProrokLab (https://www.proroklab.org/)
# All rights reserved.
import gym
import numpy as np
import pytest
from torch import Tensor
from vmas import make_env
from vmas.simulator.environment import Environment
TEST_SCENARIOS = [
"balance",
"discovery",
"give_way",
"joint_passage",
"navigation",
"passage",
"transport",
"waterfall",
"simple_world_comm",
]
def _check_obs_type(obss, obs_shapes, dict_space, return_numpy):
if dict_space:
assert isinstance(
obss, dict
), f"Expected dictionary of observations, got {type(obss)}"
for k, obs in obss.items():
obs_shape = obs_shapes[k]
assert (
obs.shape == obs_shape
), f"Expected shape {obs_shape}, got {obs.shape}"
if return_numpy:
assert isinstance(
obs, np.ndarray
), f"Expected numpy array, got {type(obs)}"
else:
assert isinstance(
obs, Tensor
), f"Expected torch tensor, got {type(obs)}"
else:
assert isinstance(
obss, list
), f"Expected list of observations, got {type(obss)}"
for obs, shape in zip(obss, obs_shapes):
assert obs.shape == shape, f"Expected shape {shape}, got {obs.shape}"
if return_numpy:
assert isinstance(
obs, np.ndarray
), f"Expected numpy array, got {type(obs)}"
else:
assert isinstance(
obs, Tensor
), f"Expected torch tensor, got {type(obs)}"
@pytest.mark.parametrize("scenario", TEST_SCENARIOS)
@pytest.mark.parametrize("return_numpy", [True, False])
@pytest.mark.parametrize("continuous_actions", [True, False])
@pytest.mark.parametrize("dict_space", [True, False])
def test_gym_wrapper(
scenario, return_numpy, continuous_actions, dict_space, max_steps=10
):
env = make_env(
scenario=scenario,
num_envs=1,
device="cpu",
continuous_actions=continuous_actions,
dict_spaces=dict_space,
wrapper="gym",
wrapper_kwargs={"return_numpy": return_numpy},
max_steps=max_steps,
)
assert (
len(env.observation_space) == env.unwrapped.n_agents
), "Expected one observation per agent"
assert (
len(env.action_space) == env.unwrapped.n_agents
), "Expected one action per agent"
if dict_space:
assert isinstance(
env.observation_space, gym.spaces.Dict
), "Expected Dict observation space"
assert isinstance(
env.action_space, gym.spaces.Dict
), "Expected Dict action space"
obs_shapes = {
k: obs_space.shape for k, obs_space in env.observation_space.spaces.items()
}
else:
assert isinstance(
env.observation_space, gym.spaces.Tuple
), "Expected Tuple observation space"
assert isinstance(
env.action_space, gym.spaces.Tuple
), "Expected Tuple action space"
obs_shapes = [obs_space.shape for obs_space in env.observation_space.spaces]
assert isinstance(
env.unwrapped, Environment
), "The unwrapped attribute of the Gym wrapper should be a VMAS Environment"
obss = env.reset()
_check_obs_type(obss, obs_shapes, dict_space, return_numpy=return_numpy)
for _ in range(max_steps):
actions = [
env.unwrapped.get_random_action(agent).numpy()
for agent in env.unwrapped.agents
]
obss, rews, done, info = env.step(actions)
_check_obs_type(obss, obs_shapes, dict_space, return_numpy=return_numpy)
assert len(rews) == env.unwrapped.n_agents, "Expected one reward per agent"
if not dict_space:
assert isinstance(
rews, list
), f"Expected list of rewards but got {type(rews)}"
rew_values = rews
else:
assert isinstance(
rews, dict
), f"Expected dictionary of rewards but got {type(rews)}"
rew_values = list(rews.values())
assert all(
isinstance(rew, float) for rew in rew_values
), f"Expected float rewards but got {type(rew_values[0])}"
assert isinstance(done, bool), f"Expected bool for done but got {type(done)}"
assert isinstance(
info, dict
), f"Expected info to be a dictionary but got {type(info)}"
assert done, "Expected done to be True after 100 steps"