Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 47 additions & 38 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,7 @@ def _change_state_for_tasks_failed_to_execute(self, session):
# set TIs to queued state
for task_instance in tis_to_set_to_scheduled:
task_instance.state = State.SCHEDULED
self.executor.queued_tasks.pop(task_instance.key)

task_instance_str = "\n\t".join(
[repr(x) for x in tis_to_set_to_scheduled])
Expand Down Expand Up @@ -1488,6 +1489,9 @@ def processor_factory(file_path, zombies):
self.processor_agent.end()
self.log.info("Exited execute loop")

def _get_simple_dags(self):
return self.processor_agent.harvest_simple_dags()

def _execute_helper(self):
"""
The actual scheduler loop. The main steps in the loop are:
Expand Down Expand Up @@ -1532,49 +1536,14 @@ def _execute_helper(self):
self.processor_agent.wait_until_finished()

self.log.debug("Harvesting DAG parsing results")
simple_dags = self.processor_agent.harvest_simple_dags()
simple_dags = self._get_simple_dags()
self.log.debug("Harvested {} SimpleDAGs".format(len(simple_dags)))

# Send tasks for execution if available
simple_dag_bag = SimpleDagBag(simple_dags)
if len(simple_dags) > 0:
try:
simple_dag_bag = SimpleDagBag(simple_dags)

# Handle cases where a DAG run state is set (perhaps manually) to
# a non-running state. Handle task instances that belong to
# DAG runs in those states

# If a task instance is up for retry but the corresponding DAG run
# isn't running, mark the task instance as FAILED so we don't try
# to re-run it.
self._change_state_for_tis_without_dagrun(simple_dag_bag,
[State.UP_FOR_RETRY],
State.FAILED)
# If a task instance is scheduled or queued or up for reschedule,
# but the corresponding DAG run isn't running, set the state to
# NONE so we don't try to re-run it.
self._change_state_for_tis_without_dagrun(simple_dag_bag,
[State.QUEUED,
State.SCHEDULED,
State.UP_FOR_RESCHEDULE],
State.NONE)

self._execute_task_instances(simple_dag_bag,
(State.SCHEDULED,))
except Exception as e:
self.log.error("Error queuing tasks")
self.log.exception(e)
continue

# Call heartbeats
self.log.debug("Heartbeating the executor")
self.executor.heartbeat()

self._change_state_for_tasks_failed_to_execute()

# Process events from the executor
self._process_executor_events(simple_dag_bag)
if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
continue

# Heartbeat the scheduler periodically
time_since_last_heartbeat = (timezone.utcnow() -
Expand Down Expand Up @@ -1624,6 +1593,46 @@ def _execute_helper(self):

settings.Session.remove()

def _validate_and_run_task_instances(self, simple_dag_bag):
if len(simple_dag_bag.simple_dags) > 0:
try:
self._process_and_execute_tasks(simple_dag_bag)
except Exception as e:
self.log.error("Error queuing tasks")
self.log.exception(e)
return False

# Call heartbeats
self.log.debug("Heartbeating the executor")
self.executor.heartbeat()

self._change_state_for_tasks_failed_to_execute()

# Process events from the executor
self._process_executor_events(simple_dag_bag)
return True

def _process_and_execute_tasks(self, simple_dag_bag):
# Handle cases where a DAG run state is set (perhaps manually) to
# a non-running state. Handle task instances that belong to
# DAG runs in those states
# If a task instance is up for retry but the corresponding DAG run
# isn't running, mark the task instance as FAILED so we don't try
# to re-run it.
self._change_state_for_tis_without_dagrun(simple_dag_bag,
[State.UP_FOR_RETRY],
State.FAILED)
# If a task instance is scheduled or queued or up for reschedule,
# but the corresponding DAG run isn't running, set the state to
# NONE so we don't try to re-run it.
self._change_state_for_tis_without_dagrun(simple_dag_bag,
[State.QUEUED,
State.SCHEDULED,
State.UP_FOR_RESCHEDULE],
State.NONE)
self._execute_task_instances(simple_dag_bag,
(State.SCHEDULED,))

@provide_session
def heartbeat_callback(self, session=None):
Stats.incr('scheduler_heartbeat', 1, 1)
2 changes: 1 addition & 1 deletion tests/jobs/test_backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ def test_backfill_respect_default_pool_limit(self, mock_log):
if len(running_task_instances) == default_pool_slots:
default_pool_task_slot_count_reached_at_least_once = True

self.assertEquals(8, num_running_task_instances)
self.assertEqual(8, num_running_task_instances)
self.assertTrue(default_pool_task_slot_count_reached_at_least_once)

times_dag_concurrency_limit_reached_in_debug = self._times_called_with(
Expand Down
144 changes: 102 additions & 42 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import os
import shutil
import unittest
from datetime import timedelta
from tempfile import mkdtemp

import psutil
Expand Down Expand Up @@ -75,6 +76,21 @@ def setUp(self):
# enqueue!
self.null_exec = MockExecutor()

def create_test_dag(self, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timedelta(hours=1), **kwargs):
dag = DAG(
dag_id='test_scheduler_reschedule',
start_date=start_date,
# Make sure it only creates a single DAG Run
end_date=end_date)
dag.clear()
dag.is_subdag = False
with create_session() as session:
orm_dag = DagModel(dag_id=dag.dag_id)
orm_dag.is_paused = False
session.merge(orm_dag)
session.commit()
return dag

@classmethod
def setUpClass(cls):
cls.dagbag = DagBag()
Expand Down Expand Up @@ -152,12 +168,59 @@ def test_dag_file_processor_sla_miss_callback_invalid_sla(self):
session.merge(SlaMiss(task_id='dummy',
dag_id='test_sla_miss',
execution_date=test_start_date))

dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag_file_processor.manage_slas(dag=dag, session=session)

sla_callback.assert_not_called()

def test_scheduler_executor_overflow(self):
"""
Test that tasks that are set back to scheduled and removed from the executor
queue in the case of an overflow.
"""
executor = MockExecutor(do_update=True, parallelism=3)

with create_session() as session:
dagbag = DagBag(executor=executor, dag_folder=os.path.join(settings.DAGS_FOLDER,
"no_dags.py"))
dag = self.create_test_dag()
dagbag.bag_dag(dag=dag, root_dag=dag, parent_dag=dag)
dag = self.create_test_dag()
task = DummyOperator(
task_id='dummy',
dag=dag,
owner='airflow')
tis = []
for i in range(1, 10):
ti = TI(task, DEFAULT_DATE + timedelta(days=i))
ti.state = State.SCHEDULED
tis.append(ti)
session.merge(ti)

# scheduler._process_dags(simple_dag_bag)
@mock.patch('airflow.models.DagBag', return_value=dagbag)
@mock.patch('airflow.models.DagBag.collect_dags')
@mock.patch('airflow.jobs.scheduler_job.SchedulerJob._change_state_for_tis_without_dagrun')
def do_schedule(mock_dagbag, mock_collect_dags, mock_change_state_for_tis_without_dagrun):
# Use a empty file since the above mock will return the
# expected DAGs. Also specify only a single file so that it doesn't
# try to schedule the above DAG repeatedly.
scheduler = SchedulerJob(num_runs=1,
executor=executor,
subdir=os.path.join(settings.DAGS_FOLDER,
"no_dags.py"))
scheduler.heartrate = 0
scheduler.run()

do_schedule()
for ti in tis:
ti.refresh_from_db()
self.assertEqual(len(executor.queued_tasks), 0)

successful_tasks = [ti for ti in tis if ti.state == State.SUCCESS]
scheduled_tasks = [ti for ti in tis if ti.state == State.SCHEDULED]
self.assertEqual(3, len(successful_tasks))
self.assertEqual(6, len(scheduled_tasks))

def test_dag_file_processor_sla_miss_callback_sent_notification(self):
"""
Test that the dag file processor does not call the sla_miss_callback when a
Expand Down Expand Up @@ -2195,26 +2258,26 @@ def test_scheduler_reschedule(self):
"""
executor = MockExecutor(do_update=False)

dagbag = DagBag(executor=executor)
dagbag = DagBag(executor=executor, dag_folder=os.path.join(settings.DAGS_FOLDER,
"no_dags.py"))
dagbag.dags.clear()
dagbag.executor = executor

dag = DAG(
dag_id='test_scheduler_reschedule',
start_date=DEFAULT_DATE)
DummyOperator(
dummy_task = DummyOperator(
task_id='dummy',
dag=dag,
owner='airflow')

dag.clear()
dag.is_subdag = False

session = settings.Session()
orm_dag = DagModel(dag_id=dag.dag_id)
orm_dag.is_paused = False
session.merge(orm_dag)
session.commit()
with create_session() as session:
orm_dag = DagModel(dag_id=dag.dag_id)
orm_dag.is_paused = False
session.merge(orm_dag)

dagbag.bag_dag(dag=dag, root_dag=dag, parent_dag=dag)

Expand All @@ -2232,11 +2295,17 @@ def do_schedule(mock_dagbag, mock_collect_dags):
scheduler.run()

do_schedule()
self.assertEqual(1, len(executor.queued_tasks))
executor.queued_tasks.clear()
with create_session() as session:
ti = session.query(TI).filter(TI.dag_id == dag.dag_id,
TI.task_id == dummy_task.task_id).first()
self.assertEqual(0, len(executor.queued_tasks))
self.assertEqual(State.SCHEDULED, ti.state)

executor.do_update = True
do_schedule()
self.assertEqual(2, len(executor.queued_tasks))
self.assertEqual(0, len(executor.queued_tasks))
ti.refresh_from_db()
self.assertEqual(State.SUCCESS, ti.state)

def test_retry_still_in_executor(self):
"""
Expand All @@ -2263,11 +2332,10 @@ def test_retry_still_in_executor(self):
dag.clear()
dag.is_subdag = False

