Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-4401] SynchronizedQueue used where empty() is used. #5199

Closed
wants to merge 1 commit into from

Conversation

potiuk
Copy link
Member

@potiuk potiuk commented Apr 28, 2019

Make sure you have checked all steps below.

Jira

  • My PR addresses the following Airflow Jira issues and references them in the PR title. For example, "[AIRFLOW-XXX] My Airflow PR"
    • https://issues.apache.org/jira/browse/AIRFLOW-4401
    • In case you are fixing a typo in the documentation you can prepend your commit with [AIRFLOW-XXX], code changes always need a Jira issue.
    • In case you are proposing a fundamental code change, you need to create an Airflow Improvement Proposal (AIP).
    • In case you are adding a dependency, check if the license complies with the ASF 3rd Party License Policy.

Description

  • Here are some details about my PR, including screenshots of any UI changes:

It is a known problem https://bugs.python.org/issue23582 that
multiprocessing.Queue empty() method is not reliable - sometimes it might
return True even if another process already put something in the queue.

This resulted in some of the tasks not picked up when sync() methods
were called (in AirflowKubernetesScheduler, LocalExecutor,
DagFileProcessor). This was less of a problem if the method was called in sync()

  • as the remaining jobs/files could be processed in next pass but it was a problem
    in tests and when graceful shutdown was executed (some tasks could be still
    unprocessed while the shutdown occured).

All the cases impacted follow the same pattern now:

while not queue.empty():
res = queue.get()
....

This loop runs always in single (main) process so it is safe to run it this way -
there is no risk that some other process will retrieve the data from the queue in
between empty() and get().

Note that unlike in the standard multiprocessing.Queue, you cannot rely
on data being immediately available after empty() is False. You should be
prepared that subsequent get_nowait() raises Empty, or (better) use get()
to retrieve the data.

In all these cases overhead for inter-processing locking is negligible
comparing to the action executed (Parsing DAG, executing job)
so it appears it should be safe to merge the change.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

No need. Lots of tests for that already (flaky ones).

Commits

  • My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain docstrings that explain what it does
    • If you implement backwards incompatible changes, please leave a note in the Updating.md so we can assign it to a appropriate release

Code Quality

  • Passes flake8

It is a known problem https://bugs.python.org/issue23582 that
multiprocessing.Queue empty() method is not reliable - sometimes it might
return True even if another process already put something in the queue.

This resulted in some of the tasks not picked up when sync() methods
were called (in AirflowKubernetesScheduler, LocalExecutor,
DagFileProcessor). This was less of a problem if the method was called in sync()
- as the remaining jobs/files could be processed in next pass but it was a problem
in tests and when graceful shutdown was executed (some tasks could be still
unprocessed while the shutdown occured).

All the cases impacted follow the same pattern now:

while not queue.empty():
   res = queue.get()
   ....

This loop runs always in single (main) process so it is safe to run it this way -
there is no risk that some other process will retrieve the data from the queue in
between empty() and get().

Note that unlike in the standard multiprocessing.Queue, you cannot rely
on data being immediately available after empty() is False. You should be
prepared that subsequent get_nowait() raises Empty, or (better) use get()
to retrieve the data.

In all these cases overhead for inter-processing locking is negligible
comparing to the action executed (Parsing DAG, executing job)
so it appears it should be safe to merge the change.
@codecov-io
Copy link

codecov-io commented Apr 28, 2019

Codecov Report

Merging #5199 into master will increase coverage by <.01%.
The diff coverage is 81.48%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #5199      +/-   ##
==========================================
+ Coverage   78.54%   78.55%   +<.01%     
==========================================
  Files         469      470       +1     
  Lines       29896    29932      +36     
==========================================
+ Hits        23483    23514      +31     
- Misses       6413     6418       +5
Impacted Files Coverage Δ
airflow/executors/base_executor.py 95.58% <ø> (-0.07%) ⬇️
airflow/utils/dag_processing.py 59.22% <100%> (+0.1%) ⬆️
airflow/contrib/executors/kubernetes_executor.py 63.27% <100%> (ø) ⬆️
airflow/executors/local_executor.py 80.41% <40%> (-0.65%) ⬇️
airflow/jobs.py 78.53% <50%> (+0.01%) ⬆️
airflow/utils/synchronized_queue.py 88.57% <88.57%> (ø)
airflow/models/taskinstance.py 92.42% <0%> (-0.18%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 9daad7e...462d7cd. Read the comment docs.

@potiuk
Copy link
Member Author

potiuk commented Apr 28, 2019

Seems that #5200 solves it in a simpler way. Let's wait for the build to finish, but it looks good from local testing.

@potiuk
Copy link
Member Author

potiuk commented Apr 30, 2019

Closing in favour of #5200 . Managed Queue uses more standard approach with less synchronisation code to maintain.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants