/
model.py
65 lines (56 loc) · 1.91 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import numpy as np
from dislib.data.array import Array
from pycompss.api.api import compss_wait_on
try:
import cbor2
except ImportError:
cbor2 = None
def encoder_helper(obj):
if isinstance(obj, np.generic):
return obj.item()
elif isinstance(obj, np.ndarray):
return {
"class_name": "ndarray",
"dtype_list": len(obj.dtype.descr) > 1,
"dtype": str(obj.dtype),
"items": obj.tolist(),
}
elif isinstance(obj, Array):
return {"class_name": "dsarray", **obj.__dict__}
elif isinstance(obj, np.random.RandomState):
return {"class_name": "RandomState", "items": obj.get_state()}
return None
def decoder_helper(class_name, obj):
if class_name == "ndarray":
if obj["dtype_list"]:
items = list(map(tuple, obj["items"]))
return np.rec.fromrecords(items, dtype=eval(obj["dtype"]))
else:
return np.array(obj["items"], dtype=obj["dtype"])
elif class_name == "dsarray":
return Array(
blocks=obj["_blocks"],
top_left_shape=obj["_top_left_shape"],
reg_shape=obj["_reg_shape"],
shape=obj["_shape"],
sparse=obj["_sparse"],
delete=obj["_delete"],
)
return None
def sync_obj(obj):
"""Recursively synchronizes the Future objects of a list or dictionary
by using `compss_wait_on(obj)`.
"""
if isinstance(obj, dict):
iterator = iter(obj.items())
elif isinstance(obj, list):
iterator = iter(enumerate(obj))
else:
raise TypeError("Expected dict or list and received %s." % type(obj))
for key, val in iterator:
if isinstance(val, (dict, list)):
sync_obj(obj[key])
else:
obj[key] = compss_wait_on(val)
if isinstance(getattr(obj[key], "__dict__", None), dict):
sync_obj(obj[key].__dict__)