Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow is trying to schedule tasks prior to DAG's start_date #20461

Closed
1 of 2 tasks
a-chernykh opened this issue Dec 22, 2021 · 13 comments
Closed
1 of 2 tasks

Airflow is trying to schedule tasks prior to DAG's start_date #20461

a-chernykh opened this issue Dec 22, 2021 · 13 comments
Labels
affected_version:2.2 Issues Reported for 2.2 area:core area:Scheduler Scheduler or dag parsing Issues kind:bug This is a clearly a bug pending-response priority:medium Bug that should be fixed before next release but would not block a release stale Stale PRs per the .github/workflows/stale.yml policy file

Comments

@a-chernykh
Copy link

Apache Airflow version

2.2.2

What happened

I have a DAG which start_date was 09/01/2021 initially but then it was changed to 11/01/2021. This DAG has some runs prior to 11/01/2021 that did not get a chance to finish. I can now see that scheduler is still trying to schedule the runs prior to 11/01/2021. But none of the tasks in these runs are starting because of the start_date check I presume. This is maxing out the active runs thus blocking any other days within the DAG range to be scheduled.

Screen Shot 2021-12-22 at 7 56 22 AM

Screen Shot 2021-12-22 at 7 56 29 AM

What you expected to happen

Scheduler should not be trying to schedule runs that are prior to DAG's start_date.

How to reproduce

No response

Operating System

Airflow Docker

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@a-chernykh a-chernykh added area:core kind:bug This is a clearly a bug labels Dec 22, 2021
@raphaelauv
Copy link
Contributor

The scheduler is not going to delete dag runs already existing , so you have to manually delete them , I think it's safe this way

@a-chernykh
Copy link
Author

a-chernykh commented Dec 27, 2021

This is fine and it actually makes sense that the scheduler won't delete the old runs, however, I don't think it is the right behavior for the scheduler to continue scheduling runs that are earlier than the actual start date of the DAG.

@potiuk potiuk added this to the Airflow 2.2.4 milestone Dec 28, 2021
@eladkal eladkal added priority:medium Bug that should be fixed before next release but would not block a release affected_version:2.2 Issues Reported for 2.2 area:Scheduler Scheduler or dag parsing Issues labels Jan 14, 2022
@avkirilishin
Copy link
Contributor

It's very strange because in 2.2.2 we have strong checks on max_active_runs and don't create more DagRuns than this value. And I cannot reproduce it in the current main.

When were these runs created? Did you upgrade Airflow after it?

@potiuk
Copy link
Member

potiuk commented Feb 6, 2022

Is this the same as #21011 @uranusjr ? WDYT?

@a-chernykh
Copy link
Author

@avkirilishin runs were created in 2.2.0 or 2.2.1 likely then Airflow was upgraded to 2.2.2.

@avkirilishin
Copy link
Contributor

@avkirilishin runs were created in 2.2.0 or 2.2.1 likely then Airflow was upgraded to 2.2.2.

@andreychernih I think there are two different problems:

  1. The problem is related to the different logic of the scheduler before and after the update. Maybe there are no tasks in the running dags or something else. Can you show the rows for this dag run in dag, dag_run and task_instance?

  2. I agree with you that it is not the right behavior for the scheduler to continue scheduling runs that are earlier than the actual start date of the DAG or Task. It can happen, for example, after turning the dag off and back on. So I made a PR to fix it: Add dependency to the running_deps #21684

@uranusjr
Copy link
Member

(Sorry I missed this) I think #21011 is different. That one schedules the task (incorrectly) at start_date even if that time does not lie on the schedule, and the fix is to delay the first run to a time after start_date that matches the schedule. The problem description here says however the tun is scheduled before start_date.

@ephraimbuddy
Copy link
Contributor

@avkirilishin runs were created in 2.2.0 or 2.2.1 likely then Airflow was upgraded to 2.2.2.

@andreychernih I think there are two different problems:

  1. The problem is related to the different logic of the scheduler before and after the update. Maybe there are no tasks in the running dags or something else. Can you show the rows for this dag run in dag, dag_run and task_instance?
  2. I agree with you that it is not the right behavior for the scheduler to continue scheduling runs that are earlier than the actual start date of the DAG or Task. It can happen, for example, after turning the dag off and back on. So I made a PR to fix it: Add dependency to the running_deps #21684

Since tasks go through queued state before moving to running, I think that the scheduler has the check here:

ExecDateAfterStartDateDep(),
?

@avkirilishin
Copy link
Contributor

Since tasks go through queued state before moving to running, I think that the scheduler has the check here:

ExecDateAfterStartDateDep(),

?

I think SCHEDULER_QUEUED_DEPS is actually not equivalent to the logic in the scheduler:

# TODO(aoen): SCHEDULER_QUEUED_DEPS is not coupled to actual scheduling/execution
# in any way and could easily be modified or removed from the scheduler causing
# this dependency to become outdated and incorrect. This coupling should be created
# (e.g. via a dag_deps analog of ti_deps that will be used in the scheduler code,
# or allow batch deps checks) to ensure that the logic here is equivalent to the logic
# in the scheduler.
# Right now there's one discrepancy between this context and how scheduler schedule tasks:
# Scheduler will check if the executor has the task instance--it is not possible
# to check the executor outside scheduler main process.

@ephraimbuddy
Copy link
Contributor

What I think we should do but I have not tried any:

  1. Make sure that we don't create dagruns prior to a DAG's start date
    or
  2. Exclude the task instances here:
    query = (
    session.query(TI)
    .join(TI.dag_run)
    .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
    .join(TI.dag_model)
    .filter(not_(DM.is_paused))
    .filter(TI.state == TaskInstanceState.SCHEDULED)
    .options(selectinload('dag_model'))
    .order_by(-TI.priority_weight, DR.execution_date)
    )

I think the solution you have right now, will have the scheduler put those task instances in queued state and never move them to running state which might not be a very good experience for users. WDYT @avkirilishin

@ashb ashb removed this from the Airflow 2.3.0 milestone Apr 22, 2022
@ashb
Copy link
Member

ashb commented Apr 22, 2022

Can someone please give me exact reproduction steps for this including a dag I can run?

@github-actions
Copy link

This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label May 23, 2022
@github-actions
Copy link

This issue has been closed because it has not received response from the issue author.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.2 Issues Reported for 2.2 area:core area:Scheduler Scheduler or dag parsing Issues kind:bug This is a clearly a bug pending-response priority:medium Bug that should be fixed before next release but would not block a release stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants