New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use concurrent.futures to fix multiprocessing issues #53
Conversation
broker/providers/test_provider.py
Outdated
@@ -37,7 +37,10 @@ def _set_attributes(self, host_inst, broker_args=None): | |||
def construct_host(self, provider_params, host_classes, **kwargs): | |||
host_params = provider_params.copy() | |||
host_params.update(kwargs) | |||
host_inst = host_classes[host_params["host_type"]](**host_params) | |||
host_type = host_classes[host_params["host_type"]] | |||
host_type.connect = MagicMock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you considered using the MockStub class from helpers here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is actually not needed to mock something here. The connect got called due to a bug in my PR which I have fixed now.
I like to use MagicMock
though. It is in standard library, quite well documented and thus it is likely the other developers will know how to use it. I did use it to make the code raise an exception in broker._act
in other test. This was done trough the pytest-mock that patches the class only for the time the test needs it.
The CF is easier to use and less error prone than using the multiprocessing directly. It also allows easier switch between processes and threads. Another advantage is error and treacebacks reporting to the parent process is better.
This is to work arround issue with TestProvider class attempts to be collected as test class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some small changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ACK
@mshriver do you want to weigh in as well? |
with self.EXECUTOR(max_workers=max_workers_count) as workers: | ||
completed_futures = as_completed(workers.submit(self.func, instance, *args, **kwargs) | ||
for _ in range(count)) | ||
for f in completed_futures: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be code golfing here, but couldn't you directly return here instead of initializing and extending the list?
return [f.result() for f in completed_futures]
I take that back, can't unpack iterables within comprehension to get the extend
behavior.
There are some problems with the inventory.yaml unlink when doing multiple checkouts. I recommend not merging before it is known why this happens. |
Some of the mp tests were failing like this: tests/test_broker.py::test_mp_checkout FAILED [ 50%] tests/test_broker.py::test_mp_checkout_twice PASSED [ 60%] tests/test_broker.py::test_mp_decorator PASSED [ 70%] tests/providers/test_ansible_tower.py::test_exec_workflow PASSED [ 80%] tests/providers/test_ansible_tower.py::test_host_creation PASSED [ 90%] tests/providers/test_ansible_tower.py::test_workflow_lookup_failure PASSED [100%] ========================================================================================== FAILURES ========================================================================================== ______________________________________________________________________________________ test_mp_checkout ______________________________________________________________________________________ concurrent.futures.process._RemoteTraceback: """ Traceback (most recent call last): File "/usr/lib64/python3.9/concurrent/futures/process.py", line 243, in _process_worker r = call_item.fn(*call_item.args, **call_item.kwargs) File "/home/jhenner/work/sat/broker/broker/broker.py", line 116, in _checkout helpers.update_inventory(add=host.to_dict()) File "/home/jhenner/work/sat/broker/broker/helpers.py", line 154, in update_inventory inventory_file.unlink() # This making mp tests to fail. Perhaps a race condition. File "/usr/lib64/python3.9/pathlib.py", line 1340, in unlink self._accessor.unlink(self) FileNotFoundError: [Errno 2] No such file or directory: 'inventory.yaml' """ The above exception was the direct cause of the following exception: def test_mp_checkout(): """Test that broker can checkout multiple hosts using multiprocessing""" broker_inst = broker.VMBroker(nick="test_nick", _count=2) > broker_inst.checkout() tests/test_broker.py:50: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ broker/broker.py:126: in checkout hosts = self._checkout(connect=connect) broker/broker.py:62: in mp_split results.extend(f.result()) /usr/lib64/python3.9/concurrent/futures/_base.py:433: in result return self.__get_result() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <Future at 0x7f4c662e9790 state=finished raised FileNotFoundError> def __get_result(self): if self._exception: > raise self._exception E FileNotFoundError: [Errno 2] No such file or directory: 'inventory.yaml' It seem sto be a run condition when working with the inventory file. Fixing by moving the work with the inventory out of the concurrently executed code.
This makes the tests/test_broker.py::test_mp_checkout fail more often when there is a run condition disovered in #53
I think the problem with race condition between the children of single process is fixed by this PR. We are ready to merge @mshriver @JacobCallahan |
This makes the behavior of broker in case of exception rised in the child much better (without this PR, there is a deadlock and possible no output on the terminal about the exception. With this PR the exception is propagated to the parent process.
There were issues that I think were arising from using a decorator that replaced the decorated method with some nested method (the
wraps
function is commonly used in that case). It seems to me such method cannot be pickled but that seemed necessary for sending it to theconcurrent.futures