Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 55 additions & 52 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1255,40 +1255,40 @@ def setUp(self) -> None:
# pylint: disable=bad-whitespace
# expected, dag_count, task_count, start_ago, schedule_interval, shape
# One DAG with one task per DAG file
( 1, 1, 1, "1d", "None", "no_structure"), # noqa
( 1, 1, 1, "1d", "None", "linear"), # noqa
( 9, 1, 1, "1d", "@once", "no_structure"), # noqa
( 9, 1, 1, "1d", "@once", "linear"), # noqa
( 9, 1, 1, "1d", "30m", "no_structure"), # noqa
( 9, 1, 1, "1d", "30m", "linear"), # noqa
( 9, 1, 1, "1d", "30m", "binary_tree"), # noqa
( 9, 1, 1, "1d", "30m", "star"), # noqa
( 9, 1, 1, "1d", "30m", "grid"), # noqa
([ 1, 1, 1, 1], 1, 1, "1d", "None", "no_structure"), # noqa
([ 1, 1, 1, 1], 1, 1, "1d", "None", "linear"), # noqa
([ 9, 5, 5, 5], 1, 1, "1d", "@once", "no_structure"), # noqa
([ 9, 5, 5, 5], 1, 1, "1d", "@once", "linear"), # noqa
([ 9, 12, 15, 18], 1, 1, "1d", "30m", "no_structure"), # noqa
([ 9, 12, 15, 18], 1, 1, "1d", "30m", "linear"), # noqa
([ 9, 12, 15, 18], 1, 1, "1d", "30m", "binary_tree"), # noqa
([ 9, 12, 15, 18], 1, 1, "1d", "30m", "star"), # noqa
([ 9, 12, 15, 18], 1, 1, "1d", "30m", "grid"), # noqa
# One DAG with five tasks per DAG file
( 1, 1, 5, "1d", "None", "no_structure"), # noqa
( 1, 1, 5, "1d", "None", "linear"), # noqa
( 9, 1, 5, "1d", "@once", "no_structure"), # noqa
(10, 1, 5, "1d", "@once", "linear"), # noqa
( 9, 1, 5, "1d", "30m", "no_structure"), # noqa
(10, 1, 5, "1d", "30m", "linear"), # noqa
(10, 1, 5, "1d", "30m", "binary_tree"), # noqa
(10, 1, 5, "1d", "30m", "star"), # noqa
(10, 1, 5, "1d", "30m", "grid"), # noqa
([ 1, 1, 1, 1], 1, 5, "1d", "None", "no_structure"), # noqa
([ 1, 1, 1, 1], 1, 5, "1d", "None", "linear"), # noqa
([ 9, 5, 5, 5], 1, 5, "1d", "@once", "no_structure"), # noqa
([10, 6, 6, 6], 1, 5, "1d", "@once", "linear"), # noqa
([ 9, 12, 15, 18], 1, 5, "1d", "30m", "no_structure"), # noqa
([10, 14, 18, 22], 1, 5, "1d", "30m", "linear"), # noqa
([10, 14, 18, 22], 1, 5, "1d", "30m", "binary_tree"), # noqa
([10, 14, 18, 22], 1, 5, "1d", "30m", "star"), # noqa
([10, 14, 18, 22], 1, 5, "1d", "30m", "grid"), # noqa
# 10 DAGs with 10 tasks per DAG file
( 1, 10, 10, "1d", "None", "no_structure"), # noqa
( 1, 10, 10, "1d", "None", "linear"), # noqa
(81, 10, 10, "1d", "@once", "no_structure"), # noqa
(91, 10, 10, "1d", "@once", "linear"), # noqa
(81, 10, 10, "1d", "30m", "no_structure"), # noqa
(91, 10, 10, "1d", "30m", "linear"), # noqa
(91, 10, 10, "1d", "30m", "binary_tree"), # noqa
(91, 10, 10, "1d", "30m", "star"), # noqa
(91, 10, 10, "1d", "30m", "grid"), # noqa
([ 1, 1, 1, 1], 10, 10, "1d", "None", "no_structure"), # noqa
([ 1, 1, 1, 1], 10, 10, "1d", "None", "linear"), # noqa
([81, 41, 41, 41], 10, 10, "1d", "@once", "no_structure"), # noqa
([91, 51, 51, 51], 10, 10, "1d", "@once", "linear"), # noqa
([81, 111, 111, 111], 10, 10, "1d", "30m", "no_structure"), # noqa
([91, 131, 131, 131], 10, 10, "1d", "30m", "linear"), # noqa
([91, 131, 131, 131], 10, 10, "1d", "30m", "binary_tree"), # noqa
([91, 131, 131, 131], 10, 10, "1d", "30m", "star"), # noqa
([91, 131, 131, 131], 10, 10, "1d", "30m", "grid"), # noqa
# pylint: enable=bad-whitespace
]
)
def test_process_dags_queries_count(
self, expected_query_count, dag_count, task_count, start_ago, schedule_interval, shape
self, expected_query_counts, dag_count, task_count, start_ago, schedule_interval, shape
):
with mock.patch.dict("os.environ", {
"PERF_DAGS_COUNT": str(dag_count),
Expand All @@ -1300,42 +1300,43 @@ def test_process_dags_queries_count(
('scheduler', 'use_job_schedule'): 'True',
}):
dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, include_examples=False)
with assert_queries_count(expected_query_count):
processor = DagFileProcessor([], mock.MagicMock())
processor._process_dags(dagbag.dags.values())
processor = DagFileProcessor([], mock.MagicMock())
for expected_query_count in expected_query_counts:
with assert_queries_count(expected_query_count):
processor._process_dags(dagbag.dags.values())

