-
Notifications
You must be signed in to change notification settings - Fork 756
/
run_tests.py
428 lines (378 loc) · 13.5 KB
/
run_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
import glob
import importlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import uuid
from multiprocessing import Pool
from metaflow._vendor import click
from metaflow.cli import run, start
skip_api_executor = False
try:
from metaflow import Runner
from metaflow.runner.click_api import (
MetaflowAPI,
click_to_python_types,
extract_all_params,
)
except ImportError:
skip_api_executor = True
from metaflow_test import MetaflowTest
from metaflow_test.formatter import FlowFormatter
def iter_graphs():
root = os.path.join(os.path.dirname(__file__), "graphs")
for graphfile in os.listdir(root):
if graphfile.endswith(".json") and not graphfile[0] == ".":
with open(os.path.join(root, graphfile)) as f:
yield json.load(f)
def iter_tests():
root = os.path.join(os.path.dirname(__file__), "tests")
sys.path.insert(0, root)
for testfile in os.listdir(root):
if testfile.endswith(".py") and not testfile[0] == ".":
mod = importlib.import_module(testfile[:-3], "metaflow_test")
for name in dir(mod):
obj = getattr(mod, name)
if (
name != "MetaflowTest"
and isinstance(obj, type)
and issubclass(obj, MetaflowTest)
):
yield obj()
def log(msg, formatter=None, context=None, real_bad=False, real_good=False):
cstr = ""
fstr = ""
if context:
cstr = " in context '%s'" % context["name"]
if formatter:
fstr = " %s" % formatter
if cstr or fstr:
line = "###%s%s: %s ###" % (fstr, cstr, msg)
else:
line = "### %s ###" % msg
if real_bad:
line = click.style(line, fg="red", bold=True)
elif real_good:
line = click.style(line, fg="green", bold=True)
else:
line = click.style(line, fg="white", bold=True)
pid = os.getpid()
click.echo("[pid %s] %s" % (pid, line))
def run_test(formatter, context, debug, checks, env_base, executor):
def run_cmd(mode, args=None):
cmd = [context["python"], "-B", "test_flow.py"]
cmd.extend(context["top_options"])
cmd.append(mode)
if args:
cmd.extend(args)
cmd.extend(("--run-id-file", "run-id"))
cmd.extend(context["run_options"])
return cmd
def construct_arg_dict(params_opts, cli_options):
result_dict = {}
has_value = False
secondary_supplied = False
for arg in cli_options:
if "=" in arg:
given_opt, val = arg.split("=", 1)
has_value = True
else:
given_opt = arg
for key, each_param in params_opts.items():
py_type = click_to_python_types[type(each_param.type)]
if given_opt in each_param.opts:
secondary_supplied = False
elif given_opt in each_param.secondary_opts:
secondary_supplied = True
else:
continue
if has_value:
value = val
else:
if secondary_supplied:
value = False
else:
value = True
if each_param.multiple:
if key not in result_dict:
result_dict[key] = [py_type(value)]
else:
result_dict[key].append(py_type(value))
else:
result_dict[key] = py_type(value)
has_value = False
secondary_supplied = False
return result_dict
def construct_arg_dicts_from_click_api():
_, _, param_opts, _, _ = extract_all_params(start)
top_level_options = context["top_options"]
top_level_dict = construct_arg_dict(param_opts, top_level_options)
_, _, param_opts, _, _ = extract_all_params(run)
run_level_options = context["run_options"]
run_level_dict = construct_arg_dict(param_opts, run_level_options)
run_level_dict["run_id_file"] = "run-id"
return top_level_dict, run_level_dict
cwd = os.getcwd()
tempdir = tempfile.mkdtemp("_metaflow_test")
package = os.path.dirname(os.path.abspath(__file__))
try:
# write scripts
os.chdir(tempdir)
with open("test_flow.py", "w") as f:
f.write(formatter.flow_code)
with open("check_flow.py", "w") as f:
f.write(formatter.check_code)
shutil.copytree(
os.path.join(cwd, "metaflow_test"), os.path.join(tempdir, "metaflow_test")
)
path = os.path.join(tempdir, "test_flow.py")
env = {}
env.update(env_base)
# expand environment variables
# nonce can be used to insert entropy in env vars.
# This is useful e.g. for separating S3 paths of
# runs, which may have clashing run_ids
env.update(
dict(
(k, v.format(nonce=str(uuid.uuid4())))
for k, v in context["env"].items()
)
)
pythonpath = os.environ.get("PYTHONPATH", ".")
env.update(
{
"LANG": "en_US.UTF-8",
"LC_ALL": "en_US.UTF-8",
"PATH": os.environ.get("PATH", "."),
"PYTHONIOENCODING": "utf_8",
"PYTHONPATH": "%s:%s" % (package, pythonpath),
}
)
if "pre_command" in context:
if context["pre_command"].get("metaflow_command"):
cmd = [context["python"], "test_flow.py"]
cmd.extend(context["top_options"])
cmd.extend(context["pre_command"]["command"])
else:
cmd = context["pre_command"]["command"]
pre_ret = subprocess.call(cmd, env=env)
if pre_ret and not context["pre_command"].get("ignore_errors", False):
log("pre-command failed", formatter, context)
return pre_ret, path
# run flow
if executor == "cli":
flow_ret = subprocess.call(run_cmd("run"), env=env)
elif executor == "api":
top_level_dict, run_level_dict = construct_arg_dicts_from_click_api()
runner = Runner("test_flow.py", show_output=True, env=env, **top_level_dict)
result = runner.run(**run_level_dict)
flow_ret = result.command_obj.process.returncode
if flow_ret:
if formatter.should_fail:
log("Flow failed as expected.")
elif formatter.should_resume:
log("Resuming flow", formatter, context)
if executor == "cli":
flow_ret = subprocess.call(
run_cmd(
"resume",
[formatter.resume_step] if formatter.resume_step else [],
),
env=env,
)
elif executor == "api":
_, resume_level_dict = construct_arg_dicts_from_click_api()
if formatter.resume_step:
resume_level_dict["step_to_rerun"] = formatter.resume_step
result = runner.resume(**resume_level_dict)
flow_ret = result.command_obj.process.returncode
else:
log("flow failed", formatter, context)
return flow_ret, path
elif formatter.should_fail:
log("The flow should have failed but it didn't. Error!", formatter, context)
return 1, path
# check results
run_id = open("run-id").read()
ret = 0
for check_name in context["checks"]:
check = checks[check_name]
python = check["python"]
cmd = [python, "check_flow.py", check["class"], run_id]
cmd.extend(context["top_options"])
check_ret = subprocess.call(cmd, env=env)
if check_ret:
log(
"checker '%s' says that results failed" % check_name,
formatter,
context,
)
ret = check_ret
else:
log(
"checker '%s' says that results are ok" % check_name,
formatter,
context,
)
return ret, path
finally:
os.chdir(cwd)
if not debug:
shutil.rmtree(tempdir)
def run_all(ok_tests, ok_contexts, ok_graphs, debug, num_parallel, inherit_env):
tests = [
test
for test in sorted(iter_tests(), key=lambda x: x.PRIORITY)
if not ok_tests or test.__class__.__name__.lower() in ok_tests
]
failed = []
if inherit_env:
base_env = dict(os.environ)
else:
base_env = {}
if debug or num_parallel is None:
for test in tests:
failed.extend(
run_test_cases((test, ok_contexts, ok_graphs, debug, base_env))
)
else:
args = [(test, ok_contexts, ok_graphs, debug, base_env) for test in tests]
for fail in Pool(num_parallel).imap_unordered(run_test_cases, args):
failed.extend(fail)
return failed
def run_test_cases(args):
test, ok_contexts, ok_graphs, debug, base_env = args
contexts = json.load(open("contexts.json"))
graphs = list(iter_graphs())
test_name = test.__class__.__name__
log("Loaded test %s" % test_name)
failed = []
for graph in graphs:
if ok_graphs and graph["name"].lower() not in ok_graphs:
continue
formatter = FlowFormatter(graph, test)
if formatter.valid:
for context in contexts["contexts"]:
if context.get("disable_parallel", False) and any(
"num_parallel" in node for node in graph["graph"].values()
):
continue
if ok_contexts:
if context["name"].lower() not in ok_contexts:
continue
elif context.get("disabled", False):
continue
if test_name in map(str, context.get("disabled_tests", [])):
continue
enabled_tests = context.get("enabled_tests", [])
if enabled_tests and (test_name not in map(str, enabled_tests)):
continue
for executor in context["executors"]:
if executor == "api" and skip_api_executor is True:
continue
log(
"running [using %s executor]" % executor,
formatter,
context,
)
ret, path = run_test(
formatter,
context,
debug,
contexts["checks"],
base_env,
executor,
)
if ret:
tstid = "%s in context %s [using %s executor]" % (
formatter,
context["name"],
executor,
)
failed.append((tstid, path))
log(
"failed [using %s executor]" % executor,
formatter,
context,
real_bad=True,
)
if debug:
return failed
else:
log(
"success [using %s executor]" % executor,
formatter,
context,
real_good=True,
)
else:
log("not a valid combination. Skipped.", formatter)
return failed
@click.command(help="Run tests")
@click.option(
"--contexts",
default="",
type=str,
help="A comma-separated list of contexts to include (default: all).",
)
@click.option(
"--tests",
default="",
type=str,
help="A comma-separated list of tests to include (default: all).",
)
@click.option(
"--graphs",
default="",
type=str,
help="A comma-separated list of graphs to include (default: all).",
)
@click.option(
"--debug",
is_flag=True,
default=False,
help="Debug mode: Stop at the first failure, " "don't delete test directory",
)
@click.option(
"--inherit-env", is_flag=True, default=False, help="Inherit env variables"
)
@click.option(
"--num-parallel",
show_default=True,
default=None,
type=int,
help="Number of parallel tests to run. By default, " "tests are run sequentially.",
)
def cli(
tests=None,
contexts=None,
graphs=None,
num_parallel=None,
debug=False,
inherit_env=False,
):
parse = lambda x: {t.lower() for t in x.split(",") if t}
failed = run_all(
parse(tests),
parse(contexts),
parse(graphs),
debug,
num_parallel,
inherit_env,
)
if failed:
log("The following tests failed:")
for fail, path in failed:
if debug:
log("%s (path %s)" % (fail, path), real_bad=True)
else:
log(fail, real_bad=True)
sys.exit(1)
else:
log("All tests were successful!", real_good=True)
sys.exit(0)
if __name__ == "__main__":
cli()