-
Notifications
You must be signed in to change notification settings - Fork 618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable python operators in async pipelines #4965
Enable python operators in async pipelines #4965
Conversation
Signed-off-by: Rafal Banas <rbanas@nvidia.com>
Signed-off-by: Rafal Banas <rbanas@nvidia.com>
Signed-off-by: Rafal Banas <rbanas@nvidia.com>
!build |
CI MESSAGE: [9205091]: BUILD STARTED |
dali/python/backend_impl.cc
Outdated
}) | ||
// On the pipeline destruction we need to release the GIL to shutdown the executor | ||
// This way, Python operators that might be still running do not deadlock | ||
.def("Shutdown", [](Pipeline *p) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if Shutdown
shouldn't be an internal method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do it a bit more automatically using this approach.
The init
of the Pipeline
Python class returns a smart pointer. We can add a custom deleter to it that would release the lock for the time of p->Shutdown();
and then delete the pipeline. I think it is way less error-prone than exposing part of the teardown sequence to Python.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be, although i want to call it from the __del__
method of Python pipeline object. I could rename it to _Shutdown, though.
And it is not exposed through the Pyton pipeline wrapper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, that's sounds like a good approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great change. I'm very happy to see it!
CI MESSAGE: [9205091]: BUILD FAILED |
Signed-off-by: Rafal Banas <rbanas@nvidia.com>
@@ -315,13 +305,6 @@ def test_python_operator_invalid_function(): | |||
invalid_pipe.run() | |||
|
|||
|
|||
@raises(TypeError, "do not support multiple input sets") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it is supported now, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, just removed too much tests. Reverted it
dali/python/backend_impl.cc
Outdated
PyPipeline(int batch_size, int num_threads, int device_id, int64_t seed, | ||
bool pipelined_execution, int prefetch_queue_depth, | ||
bool async_execution, size_t bytes_per_sample_hint, | ||
bool set_affinity, int max_num_stream, | ||
int default_cuda_stream_priority): | ||
Pipeline(batch_size, num_threads, device_id, seed, pipelined_execution, | ||
prefetch_queue_depth, async_execution, bytes_per_sample_hint, set_affinity, | ||
max_num_stream, default_cuda_stream_priority) {} | ||
|
||
PyPipeline(string serialized_pipe, int batch_size, int num_threads, int device_id, | ||
bool pipelined_execution, int prefetch_queue_depth, bool async_execution, | ||
size_t bytes_per_sample_hint, bool set_affinity, int max_num_stream, | ||
int default_cuda_stream_priority): | ||
Pipeline(serialized_pipe, batch_size, num_threads, device_id, pipelined_execution, | ||
prefetch_queue_depth, async_execution, bytes_per_sample_hint, set_affinity, | ||
max_num_stream, default_cuda_stream_priority) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PyPipeline(int batch_size, int num_threads, int device_id, int64_t seed, | |
bool pipelined_execution, int prefetch_queue_depth, | |
bool async_execution, size_t bytes_per_sample_hint, | |
bool set_affinity, int max_num_stream, | |
int default_cuda_stream_priority): | |
Pipeline(batch_size, num_threads, device_id, seed, pipelined_execution, | |
prefetch_queue_depth, async_execution, bytes_per_sample_hint, set_affinity, | |
max_num_stream, default_cuda_stream_priority) {} | |
PyPipeline(string serialized_pipe, int batch_size, int num_threads, int device_id, | |
bool pipelined_execution, int prefetch_queue_depth, bool async_execution, | |
size_t bytes_per_sample_hint, bool set_affinity, int max_num_stream, | |
int default_cuda_stream_priority): | |
Pipeline(serialized_pipe, batch_size, num_threads, device_id, pipelined_execution, | |
prefetch_queue_depth, async_execution, bytes_per_sample_hint, set_affinity, | |
max_num_stream, default_cuda_stream_priority) {} | |
using Pipeline::Pipeline; |
@@ -1588,6 +1588,30 @@ void FeedPipeline(Pipeline *p, const string &name, py::list list, AccessOrder or | |||
p->SetExternalInput(name, tv, order, sync, use_copy_kernel); | |||
} | |||
|
|||
struct PyPipeline: public Pipeline { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happend to @JanuszL 's idea of using a custom deleter for a smart pointer? Is there really no other way of doing it than inheritance? It seems rather intrusive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PyBind doesn't accept it ;)
when we define init method for py:class_ it accepted std::unique_ptr but didn't accpept unique_ptr with a deleter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could define py::class_<Pipeline, std::unique_ptr<Pipeline, Deleter>>. It would accept such a ptr in init but, I would have to change the way all the methods are exposed
from
.def("device_id", &Pipeline::device_id)
to
.def("device_id", [](Pipeline *p) { return p->device_id(); })
And that would be a lot of work
Signed-off-by: Rafal Banas <rbanas@nvidia.com>
Signed-off-by: Rafal Banas <rbanas@nvidia.com>
!build |
CI MESSAGE: [9267140]: BUILD STARTED |
CI MESSAGE: [9267140]: BUILD FAILED |
Signed-off-by: Rafal Banas <rbanas@nvidia.com>
!build |
CI MESSAGE: [9316674]: BUILD STARTED |
CI MESSAGE: [9316674]: BUILD PASSED |
Signed-off-by: Rafal Banas <rbanas@nvidia.com>
Category:
New feature
Description:
I enables support for Python operators in pipelines with
exec_async=True
andexec_pipelined=True
.Additional information:
Affected modules and functionalities:
The most important fix that prevent deadlock that was happening there, is releasing the GIL when waiting for the pipeline run results:
{ py::gil_scoped_release interpreter_unlock{}; p->Outputs(&ws); }
The eliminated the main problem: when a thread was blocked on .Outputs() call, the python interpreter had no way of making it release the GIL. At the same time the executor threads tried to acquire the GIL to run the Python op and that caused the deadlock.
There was also a bunch of other fixes required:
Key points relevant for the review:
Tests:
test_dltensor_function, test_python_function_operator, test_gpu_python_function_operator
I modified them to run with async pipelines.
Checklist
Documentation
I leave updating the notebooks for another PR
DALI team only
Requirements
REQ IDs: N/A
JIRA TASK: N/A