Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue with stale dags not being marked inactive by Scheduler #11462

Closed
wants to merge 1 commit into from

Conversation

msumit
Copy link
Contributor

@msumit msumit commented Oct 12, 2020

As of now, the logic to mark DAGs inactive is out of the _run_scheduler_loop which never runs unless num_runs is configured, so have to move it inside the _run_scheduler_loop method.

Also currently, it relies on Scheduler's loop start time to figure out which all DAGs needed to be marked inactive, something which won't work in a Scheduler HA setup, as each Scheduler would be processing a small number of DAGs only. To solve that, I've introduced a timeout config with a default value of 10mins, so a deleted DAG would be visible up to 10mins on UI, but that's better to wrongly mark some active DAGs as inactive or not marking them at all.

PS: Will write unit tests if the approach looks good.

@boring-cyborg boring-cyborg bot added the area:Scheduler Scheduler or dag parsing Issues label Oct 12, 2020
@msumit msumit requested review from ashb and kaxil October 12, 2020 09:37
@@ -1409,6 +1399,15 @@ def _run_scheduler_loop(self) -> None:
# usage when "idle"
time.sleep(self._processor_poll_interval)

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been deleted.
if self.processor_agent.all_files_processed:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb do we still need to keep this check? I feel that we don't need to keep it anymore.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, not needed anymore, correct.

@@ -1374,6 +1363,7 @@ def _run_scheduler_loop(self) -> None:
if not self.processor_agent:
raise ValueError("Processor agent is not started.")
is_unit_test: bool = conf.getboolean('core', 'unit_test_mode')
stale_dag_cleanup_timeout: int = conf.getint('scheduler', 'stale_dag_cleanup_timeout', 600)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
stale_dag_cleanup_timeout: int = conf.getint('scheduler', 'stale_dag_cleanup_timeout', 600)
stale_dag_cleanup_timeout = timedelta(seconds=conf.getint('scheduler', 'stale_dag_cleanup_timeout', 600))

Lets not re-create the timedelta object more often than we need to.

