-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-4401] Use managers for Queue synchronization #5200
[AIRFLOW-4401] Use managers for Queue synchronization #5200
Conversation
c293256
to
6b09440
Compare
6b09440
to
a53341d
Compare
Codecov Report
@@ Coverage Diff @@
## master #5200 +/- ##
==========================================
+ Coverage 78.28% 78.54% +0.25%
==========================================
Files 469 469
Lines 29912 29952 +40
==========================================
+ Hits 23418 23527 +109
+ Misses 6494 6425 -69
Continue to review full report at Codecov.
|
tests/test_jobs.py
Outdated
@@ -206,7 +206,7 @@ def test_trigger_controller_dag(self): | |||
|
|||
scheduler = SchedulerJob() | |||
queue = Mock() | |||
scheduler._process_task_instances(target_dag, queue=queue) | |||
scheduler._process_task_instances(target_dag, task_instances_list=queue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will rename all the queues later when we discuss which solution to choose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good though
a53341d
to
ae5ad4d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented on 2 tiny nits
From a logic perspective, it appears okay to me although I'm not very experienced with the multiprocessing library. If the tests succeed and don't hang I'm okay with merging, to fix the current issues.
b1d8793
to
6ae1ac6
Compare
@@ -769,3 +793,7 @@ def _change_state(self, key, state, pod_id): | |||
def end(self): | |||
self.log.info('Shutting down Kubernetes executor') | |||
self.task_queue.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we had problem here before. task_queue.join() was used but task_done() never called., so I think we could have some hangs when trying to stop gracefully KubernetesScheduler.
@BasPH @Fokko @ashb -> I made some amendments now as I realised that Manager().Queue() is always a JoinableQueue (unlike multiprocessing.Queue). I updated the pattern everywhere to follow this general pattern below (also described it in the commit message). Since the managed queue is anyhow proxied to the shared one, get_nowait() will always return the message if it already has been put to the queue by another process. BTW. the get_nowait() message in case of managed queue is not really "instant" as in case of standard multiprocessing.Queue - it will poll the shared process, but that's exactly why it is good for us :). It also seems that in KubernetesExecutor there was a mistake. See the comment above (#5200 (comment)) - if we've heard about inability to gracefully shutdown the KubernetesExecutor - that could be it.
|
471df47
to
384af0f
Compare
self.process_watcher_task(task) | ||
finally: | ||
self.watcher_queue.task_done() | ||
except queue.Empty: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
except queue.Empty: | |
except Empty: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix. LGTM.
It is a known problem https://bugs.python.org/issue23582 that multiprocessing.Queue empty() method is not reliable - sometimes it might return True even if another process already put something in the queue. This resulted in some of the tasks not picked up when sync() methods were called (in AirflowKubernetesScheduler, LocalExecutor, DagFileProcessor). This was less of a problem if the method was called in sync() - as the remaining jobs/files could be processed in next pass but it was a problem in tests and when graceful shutdown was executed (some tasks could be still unprocessed while the shutdown occured). We switched to Managers() managed queues to handle that - the queue in this case is run in a separate subprocess and each process using it uses a proxy to access this shared queue. Additionally all Queues() returned by managers are Joinable Queues so we should run task_done() after all processing and we can now perform join() in termination/end code to wait until all tasks are actually processed, not only retrieved from the queue. That increases gracefulness of shutdown. All the cases impacted follow the same general pattern now: while True: try: res = queue.get_nowait() try: .... do some processing finally: queue.task_done() except Empty: break In all these cases overhead for inter-processing locking is negligible comparing to the action executed (Parsing DAG, executing job) so it appears it should be safe for concurrency as well.
384af0f
to
aa16653
Compare
One last build and I think we are good to go :) |
It is a known problem https://bugs.python.org/issue23582 that multiprocessing.Queue empty() method is not reliable - sometimes it might return True even if another process already put something in the queue. This resulted in some of the tasks not picked up when sync() methods were called (in AirflowKubernetesScheduler, LocalExecutor, DagFileProcessor). This was less of a problem if the method was called in sync() - as the remaining jobs/files could be processed in next pass but it was a problem in tests and when graceful shutdown was executed (some tasks could be still unprocessed while the shutdown occured). We switched to Managers() managed queues to handle that - the queue in this case is run in a separate subprocess and each process using it uses a proxy to access this shared queue. Additionally all Queues() returned by managers are Joinable Queues so we should run task_done() after all processing and we can now perform join() in termination/end code to wait until all tasks are actually processed, not only retrieved from the queue. That increases gracefulness of shutdown. All the cases impacted follow the same general pattern now: while True: try: res = queue.get_nowait() try: .... do some processing finally: queue.task_done() except Empty: break In all these cases overhead for inter-processing locking is negligible comparing to the action executed (Parsing DAG, executing job) so it appears it should be safe for concurrency as well. (cherry picked from commit 6952b19)
That |
It is a known problem https://bugs.python.org/issue23582 that multiprocessing.Queue empty() method is not reliable - sometimes it might return True even if another process already put something in the queue. This resulted in some of the tasks not picked up when sync() methods were called (in AirflowKubernetesScheduler, LocalExecutor, DagFileProcessor). This was less of a problem if the method was called in sync() - as the remaining jobs/files could be processed in next pass but it was a problem in tests and when graceful shutdown was executed (some tasks could be still unprocessed while the shutdown occured). We switched to Managers() managed queues to handle that - the queue in this case is run in a separate subprocess and each process using it uses a proxy to access this shared queue. Additionally all Queues() returned by managers are Joinable Queues so we should run task_done() after all processing and we can now perform join() in termination/end code to wait until all tasks are actually processed, not only retrieved from the queue. That increases gracefulness of shutdown. All the cases impacted follow the same general pattern now: while True: try: res = queue.get_nowait() try: .... do some processing finally: queue.task_done() except Empty: break In all these cases overhead for inter-processing locking is negligible comparing to the action executed (Parsing DAG, executing job) so it appears it should be safe for concurrency as well. (cherry picked from commit 6952b19)
It is a known problem https://bugs.python.org/issue23582 that multiprocessing.Queue empty() method is not reliable - sometimes it might return True even if another process already put something in the queue. This resulted in some of the tasks not picked up when sync() methods were called (in AirflowKubernetesScheduler, LocalExecutor, DagFileProcessor). This was less of a problem if the method was called in sync() - as the remaining jobs/files could be processed in next pass but it was a problem in tests and when graceful shutdown was executed (some tasks could be still unprocessed while the shutdown occured). We switched to Managers() managed queues to handle that - the queue in this case is run in a separate subprocess and each process using it uses a proxy to access this shared queue. Additionally all Queues() returned by managers are Joinable Queues so we should run task_done() after all processing and we can now perform join() in termination/end code to wait until all tasks are actually processed, not only retrieved from the queue. That increases gracefulness of shutdown. All the cases impacted follow the same general pattern now: while True: try: res = queue.get_nowait() try: .... do some processing finally: queue.task_done() except Empty: break In all these cases overhead for inter-processing locking is negligible comparing to the action executed (Parsing DAG, executing job) so it appears it should be safe for concurrency as well.
It is a known problem https://bugs.python.org/issue23582 that multiprocessing.Queue empty() method is not reliable - sometimes it might return True even if another process already put something in the queue. This resulted in some of the tasks not picked up when sync() methods were called (in AirflowKubernetesScheduler, LocalExecutor, DagFileProcessor). This was less of a problem if the method was called in sync() - as the remaining jobs/files could be processed in next pass but it was a problem in tests and when graceful shutdown was executed (some tasks could be still unprocessed while the shutdown occured). We switched to Managers() managed queues to handle that - the queue in this case is run in a separate subprocess and each process using it uses a proxy to access this shared queue. Additionally all Queues() returned by managers are Joinable Queues so we should run task_done() after all processing and we can now perform join() in termination/end code to wait until all tasks are actually processed, not only retrieved from the queue. That increases gracefulness of shutdown. All the cases impacted follow the same general pattern now: while True: try: res = queue.get_nowait() try: .... do some processing finally: queue.task_done() except Empty: break In all these cases overhead for inter-processing locking is negligible comparing to the action executed (Parsing DAG, executing job) so it appears it should be safe for concurrency as well.
It is a known problem https://bugs.python.org/issue23582 that multiprocessing.Queue empty() method is not reliable - sometimes it might return True even if another process already put something in the queue. This resulted in some of the tasks not picked up when sync() methods were called (in AirflowKubernetesScheduler, LocalExecutor, DagFileProcessor). This was less of a problem if the method was called in sync() - as the remaining jobs/files could be processed in next pass but it was a problem in tests and when graceful shutdown was executed (some tasks could be still unprocessed while the shutdown occured). We switched to Managers() managed queues to handle that - the queue in this case is run in a separate subprocess and each process using it uses a proxy to access this shared queue. Additionally all Queues() returned by managers are Joinable Queues so we should run task_done() after all processing and we can now perform join() in termination/end code to wait until all tasks are actually processed, not only retrieved from the queue. That increases gracefulness of shutdown. All the cases impacted follow the same general pattern now: while True: try: res = queue.get_nowait() try: .... do some processing finally: queue.task_done() except Empty: break In all these cases overhead for inter-processing locking is negligible comparing to the action executed (Parsing DAG, executing job) so it appears it should be safe for concurrency as well. (cherry picked from commit 6952b19)
Make sure you have checked all steps below.
Jira
Description
It is a known problem https://bugs.python.org/issue23582 that
multiprocessing.Queue empty() method is not reliable - sometimes it might
return True even if another process already put something in the queue.
This resulted in some of the tasks not picked up when sync() methods
were called (in AirflowKubernetesScheduler, LocalExecutor,
DagFileProcessor). This was less of a problem if the method was called in sync()
in tests and when graceful shutdown was executed (some tasks could be still
unprocessed while the shutdown occured).
We switched to Managers() managed queues to handle that - the queue in this case
is run in a separate subprocess and each process using it uses a proxy to access
this shared queue.
All the cases impacted follow the same pattern now:
while not queue.empty():
res = queue.get()
....
This loop runs always in single (main) process so it is safe to run it this way -
there is no risk that some other process will retrieve the data from the queue in
between empty() and get().
In all these cases overhead for inter-processing locking is negligible
comparing to the action executed (Parsing DAG, executing job)
so it appears it should be safe to merge the change.
Tests
Tests were there (but flaky)
Commits
Documentation
Code Quality
flake8