Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix external_executor_id not being set for manually run jobs. #17207

Merged
merged 1 commit into from
Sep 15, 2021

Conversation

Jorricks
Copy link
Contributor

Summary of the problem:
Currently the celery task_id is being stored only after a scheduler launched a task in its executor.
Then the celery task_id is being put on the event_buffer and the scheduler periodically reads out the event_buffer and stores the external_executor_id.
Because manually trigger tasks enter the adoption flow -- as their executor instances are only there for the launching of that one specific tasks -- and the external_executor_id is not set, they won't get adopted. Instead they get killed. Meaning, any manually triggered task that doesn't have an external_executor_id from a previous scheduled run before being launched, might get killed if the adoption routines kicks in while the task is still running.

Solution:
I could imagine two solution:

  1. After every manually triggered task, we read the event_buffer and store that in the task instances.
  2. Every task that is triggered for Celery Executors automatically stores its external_executor_id at the start up of the task.

I implemented both but found the second version nicer.
I am looking for some feedback so please provide me with any you can think of :)

Opened issues that are related:
related: #16023

@boring-cyborg boring-cyborg bot added area:CLI area:Scheduler Scheduler or dag parsing Issues labels Jul 25, 2021
@Jorricks
Copy link
Contributor Author

Jorricks commented Jul 25, 2021

The other idea:

After every manually triggered task, we read the event_buffer and store that in the task instances.

Would look something like adding this to the def run() of class TaskInstanceModelView:

            event_buffer = executor.get_event_buffer()
            for ti_key, (state, external_executor_id) in event_buffer.items():
                if ti.key == ti_key and state == State.QUEUED:
                    session.merge(ti)
                    ti.external_executor_id = external_executor_id
                    session.commit()
                    updated = True

if "external_executor_id" in args:
return args.external_executor_id
elif "external_executor_id" in os.environ:
return args.external_executor_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we read env variable here?

Copy link
Contributor Author

@Jorricks Jorricks Jul 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Will change that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What did you think of the overal implementation @mik-laj ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the reference.

@Jorricks
Copy link
Contributor Author

Jorricks commented Aug 1, 2021

Rebased on latest main.

Copy link
Contributor

@ephraimbuddy ephraimbuddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should not make this change but dig more to reliably reproduce the linked issue.
On my first try, I was able to see Scheduler marked as failed but on several tries now, I can't reproduce it. I ended up having task stuck in running which is related to another issue(Trying to find how I got to that)

airflow/cli/commands/task_command.py Outdated Show resolved Hide resolved
@Jorricks
Copy link
Contributor Author

Jorricks commented Aug 19, 2021

I feel we should not make this change but dig more to reliably reproduce the linked issue.
On my first try, I was able to see Scheduler marked as failed but on several tries now, I can't reproduce it. I ended up having task stuck in running which is related to another issue(Trying to find how I got to that)

First of all, thanks for taking a look at the PR!

There are a couple things that need to hold for you to be able to reproduce it:

  • Before starting the task, the task should not have run before or at least the external_executor_id must be None.
  • The task must still be running when the adoption flow of one of the schedulers kicks in.

I am able to reliable reproduce this issue when these two things are held.

However, I am not really sure what you mean with dig deeper. Unless the whole executor part is rewritten, this is the best I could come up with. Could you please be a bit more explicit?

@ephraimbuddy
Copy link
Contributor

I feel we should not make this change but dig more to reliably reproduce the linked issue.
On my first try, I was able to see Scheduler marked as failed but on several tries now, I can't reproduce it. I ended up having task stuck in running which is related to another issue(Trying to find how I got to that)

First of all, thanks for taking a look at the PR!

There are a couple things that need to hold for you to be able to reproduce it:

  • Before starting the task, the task should not have run before or at least the external_executor_id must be None.
  • The task must still be running when the adoption flow of one of the schedulers kicks in.

I am able to reliable reproduce this issue when these two things are held.

However, I am not really sure what you mean with dig deeper. Unless the whole executor part is rewritten, this is the best I could come up with. Could you please be a bit more explicit?

I did reproduce it initially but can't reproduce it again. Have done airflow db reset in breeze but can't reproduce it. Well, I do not fully understand this area, let's hear from others

@Jorricks
Copy link
Contributor Author

@ephraimbuddy please make sure you are using a form of a Celery Executor, as this ticket is only for celery setups.

Copy link
Contributor

@ephraimbuddy ephraimbuddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now makes sense to me and will make it possible for CeleryExecutors to adopt tasks from failed SchedulerJobs when tasks are run from the UI with Ignore All deps.

