From 0f9b34d41aba5e696ac68ac5b8e24b485a22d092 Mon Sep 17 00:00:00 2001 From: Tomek Urbaszek Date: Thu, 25 Jun 2020 12:20:59 +0200 Subject: [PATCH 1/2] Add query count tests for _run_raw_task --- tests/models/test_taskinstance.py | 56 +++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index bfe4c09ecc740..27180952cc2b7 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -22,6 +22,7 @@ import unittest import urllib from typing import List, Optional, Union, cast +from unittest import mock from unittest.mock import call, mock_open, patch import pendulum @@ -51,7 +52,11 @@ from airflow.utils.types import DagRunType from tests.models import DEFAULT_DATE from tests.test_utils import db +from tests.test_utils.asserts import assert_queries_count from tests.test_utils.config import conf_vars +from tests.test_utils.db import ( + clear_db_dags, clear_db_errors, clear_db_pools, clear_db_runs, clear_db_sla_miss, +) class CallbackWrapper: @@ -1699,3 +1704,54 @@ def test_refresh_from_task(pool_override): assert ti.max_tries == task.retries assert ti.executor_config == task.executor_config assert ti.operator == DummyOperator.__name__ + + +class TestRunRawTaskQueriesCount(unittest.TestCase): + """ + These tests are designed to detect changes in the number of queries for + different DAG files. These tests allow easy detection when a change is + made that affects the performance of the SchedulerJob. + """ + + @staticmethod + def _clean(): + clear_db_runs() + clear_db_pools() + clear_db_dags() + clear_db_sla_miss() + clear_db_errors() + + def setUp(self) -> None: + self._clean() + + def tearDown(self) -> None: + self._clean() + + @parameterized.expand([ + # Expected queries, mark_success + (7, False), + (5, True), + ]) + def test_execute_queries_count(self, expected_query_count, mark_success): + with create_session() as s: + dag = DAG('test_queries', start_date=DEFAULT_DATE) + task = DummyOperator(task_id='op', dag=dag) + ti = TI(task=task, execution_date=datetime.datetime.now()) + ti.state = State.RUNNING + s.merge(ti) + + with assert_queries_count(expected_query_count): + ti._run_raw_task(mark_success=mark_success) + + def test_execute_queries_count_store_serialized(self): + with create_session() as s: + dag = DAG('test_queries', start_date=DEFAULT_DATE) + task = DummyOperator(task_id='op', dag=dag) + ti = TI(task=task, execution_date=datetime.datetime.now()) + ti.state = State.RUNNING + s.merge(ti) + + with assert_queries_count(10), mock.patch( + "airflow.models.taskinstance.STORE_SERIALIZED_DAGS", True + ): + ti._run_raw_task() From 3e11965e4b4e2f2aab42e3c2aaffa619ed494d43 Mon Sep 17 00:00:00 2001 From: Tomek Urbaszek Date: Thu, 25 Jun 2020 13:30:23 +0200 Subject: [PATCH 2/2] fixup! Add query count tests for _run_raw_task --- tests/models/test_taskinstance.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 27180952cc2b7..6aaf9ef32bbcd 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -22,7 +22,6 @@ import unittest import urllib from typing import List, Optional, Union, cast -from unittest import mock from unittest.mock import call, mock_open, patch import pendulum @@ -1708,9 +1707,8 @@ def test_refresh_from_task(pool_override): class TestRunRawTaskQueriesCount(unittest.TestCase): """ - These tests are designed to detect changes in the number of queries for - different DAG files. These tests allow easy detection when a change is - made that affects the performance of the SchedulerJob. + These tests are designed to detect changes in the number of queries executed + when calling _run_raw_task """ @staticmethod @@ -1733,25 +1731,25 @@ def tearDown(self) -> None: (5, True), ]) def test_execute_queries_count(self, expected_query_count, mark_success): - with create_session() as s: + with create_session() as session: dag = DAG('test_queries', start_date=DEFAULT_DATE) task = DummyOperator(task_id='op', dag=dag) ti = TI(task=task, execution_date=datetime.datetime.now()) ti.state = State.RUNNING - s.merge(ti) + session.merge(ti) with assert_queries_count(expected_query_count): ti._run_raw_task(mark_success=mark_success) def test_execute_queries_count_store_serialized(self): - with create_session() as s: + with create_session() as session: dag = DAG('test_queries', start_date=DEFAULT_DATE) task = DummyOperator(task_id='op', dag=dag) ti = TI(task=task, execution_date=datetime.datetime.now()) ti.state = State.RUNNING - s.merge(ti) + session.merge(ti) - with assert_queries_count(10), mock.patch( + with assert_queries_count(10), patch( "airflow.models.taskinstance.STORE_SERIALIZED_DAGS", True ): ti._run_raw_task()