-
Notifications
You must be signed in to change notification settings - Fork 3k
Python: Parallelize IO #6645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Python: Parallelize IO #6645
Conversation
Alternative for apache#6590 This uses the ThreadPool approach instead of ThreadPoolExecutor. The ThreadPoolExecutor is more flexible and works well with heterogeneous tasks. This allows the user to handle exceptions per task and able to cancel individual tasks. But the ThreadPoolExecutor also has some limitation such as not able to forcefully terminate all the tasks. For reading tasks I think the ThreadPool might be more appriopriate, but for writing the ThreadPoolExecutor might be more applicable. A very nice writeup of the differences is available in this blog: https://superfastpython.com/threadpool-vs-threadpoolexecutor/ Before: ``` ➜ python git:(fd-threadpool) time python3 /tmp/test.py python3 /tmp/test.py 3.45s user 2.84s system 2% cpu 3:34.19 total ``` After: ``` ➜ python git:(fd-threadpool) ✗ time python3 /tmp/test.py python3 /tmp/test.py 3.13s user 2.83s system 19% cpu 31.369 total ➜ python git:(fd-threadpool) ✗ time python3 /tmp/test.py python3 /tmp/test.py 2.94s user 3.08s system 18% cpu 32.538 total ➜ python git:(fd-threadpool) ✗ time python3 /tmp/test.py python3 /tmp/test.py 2.84s user 3.14s system 20% cpu 29.033 total ``` Longlining the requests from EU to the USA, so this might impact the results a bit, but makes IO more dominant.
a264281 to
701e935
Compare
| # Prune the stuff that we don't need anyway | ||
| file_project_schema_arrow = schema_to_pyarrow(file_project_schema) | ||
|
|
||
| arrow_table = ds.dataset( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I was running tests, I noticed that reading into PyArrow became the bottleneck. I think we will probably want to configure the format at least with pre_buffer. That doesn't need to be done here though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, let me add that! Looking at the docs that makes a lot of sense.
What's also on my list is to test reading tables instead of datasets: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html#pyarrow-parquet-read-table
A dataset is a high-level construct to read data in a lazy fashion. Since we create a dataset per file, this might not be optimal. Maybe a table might be more efficient. I haven't done this because the dataset allows you to pass in an expression, and for the table, you need to pass in the filter in a DNF format, but we have the conversion available now.
| return boolean_expression_visit(expr, _ConvertToArrowExpression()) | ||
|
|
||
|
|
||
| def _file_to_table( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why I passed row_filter and table to this method in my branch was so that we could make the method public. I think we are going to need that so that tasks can be distributed to other nodes or processes and read independently. Having a method that reads a task and another that reads an iterable of tasks in a threadpool seems like a good API to provide.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we do that in a separate PR? I also gave multi-processing a shot, but I ran into a lot of serialization issues. Having this private until we fix all of that prevents us from changing public APIs later on.
|
Thanks for the review @rdblue 🙌🏻 |
* Python: Parallelize IO Alternative for apache#6590 This uses the ThreadPool approach instead of ThreadPoolExecutor. The ThreadPoolExecutor is more flexible and works well with heterogeneous tasks. This allows the user to handle exceptions per task and able to cancel individual tasks. But the ThreadPoolExecutor also has some limitation such as not able to forcefully terminate all the tasks. For reading tasks I think the ThreadPool might be more appriopriate, but for writing the ThreadPoolExecutor might be more applicable. A very nice writeup of the differences is available in this blog: https://superfastpython.com/threadpool-vs-threadpoolexecutor/ Before: ``` ➜ python git:(fd-threadpool) time python3 /tmp/test.py python3 /tmp/test.py 3.45s user 2.84s system 2% cpu 3:34.19 total ``` After: ``` ➜ python git:(fd-threadpool) ✗ time python3 /tmp/test.py python3 /tmp/test.py 3.13s user 2.83s system 19% cpu 31.369 total ➜ python git:(fd-threadpool) ✗ time python3 /tmp/test.py python3 /tmp/test.py 2.94s user 3.08s system 18% cpu 32.538 total ➜ python git:(fd-threadpool) ✗ time python3 /tmp/test.py python3 /tmp/test.py 2.84s user 3.14s system 20% cpu 29.033 total ``` Longlining the requests from EU to the USA, so this might impact the results a bit, but makes IO more dominant. * Set read options
Alternative for the multithreading part of: #6590
This uses the ThreadPool approach instead of ThreadPoolExecutor.
The ThreadPoolExecutor is more flexible and works well with heterogeneous tasks. This allows the user to handle exceptions per task and able to cancel individual tasks. But the ThreadPoolExecutor also has some limitations such as not being able to forcefully terminate all the tasks.
For reading tasks I think the ThreadPool might be more appropriate, but for writing the ThreadPoolExecutor might be more applicable.
A very nice writeup of the differences is available in this blog: https://superfastpython.com/threadpool-vs-threadpoolexecutor/ (scroll to Summary of Differences)
Before:
After:
Longlining the requests from EU to the USA might impact the results a bit but makes IO more dominant.