@parameterized.expand(
[
# pylint: disable=bad-whitespace
# expected, dag_count, task_count, start_ago, schedule_interval, shape
# One DAG with two tasks per DAG file
( 5, 1, 1, "1d", "None", "no_structure"), # noqa
( 5, 1, 1, "1d", "None", "linear"), # noqa
(15, 1, 1, "1d", "@once", "no_structure"), # noqa
(15, 1, 1, "1d", "@once", "linear"), # noqa
(15, 1, 1, "1d", "30m", "no_structure"), # noqa
(15, 1, 1, "1d", "30m", "linear"), # noqa
([ 5, 5, 5, 5], 1, 1, "1d", "None", "no_structure"), # noqa
([ 5, 5, 5, 5], 1, 1, "1d", "None", "linear"), # noqa
([15, 9, 9, 9], 1, 1, "1d", "@once", "no_structure"), # noqa
([15, 9, 9, 9], 1, 1, "1d", "@once", "linear"), # noqa
([15, 18, 21, 24], 1, 1, "1d", "30m", "no_structure"), # noqa
([15, 18, 21, 24], 1, 1, "1d", "30m", "linear"), # noqa
# One DAG with five tasks per DAG file
( 5, 1, 5, "1d", "None", "no_structure"), # noqa
( 5, 1, 5, "1d", "None", "linear"), # noqa
(15, 1, 5, "1d", "@once", "no_structure"), # noqa
(16, 1, 5, "1d", "@once", "linear"), # noqa
(15, 1, 5, "1d", "30m", "no_structure"), # noqa
(16, 1, 5, "1d", "30m", "linear"), # noqa
([ 5, 5, 5, 5], 1, 5, "1d", "None", "no_structure"), # noqa
([ 5, 5, 5, 5], 1, 5, "1d", "None", "linear"), # noqa
([15, 9, 9, 9], 1, 5, "1d", "@once", "no_structure"), # noqa
([16, 10, 10, 10], 1, 5, "1d", "@once", "linear"), # noqa
([15, 18, 21, 24], 1, 5, "1d", "30m", "no_structure"), # noqa
([16, 20, 24, 28], 1, 5, "1d", "30m", "linear"), # noqa
# 10 DAGs with 10 tasks per DAG file
( 5, 10, 10, "1d", "None", "no_structure"), # noqa
( 5, 10, 10, "1d", "None", "linear"), # noqa
(87, 10, 10, "1d", "@once", "no_structure"), # noqa
(97, 10, 10, "1d", "@once", "linear"), # noqa
(87, 10, 10, "1d", "30m", "no_structure"), # noqa
(97, 10, 10, "1d", "30m", "linear"), # noqa
([ 5, 5, 5, 5], 10, 10, "1d", "None", "no_structure"), # noqa
([ 5, 5, 5, 5], 10, 10, "1d", "None", "linear"), # noqa
([87, 45, 45, 45], 10, 10, "1d", "@once", "no_structure"), # noqa
([97, 55, 55, 55], 10, 10, "1d", "@once", "linear"), # noqa
([87, 117, 117, 117], 10, 10, "1d", "30m", "no_structure"), # noqa
([97, 137, 137, 137], 10, 10, "1d", "30m", "linear"), # noqa
# pylint: enable=bad-whitespace
]
)
def test_process_file_queries_count(
self, expected_query_count, dag_count, task_count, start_ago, schedule_interval, shape
self, expected_query_counts, dag_count, task_count, start_ago, schedule_interval, shape
):
with assert_queries_count(expected_query_count), mock.patch.dict("os.environ", {
with mock.patch.dict("os.environ", {
"PERF_DAGS_COUNT": str(dag_count),
"PERF_TASKS_COUNT": str(task_count),
"PERF_START_AGO": start_ago,
Expand All @@ -1345,7 +1346,9 @@ def test_process_file_queries_count(
('scheduler', 'use_job_schedule'): 'True'
}):
processor = DagFileProcessor([], mock.MagicMock())
processor.process_file(ELASTIC_DAG_FILE, [])
for expected_query_count in expected_query_counts:
with assert_queries_count(expected_query_count):
processor.process_file(ELASTIC_DAG_FILE, [])


@pytest.mark.usefixtures("disable_load_example")
Expand Down