session = settings.Session()
orm_dag = DagModel(dag_id=dag.dag_id)
orm_dag.is_paused = False
session.merge(orm_dag)
session.commit()
with create_session() as session:
orm_dag = DagModel(dag_id=dag.dag_id)
orm_dag.is_paused = False
session.merge(orm_dag)

dagbag.bag_dag(dag=dag, root_dag=dag, parent_dag=dag)

Expand All @@ -2285,19 +2353,21 @@ def do_schedule(mock_dagbag, mock_collect_dags):
scheduler.run()

do_schedule()
self.assertEqual(1, len(executor.queued_tasks))
with create_session() as session:
ti = session.query(TI).filter(TI.dag_id == 'test_retry_still_in_executor',
TI.task_id == 'test_retry_handling_op').first()
ti.task = dag_task1

# Nothing should be left in the queued_tasks as we don't do update in MockExecutor yet,
# and the queued_tasks will be cleared by scheduler job.
self.assertEqual(0, len(executor.queued_tasks))

def run_with_error(task, ignore_ti_state=False):
try:
task.run(ignore_ti_state=ignore_ti_state)
except AirflowException:
pass

ti_tuple = next(iter(executor.queued_tasks.values()))
(_, _, _, simple_ti) = ti_tuple
ti = simple_ti.construct_task_instance()
ti.task = dag_task1

