-
Notifications
You must be signed in to change notification settings - Fork 240
/
thread.py
297 lines (250 loc) · 9.71 KB
/
thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# curio/thread.py
#
# Support for threads implemented on top of the Curio kernel.
#
# Theory of operation:
# --------------------
# Curio has the ability to safely wait for Futures as defined
# in the concurrent.futures module. A notable feature of coroutines
# is that when called, their evaluation is delayed--instead you get
# a "coroutine" object that must be executed by a kernel or event loop.
#
# A so-called "async thread" uses both of these features together to
# set up an execution pathway for allowing threads to execute
# coroutines. For each thread (a real thread--created by the
# threading module), a backing coroutine is created in Curio. This
# backing coroutine runs on top of the Curio kernel and constantly
# monitors a Future for an incoming request. This request is expected
# to contain an unevaluated coroutine. The unevaluated coroutine is
# evaluated on behalf of the thread by the backing coroutine. Any
# result is the communicated back to the thread which is waiting
# for it on an Event.
#
# The mechanism for making a request within a thread is the AWAIT
# function. Specifically, a call like this:
#
# result = AWAIT(coro, *args, **kwargs)
#
# Makes the thread's backing coroutine execute the following:
#
# result = await coro(*args, **kwargs)
#
# From the standpoint of the thread, it appears to be executing a
# normal synchronous call.
#
# Here is a picture diagram of the parts
#
# ________ ___________ _________
# | | await | | Future | |
# | Curio |<-------| backing |<-------| Thread |
# | Kernel |------->| coroutine |------->| |
# |________| result |___________| Event |_________|
#
__all__ = [ 'AWAIT', 'async_thread', 'spawn_thread' ]
# -- Standard Library
import threading
from concurrent.futures import Future
from functools import wraps
from inspect import iscoroutine, isgenerator
from contextlib import contextmanager
import logging
log = logging.getLogger(__name__)
# -- Curio
from . import sync
from . import queue
from .task import spawn, disable_cancellation, check_cancellation, set_cancellation
from .traps import _future_wait
from . import errors
from . import meta
_locals = threading.local()
class AsyncThread(object):
def __init__(self, target=None, args=(), kwargs={}, daemon=False):
self.target = target
self.args = args
self.kwargs = kwargs
self.daemon = daemon
# The following attributes are provided to make a thread mimic a Task
self.terminated = False
self.cancelled = False
self.taskgroup = None
self.joined = False
# This future is used by a thread to make a request to Curio
self._request = Future()
# This event is used to communicate completion of the request
self._done_evt = threading.Event()
# Event used to signal thread termination
self._terminate_evt = sync.UniversalEvent()
# Information about the coroutine being executed by the thread
self._coro = None
self._coro_result = None
self._coro_exc = None
# Final values produced by the thread before termination
self._final_value = None
self._final_exc = None
# A reference to the associated thread (from threading module)
self._thread = None
# A reference to the associated backing coroutine
self._task = None
async def _coro_runner(self):
while True:
# Wait for a hand-off
await disable_cancellation(_future_wait(self._request))
self._coro = self._request.result()
self._request = Future()
# If no coroutine, we're shutting down
if not self._coro:
break
# Run the the coroutine
try:
self._coro_result = await self._coro
self._coro_exc = None
except BaseException as e:
self._coro_result = None
self._coro_exc = e
# Hand it back to the thread
self._coro = None
self._done_evt.set()
if self.taskgroup:
await self.taskgroup._task_done(self)
self.joined = True
def _func_runner(self):
_locals.thread = self
try:
self._final_result = self.target(*self.args, **self.kwargs)
self._final_exc = None
except BaseException as e:
self._final_result = None
self._final_exc = e
if not isinstance(e, errors.CancelledError):
log.warning("Unexpected exception in cancelled async thread", exc_info=True)
finally:
self._request.set_result(None)
self._terminate_evt.set()
async def start(self):
if self.target is None:
raise RuntimeError("Async thread must be given a target")
# Launch the backing coroutine
self._task = await spawn(self._coro_runner, daemon=True)
# Launch the thread itself
self._thread = threading.Thread(target=self._func_runner)
self._thread.start()
def AWAIT(self, coro):
self._request.set_result(coro)
self._done_evt.wait()
self._done_evt.clear()
if self._coro_exc:
raise self._coro_exc
else:
return self._coro_result
async def join(self):
await self.wait()
self.joined = True
if self.taskgroup:
self.taskgroup._task_discard(self)
if self._final_exc:
raise errors.TaskError() from self._final_exc
else:
return self._final_result
async def wait(self):
await self._terminate_evt.wait()
self.terminated = True
@property
def result(self):
if not self._terminate_evt.is_set():
raise RuntimeError('Thread not terminated')
if self._final_exc:
raise self._final_exc
else:
return self._final_result
@property
def exception(self):
if not self._terminate_evt.is_set():
raise RuntimeError('Thread not terminated')
return self._final_exc
async def cancel(self, *, exc=errors.TaskCancelled, blocking=True):
self.cancelled = True
await self._task.cancel(exc=exc, blocking=blocking)
if blocking:
await self.wait()
def AWAIT(coro, *args, **kwargs):
'''
Await for a coroutine in an asynchronous thread. If coro is
not a proper coroutine, this function acts a no-op, returning coro.
'''
# If the coro is a callable and it's identifiable as a coroutine function,
# wrap it inside a coroutine and pass that.
if callable(coro):
if meta.iscoroutinefunction(coro) and hasattr(_locals, 'thread'):
async def _coro(coro):
return await coro(*args, **kwargs)
coro = _coro(coro)
else:
coro = coro(*args, **kwargs)
if iscoroutine(coro) or isgenerator(coro):
if hasattr(_locals, 'thread'):
return _locals.thread.AWAIT(coro)
else:
# Thought: Do we try to promote the calling thread into an
# "async" thread automatically? Would require a running
# kernel. Would require a task dedicated to spawning the
# coro runner. Would require shutdown. Maybe a context
# manager?
raise errors.AsyncOnlyError('Must be used as async')
else:
return coro
def spawn_thread(func, *args, daemon=False):
'''
Launch an async thread. This mimicks the way a task is normally spawned. For
example:
t = await spawn_thread(func, arg1, arg2)
...
await t.join()
'''
if iscoroutine(func) or meta.iscoroutinefunction(func):
raise TypeError("spawn_thread() can't be used on coroutines")
async def runner(args, daemon):
t = AsyncThread(func, args=args, daemon=daemon)
await t.start()
return t
return runner(args, daemon)
def async_thread(func=None, *, daemon=False):
'''
Decorator that is used to mark a callable as running in an asynchronous thread
'''
if func is None:
return lambda func: async_thread(func, daemon=daemon)
if meta.iscoroutinefunction(func):
raise TypeError("async_thread can't be applied to coroutines.")
@wraps(func)
def wrapper(*args, **kwargs):
if meta._from_coroutine() and not is_async_thread():
async def runner(*args, **kwargs):
# Launching async threads could result in a situation where
# synchronous code gets executed, but there is no opportunity
# for Curio to properly check for cancellation. This next
# call is a sanity check--if there's pending cancellation, don't
# even bother to launch the associated thread.
await check_cancellation()
t = AsyncThread(func, args=args, kwargs=kwargs, daemon=daemon)
await t.start()
try:
return await t.join()
except errors.CancelledError as e:
await t.cancel(exc=e)
await set_cancellation(t._task.cancel_pending)
if t._coro_exc:
raise t._coro_exc from None
else:
return t._coro_result
except errors.TaskError as e:
raise e.__cause__ from None
return runner(*args, **kwargs)
else:
return func(*args, **kwargs)
wrapper._async_thread = True
return wrapper
def is_async_thread():
'''
Returns True if current thread is an async thread.
'''
return hasattr(_locals, 'thread')