forked from dask/dask
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_cache.py
62 lines (42 loc) · 1.2 KB
/
test_cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from dask.cache import Cache
from dask.async import get_sync
from dask.threaded import get
from operator import add
from dask.context import _globals
from time import sleep
import pytest
cachey = pytest.importorskip('cachey')
flag = []
def inc(x):
flag.append(x)
return x + 1
def test_cache():
c = cachey.Cache(10000)
cc = Cache(c)
with cc:
assert get({'x': (inc, 1)}, 'x') == 2
assert flag == [1]
assert c.data['x'] == 2
assert not cc.starttimes
assert not cc.durations
while flag:
flag.pop()
dsk = {'x': (inc, 1), 'y': (inc, 2), 'z': (add, 'x', 'y')}
with cc:
assert get(dsk, 'z') == 5
assert flag == [2] # no x present
assert not _globals['callbacks']
def test_cache_with_number():
c = Cache(10000, limit=1)
assert isinstance(c.cache, cachey.Cache)
assert c.cache.available_bytes == 10000
assert c.cache.limit == 1
def f(duration, size, *args):
sleep(duration)
return [0] * size
def test_prefer_cheap_dependent():
dsk = {'x': (f, 0.01, 10), 'y': (f, 0.000001, 1, 'x')}
c = Cache(10000)
with c:
get_sync(dsk, 'y')
assert c.cache.scorer.cost['x'] < c.cache.scorer.cost['y']