-
Notifications
You must be signed in to change notification settings - Fork 42
/
env.py
324 lines (274 loc) · 10.6 KB
/
env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
from contextlib import contextmanager
import zmq
from .constants import DEFAULT_TIMEOUTMS
from .launcher import BlenderLauncher
from .env_rendering import create_renderer
from . import colors
class RemoteEnv:
'''Communicate with a remote Blender environment.
This sets up a communication channel with a remote Blender environment.
Its counterpart on Blender is usually a `btb.RemoteControlledAgent`.
`RemoteEnv` already provides the usual `step()` and `reset()` methods
that block the caller until the remote call returns. However, it does
not manage launching the remote Environment. For this reason we provide
`launch_env` below.
To provide OpenAI gym compatible environments, one usually inherits
from `btb.env.OpenAIRemoteEnv`.
By default, the simulation time of the remote environment only advances
when the agent issues a command (step, reset). However, one may configure
the remote environment in real-time mode, in which case the simulation time
advances independently of the agent's commands.
Params
------
address: str
ZMQ endpoint to connect to.
timeoutms: int
Receive timeout before raising an error.
'''
def __init__(self, address, timeoutms=DEFAULT_TIMEOUTMS):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.setsockopt(zmq.SNDTIMEO, timeoutms*10)
self.socket.setsockopt(zmq.RCVTIMEO, timeoutms)
self.socket.setsockopt(zmq.REQ_RELAXED, 1)
self.socket.setsockopt(zmq.REQ_CORRELATE, 1)
self.socket.connect(address)
self.env_time = None
self.rgb_array = None
self.viewer = None
def reset(self):
'''Reset the remote environment.
Returns
-------
obs: object
Initial observation
info: dict
Addition information provided by the remote
environment.
'''
ddict = self._reqrep(cmd='reset')
self.rgb_array = ddict.pop('rgb_array', None)
return ddict.pop('obs'), ddict
def step(self, action):
'''Advance the remote environment by providing an action.
Params
------
action: object
Action to apply
Returns
-------
obs: object
New observation
reward: float
Received reward
done: bool
Whether or not the environment simulation finished
info: dict
Additional information provided by the environment.
'''
ddict = self._reqrep(cmd='step', action=action)
obs = ddict.pop('obs')
r = ddict.pop('reward')
done = ddict.pop('done')
self.rgb_array = ddict.pop('rgb_array', None)
return obs, r, done, ddict
def render(self, mode='human', backend=None, gamma_coeff=2.2):
'''Render the current remote environment state.
We consider Blender itself the visualization of the environment
state. By calling this method a 2D render image of the environment
will be shown, if the remote environment configured a suitable renderer.
Params
------
mode: str
Either 'human' or 'rgb_array'
backend: str, None
Which backend to use to visualize the image. When None,
automatically chosen by blendtorch.
gamma_coeff: scalar
Gamma correction coeff before visualizing image. Does not
affect the returned rgb array when mode is `rgb_array` which
remains in linear color space. Defaults to 2.2
'''
if mode == 'rgb_array' or self.rgb_array is None:
return self.rgb_array
if self.viewer is None:
self.viewer = create_renderer(backend)
self.viewer.imshow(colors.gamma(self.rgb_array, gamma_coeff))
def _reqrep(self, **send_kwargs):
'''Convenience request-reply method.'''
try:
ext = {**send_kwargs, 'time': self.env_time}
self.socket.send_pyobj(ext)
except zmq.error.Again:
raise ValueError('Failed to send to remote environment') from None
try:
ddict = self.socket.recv_pyobj()
self.env_time = ddict['time']
return ddict
except zmq.error.Again:
raise ValueError(
'Failed to receive from remote environment') from None
def close(self):
'''Close the environment.'''
if self.viewer:
self.viewer.close()
self.viewer = None
if self.socket:
self.socket.close()
self.socket = None
@contextmanager
def launch_env(scene, script, background=False, **kwargs):
'''Launch a remote environment wrapped in a context manager.
Params
------
scene: path, str
Blender scene file
script: path, str
Python script containing environment implementation.
background: bool
Whether or not this environment can run in Blender background mode.
Defaults to False.
kwargs: dict
Any other arguments passed as command-line arguments
to the remote environment. Note by default a <key,value>
entry will be converted to `--key str(value)`. Boolean values
will be converted to switches as follows `--key` or `--no-key`.
Note that underlines will be converted to dashes as usual with
command-line arguments and argparse.
Yields
------
env: `btt.RemoteEnv`
Remote environement to interact with.
'''
env = None
try:
additional_args = []
for k, v in kwargs.items():
k = k.replace('_', '-')
if isinstance(v, bool):
if v:
additional_args.append(f'--{k}')
else:
additional_args.append(f'--no-{k}')
else:
additional_args.extend([f'--{k}', str(v)])
launcher_args = dict(
scene=scene,
script=script,
num_instances=1,
named_sockets=['GYM'],
instance_args=[additional_args],
background=background
)
with BlenderLauncher(**launcher_args) as bl:
env = RemoteEnv(bl.launch_info.addresses['GYM'][0])
yield env
finally:
if env:
env.close()
try:
import gym
from contextlib import ExitStack
class OpenAIRemoteEnv(gym.Env):
'''Base class for remote OpenAI gym compatible environments.
By inherting from this class you can provide almost all of the
code necessary to register a remote Blender environment to
OpenAI gym.
See the `examples/control/cartpole_gym` for details.
Params
------
version : str
Version of this environment.
'''
metadata = {'render.modes': ['rgb_array', 'human']}
def __init__(self, version='0.0.1'):
self.__version__ = version
self._es = ExitStack()
self._env = None
def launch(self, scene, script, background=False, **kwargs):
'''Launch the remote environment.
Params
------
scene: path, str
Blender scene file
script: path, str
Python script containing environment implementation.
background: bool
Whether or not this environment can run in Blender background mode.
kwargs: dict
Any keyword arguments passes as command-line arguments
to the remote environment. See `btt.env.launch_env` for
details.
'''
assert not self._env, 'Environment already running.'
self._env = self._es.enter_context(
launch_env(
scene=scene,
script=script,
background=background,
**kwargs
)
)
def step(self, action):
'''Run one timestep of the environment's dynamics. When end of
episode is reached, you are responsible for calling `reset()`
to reset this environment's state.
Accepts an action and returns a tuple (observation, reward, done, info).
Note, this methods documentation is a 1:1 copy of OpenAI `gym.Env`.
Params
------
action: object
An action provided by the agent
Returns
-------
observation: object
Agent's observation of the current environment
reward: float
Amount of reward returned after previous action
done: bool
Whether the episode has ended, in which case further step() calls will return undefined results
info: (dict)
Contains auxiliary diagnostic information (helpful for debugging, and sometimes learning)
'''
assert self._env, 'Environment not running.'
obs, reward, done, info = self._env.step(action)
return obs, reward, done, info
def reset(self):
'''Resets the state of the environment and returns an initial observation.
Note, this methods documentation is a 1:1 copy of OpenAI `gym.Env`.
Returns
-------
observation: object
The initial observation.
'''
assert self._env, 'Environment not running.'
obs, info = self._env.reset()
return obs
def seed(self, seed):
''''Sets the seed for this env's random number generator(s).'''
raise NotImplementedError()
def render(self, mode='human'):
'''Renders the environment.
Note, we consider Blender itself the main vehicle to view
and manipulate the current environment state. Calling
this method will usually render a specific camera view
in Blender, transmit its image and visualize it. This will
only work, if the remote environment supports such an operation.
'''
assert self._env, 'Environment not running.'
return self._env.render(mode=mode)
@property
def env_time(self):
'''Returns the remote environment time.'''
return self._env.env_time
def close(self):
'''Close the environment.'''
if self._es:
self._es.close()
self._es = None
self._env = None
def __del__(self):
self.close()
except ImportError as e:
pass