for dag in session.query(
DagModel).filter(DagModel.last_scheduler_run < expiration_date,
DagModel.is_active).all():
query = session.query(DagModel).filter(DagModel.last_scheduler_run < expiration_date,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooh crap, I don't update this column anymore on master, so I don't think this is right. But there also isn't any global setting I do update anymore.

Even SerialzedDag.updated_at might not be right, as if the dag hasn't changed, that value won't be updated anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh is it? Then should it be taken care of while doing serialization itself? Like, mark the dag inactive if it doesn't find it anymore?

msumit added a commit to twitter-forks/airflow that referenced this pull request Oct 15, 2020
@kaxil
Copy link
Member

kaxil commented Nov 4, 2020

Can you please rebase your PR on latest Master since we have applied Black and PyUpgrade on Master.

It will help if your squash your commits into single commit first so that there are less conflicts.

ayushSethi22 pushed a commit to ayushSethi22/airflow that referenced this pull request Nov 23, 2020
Cherry-Pick contains [CP][TWTR][EWT-377] Fix DagBag bug when a Dag has invalid schedule_interval (twitter-forks#61)

CP of 5605d10
& apache#11462
ayushSethi22 pushed a commit to ayushSethi22/airflow that referenced this pull request Dec 21, 2020
[CP][TWTR][EWT-377] Fix DagBag bug when a Dag has invalid schedule_interval (twitter-forks#61)

CP of 5605d10
& apache#11462
@stale
Copy link

stale bot commented Dec 25, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Dec 25, 2020
@potiuk
Copy link
Member

potiuk commented Dec 25, 2020

Hey @msumit @ashb @kaxil - anything we should merge ?

@stale stale bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Dec 25, 2020
msumit pushed a commit to twitter-forks/airflow that referenced this pull request Jan 8, 2021
* EWT-569 : Initial Commit for migrations

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  76fe7ac from 1.10.4

* CP Contains fb64f2e: [TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup job scheduling without explicit_defaults_for_timestamp

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (#63)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
CP contains [EWT-16]: Airflow fix for manual trigger during version upgrade (#13)

* [EWT-16]: Airflow fix for manual trigger during version upgrade

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (#63)

CP of f757a54

* CP(55bb579) [AIRFLOW-5597] Linkify urls in task instance log (#16)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 94cdcf6
[CP] Contains [AIRFLOW-5597] Linkify urls in task instance log

CP of f757a54

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  4ce8d4c from 1.10.4
CP contains [TWTTR] Fix for rendering code on UI (#34)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  299b4d8 from 1.10.4
CP contains [TWTR] CP from 1.10+twtr (#35)

* 99ee040: CP from 1.10+twtr

* 2e01c24: CP from 1.10.4 ([TWTR][AIRFLOW-4939] Fixup use of fallback kwarg in conf.getint)

* 00cb4ae: [TWTR][AIRFLOW-XXXX] Cherry-pick d4a83bc and bump version (#21)

* CP 51b1aee: Relax version requiremets (#24)

* CP 67a4d1c: [CX-16266] Change with reference to 1a4c164 commit in open source (#25)

* CP 54bd095: [TWTR][CX-17516] Queue tasks already being handled by the executor (#26)

* CP 87fcc1c: [TWTR][CX-17516] Requeue tasks in the queued state (#27)

* CP 98a1ca9: [AIRFLOW-6625] Explicitly log using utf-8 encoding (apache#7247) (#31)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : f7050fb
CP Contains Experiment API path fix (#37)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  8a689af from 1.10.4
CP Contains Export scheduler env variable into worker pods. (#38)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  5875a15 from 1.10.4
Cp Contains [EWT-115][EWT-118] Initialise dag var to None and fix for DagModel.fileloc (missed in EWT-16) (#39)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  a68e2b3 from 1.10.4
[CX-16591] Fix regex to work with impersonated clusters like airflow_scheduler_ddavydov (#42)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : e9642c2
[CP][EWT-128] Fetch task logs from worker pods (19ac45a) (#43)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : d5d0a07
[CP][AIRFLOW-6561][EWT-290]: Adding priority class and default resource for worker pod. (#47)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 9b58c88
[CP][EWT-302]Patch Pool.DEFAULT_POOL_NAME in BaseOperator (apache#8587) (#49)

Open source commit id: b37ce29

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 7b52a71
[CP][AIRFLOW-3121] Define closed property on StreamLogWriter (apache#3955) (#52)

CP of 2d5b8a5

* [EWT-361] Fix broken regex pattern for extracting dataflow job id (#51)

Update the dataflow URL regex as per AIRFLOW-9323

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 4b5b977
EWT-370: Use python3 to launch the dataflow job. (#53)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 596e24f
* [EWT-450] fixing sla miss triggering duplicate alerts every minute (#56)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : b3d7fb4
[CP] Handle IntegrityErrors for trigger dagruns & add Stacktrace when DagFileProcessorManager gets killed (#57)

CP of faaf179 - from master
CP of 2102122 - from 1.10.12

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : bac4acd
[TWTR][EWT-472] Add lifecycle support while launching worker pods (#59)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 6162402
[TWTTR] Don't enqueue tasks again if already queued for K8sExecutor(#60)

Basically reverting commit 87fcc1c  and making changes specifically into the Celery Executor class only.

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 1991419
[CP][TWTR][EWT-377] Fix DagBag bug when a Dag has invalid schedule_interval (#61)

CP of 5605d10 & apache#11462

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 48be0f9
[TWTR][EWT-350] Reverting the last commit partially (#62)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : d8c473e
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (#63)

CP of f757a54
@github-actions
Copy link

github-actions bot commented Mar 2, 2021

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Mar 2, 2021
@ashb ashb removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Mar 5, 2021
@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Apr 20, 2021
@github-actions github-actions bot closed this May 5, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler Scheduler or dag parsing Issues stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants