forked from dask/dask
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_multiprocessing.py
132 lines (90 loc) · 3.01 KB
/
test_multiprocessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import multiprocessing
from operator import add
import pickle
import random
import numpy as np
import pytest
from dask import compute, delayed
from dask.context import set_options
from dask.multiprocessing import get, _dumps, _loads, remote_exception
from dask.utils_test import inc
def test_pickle_globals():
""" For the function f(x) defined below, the only globals added in pickling
should be 'np' and '__builtins__'"""
def f(x):
return np.sin(x) + np.cos(x)
assert set(['np', '__builtins__']) == set(
_loads(_dumps(f)).__globals__.keys())
def bad():
raise ValueError("12345")
def test_errors_propagate():
dsk = {'x': (bad,)}
try:
get(dsk, 'x')
except Exception as e:
assert isinstance(e, ValueError)
assert "12345" in str(e)
def test_remote_exception():
e = TypeError("hello")
a = remote_exception(e, 'traceback-body')
b = remote_exception(e, 'traceback-body')
assert type(a) == type(b)
assert isinstance(a, TypeError)
assert 'hello' in str(a)
assert 'Traceback' in str(a)
assert 'traceback-body' in str(a)
def make_bad_result():
return lambda x: x + 1
def test_unpicklable_results_generate_errors():
dsk = {'x': (make_bad_result,)}
try:
get(dsk, 'x')
except Exception as e:
# can't use type because pickle / cPickle distinction
assert type(e).__name__ in ('PicklingError', 'AttributeError')
class NotUnpickleable(object):
def __getstate__(self):
return ()
def __setstate__(self, state):
raise ValueError("Can't unpickle me")
def test_unpicklable_args_generate_errors():
a = NotUnpickleable()
def foo(a):
return 1
dsk = {'x': (foo, a)}
try:
get(dsk, 'x')
except Exception as e:
assert isinstance(e, ValueError)
dsk = {'x': (foo, 'a'),
'a': a}
try:
get(dsk, 'x')
except Exception as e:
assert isinstance(e, ValueError)
def test_reuse_pool():
pool = multiprocessing.Pool()
with set_options(pool=pool):
assert get({'x': (inc, 1)}, 'x') == 2
assert get({'x': (inc, 1)}, 'x') == 2
def test_dumps_loads():
with set_options(func_dumps=pickle.dumps, func_loads=pickle.loads):
assert get({'x': 1, 'y': (add, 'x', 2)}, 'y') == 3
def test_fuse_doesnt_clobber_intermediates():
d = {'x': 1, 'y': (inc, 'x'), 'z': (add, 10, 'y')}
assert get(d, ['y', 'z']) == (2, 12)
def test_optimize_graph_false():
from dask.callbacks import Callback
d = {'x': 1, 'y': (inc, 'x'), 'z': (add, 10, 'y')}
keys = []
with Callback(pretask=lambda key, *args: keys.append(key)):
get(d, 'z', optimize_graph=False)
assert len(keys) == 2
@pytest.mark.parametrize('random', [np.random, random])
def test_random_seeds(random):
def f():
return tuple(random.randint(0, 10000) for i in range(5))
N = 10
with set_options(get=get):
results, = compute([delayed(f, pure=False)() for i in range(N)])
assert len(set(results)) == N