-
Notifications
You must be signed in to change notification settings - Fork 34
/
test_parallel_execution.py
64 lines (50 loc) · 1.88 KB
/
test_parallel_execution.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import time
import pytest
from airflow import settings
from dbnd import PipelineTask, parameter
from dbnd._core.errors import DatabandConfigError
from dbnd._core.errors.base import DatabandRunError
from dbnd._core.inline import run_cmd_locally, run_task
from dbnd.tasks.basics import SimplestTask
from dbnd.testing import assert_run_task
from dbnd.testing.helpers_pytest import skip_on_windows
class SleepyTask(SimplestTask):
sleep_time = parameter.value(0.1, significant=False)
def run(self):
if self.sleep_time:
time.sleep(self.sleep_time)
super(SleepyTask, self).run()
class ParallelTasksPipeline(PipelineTask):
num_of_tasks = parameter.value(3)
def band(self):
tasks = []
for i in range(self.num_of_tasks):
tasks.append(SleepyTask(simplest_param=str(i)))
return tasks
class TestTasksParallelExample(object):
def test_parallel_simple_executor(self):
target = ParallelTasksPipeline(num_of_tasks=2)
run_task(target)
assert target._complete()
# @with_context(conf={'executor': {'local': 'true'},
# 'databand': {'module': ParallelTasksPipeline.__module__}})
@skip_on_windows
def test_parallel_local_executor(self):
cmd = [
"-m",
ParallelTasksPipeline.__module__,
ParallelTasksPipeline.get_task_family(),
"--parallel",
"-r",
"num_of_tasks=2",
]
if "sqlite" in settings.SQL_ALCHEMY_CONN:
with pytest.raises(DatabandRunError): # not supported on sqlite
run_cmd_locally(cmd)
else:
run_cmd_locally(cmd)
def test_parallel_dag_locally(self):
task = ParallelTasksPipeline(override={SleepyTask.sleep_time: 0})
assert_run_task(task)
# target = ParallelTasksPipeline(num_of_tasks=2)
# run_task(target)