Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

27010 : Add new files to parsing queue on every loop of dag processsing #27060

Merged
merged 15 commits into from
Nov 14, 2022

Conversation

pavansharma36
Copy link
Contributor

@pavansharma36 pavansharma36 commented Oct 14, 2022

closes: #27010

Once file paths are refreshed on dag_dir_list interval. If there are any new files added, add them to parsing queue at start so those can be parsed immediately instead of waiting of existing parsing queue to finish


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Oct 14, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Oct 14, 2022

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

Copy link
Contributor

@ephraimbuddy ephraimbuddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need tests to prove that this works. Also, benchmarking may be required.

cc: @ashb

airflow/dag_processing/manager.py Outdated Show resolved Hide resolved
@potiuk
Copy link
Member

potiuk commented Oct 23, 2022

I saw quite a few users complaining recently about similar behaviour, so it would be great to review and merge it before 2.4.3.

Deque should be fine when it comes to performance (at least all the comparision and micro-benchmarking I saw confirm that). But I think we should review this one carefully also taking into account the processing order the user can choose.

@potiuk potiuk added this to the Airflow 2.4.3 milestone Oct 23, 2022
@potiuk
Copy link
Member

potiuk commented Oct 31, 2022

conflicts need resolving and some benchmarks would be coo. Also cc: @ashb for comments.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but it needs another committer as this is very core change.

@ashb
Copy link
Member

ashb commented Nov 9, 2022

I'll look at the code in a moment, but how is this different from setting the existing dag dir refresh interval config to 0?

@pavansharma36
Copy link
Contributor Author

@ashb

Existing implementation adds new file to parsing queue only if parsing queue is empty.
Consider we set refresh interval to 5 seconds and have 10k existing dag files which takes around 5 minutes to parse, Even though file paths are refreshed every 5 seconds new files are not added to parsing queue until all 10k files are parsed and queue becomes empty

This change to make sure if file_path is refreshed on list_interval config, new files are added to parsing queue without waiting for existing file parsing.

@ashb
Copy link
Member

ashb commented Nov 9, 2022

Got it thanks! (I was on my phone and it was difficult to read the code fully there)

Feature LGTM!

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too. Nice and simple solution.

@potiuk potiuk merged commit 65b78b7 into apache:main Nov 14, 2022
potiuk added a commit to potiuk/airflow that referenced this pull request Nov 14, 2022
The apache#27060 was built before normalisation was applied and merging
it caused static-check failure
@ephraimbuddy ephraimbuddy added the type:improvement Changelog: Improvements label Nov 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler type:improvement Changelog: Improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

DagProcessor doesnt pick new files until queued file parsing completes
5 participants