self.assertEqual(ti.try_number, 1)
# At this point, scheduler has tried to schedule the task once and
# heartbeated the executor once, which moved the state of the task from
Expand All @@ -2308,33 +2378,23 @@ def run_with_error(task, ignore_ti_state=False):
self.assertEqual(ti.state, State.UP_FOR_RETRY)
self.assertEqual(ti.try_number, 2)

ti.refresh_from_db(lock_for_update=True, session=session)
ti.state = State.SCHEDULED
session.merge(ti)
session.commit()

# do not schedule
do_schedule()
self.assertTrue(executor.has_task(ti))
ti.refresh_from_db()
# removing self.assertEqual(ti.state, State.SCHEDULED)
# as scheduler will move state from SCHEDULED to QUEUED
with create_session() as session:
ti.refresh_from_db(lock_for_update=True, session=session)
ti.state = State.SCHEDULED
session.merge(ti)

# now the executor has cleared and it should be allowed the re-queue,
# but tasks stay in the executor.queued_tasks after executor.heartbeat()
# will be set back to SCHEDULED state
executor.queued_tasks.clear()
# do schedule
do_schedule()
ti.refresh_from_db()

# MockExecutor is not aware of the TI since we don't do update yet
# and no trace of this TI will be left in the executor.
self.assertFalse(executor.has_task(ti))
self.assertEqual(ti.state, State.SCHEDULED)

# To verify that task does get re-queued.
executor.queued_tasks.clear()
executor.do_update = True
do_schedule()
ti.refresh_from_db()
self.assertIn(ti.state, [State.RUNNING, State.SUCCESS])
self.assertEqual(ti.state, State.SUCCESS)

@unittest.skipUnless("INTEGRATION" in os.environ, "Can only run end to end")
def test_retry_handling_job(self):
Expand Down
6 changes: 4 additions & 2 deletions tests/test_utils/mock_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ def sort_by(item):
(_, prio, _, _) = val
# Sort by priority (DESC), then date,task, try
return -prio, date, dag_id, task_id, try_number
sorted_queue = sorted(self.queued_tasks.items(), key=sort_by)

for (key, (_, _, _, simple_ti)) in sorted_queue:
open_slots = self.parallelism - len(self.running)
sorted_queue = sorted(self.queued_tasks.items(), key=sort_by)
for index in range(min((open_slots, len(sorted_queue)))):
(key, (_, _, _, simple_ti)) = sorted_queue[index]
self.queued_tasks.pop(key)
state = self.mock_task_results[key]
ti = simple_ti.construct_task_instance(session=session, lock_for_update=True)
Expand Down