-
Notifications
You must be signed in to change notification settings - Fork 12
/
test_resource_cache.py
181 lines (140 loc) · 4.9 KB
/
test_resource_cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
'''
Async context manager cache api testing: ``trionics.maybe_open_context():``
'''
from contextlib import asynccontextmanager as acm
import platform
from typing import Awaitable
import pytest
import trio
import tractor
_resource: int = 0
@acm
async def maybe_increment_counter(task_name: str):
global _resource
_resource += 1
await trio.lowlevel.checkpoint()
yield _resource
await trio.lowlevel.checkpoint()
_resource -= 1
@pytest.mark.parametrize(
'key_on',
['key_value', 'kwargs'],
ids="key_on={}".format,
)
def test_resource_only_entered_once(key_on):
global _resource
_resource = 0
key = None
if key_on == 'key_value':
key = 'some_common_key'
async def main():
cache_active: bool = False
async def enter_cached_mngr(name: str):
nonlocal cache_active
if key_on == 'kwargs':
# make a common kwargs input to key on it
kwargs = {'task_name': 'same_task_name'}
assert key is None
else:
# different task names per task will be used
kwargs = {'task_name': name}
async with tractor.trionics.maybe_open_context(
maybe_increment_counter,
kwargs=kwargs,
key=key,
) as (cache_hit, resource):
if cache_hit:
try:
cache_active = True
assert resource == 1
await trio.sleep_forever()
finally:
cache_active = False
else:
assert resource == 1
await trio.sleep_forever()
with trio.move_on_after(0.5):
async with (
tractor.open_root_actor(),
trio.open_nursery() as n,
):
for i in range(10):
n.start_soon(enter_cached_mngr, f'task_{i}')
await trio.sleep(0.001)
trio.run(main)
@tractor.context
async def streamer(
ctx: tractor.Context,
seq: list[int] = list(range(1000)),
) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
for val in seq:
await stream.send(val)
await trio.sleep(0.001)
print('producer finished')
@acm
async def open_stream() -> Awaitable[tractor.MsgStream]:
async with tractor.open_nursery() as tn:
portal = await tn.start_actor('streamer', enable_modules=[__name__])
async with (
portal.open_context(streamer) as (ctx, first),
ctx.open_stream() as stream,
):
yield stream
await portal.cancel_actor()
print('CANCELLED STREAMER')
@acm
async def maybe_open_stream(taskname: str):
async with tractor.trionics.maybe_open_context(
# NOTE: all secondary tasks should cache hit on the same key
acm_func=open_stream,
) as (cache_hit, stream):
if cache_hit:
print(f'{taskname} loaded from cache')
# add a new broadcast subscription for the quote stream
# if this feed is already allocated by the first
# task that entereed
async with stream.subscribe() as bstream:
yield bstream
else:
# yield the actual stream
yield stream
def test_open_local_sub_to_stream():
'''
Verify a single inter-actor stream can can be fanned-out shared to
N local tasks using ``trionics.maybe_open_context():``.
'''
timeout: float = 3.6 if platform.system() != "Windows" else 10
async def main():
full = list(range(1000))
async def get_sub_and_pull(taskname: str):
async with (
maybe_open_stream(taskname) as stream,
):
if '0' in taskname:
assert isinstance(stream, tractor.MsgStream)
else:
assert isinstance(
stream,
tractor.trionics.BroadcastReceiver
)
first = await stream.receive()
print(f'{taskname} started with value {first}')
seq = []
async for msg in stream:
seq.append(msg)
assert set(seq).issubset(set(full))
print(f'{taskname} finished')
with trio.fail_after(timeout):
# TODO: turns out this isn't multi-task entrant XD
# We probably need an indepotent entry semantic?
async with tractor.open_root_actor():
async with (
trio.open_nursery() as nurse,
):
for i in range(10):
nurse.start_soon(get_sub_and_pull, f'task_{i}')
await trio.sleep(0.001)
print('all consumer tasks finished')
trio.run(main)