This repository has been archived by the owner on Nov 3, 2023. It is now read-only.
/
world_runner.py
196 lines (166 loc) · 6.93 KB
/
world_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import time
import datetime
from concurrent import futures
import parlai.chat_service.utils.logging as log_utils
import parlai.chat_service.utils.misc as utils
class ChatServiceWorldRunner:
"""
World Runner.
Launches worlds, overworlds, etc. Helper for ChatServiceManager.
"""
def __init__(self, opt, world_path, max_workers, manager, is_debug=False):
self._world_module = utils.get_world_module(world_path)
self.executor = futures.ThreadPoolExecutor(max_workers=max_workers)
self.debug = is_debug
self._log("Found world module: {}".format(self._world_module))
opt["is_debug"] = is_debug
self.manager = manager
self.system_done = False
self.opt = opt
self.tasks = {} # task ID to task
self.initialized = False
def _is_done_initializing(fut):
e = fut.exception()
if e is not None:
self._log('`module_initialize` returned with error {}'.format(repr(e)))
if self.debug:
raise e
if fut.result():
print(fut.result())
if self.debug:
print("DEBUG: Call to `module_initialize` has completed...")
self.initialized = True
if hasattr(self._world_module, "module_initialize"):
self._log("Initializing world module...")
# perform any module intialization steps
init_fn = self._world_module.module_initialize
self.init_fut = self.executor.submit(init_fn, opt, manager)
self.init_fut.add_done_callback(_is_done_initializing)
else:
self._log("World module does not have `module initialize` function")
self.initialized = True
def _log(self, text):
if self.debug:
time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print("{} DEBUG: {}".format(time, text))
def is_initialized(self):
return self.initialized
def shutdown(self):
"""
Shutdown the world runner.
"""
for _, task in self.tasks.items():
if task.world is not None:
task.world.shutdown()
self.system_done = True # this forces worlds to stop executing parley
self._log("Executor shutting down.")
self.executor.shutdown()
self._log("Shutdown complete.")
def _run_world(self, task, world_name, agents):
"""
Run a world until completion.
:param task:
TaskState. State of the given task.
:param world_name:
string. The name of the world in the module file
:param agents:
list. A list of agents that should be in the world.
:return:
ret_val: last output of world's parley function. Return None if ERROR
world_data: data attribute of world, if it has one
"""
ret_val = None
world_generator = utils.get_world_fn_attr(
self._world_module, world_name, "generate_world"
)
world = world_generator(self.opt, agents)
task.world = world
while not world.episode_done() and not self.system_done:
ret_val = world.parley()
time.sleep(0.3)
world.shutdown()
world_data = world.data if hasattr(world, "data") else {}
return ret_val, world_data
def launch_task_world(self, task_name, world_name, agents):
"""
Launch a task world.
Return the job's future.
:param task_name:
string. the name of the job thread
:param world_name:
string. the name of the task world in the module file
:param agents:
list. the list of agents to install in the world
:return:
the Futures object corresponding to this launched task
"""
task = utils.TaskState(task_name, world_name, agents)
self.tasks[task_name] = task
def _world_fn():
log_utils.print_and_log(
logging.INFO, 'Starting task {}...'.format(task_name)
)
return self._run_world(task, world_name, agents)
fut = self.executor.submit(_world_fn)
task.future = fut
return fut
def launch_overworld(self, task_name, overworld_name, onboard_map, overworld_agent):
"""
Launch an overworld and a subsequent onboarding world.
Return the job's future
:param task_name:
string. the name of the job thread
:param overworld_name:
string. the name of the overworld in the module file
:param onboard_map:
map. a mapping of overworld return values to the names
of onboarding worlds in the module file.
:param overworld_agent:
The agent to run the overworld with
:return:
the Futures object corresponding to running the overworld
"""
task = utils.TaskState(
task_name,
overworld_name,
[overworld_agent],
is_overworld=True,
world_type=None,
)
self.tasks[task_name] = task
agent_state = self.manager.get_agent_state(overworld_agent.id)
def _world_function():
world_generator = utils.get_world_fn_attr(
self._world_module, overworld_name, "generate_world"
)
overworld = world_generator(self.opt, [overworld_agent])
while not overworld.episode_done() and not self.system_done:
world_type = overworld.parley()
if world_type is None:
time.sleep(0.5)
continue
if world_type == self.manager.EXIT_STR:
self.manager._remove_agent(overworld_agent.id)
return world_type
# perform onboarding
onboard_type = onboard_map.get(world_type)
if onboard_type:
onboard_id = 'onboard-{}-{}'.format(overworld_agent.id, time.time())
agent = self.manager._create_agent(onboard_id, overworld_agent.id)
agent.data = overworld_agent.data
agent_state.set_active_agent(agent)
agent_state.assign_agent_to_task(agent, onboard_id)
_, onboard_data = self._run_world(task, onboard_type, [agent])
agent_state.onboard_data = onboard_data
agent_state.data = agent.data
self.manager.add_agent_to_pool(agent_state, world_type)
log_utils.print_and_log(logging.INFO, 'onboarding/overworld complete')
return world_type
fut = self.executor.submit(_world_function)
task.future = fut
return fut