Skip to content

Commit

Permalink
[AIRFLOW-1142] Do not reset orphaned state for backfills
Browse files Browse the repository at this point in the history
The scheduler could interfere with backfills when
it resets the state
of tasks that were considered orphaned. This patch
prevents the scheduler
from doing so and adds a guard in the backfill.

Closes #2260 from bolkedebruin/AIRFLOW-1142
  • Loading branch information
bolkedebruin committed Apr 27, 2017
1 parent c2472ff commit 4e79b83
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 2 deletions.
10 changes: 9 additions & 1 deletion airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,8 @@ def _execute_helper(self, processor_manager):
active_runs = DagRun.find(
state=State.RUNNING,
external_trigger=False,
session=session
session=session,
no_backfills=True,
)
for dr in active_runs:
self.logger.info("Resetting {} {}".format(dr.dag_id,
Expand Down Expand Up @@ -1855,6 +1856,13 @@ def _execute(self):
self.logger.debug("Task instance to run {} state {}"
.format(ti, ti.state))

# guard against externally modified tasks instances or
# in case max concurrency has been reached at task runtime
if ti.state == State.NONE:
self.logger.warning("FIXME: task instance {} state was set to "
"None externally. This should not happen")
ti.set_state(State.SCHEDULED, session=session)

# The task was already marked successful or skipped by a
# different Job. Don't rerun it.
if ti.state == State.SUCCESS:
Expand Down
10 changes: 9 additions & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4037,7 +4037,8 @@ def refresh_from_db(self, session=None):
@staticmethod
@provide_session
def find(dag_id=None, run_id=None, execution_date=None,
state=None, external_trigger=None, session=None):
state=None, external_trigger=None, no_backfills=False,
session=None):
"""
Returns a set of dag runs for the given search criteria.
:param dag_id: the dag_id to find dag runs for
Expand All @@ -4050,6 +4051,9 @@ def find(dag_id=None, run_id=None, execution_date=None,
:type state: State
:param external_trigger: whether this dag run is externally triggered
:type external_trigger: bool
:param no_backfills: return no backfills (True), return all (False).
Defaults to False
:type no_backfills: bool
:param session: database session
:type session: Session
"""
Expand All @@ -4069,6 +4073,10 @@ def find(dag_id=None, run_id=None, execution_date=None,
qry = qry.filter(DR.state == state)
if external_trigger is not None:
qry = qry.filter(DR.external_trigger == external_trigger)
if no_backfills:
# in order to prevent a circular dependency
from airflow.jobs import BackfillJob
qry = qry.filter(DR.run_id.notlike(BackfillJob.ID_PREFIX + '%'))

dr = qry.order_by(DR.execution_date).all()

Expand Down
42 changes: 42 additions & 0 deletions tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,48 @@ def test_concurrency(self):

session.close()

def test_execute_helper_reset_orphaned_tasks(self):
session = settings.Session()
dag = DAG(
'test_execute_helper_reset_orphaned_tasks',
start_date=DEFAULT_DATE,
default_args={'owner': 'owner1'})

with dag:
op1 = DummyOperator(task_id='op1')

dag.clear()
dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX,
state=State.RUNNING,
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
session=session)
dr2 = dag.create_dagrun(run_id=BackfillJob.ID_PREFIX,
state=State.RUNNING,
execution_date=DEFAULT_DATE + datetime.timedelta(1),
start_date=DEFAULT_DATE,
session=session)
ti = dr.get_task_instance(task_id=op1.task_id, session=session)
ti.state = State.SCHEDULED
ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
ti2.state = State.SCHEDULED
session.commit()

processor = mock.MagicMock()
processor.get_last_finish_time.return_value = None

scheduler = SchedulerJob(num_runs=0, run_duration=0)
executor = TestExecutor()
scheduler.executor = executor

scheduler._execute_helper(processor_manager=processor)

ti = dr.get_task_instance(task_id=op1.task_id, session=session)
self.assertEqual(ti.state, State.NONE)

ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
self.assertEqual(ti2.state, State.SCHEDULED)

@provide_session
def evaluate_dagrun(
self,
Expand Down

0 comments on commit 4e79b83

Please sign in to comment.