forked from dask/dask
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_core.py
234 lines (175 loc) · 6.21 KB
/
test_core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
from collections import namedtuple
import pytest
import pickle
from dask.utils_test import GetFunctionTestMixin, inc, add
from dask import core
from dask.core import (istask, get_dependencies, get_deps, flatten, subs,
preorder_traversal, literal, quote, has_tasks)
def contains(a, b):
"""
>>> contains({'x': 1, 'y': 2}, {'x': 1})
True
>>> contains({'x': 1, 'y': 2}, {'z': 3})
False
"""
return all(a.get(k) == v for k, v in b.items())
def test_istask():
assert istask((inc, 1))
assert not istask(1)
assert not istask((1, 2))
f = namedtuple('f', ['x', 'y'])
assert not istask(f(sum, 2))
def test_has_tasks():
dsk = {'a': [1, 2, 3],
'b': 'a',
'c': [1, (inc, 1)],
'd': [(sum, 'a')],
'e': ['a', 'b'],
'f': [['a', 'b'], 2, 3]}
assert not has_tasks(dsk, dsk['a'])
assert has_tasks(dsk, dsk['b'])
assert has_tasks(dsk, dsk['c'])
assert has_tasks(dsk, dsk['d'])
assert has_tasks(dsk, dsk['e'])
assert has_tasks(dsk, dsk['f'])
def test_preorder_traversal():
t = (add, 1, 2)
assert list(preorder_traversal(t)) == [add, 1, 2]
t = (add, (add, 1, 2), (add, 3, 4))
assert list(preorder_traversal(t)) == [add, add, 1, 2, add, 3, 4]
t = (add, (sum, [1, 2]), 3)
assert list(preorder_traversal(t)) == [add, sum, list, 1, 2, 3]
class TestGet(GetFunctionTestMixin):
get = staticmethod(core.get)
class TestRecursiveGet(GetFunctionTestMixin):
get = staticmethod(lambda d, k: core.get(d, k, recursive=True))
def test_get_stack_limit(self):
# will blow stack in recursive mode
pass
def test_GetFunctionTestMixin_class():
class TestCustomGetFail(GetFunctionTestMixin):
get = staticmethod(lambda x, y: 1)
custom_testget = TestCustomGetFail()
pytest.raises(AssertionError, custom_testget.test_get)
class TestCustomGetPass(GetFunctionTestMixin):
get = staticmethod(core.get)
custom_testget = TestCustomGetPass()
custom_testget.test_get()
def test_get_dependencies_nested():
dsk = {'x': 1, 'y': 2,
'z': (add, (inc, [['x']]), 'y')}
assert get_dependencies(dsk, 'z') == set(['x', 'y'])
assert sorted(get_dependencies(dsk, 'z', as_list=True)) == ['x', 'y']
def test_get_dependencies_empty():
dsk = {'x': (inc,)}
assert get_dependencies(dsk, 'x') == set()
assert get_dependencies(dsk, 'x', as_list=True) == []
def test_get_dependencies_list():
dsk = {'x': 1, 'y': 2, 'z': ['x', [(inc, 'y')]]}
assert get_dependencies(dsk, 'z') == set(['x', 'y'])
assert sorted(get_dependencies(dsk, 'z', as_list=True)) == ['x', 'y']
def test_get_dependencies_task():
dsk = {'x': 1, 'y': 2, 'z': ['x', [(inc, 'y')]]}
assert get_dependencies(dsk, task=(inc, 'x')) == set(['x'])
assert get_dependencies(dsk, task=(inc, 'x'), as_list=True) == ['x']
def test_get_dependencies_nothing():
with pytest.raises(ValueError):
get_dependencies({})
def test_get_dependencies_many():
dsk = {'a': [1, 2, 3],
'b': 'a',
'c': [1, (inc, 1)],
'd': [(sum, 'c')],
'e': ['a', 'b', 'zzz'],
'f': [['a', 'b'], 2, 3]}
tasks = [dsk[k] for k in ('d', 'f')]
s = get_dependencies(dsk, task=tasks)
assert s == {'a', 'b', 'c'}
s = get_dependencies(dsk, task=tasks, as_list=True)
assert sorted(s) == ['a', 'b', 'c']
s = get_dependencies(dsk, task=[])
assert s == set()
s = get_dependencies(dsk, task=[], as_list=True)
assert s == []
def test_get_deps():
"""
>>> dsk = {'a': 1, 'b': (inc, 'a'), 'c': (inc, 'b')}
>>> dependencies, dependents = get_deps(dsk)
>>> dependencies
{'a': set([]), 'c': set(['b']), 'b': set(['a'])}
>>> dependents
{'a': set(['b']), 'c': set([]), 'b': set(['c'])}
"""
dsk = {'a': [1, 2, 3],
'b': 'a',
'c': [1, (inc, 1)],
'd': [(sum, 'c')],
'e': ['b', 'zzz', 'b'],
'f': [['a', 'b'], 2, 3]}
dependencies, dependents = get_deps(dsk)
assert dependencies == {'a': set(),
'b': {'a'},
'c': set(),
'd': {'c'},
'e': {'b'},
'f': {'a', 'b'},
}
assert dependents == {'a': {'b', 'f'},
'b': {'e', 'f'},
'c': {'d'},
'd': set(),
'e': set(),
'f': set(),
}
def test_flatten():
assert list(flatten(())) == []
assert list(flatten('foo')) == ['foo']
def test_subs():
assert subs((sum, [1, 'x']), 'x', 2) == (sum, [1, 2])
assert subs((sum, [1, ['x']]), 'x', 2) == (sum, [1, [2]])
class MutateOnEq(object):
hit_eq = 0
def __eq__(self, other):
self.hit_eq += 1
return False
def test_subs_no_key_data_eq():
# Numpy throws a deprecation warning on bool(array == scalar), which
# pollutes the terminal. This test checks that `subs` never tries to
# compare keys (scalars) with values (which could be arrays)`subs` never
# tries to compare keys (scalars) with values (which could be arrays).
a = MutateOnEq()
subs(a, 'x', 1)
assert a.hit_eq == 0
subs((add, a, 'x'), 'x', 1)
assert a.hit_eq == 0
def test_subs_with_unfriendly_eq():
try:
import numpy as np
except:
return
else:
task = (np.sum, np.array([1, 2]))
assert (subs(task, (4, 5), 1) == task) is True
class MyException(Exception):
pass
class F():
def __eq__(self, other):
raise MyException()
task = F()
assert subs(task, 1, 2) is task
def test_subs_with_surprisingly_friendly_eq():
try:
import pandas as pd
except:
return
else:
df = pd.DataFrame()
assert subs(df, 'x', 1) is df
def test_quote():
literals = [[1, 2, 3], (add, 1, 2),
[1, [2, 3]], (add, 1, (add, 2, 3))]
for l in literals:
assert core.get({'x': quote(l)}, 'x') == l
def test_literal_serializable():
l = literal((add, 1, 2))
assert pickle.loads(pickle.dumps(l)).data == (add, 1, 2)