-
Notifications
You must be signed in to change notification settings - Fork 240
/
test_thread.py
337 lines (264 loc) · 8.19 KB
/
test_thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# test_thread.py
import pytest
from curio import *
from curio.thread import AWAIT, async_thread, spawn_thread
from curio.file import aopen
import time
import pytest
def simple_func(x, y):
AWAIT(sleep(0.5)) # Execute a blocking operation
return x + y
@async_thread
def simple_func2(x, y):
time.sleep(0.5)
return x + y
async def simple_coro(x, y):
await sleep(0.5)
return x + y
def test_good_result(kernel):
async def main():
t = await spawn_thread(simple_func, 2, 3)
result = await t.join()
assert result == 5
kernel.run(main)
def test_bad_result(kernel):
async def main():
t = await spawn_thread(simple_func, 2, '3')
try:
result = await t.join()
assert False
except TaskError as e:
assert isinstance(e.__cause__, TypeError)
assert True
kernel.run(main)
def test_cancel_result(kernel):
async def main():
t = await spawn_thread(simple_func, 2, 3)
await sleep(0.25)
await t.cancel()
try:
result = await t.join()
assert False
except TaskError as e:
assert isinstance(e.__cause__, TaskCancelled)
assert True
kernel.run(main)
def test_thread_good_result(kernel):
@async_thread
def coro():
result = AWAIT(simple_coro(2, 3))
return result
async def main():
result = await coro()
assert result == 5
kernel.run(main)
def test_thread_bad_result(kernel):
@async_thread
def coro():
with pytest.raises(TypeError):
result = AWAIT(simple_coro(2, '3'))
async def main():
await coro()
kernel.run(main)
def test_thread_cancel_result(kernel):
def func():
with pytest.raises(TaskCancelled):
result = AWAIT(simple_coro(2, 3))
async def main():
t = await spawn_thread(func)
await sleep(0.25)
await t.cancel()
kernel.run(main)
def test_thread_sync(kernel):
results = []
def func(lock):
with lock:
results.append('func')
async def main():
lock = Lock()
async with lock:
results.append('main')
t = await spawn_thread(func, lock)
await sleep(0.5)
results.append('main done')
await t.join()
kernel.run(main())
assert results == [ 'main', 'main done', 'func' ]
def test_thread_timeout(kernel):
@async_thread
def func():
with pytest.raises(TaskTimeout):
with timeout_after(1):
AWAIT(sleep(2))
async def main():
await func()
kernel.run(main)
def test_thread_disable_cancellation(kernel):
def func():
with disable_cancellation():
AWAIT(sleep(1))
assert True
with enable_cancellation():
AWAIT(sleep(2))
assert isinstance(AWAIT(check_cancellation()), TaskTimeout)
with pytest.raises(TaskTimeout):
AWAIT(sleep(2))
async def main():
t = await spawn_thread(func)
await sleep(0.5)
await t.cancel()
kernel.run(main)
import os
dirname = os.path.dirname(__file__)
testinput = os.path.join(dirname, 'testdata.txt')
def test_thread_read(kernel):
def func():
with aopen(testinput, 'r') as f:
data = AWAIT(f.read())
assert f.closed == False
assert data == 'line 1\nline 2\nline 3\n'
async def main():
t = await spawn_thread(func)
await t.join()
kernel.run(main)
import curio.subprocess as subprocess
import sys
@pytest.mark.skipif(sys.platform.startswith('win'),
reason='Broken on windows')
def test_subprocess_popen(kernel):
def func():
with subprocess.Popen([sys.executable, '-c', 'print("hello")'], stdout=subprocess.PIPE) as p:
data = AWAIT(p.stdout.read())
assert data == b'hello\n'
async def main():
t = await spawn_thread(func)
await t.join()
kernel.run(main)
def test_task_group_thread(kernel):
results = []
async def add(x, y):
return x + y
def task():
task1 = AWAIT(spawn(add, 1, 1))
task2 = AWAIT(spawn(add, 2, 2))
task3 = AWAIT(spawn(add, 3, 3))
w = TaskGroup([task1, task2, task3])
with w:
for task in w:
result = AWAIT(task.join())
results.append(result)
async def main():
t = await spawn_thread(task)
await t.join()
kernel.run(main)
assert results == [2, 4, 6]
# Tests the use of the spawn function on async thread functions.
# should run as a proper coroutine
def test_spawn_async_thread(kernel):
results = []
async def main():
t = await spawn(simple_func2(2,3))
results.append(1)
results.append(await t.join())
t = await spawn(simple_func2, 2, 3)
results.append(2)
results.append(await t.join())
kernel.run(main)
assert results == [1, 5, 2, 5]
@pytest.mark.skip
def test_thread_context(kernel):
results = []
async def func(x, y):
async with spawn_thread():
time.sleep(0.5)
results.append(AWAIT(simple_coro(x, y)))
async def main():
t = await spawn(func, 2, 3)
await sleep(0.1)
results.append(1)
await t.join()
kernel.run(main)
assert results == [1, 5]
# This test is aimed at testing cancellation/timeouts in async threads
@pytest.mark.skip
def test_thread_timeout(kernel):
evt = Event()
results = []
@async_thread
def worker1():
AWAIT(evt.wait)
results.append('worker1')
async def worker2():
async with spawn_thread():
AWAIT(evt.wait)
results.append('worker2')
async def setter():
await sleep(1)
await evt.set()
async def main():
t = await spawn(setter)
try:
await timeout_after(0.1, worker1)
results.append('bad')
except TaskTimeout:
results.append('timeout1')
await t.join()
evt.clear()
t = await spawn(setter)
try:
await timeout_after(0.1, worker2)
results.append('bad')
except TaskTimeout:
results.append('timeout2')
kernel.run(main)
assert results == ['timeout1', 'timeout2']
import threading
@pytest.mark.skip
def test_async_thread_call(kernel):
# Tests calling between async thread objects
results = []
@async_thread
def func1(t):
results.append('func1')
results.append(t == threading.currentThread())
@async_thread
def func2():
results.append('func2')
# Calling an async_thread function from another async_thread function should
# work like a normal synchronous function call
func1(threading.currentThread())
async def coro1():
results.append('coro1')
async with spawn_thread():
# Calling an async_thread function from a thread context block should
# work like a normal sync function call
func1(threading.currentThread())
async def main():
await func2()
await coro1()
kernel.run(main)
assert results == ['func2', 'func1', True, 'coro1', 'func1', True]
@pytest.mark.skip
def test_async_thread_async_thread_call(kernel):
# Tests calling between async thread objects
results = []
@async_thread
def func1(t):
results.append('func1')
results.append(t == threading.currentThread())
@async_thread
def func2():
results.append('func2')
# Awaiting on an async-thread function should work, but it should stay in the same thread
AWAIT(func1, threading.currentThread())
async def coro1():
results.append('coro1')
async with spawn_thread():
# Calling an async_thread function from a thread context block should
# work like a normal sync function call
func1(threading.currentThread())
async def main():
await func2()
await coro1()
kernel.run(main)
assert results == ['func2', 'func1', True, 'coro1', 'func1', True]