/
async_.py
234 lines (180 loc) · 6.52 KB
/
async_.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
"""
Lightweight asynchronous framework.
This module defines the protocol used for asynchronous operations in udiskie.
It is based on ideas from "Twisted" and the "yield from" expression in
python3, but more lightweight (incomplete) and compatible with python2.
"""
import traceback
from functools import partial
from subprocess import CalledProcessError
from gi.repository import GLib
from gi.repository import Gio
from .common import cachedproperty, wraps
__all__ = [
'pack',
'to_coro',
'run_bg',
'Future',
'gather',
'Task',
]
ACTIVE_TASKS = set()
def pack(*values):
"""Unpack a return tuple to a yield expression return value."""
# Schizophrenic returns from asyncs. Inspired by
# gi.overrides.Gio.DBusProxy.
if len(values) == 0:
return None
elif len(values) == 1:
return values[0]
else:
return values
class Future:
"""
Base class for asynchronous operations.
One `Future' object represents an asynchronous operation. It allows for
separate result and error handlers which can be set by appending to the
`callbacks` and `errbacks` lists.
Implementations must conform to the following very lightweight protocol:
The task is started on initialization, but most not finish immediately.
Success/error exit is signaled to the observer by calling exactly one of
`self.set_result(value)` or `self.set_exception(exception)` when the
operation finishes.
For implementations, see :class:`Task` or :class:`Dialog`.
"""
@cachedproperty
def callbacks(self):
"""Functions to be called on successful completion."""
return []
@cachedproperty
def errbacks(self):
"""Functions to be called on error completion."""
return []
def _finish(self, callbacks, *args):
"""Set finished state and invoke specified callbacks [internal]."""
return [fn(*args) for fn in callbacks]
def set_result(self, value):
"""Signal successful completion."""
self._finish(self.callbacks, value)
def set_exception(self, exception):
"""Signal unsuccessful completion."""
was_handled = self._finish(self.errbacks, exception)
if not was_handled:
traceback.print_exception(
type(exception), exception, exception.__traceback__)
def __await__(self):
ACTIVE_TASKS.add(self)
try:
return (yield self)
finally:
ACTIVE_TASKS.remove(self)
def to_coro(func):
@wraps(func)
async def coro(*args, **kwargs):
return func(*args, **kwargs)
return coro
def run_bg(func):
@wraps(func)
def runner(*args, **kwargs):
return ensure_future(func(*args, **kwargs))
return runner
class gather(Future):
"""
Manages a collection of asynchronous tasks.
The callbacks are executed when all of the subtasks have completed.
"""
def __init__(self, *tasks):
"""Create from a list of `Future`-s."""
tasks = list(tasks)
self._done = False
self._results = {}
self._num_tasks = len(tasks)
if not tasks:
run_soon(self.set_result, [])
for idx, task in enumerate(tasks):
task = ensure_future(task)
task.callbacks.append(partial(self._subtask_result, idx))
task.errbacks.append(partial(self._subtask_error, idx))
def _subtask_result(self, idx, value):
"""Receive a result from a single subtask."""
self._results[idx] = value
if len(self._results) == self._num_tasks:
self.set_result([
self._results[i]
for i in range(self._num_tasks)
])
def _subtask_error(self, idx, error):
"""Receive an error from a single subtask."""
self.set_exception(error)
self.errbacks.clear()
def call_func(fn, *args):
"""
Call the function with the specified arguments but return None.
This rather boring helper function is used by run_soon to make sure the
function is executed only once.
"""
# NOTE: Apparently, idle_add does not re-execute its argument if an
# exception is raised. So it's okay to let exceptions propagate.
fn(*args)
def run_soon(fn, *args):
"""Run the function once."""
GLib.idle_add(call_func, fn, *args)
def sleep(seconds):
future = Future()
GLib.timeout_add(int(seconds*1000), future.set_result, True)
return future
def ensure_future(awaitable):
if isinstance(awaitable, Future):
return awaitable
return Task(iter(awaitable.__await__()))
class Task(Future):
"""Turns a generator into a Future."""
def __init__(self, generator):
"""Create and start a ``Task`` from the specified generator."""
self._generator = generator
run_soon(self._resume, next, self._generator)
def _resume(self, func, *args):
"""Resume the coroutine by throwing a value or returning a value from
the ``await`` and handle further awaits."""
try:
value = func(*args)
except StopIteration:
self._generator.close()
self.set_result(None)
except Exception as e:
self._generator.close()
self.set_exception(e)
else:
assert isinstance(value, Future)
value.callbacks.append(partial(self._resume, self._generator.send))
value.errbacks.append(partial(self._resume, self._generator.throw))
def gio_callback(proxy, result, future):
future.set_result(result)
async def exec_subprocess(argv):
"""
An Future task that represents a subprocess. If successful, the task's
result is set to the collected STDOUT of the subprocess.
:raises subprocess.CalledProcessError: if the subprocess returns a non-zero
exit code
"""
future = Future()
process = Gio.Subprocess.new(
argv,
Gio.SubprocessFlags.STDOUT_PIPE |
Gio.SubprocessFlags.STDIN_INHERIT)
stdin_buf = None
cancellable = None
process.communicate_async(
stdin_buf, cancellable, gio_callback, future)
result = await future
success, stdout, stderr = process.communicate_finish(result)
stdout = stdout.get_data() # GLib.Bytes -> bytes
if not success:
raise RuntimeError("Subprocess did not exit normally!")
exit_code = process.get_exit_status()
if exit_code != 0:
raise CalledProcessError(
"Subprocess returned a non-zero exit-status!",
exit_code,
stdout)
return stdout