-
Notifications
You must be signed in to change notification settings - Fork 6
/
convert.py
271 lines (242 loc) · 9.41 KB
/
convert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
'''Useful functions for converting between different types (dicts, lists, tuples, etc.)
'''
from copy import deepcopy
from uuid import uuid4
from vflow.vset import PREV_KEY
from vflow.vfunc import VfuncPromise
from vflow.subkey import Subkey
import pandas as pd
from pandas import DataFrame
def init_args(args_tuple: tuple, names=None):
''' converts tuple of arguments to a list of dicts
Params
------
names: optional, list-like
gives names for each of the arguments in the tuple
'''
if names is None:
names = ['start'] * len(args_tuple)
else:
assert len(names) == len(args_tuple), 'names should be same length as args_tuple'
output_dicts = []
for (i, ele) in enumerate(args_tuple):
output_dicts.append({
(Subkey(names[i], 'init'), ): args_tuple[i],
PREV_KEY: ('init', ),
})
return output_dicts
def s(x):
'''Gets shape of a list/tuple/ndarray
'''
if type(x) in [list, tuple]:
return len(x)
else:
return x.shape
def init_step(idx, cols):
for i in range(idx, len(cols)):
if cols[i] != 'init':
return 'init-' + cols[i]
def dict_to_df(d: dict):
'''Converts a dictionary with tuple keys
into a pandas DataFrame
'''
d_copy = {tuple([sk.value for sk in k]):d[k] for k in d if k != PREV_KEY}
df = pd.Series(d_copy).reset_index()
if len(d_copy.keys()) > 0:
key_list = list(d.keys())
subkey_list = key_list[0] if key_list[0] != PREV_KEY else key_list[1]
cols = [sk.origin for sk in subkey_list] + ['out']
# set each init col to init-{next_module_set}
cols = [c if c != 'init' else init_step(idx, cols) for idx, c in enumerate(cols) ]
df.set_axis(cols, axis=1, inplace=True)
return df
def compute_interval(df: DataFrame, d_label, wrt_label, accum: list=['std']):
'''Compute an interval (std. dev) of d_label column with
respect to pertubations in the wrt_label column
'''
df = df.astype({wrt_label: str})
return df[[wrt_label, d_label]].groupby(wrt_label).agg(accum)
def to_tuple(lists: list):
'''Convert from lists to unpacked tuple
Ex. [[x1, y1], [x2, y2], [x3, y3]] -> ([x1, x2, x3], [y1, y2, y3])
Ex. [[x1, y1]] -> ([x1], [y1])
Ex. [m1, m2, m3] -> [m1, m2, m3]
Allows us to write X, y = ([x1, x2, x3], [y1, y2, y3])
'''
n_mods = len(lists)
if n_mods <= 1:
return lists
if not type(lists[0]) == list:
return lists
n_tup = len(lists[0])
tup = [[] for _ in range(n_tup)]
for i in range(n_mods):
for j in range(n_tup):
tup[j].append(lists[i][j])
return tuple(tup)
def to_list(tup: tuple):
'''Convert from tuple to packed list
Ex. ([x1, x2, x3], [y1, y2, y3]) -> [[x1, y1], [x2, y2], [x3, y3]]
Ex. ([x1], [y1]) -> [[x1, y1]]
Ex. ([x1, x2, x3]) -> [[x1], [x2], [x3]]
Ex. (x1) -> [[x1]]
Ex. (x1, y1) -> [[x1, y1]]
Ex. (x1, x2, x3, y1, y2, y3) -> [[x1, y1], [x2, y2], [x3, y3]]
Ex. (x1, x2, x3, y1, y2) -> Error
Allows us to call function with arguments in a loop
'''
n_tup = len(tup)
if n_tup == 0:
return []
elif not isinstance(tup[0], list):
# the first element is data
if n_tup == 1:
return list(tup)
if n_tup % 2 != 0:
raise ValueError('Don\'t know how to handle uneven number of args '
'without a list. Please wrap your args in a list.')
# assume first half of args is input and second half is outcome
return [list(el) for el in zip(tup[:(n_tup // 2)], tup[(n_tup // 2):])]
elif n_tup == 1:
return [[x] for x in tup[0]]
n_mods = len(tup[0])
lists_packed = [[] for _ in range(n_mods)]
for i in range(n_mods):
for j in range(n_tup):
lists_packed[i].append(tup[j][i])
return lists_packed
def sep_dicts(d: dict, n_out: int = 1, keys: list = []):
'''converts dictionary with value being saved as an iterable into multiple dictionaries
Assumes every value has same length n_out
Params
------
d: {k1: (x1, y1), k2: (x2, y2), ..., '__prev__': p}
n_out: the number of dictionaries to separate d into
Returns
-------
sep_dicts: [{k1: x1, k2: x2, ..., '__prev__': p}, {k1: y1, k2: y2, '__prev__': p}]
'''
if len(keys) > 0 and len(keys) != n_out:
raise ValueError(f'keys should be empty or have length n_out={n_out}')
# empty dict -- return empty dict
if n_out <= 1:
return d
else:
# try separating dict into multiple dicts
sep_dicts_id = str(uuid4()) # w/ high prob, uuid4 is unique
sep_dicts = [dict() for x in range(n_out)]
for key, value in d.items():
if key != PREV_KEY:
for i in range(n_out):
# assumes the correct sub-key for item i is in the i-th position
if len(keys) == 0:
new_key = (key[i],) + key[n_out:]
else:
new_sub = Subkey(value=keys[i], origin=key[-1].origin+'-'+str(i))
new_key = (new_sub,) + key
new_key[-1]._sep_dicts_id = sep_dicts_id
if isinstance(value, VfuncPromise):
# return a promise to get the value at index i of the
# original promise
value_i = VfuncPromise(lambda v,i: v[i], value, i)
else:
value_i = value[i]
sep_dicts[i][new_key] = value_i
# add back prev
prev = d[PREV_KEY]
for i in range(n_out):
sep_dicts[i][PREV_KEY] = prev
return sep_dicts
def combine_keys(left_key, right_key):
if len(left_key) < len(right_key):
match_key = left_key
compare_key = right_key
else:
match_key = right_key
compare_key = left_key
match_subkeys = [subkey for subkey in match_key if subkey.is_matching()]
if len(match_subkeys) > 0:
matched_subkeys = []
for subkey in match_subkeys:
for c_subkey in compare_key:
if subkey.matches(c_subkey):
matched_subkeys.append(subkey)
break
elif subkey.mismatches(c_subkey):
# subkeys with same origin but different values are rejected
return ()
if len(matched_subkeys) > 0:
# always filter on right key
filtered_key = tuple([subkey for subkey in right_key if subkey not in matched_subkeys])
combined_key = left_key + filtered_key
return combined_key
else:
return left_key + right_key
else:
return left_key + right_key
def combine_dicts(*args: dict, base_case=True):
'''Combines any number of dictionaries into a single dictionary. Dictionaries
are combined left to right, matching on the subkeys of the arg that has
fewer matching requirements.
'''
n_args = len(args)
combined_dict = {}
if n_args == 0:
return combined_dict
elif n_args == 1:
for k in args[0]:
# wrap the dict values in tuples; this is helpful so that when we
# pass the values to a module fun in we can just use * expansion
if k != PREV_KEY:
combined_dict[k] = (args[0][k],)
else:
combined_dict[k] = args[0][k]
return combined_dict
elif n_args == 2:
for k0 in args[0]:
for k1 in args[1]:
if k0 == PREV_KEY or k1 == PREV_KEY:
continue
combined_key = combine_keys(k0, k1)
if len(combined_key) > 0:
if base_case:
combined_dict[combined_key] = (args[0][k0], args[1][k1])
else:
combined_dict[combined_key] = args[0][k0] + (args[1][k1],)
prev_tup = ()
for i in range(2):
if PREV_KEY in args[i]:
prev = args[i][PREV_KEY]
for p in prev:
if p not in prev_tup:
prev_tup += (p,)
combined_dict[PREV_KEY] = prev_tup
return combined_dict
else:
# combine the first two dicts and call recursively with remaining args
return combine_dicts(combine_dicts(args[0], args[1]), *args[2:], base_case=False)
def apply_modules(modules: dict, data_dict: dict, lazy: bool = False):
out_dict = {}
for mod_k in modules:
if (len(data_dict) == 0):
func = deepcopy(modules[mod_k])
if lazy:
out_dict[mod_k] = VfuncPromise(func)
else:
out_dict[mod_k] = func()
for data_k in data_dict:
if mod_k == PREV_KEY or data_k == PREV_KEY:
continue
combined_key = combine_keys(data_k, mod_k)
if len(combined_key) > 0:
func = deepcopy(modules[mod_k])
if lazy:
# return a promise
out_dict[combined_key] = VfuncPromise(func, *data_dict[data_k])
else:
data_list = list(data_dict[data_k])
for i, data in enumerate(data_list):
if isinstance(data, VfuncPromise):
data_list[i] = data()
out_dict[combined_key] = func(*data_list)
return out_dict