From 29f266adc5fd02bff174b09749951ff22a1a08b2 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sat, 1 Aug 2020 12:38:02 -0700 Subject: [PATCH 1/3] prevent DAG callback exception from crashing scheduler --- airflow/models/dag.py | 13 +++++++++---- airflow/models/taskinstance.py | 9 +++++---- tests/models/test_dag.py | 25 +++++++++++++++++++++++++ 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index dfb6409c69003..be303333397b8 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -47,7 +47,7 @@ from airflow.models.dagcode import DagCode from airflow.models.dagpickle import DagPickle from airflow.models.dagrun import DagRun -from airflow.models.taskinstance import TaskInstance, clear_task_instances +from airflow.models.taskinstance import Context, TaskInstance, clear_task_instances from airflow.utils import timezone from airflow.utils.dates import cron_presets, date_range as utils_date_range from airflow.utils.file import correct_maybe_zipped @@ -64,6 +64,8 @@ DEFAULT_VIEW_PRESETS = ['tree', 'graph', 'duration', 'gantt', 'landing_times'] ORIENTATION_PRESETS = ['LR', 'TB', 'RL', 'BT'] +DagStateChangeCallback = Callable[[Context], None] + def get_last_dagrun(dag_id, session, include_externally_triggered=False): """ @@ -226,8 +228,8 @@ def __init__( default_view: str = conf.get('webserver', 'dag_default_view').lower(), orientation: str = conf.get('webserver', 'dag_orientation'), catchup: bool = conf.getboolean('scheduler', 'catchup_by_default'), - on_success_callback: Optional[Callable] = None, - on_failure_callback: Optional[Callable] = None, + on_success_callback: Optional[DagStateChangeCallback] = None, + on_failure_callback: Optional[DagStateChangeCallback] = None, doc_md: Optional[str] = None, params: Optional[Dict] = None, access_control: Optional[Dict] = None, @@ -686,7 +688,10 @@ def handle_callback(self, dagrun, success=True, reason=None, session=None): ti.task = self.get_task(ti.task_id) context = ti.get_template_context(session=session) context.update({'reason': reason}) - callback(context) + try: + callback(context) + except Exception: + self.log.exception("failed to invoke dag state update callback") def get_active_runs(self): """ diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index a4d2714602d81..d07f91875de62 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -66,13 +66,14 @@ from airflow.utils.timeout import timeout TR = TaskReschedule +Context = Dict[str, Any] -_CURRENT_CONTEXT = [] +_CURRENT_CONTEXT: List[Context] = [] log = logging.getLogger(__name__) @contextlib.contextmanager -def set_current_context(context: Dict[str, Any]): +def set_current_context(context: Context): """ Sets the current execution context to the provided context object. This method should be called once per Task execution, before calling operator.execute. @@ -1391,7 +1392,7 @@ def _safe_date(self, date_attr, fmt): return '' @provide_session - def get_template_context(self, session=None) -> Dict[str, Any]: # pylint: disable=too-many-locals + def get_template_context(self, session=None) -> Context: # pylint: disable=too-many-locals """Return TI Context""" task = self.task from airflow import macros @@ -1583,7 +1584,7 @@ def overwrite_params_with_dag_run_conf(self, params, dag_run): self.log.debug("Updating task params (%s) with DagRun.conf (%s)", params, dag_run.conf) params.update(dag_run.conf) - def render_templates(self, context: Optional[Dict] = None) -> None: + def render_templates(self, context: Optional[Context] = None) -> None: """Render templates in the operator fields.""" if not context: context = self.get_template_context() diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 8891d56c8e98d..708a986482c5d 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -1089,6 +1089,31 @@ def test_schedule_dag_relativedelta(self): dag.clear() self._clean_up(dag_id) + def test_dag_handle_callback_crash(self): + """ + Tests avoid crashes from calling dag callbacks exceptions + """ + dag_id = "test_dag_callback_crash" + mock_callback_with_exception = mock.MagicMock + mock_callback_with_exception.side_effect = Exception + dag = DAG( + dag_id=dag_id, + # callback with invalid signature should not cause crashes + on_success_callback=lambda: 1, + on_failure_callback=mock_callback_with_exception) + dag.add_task(BaseOperator( + task_id="faketastic", + owner='Also fake', + start_date=datetime_tz(2015, 1, 2, 0, 0))) + + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) + dag_run = dag_file_processor.create_dag_run(dag) + # should not rause any exception + dag.handle_callback(dag_run, success=False) + dag.handle_callback(dag_run, success=True) + dag.clear() + self._clean_up(dag_id) + def test_schedule_dag_fake_scheduled_previous(self): """ Test scheduling a dag where there is a prior DagRun From ef8bf7831cc06ebf3c09e0f7b4e4a8aff7b6db9c Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sun, 2 Aug 2020 16:55:50 -0700 Subject: [PATCH 2/3] document change in UPDATING.md and add dag.callback_exceptions metrics --- UPDATING.md | 9 +++++++++ airflow/models/dag.py | 2 ++ tests/models/test_dag.py | 8 ++++++-- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/UPDATING.md b/UPDATING.md index 54759742c86dd..03bd12e493088 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -136,6 +136,7 @@ with third party services to the ``airflow.providers`` package. All changes made are backward compatible, but if you use the old import paths you will see a deprecation warning. The old import paths can be abandoned in the future. + ### Migration Guide from Experimental API to Stable API v1 In Airflow 2.0, we added the new REST API. Experimental API still works, but support may be dropped in the future. If your application is still using the experimental API, you should consider migrating to the stable API. @@ -176,6 +177,14 @@ filters in the query string of this endpoint(``/api/v1/dags/{dag_id}/dagRuns``). reference documentation for more information +### Changes to Exception handling for from DAG callbacks + +Exception from DAG callbacks used to crash scheduler. In order to make +scheduler more robust, we have changed this behavior to log the exception +instead. On top of that, a new `dag.callback_exceptions` counter metric has +been added to help better monitor callback exceptions. + + ### CLI changes in Airflow 2.0 The Airflow CLI has been organized so that related commands are grouped together as subcommands, diff --git a/airflow/models/dag.py b/airflow/models/dag.py index be303333397b8..2a6c11a7b4458 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -48,6 +48,7 @@ from airflow.models.dagpickle import DagPickle from airflow.models.dagrun import DagRun from airflow.models.taskinstance import Context, TaskInstance, clear_task_instances +from airflow.stats import Stats from airflow.utils import timezone from airflow.utils.dates import cron_presets, date_range as utils_date_range from airflow.utils.file import correct_maybe_zipped @@ -692,6 +693,7 @@ def handle_callback(self, dagrun, success=True, reason=None, session=None): callback(context) except Exception: self.log.exception("failed to invoke dag state update callback") + Stats.incr("dag.callback_exceptions") def get_active_runs(self): """ diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 708a986482c5d..a348b71b3a649 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -1089,12 +1089,13 @@ def test_schedule_dag_relativedelta(self): dag.clear() self._clean_up(dag_id) - def test_dag_handle_callback_crash(self): + @patch('airflow.models.dag.Stats') + def test_dag_handle_callback_crash(self, mock_stats): """ Tests avoid crashes from calling dag callbacks exceptions """ dag_id = "test_dag_callback_crash" - mock_callback_with_exception = mock.MagicMock + mock_callback_with_exception = mock.MagicMock() mock_callback_with_exception.side_effect = Exception dag = DAG( dag_id=dag_id, @@ -1111,6 +1112,9 @@ def test_dag_handle_callback_crash(self): # should not rause any exception dag.handle_callback(dag_run, success=False) dag.handle_callback(dag_run, success=True) + + mock_stats.incr.assert_called_with("dag.callback_exceptions") + dag.clear() self._clean_up(dag_id) From db455946c3dd746d7d26bc88505cf39b0fa312bf Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Wed, 5 Aug 2020 15:16:13 -0700 Subject: [PATCH 3/3] add doc dag.callback_exceptions counter metric --- docs/metrics.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/metrics.rst b/docs/metrics.rst index c31f308f749a4..a91158bad37cc 100644 --- a/docs/metrics.rst +++ b/docs/metrics.rst @@ -88,6 +88,7 @@ Name Description ``sla_email_notification_failure`` Number of failed SLA miss email notification attempts ``ti.start..`` Number of started task in a given dag. Similar to _start but for task ``ti.finish...`` Number of completed task in a given dag. Similar to _end but for task +``dag.callback_exceptions`` Number of exceptions raised from DAG callbacks. When this happens, it means DAG callback is not working. ======================================= ================================================================ Gauges