-
Notifications
You must be signed in to change notification settings - Fork 233
/
code.py
339 lines (253 loc) · 10.7 KB
/
code.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
import inspect
import re
from inspect import signature
from pprint import pformat
from typing import Union, Optional
from multipledispatch.dispatcher import MethodDispatcher
from optimus.engines.base.basedataframe import BaseDataFrame
from optimus.engines.base.columns import BaseColumns
from optimus.engines.base.create import BaseCreate
from optimus.engines.base.engine import BaseEngine
from optimus.engines.base.stringclustering import Clusters
def engine_accessors():
from optimus.engines.base.create import BaseCreate
from optimus.engines.base.io.load import BaseLoad
from optimus.engines.base.io.save import BaseSave
from optimus.engines.base.io.connect import Connect
return {
"create": BaseCreate,
"load": BaseLoad,
"save": BaseSave,
"connect": Connect
}
def dataframe_accessors():
from optimus.engines.base.columns import BaseColumns
from optimus.engines.base.rows import BaseRows
from optimus.engines.base.set import BaseSet
from optimus.engines.base.mask import Mask
from optimus.engines.base.ml.models import BaseML
from optimus.plots.plots import Plot
from optimus.outliers.outliers import Outliers
from optimus.engines.base.profile import BaseProfile
return {
"cols": BaseColumns,
"rows": BaseRows,
"set": BaseSet,
"mask": Mask,
"ml": BaseML,
"plot": Plot,
"outliers": Outliers,
"profile": BaseProfile
}
def accessors():
return {**engine_accessors(), **dataframe_accessors()}
def _create_new_variable(base_name: str, names: List[str]):
while base_name in names:
base_name = _increment_variable_name(base_name)
return base_name
def _increment_variable_name(variable_name: str):
match = re.search(r'\d+$', variable_name)
if match:
variable_name = variable_name[0:match.start()] + str(int(variable_name[match.start():match.endpos]) + 1)
else:
variable_name = variable_name + "2"
return variable_name
def _arguments(args: dict, args_properties: Optional[dict] = None):
args_list = []
# It doesn't find properties or allows kwargs
if not args_properties or "kwargs" in args_properties:
args_list = list(args.keys())
for key in ["source", "target", "operation", "operation_options"]:
if key in args_list:
args_list.remove(key)
# Properties from declaration
else:
args_list = args_properties.keys()
# Formating
for arg in args_list:
# Variable names of list of variable names
if arg in args:
if arg in args_properties and is_any_optimus_type(args_properties[arg].get("type", None)):
if is_list(args[arg]):
args[arg] = f"[{', '.join(args[arg])}]"
else:
args[arg] = args[arg]
# Native types
elif is_str(arg) and arg in args_properties and args_properties[arg].get("type", None) == dict:
pass
else:
args[arg] = pformat(args[arg])
return ", ".join([f'{arg}={args[arg]}' for arg in args_list if arg in args])
def _generate_code_target(body: dict, properties: dict, target: str):
arguments = _arguments(body, properties["arguments"])
code = ''
if target:
code += f'{target} = '
if body.get("source", None):
code += f'{body["source"]}.'
code += f'{body["operation"]}({arguments})'
if target:
return code, [target]
else:
return code, []
def _generate_code_dataframe_transformation(body: dict, properties: dict, variables):
target = body.get("target")
if target is None:
target = body["source"]
options = body.get("operation_options", None)
if options:
if options.get("creates_new", False):
target = available_variable("df", variables)
return _generate_code_target(body, properties, target)
def _generate_code_dataframe_mask(body, properties, variables):
target = body.get("target", available_variable("mask", variables))
return _generate_code_target(body, properties, target)
def _generate_code_dataframe_clusters(body, properties, variables):
target = body.get("target", available_variable("clusters", variables))
return _generate_code_target(body, properties, target)
def _generate_code_output(body, properties, variables):
target = body.get("target", "result")
return _generate_code_target(body, properties, target)
def _generate_code_engine_dataframe(body, properties, variables):
target = body.get("target", available_variable("df", variables))
return _generate_code_target(body, properties, target)
def _generate_code_engine_connection(body, properties, variables):
target = body.get("target", available_variable("conn", variables))
return _generate_code_target(body, properties, target)
def _generate_code_engine(body, properties, variables):
target = body.get("target", available_variable("op", variables))
return _generate_code_target(body, properties, target)
def _get_generator(func_properties, method_root_type):
if method_root_type == "dataframe":
# If the method returns a dataframe, it's a transformation
if 'DataFrameType' == func_properties["return_annotation"]:
return _generate_code_dataframe_transformation
# TO-DO: should mask functions be treated as transformations? (MaskDataFrameType)
if 'MaskDataFrameType' == func_properties["return_annotation"]:
return _generate_code_dataframe_mask
# If the method returns anything else, it's an output
elif 'ClustersType' == func_properties["return_annotation"]:
return _generate_code_dataframe_clusters
else:
return _generate_code_output
elif method_root_type == "engine":
if 'DataFrameType' == func_properties["return_annotation"]:
return _generate_code_engine_dataframe
elif 'ConnectionType' == func_properties["return_annotation"]:
return _generate_code_engine_connection
else:
return _generate_code_output
elif method_root_type == "optimus":
return _generate_code_engine
elif method_root_type == "clusters":
return _generate_code_output
def _get_method_root_type(accessor):
if accessor in engine_accessors():
return "engine"
if accessor in dataframe_accessors():
return "dataframe"
def _init_methods(engine):
method = ""
if engine == "pandas":
from optimus.engines.pandas.engine import PandasEngine
method = PandasEngine
if engine == "vaex":
from optimus.engines.vaex.engine import VaexEngine
method = VaexEngine
if engine == "spark":
from optimus.engines.spark.engine import SparkEngine
method = SparkEngine
if engine == "dask":
from optimus.engines.dask.engine import DaskEngine
method = DaskEngine
if engine == "ibis":
from optimus.engines.ibis.engine import IbisEngine
method = IbisEngine
if engine == "cudf":
from optimus.engines.cudf.engine import CUDFEngine
method = CUDFEngine
if engine == "dask_cudf":
from optimus.engines.dask_cudf.engine import DaskCUDFEngine
method = DaskCUDFEngine
return method
def _generate_code(body=None, variables=[], **kwargs):
if not body:
body = kwargs
operation = body["operation"].split(".")
method = None
method_root_type = None
_accessors = accessors()
if operation[0] == "Optimus":
method = _init_methods(body["engine"])
method_root_type = "optimus"
operation = []
elif operation[0] in _accessors:
method = _accessors[operation[0]]
method_root_type = _get_method_root_type(operation[0])
operation = operation[1:]
elif getattr(BaseDataFrame, operation[0], None):
method = BaseDataFrame
method_root_type = "dataframe"
elif getattr(BaseEngine, operation[0], None):
method = BaseEngine
method_root_type = "engine"
elif getattr(Clusters, operation[0], None):
method = Clusters
method_root_type = "clusters"
for item in operation:
method = getattr(method, item)
properties = method_properties(method, method_root_type)
code, updated = properties["generator"](body, properties, variables)
return code, updated
def optimus_variables():
from optimus.helpers.functions import list_engines, list_dataframes, list_clusters, list_connections
return [*list_engines(), *list_dataframes(), *list_clusters(), *list_connections()]
def available_variable(name: str, variables: dict):
return _create_new_variable(name, [*variables, *optimus_variables()])
def method_properties(func: Union[callable], method_root_type):
try:
fp = signature(func)
func_properties = {"parameters": fp.parameters, "return_annotation": fp.return_annotation}
except ValueError as e:
if getattr(func, "funcs"):
func_properties = {"parameters": {}, "return_annotation": None}
for f in func.funcs.values():
fp = signature(f)
func_properties["parameters"].update(fp.parameters)
if fp.return_annotation:
func_properties["return_annotation"] = fp.return_annotation
else:
raise e
if list(func_properties["parameters"].keys()) == ['root'] and getattr(func, "__call__"):
fp = signature(func.__call__)
func_properties = {"parameters": fp.parameters, "return_annotation": fp.return_annotation}
arguments_list = list(func_properties["parameters"].items())
arguments = {}
for key, arg in arguments_list:
if arg.name in ['self', 'cls']:
continue
arguments[arg.name] = {}
if arg.annotation is not inspect._empty:
arguments[arg.name].update({"type": arg.annotation})
if arg.default is not inspect._empty:
arguments[arg.name].update({"value": arg.default})
return {
"arguments": arguments,
"returns": func_properties["return_annotation"],
"generator": _get_generator(func_properties, method_root_type)
}
def generate_code(body: Optional[dict] = None, variables: List[str] = [], get_updated: bool = False, **kwargs):
if not body:
body = kwargs
body = val_to_list(body)
updated = []
code = []
for operation in body:
operation = prepare(operation)
operation_code, operation_updated = _generate_code(operation, [*updated, *variables])
updated.extend(operation_updated)
code.append(operation_code)
if get_updated:
return "\n".join(code), one_list_to_val(updated)
else:
return "\n".join(code)