Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle and log exceptions raised during task callback #17347

Merged
merged 3 commits into from
Aug 6, 2021

Conversation

SamWheating
Copy link
Contributor

@SamWheating SamWheating commented Jul 30, 2021

Currently, an exception thrown in a task-level callback will be unhandled, so potentially important parts of the callback (for example, sending a slack notification after a job failure) may be skipped. This can make it really difficult to identify and fix errors in callbacks.

To demonstrate, here'a a simple DAG with an error in the callback function:

from airflow.models import DAG
from airflow import utils
from airflow.operators.python import PythonOperator

def message():
    print('Hello, world.')

def broken_callback(context=None):
    print(f"Callback started")
    print(f"Task {context['taskID']} finished successfully")  # raises keyError
    print("Callback complete.")

with DAG(f'dag-with-broken-callback', schedule_interval=None, start_date=utils.dates.days_ago(1), ) as dag2:
    
    task = PythonOperator(
        task_id='task',
        python_callable=message,
        on_success_callback=broken_callback,
    )

The task execution logs will cut off where the exception is thrown:

[2021-07-30, 16:25:44 UTC] {logging_mixin.py:109} INFO - Hello, world.
[2021-07-30, 16:25:44 UTC] {python.py:151} INFO - Done. Returned value was: None
[2021-07-30, 16:25:44 UTC] {taskinstance.py:1144} INFO - Marking task as SUCCESS. dag_id=dag-with-broken-callback, task_id=task, execution_date=20210730T162540, start_date=20210730T162544, end_date=20210730T162544
[2021-07-30, 16:25:44 UTC] {local_task_job.py:151} INFO - Task exited with return code 0
[2021-07-30, 16:25:44 UTC] {logging_mixin.py:109} INFO - Callback started
<nothing>

And the unhandled exception will show up in the executor logs:

Running <TaskInstance: dag-with-broken-callback.task 2021-07-30T16:25:40.187231+00:00 [queued]> on host 466ea2c2e992
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 33, in <module>
    sys.exit(load_entry_point('apache-airflow', 'console_scripts', 'airflow')())
  File "/opt/airflow/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/opt/airflow/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 256, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 84, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 141, in _run_task_by_local_task_job
    run_job.run()
  File "/opt/airflow/airflow/jobs/base_job.py", line 245, in run
    self._execute()
  File "/opt/airflow/airflow/jobs/local_task_job.py", line 128, in _execute
    self.handle_task_exit(return_code)
  File "/opt/airflow/airflow/jobs/local_task_job.py", line 163, in handle_task_exit
    self.task_instance._run_finished_callback(error=error)
  File "/opt/airflow/airflow/models/taskinstance.py", line 1377, in _run_finished_callback
    task.on_success_callback(context)
  File "/opt/airflow/dags/test_callback.py", line 10, in broken_callback
    print(f"Task {context['taskID']} finished successfully")  # raises keyError
KeyError: 'taskID'
[2021-07-30 16:25:45,078] {sequential_executor.py:66} ERROR - Failed to execute task Command '['airflow', 'tasks', 'run', 'dag-with-broken-callback', 'task', '2021-07-30T16:25:40.187231+00:00', '--local', '--pool', 'default_pool', '--subd
ir', '/opt/airflow/dags/test_callback.py']' returned non-zero exit status 1..
[2021-07-30 16:25:45,079] {scheduler_job.py:577} INFO - Executor reports execution of dag-with-broken-callback.task execution_date=2021-07-30 16:25:40.187231+00:00 exited with status failed for try_number 1
[2021-07-30 16:25:45,129] {dagrun.py:435} INFO - Marking run <DagRun dag-with-broken-callback @ 2021-07-30 16:25:40.187231+00:00: manual__2021-07-30T16:25:40.187231+00:00, externally triggered: True> successful

By handling this exception and logging it, we can make callback failures much easier to identify and fix. After this change, the task logs look like this:

[2021-07-30, 16:26:45 UTC] {logging_mixin.py:109} INFO - Hello, world.
[2021-07-30, 16:26:45 UTC] {python.py:151} INFO - Done. Returned value was: None
[2021-07-30, 16:26:45 UTC] {taskinstance.py:1144} INFO - Marking task as SUCCESS. dag_id=dag-with-broken-callback, task_id=task, execution_date=20210730T162642, start_date=20210730T162645, end_date=20210730T162645
[2021-07-30, 16:26:45 UTC] {local_task_job.py:151} INFO - Task exited with return code 0
[2021-07-30, 16:26:45 UTC] {logging_mixin.py:109} INFO - Callback started
[2021-07-30, 16:26:45 UTC] {taskinstance.py:1357} ERROR - Failed when executing on_success_callback
Traceback (most recent call last):
  File "/opt/airflow/airflow/models/taskinstance.py", line 1355, in _run_finished_callback
    task.on_success_callback(context)
  File "/opt/airflow/dags/test_callback.py", line 10, in broken_callback
    print(f"Task {context['taskID']} finished successfully")  # raises keyError
KeyError: 'taskID'
[2021-07-30, 16:26:45 UTC] {local_task_job.py:258} INFO - 0 downstream tasks scheduled from follow-on schedule check

This removes the stacktrace from the executor logs. If we want to avoid changing this behaviour then we could also re-raise the exception after logging. Thoughts?

Copy link
Contributor

@ephraimbuddy ephraimbuddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add tests for on_failure_callback and on_retry_callback. This is to avoid regression.

tests/models/test_taskinstance.py Outdated Show resolved Hide resolved
@github-actions
Copy link

github-actions bot commented Aug 3, 2021

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Aug 3, 2021
@ephraimbuddy ephraimbuddy reopened this Aug 3, 2021
@ephraimbuddy ephraimbuddy added this to the Airflow 2.1.3 milestone Aug 3, 2021
@ephraimbuddy ephraimbuddy reopened this Aug 3, 2021
@ephraimbuddy
Copy link
Contributor

Can you rebase @SamWheating

@ephraimbuddy
Copy link
Contributor

Closing and reopening to trigger tests

@ephraimbuddy
Copy link
Contributor

@ashb please take a look

@ephraimbuddy ephraimbuddy reopened this Aug 5, 2021
@ephraimbuddy
Copy link
Contributor

@SamWheating please rebase again

@ephraimbuddy ephraimbuddy merged commit faf9f73 into apache:main Aug 6, 2021
jhtimmins pushed a commit that referenced this pull request Aug 9, 2021
Add missing exception handling in success/retry/failure callbacks

(cherry picked from commit faf9f73)
jhtimmins pushed a commit that referenced this pull request Aug 13, 2021
Add missing exception handling in success/retry/failure callbacks

(cherry picked from commit faf9f73)
kaxil pushed a commit that referenced this pull request Aug 17, 2021
Add missing exception handling in success/retry/failure callbacks

(cherry picked from commit faf9f73)
jhtimmins pushed a commit that referenced this pull request Aug 17, 2021
Add missing exception handling in success/retry/failure callbacks

(cherry picked from commit faf9f73)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
full tests needed We need to run full set of tests for this PR to merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants