Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly handle ti state difference between executor and scheduler #17819

Merged
merged 8 commits into from
Sep 21, 2021

Conversation

ephraimbuddy
Copy link
Contributor

@ephraimbuddy ephraimbuddy commented Aug 24, 2021

When a task fails to start, the executor fails it and its state in
scheduler is queued while its state in executor is failed. Currently
we fail this task without retries to avoid getting stuck.

This PR changes this to only fail the task if the callback cannot be
executed. This ensures the task does not get stuck

closes: #16625


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg boring-cyborg bot added the area:Scheduler Scheduler or dag parsing Issues label Aug 24, 2021
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to run any user code in Scheduler. That is why callbacks are currently run in the DAG file processor. Long term they should be run in the Worker

@ephraimbuddy
Copy link
Contributor Author

We don't want to run any user code in Scheduler. That is why callbacks are currently run in the DAG file processor. Long term they should be run in the Worker

Alright, let me think of another way...Thanks!

@ephraimbuddy ephraimbuddy deleted the fix-task-callback-scheduler branch August 24, 2021 23:43
@ephraimbuddy ephraimbuddy restored the fix-task-callback-scheduler branch August 25, 2021 11:21
@ephraimbuddy ephraimbuddy reopened this Aug 25, 2021
@ephraimbuddy ephraimbuddy marked this pull request as draft August 25, 2021 11:22
@ephraimbuddy ephraimbuddy changed the title Handle task callback inside the scheduler Promptly handle task callback from _process_executor_events Aug 25, 2021
@ephraimbuddy
Copy link
Contributor Author

Hi @kaxil, what do you think about this implementation? still working on tests...

@ephraimbuddy
Copy link
Contributor Author

This is how to reproduce the error: Run this dag and assert that it's successful. Then uncomment the depend_on_past arg so it's proper but depend_on_past instead of depends_on_past. You will have import error on the UI, Run the dag and it would enter queued and retry twice before failing.

import time
from datetime import datetime, timedelta
from airflow import DAG

def on_failure(ctx):
    print('hello world')
    print(ctx)

default_args = {'on_failure_callback': on_failure}

        

dag = DAG(
    dag_id='Give-wrong-arg',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2021,7,12),
    default_args=default_args,
)

@dag.task(retries=2, retry_delay=timedelta(seconds=20))#, depend_on_past=False)
def task_wrong_arg():
    time.sleep(5)

@dag.task
def myfunc():
    return 1



task_wrong_arg() >> myfunc()

@ephraimbuddy ephraimbuddy force-pushed the fix-task-callback-scheduler branch 2 times, most recently from 392f0d5 to f363df6 Compare August 25, 2021 15:42
@ephraimbuddy ephraimbuddy marked this pull request as ready for review August 25, 2021 15:49
@ashb ashb self-assigned this Aug 30, 2021
@ashb
Copy link
Member

ashb commented Aug 30, 2021

By changing a dag file to have a parse error and then triggering callbacks you've hit a different problem too, so that isn't a good way of triggering the behaviour you are trying to test.

(Because to run the callbacks might need to run the on_failure_callback we need the actual loaded dag file in many cases.)

@ephraimbuddy ephraimbuddy deleted the fix-task-callback-scheduler branch September 3, 2021 15:31
@ephraimbuddy ephraimbuddy restored the fix-task-callback-scheduler branch September 3, 2021 17:03
@ephraimbuddy ephraimbuddy reopened this Sep 3, 2021
@ephraimbuddy ephraimbuddy changed the title Promptly handle task callback from _process_executor_events Properly handle ti state difference between executor and scheduler Sep 3, 2021
airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
ephraimbuddy and others added 8 commits September 21, 2021 20:50
When a task fails to start, the executor fails it and the report
says that its state in scheduler is queued while its state in executor is failed.

Currently we fail this task without retries to avoid getting stuck.

This change modifies the above to only fail the task if there's no retries left
@kaxil
Copy link
Member

kaxil commented Sep 21, 2021

All the reviews have been addressed

@kaxil kaxil requested a review from ashb September 21, 2021 19:51
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Sep 21, 2021
@kaxil kaxil merged commit 44f601e into apache:main Sep 21, 2021
@kaxil kaxil deleted the fix-task-callback-scheduler branch September 21, 2021 20:17
@kaxil
Copy link
Member

kaxil commented Sep 21, 2021

Well done @ephraimbuddy 👏