Can you add some tests?

airflow/cli/commands/task_command.py Show resolved Hide resolved
@Jorricks
Copy link
Contributor Author

I tried to add some tests on the CLI task part. That is pretty much done.
However, I had quite some trouble wrapping my head around a decent test approach on the celery_executor part.
There is currently not really a test for any of the functions I modified which makes me wonder if I should add them.
If so, do you have any remarks on how I could best do that?

@Jorricks Jorricks changed the title WIP: Fix external_executor_id not being set for manually run jobs. Fix external_executor_id not being set for manually run jobs. Aug 23, 2021
@Jorricks
Copy link
Contributor Author

CI/CD failed on transient errors, not related to this PR :)

Copy link
Contributor

@ephraimbuddy ephraimbuddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to add some tests on the CLI task part. That is pretty much done.
However, I had quite some trouble wrapping my head around a decent test approach on the celery_executor part.
There is currently not really a test for any of the functions I modified which makes me wonder if I should add them.
If so, do you have any remarks on how I could best do that?

For the methods modified in CeleryExecutor, you can mock them and assert they were called with a celeryID.
The app can be mocked to return an ID. See

patch_app = mock.patch('airflow.executors.celery_executor.app', test_app)

You can provide appID there.

airflow/executors/celery_executor.py Outdated Show resolved Hide resolved
@Jorricks
Copy link
Contributor Author

I tried to add some tests on the CLI task part. That is pretty much done.
However, I had quite some trouble wrapping my head around a decent test approach on the celery_executor part.
There is currently not really a test for any of the functions I modified which makes me wonder if I should add them.
If so, do you have any remarks on how I could best do that?

For the methods modified in CeleryExecutor, you can mock them and assert they were called with a celeryID.
The app can be mocked to return an ID. See

patch_app = mock.patch('airflow.executors.celery_executor.app', test_app)

You can provide appID there.

I will do this after I am back from my vacation in about two weeks 👍.

@Jorricks
Copy link
Contributor Author

Jorricks commented Sep 7, 2021

I tried to add some tests on the CLI task part. That is pretty much done.
However, I had quite some trouble wrapping my head around a decent test approach on the celery_executor part.
There is currently not really a test for any of the functions I modified which makes me wonder if I should add them.
If so, do you have any remarks on how I could best do that?

For the methods modified in CeleryExecutor, you can mock them and assert they were called with a celeryID.
The app can be mocked to return an ID. See

patch_app = mock.patch('airflow.executors.celery_executor.app', test_app)

You can provide appID there.

I modified the existing test to add this behaviour.

@ephraimbuddy
Copy link
Contributor

You have some conflicts :)

@Jorricks
Copy link
Contributor Author

You have some conflicts :)

Fixed.

tests/cli/commands/test_task_command.py Outdated Show resolved Hide resolved
tests/executors/test_celery_executor.py Outdated Show resolved Hide resolved
tests/executors/test_celery_executor.py Outdated Show resolved Hide resolved
@Jorricks
Copy link
Contributor Author

So @ephraimbuddy, do you think it's ready to be merged now or?

@ephraimbuddy
Copy link
Contributor

So @ephraimbuddy, do you think it's ready to be merged now or?

Looks like you have test failure

@Jorricks
Copy link
Contributor Author

Tests should be fixed now 👍

@Jorricks
Copy link
Contributor Author

Can someone please re-trigger the pipeline :)?

@ephraimbuddy
Copy link
Contributor

You can actually retrigger it by closing and opening the PR

@Jorricks
Copy link
Contributor Author

You can actually retrigger it by closing and opening the PR

Didn't know I could re-open it myself. Thank you!

@Jorricks
Copy link
Contributor Author

AFAIK, the failures in the tests are not related to this PR.

@Jorricks
Copy link
Contributor Author

Rebased on latest main again to see if that makes it better.

@Jorricks
Copy link
Contributor Author

Yes the remaining failure is not related to this PR.

@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Sep 15, 2021
@ephraimbuddy
Copy link
Contributor

cc: @ashb @kaxil

@ashb
Copy link
Member

ashb commented Sep 15, 2021

Should be fixed on main now, so re-triggering build :)

@ashb ashb closed this Sep 15, 2021
@ashb ashb reopened this Sep 15, 2021
@ashb ashb merged commit e7925d8 into apache:main Sep 15, 2021
@kaxil kaxil added this to the Airflow 2.2.0 milestone Sep 20, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:CLI area:Scheduler Scheduler or dag parsing Issues full tests needed We need to run full set of tests for this PR to merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants