From c5c0882fa538ab7ea8b252b5195e05497fe096f8 Mon Sep 17 00:00:00 2001 From: Daniel Imberman Date: Tue, 27 Aug 2019 08:48:11 -0500 Subject: [PATCH 1/2] TaskInstance now only overwrites executor_config when explicitly told to do so --- airflow/models/taskinstance.py | 8 ++++++-- tests/models/test_taskinstance.py | 27 +++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index f92f9b551d48b..d6ef4097a129f 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -425,10 +425,13 @@ def error(self, session=None): session.commit() @provide_session - def refresh_from_db(self, session=None, lock_for_update=False): + def refresh_from_db(self, session=None, lock_for_update=False, refresh_executor_config=False): """ Refreshes the task instance from the database based on the primary key + :param refresh_executor_config: if True, revert executor config to + result from DB. Often, however, we will want to keep the newest + version :param lock_for_update: if True, indicates that the database should lock the TaskInstance (issuing a FOR UPDATE clause) until the session is committed. @@ -454,7 +457,8 @@ def refresh_from_db(self, session=None, lock_for_update=False): self.max_tries = ti.max_tries self.hostname = ti.hostname self.pid = ti.pid - self.executor_config = ti.executor_config + if refresh_executor_config: + self.executor_config = ti.executor_config else: self.state = None diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index c08318af65998..3369af35bc021 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -43,6 +43,7 @@ from airflow.utils.state import State from tests.models import DEFAULT_DATE from tests.test_utils import db +from airflow.utils.db import provide_session class TestTaskInstance(unittest.TestCase): @@ -343,6 +344,32 @@ def test_run_pooling_task(self): db.clear_db_pools() self.assertEqual(ti.state, State.SUCCESS) + @provide_session + def test_ti_updates_with_task(self, session=None): + """ + test that updating the executor_config propogates to the TaskInstance DB + """ + dag = models.DAG(dag_id='test_run_pooling_task') + task = DummyOperator(task_id='test_run_pooling_task_op', dag=dag, owner='airflow', + executor_config={'foo': 'bar'}, + start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) + ti = TI( + task=task, execution_date=timezone.utcnow()) + + ti.run(session=session) + tis = dag.get_task_instances() + self.assertEqual({'foo': 'bar'}, tis[0].executor_config) + + task2 = DummyOperator(task_id='test_run_pooling_task_op', dag=dag, owner='airflow', + executor_config={'bar': 'baz'}, + start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) + + ti = TI( + task=task2, execution_date=timezone.utcnow()) + ti.run(session=session) + tis = dag.get_task_instances() + self.assertEqual({'bar': 'baz'}, tis[1].executor_config) + def test_run_pooling_task_with_mark_success(self): """ test that running task in an existing pool with mark_success param From aac1f38e95daf1048d5ad0b9a8a7221658958b08 Mon Sep 17 00:00:00 2001 From: Daniel Imberman Date: Tue, 27 Aug 2019 09:43:09 -0500 Subject: [PATCH 2/2] flake8 --- tests/models/test_taskinstance.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 3369af35bc021..16b57190795aa 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -361,8 +361,8 @@ def test_ti_updates_with_task(self, session=None): self.assertEqual({'foo': 'bar'}, tis[0].executor_config) task2 = DummyOperator(task_id='test_run_pooling_task_op', dag=dag, owner='airflow', - executor_config={'bar': 'baz'}, - start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) + executor_config={'bar': 'baz'}, + start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( task=task2, execution_date=timezone.utcnow())