@taylorfinnell
Copy link

@ephraimbuddy @kaxil

We applied this patch directly on top of the 2.1.4 tag and noticed issues almost instantly.

  • Tasks would queue
  • Tasks would throw an exception - the exception in this case was from our DAG code signaling we are not ready to process, so retry later. It was not an internal airflow exception
  • Task logs would indicate that state was being set to UP_FOR_RETRY
  • The UI would show the task still RUNNING. The task would have ~16 retries left to go and would sit on attempt 1
  • Clearing the task would result in shutdown state
  • To get the task to run again we first had to fail it, then clear it again

We also tried the main branch up to this commit and saw very similar issues.

Here are some logs.

...
[2021-09-22 04:47:15,214] {taskinstance.py:1463} ERROR - Task failed with exception
...
raise FileNotFoundError(f"{file} does not exist")

[2021-09-22 04:47:15,299] {logging_mixin.py:109} WARNING - /opt/app-root/lib64/python3.8/site-packages/sqlalchemy/orm/strategies.py:911 SAWarning: Multiple rows returned with uselist=False for lazily-loaded attribute 'DagRun.task_instances'
[2021-09-22 04:47:15,301] {taskinstance.py:1512} INFO - Marking task as UP_FOR_RETRY. dag_id=foo_dag_split, task_id=foo_task, execution_date=20210921T043000, start_date=20210922T044714, end_date=20210922T044715
[2021-09-22 04:47:15,302] {logging_mixin.py:109} WARNING - /opt/app-root/lib64/python3.8/site-packages/sqlalchemy/orm/session.py:2193 SAWarning: Instance <TaskInstance at 0x7f01f5adc070> is already pending in this Session yet is being merged again; this is probably not what you want to do
[2021-09-22 04:47:15,367] {local_task_job.py:151} INFO - Task exited with return code 1
[2021-09-22 04:47:15,517] {logging_mixin.py:109} WARNING - /opt/app-root/lib64/python3.8/site-packages/sqlalchemy/orm/strategies.py:911 SAWarning: Multiple rows returned with uselist=False for lazily-loaded attribute 'DagRun.task_instances'
[2021-09-22 04:47:15,517] {taskinstance.py:1512} INFO - Marking task as UP_FOR_RETRY. dag_id=foo_dag_split, task_id=foo_task, execution_date=20210921T043000, start_date=20210922T044714, end_date=20210922T044715
[2021-09-22 04:47:15,518] {logging_mixin.py:109} WARNING - /opt/app-root/lib64/python3.8/site-packages/sqlalchemy/orm/session.py:2193 SAWarning: Instance <TaskInstance at 0x7f01f5ad6580> is already pending in this Session yet is being merged again; this is probably not what you want to do

@ephraimbuddy
Copy link
Contributor Author

ephraimbuddy commented Sep 22, 2021

Thanks for checking this @taylorfinnell.
Can you modify your patch and use ti.try_number+=1 instead of ti._try_number+=1 in this line:

self._try_number += 1

If I understand correctly, throwing the above exception is not a bug?

@taylorfinnell
Copy link

That's correct we expect that exception to be raised when data is not in place for the DAG to process. We then rely on the retry mechanism to try to process the data at a later time when it is available.

Do you have any suggestions on how we could reproduce this in a test case? If we can get a test that is fixed by your suggestion I would feel more comfortable trying the change. Unfortunately, we didn't see the issue until we got to production scale

@ephraimbuddy
Copy link
Contributor Author

ephraimbuddy commented Sep 22, 2021

I don't have an idea on a test case other than the one we have in the unittest for this PR.

To reproduce with a dag manually, raise AirflowException after this line:

.
That's add raise AirflowException after the above line and run a dag with retries

@WattsInABox
Copy link

Thanks for that, we might be able to push a DAG like that to our staging environment. Theoretically also, we could write an integration test for this? Is there a good document to follow for integration testing other than the random blog articles I've found?

@ephraimbuddy
Copy link
Contributor Author

There’s currently no community doc about integration testing that I’m aware of.

Let us know what happens when you apply the patch above for try_number

@ephraimbuddy
Copy link
Contributor Author

@WattsInABox , I have tested this in deployment and it works as expected. I think you should create an issue for the behaviour you are seeing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler Scheduler or dag parsing Issues full tests needed We need to run full set of tests for this PR to merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Task is not retried when worker pod fails to start
6 participants