Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Add parallelism to streaming CSV reader #27731

Closed
asfimport opened this issue Mar 6, 2021 · 3 comments
Closed

[C++] Add parallelism to streaming CSV reader #27731

asfimport opened this issue Mar 6, 2021 · 3 comments

Comments

@asfimport
Copy link

Currently the streaming CSV reader does not allow for much parallelism.  It doesn't allow for reading more than one segment at once (useful in S3) and it doesn't allow for column fan-out for parsing & converting.

It seems both of these options would speed up CSV reading in some scenarios although it's possible this is mostly mitigated in cases where there are many more files than cores (as per-file parallelism will occupy all the cores anyways).

Reporter: Weston Pace / @westonpace
Assignee: Weston Pace / @westonpace

PRs and other links:

Note: This issue was originally created as ARROW-11889. Please see the migration documentation for further details.

@asfimport
Copy link
Author

Antoine Pitrou / @pitrou:
I'll add that this probably means making ColumnDecoder async (perhaps turning it into a generator).

It would be nice if the solution could also tackle ARROW-11853 at the same time, since both issues will require significant reworking of the ColumnDecoder internals anyway.

@asfimport
Copy link
Author

Weston Pace / @westonpace:
It'll probably be at least another day (probably more, I'll target the end of the week) before this is PR-ready but some notes:

  • The approach I'm taking is to create a functor for parsing (CSVBlock -> ParsedBlock) and another for decoding (ParsedBlock -> Array) and then hook  the whole thing up as an iterator/generator.
  • Since it was already in place I'll be keeping the per-column parallelism but I'm also adding parallel readahead (for conversion/decoding) on the batches themselves so I don't expect per-column parallelism is strictly neccesary for performance.
  • There doesn't seem to be much use case for eagerly blocking (i.e. ThreadedBlockReader).  It seems pretty unlikely we will need a multi-threaded parser.  So for the moment I expect I can reuse SerialBlockReader and just ensure it is not pulled async-reentrantly
  • The column builders & decoders are getting even more similar, I suspect I could probably combine the two into a single set of types with a boolean "try_reconvert" flag or something.  For example, the decoders already had an array of chunks although I can't see any reason they needed more than a single chunk.
  • In order to address this and ARROW-11853 each ParsedBlock will create its own ThreadedTaskGroup.  The future for that parsed block will be completed when all columns have been decoded and any "recode" tasks that were launched by that parsed block have finished.  Finish will be called on each future so a failure (or a cancellation) should get caught pretty quickly.  The stored futures might still hang around for coordination but they won't be waited on so we shouldn't deadlock there.
  • The table reader and the streaming readers are starting to become more and more similar as well.  It may end up that they can be combined as well where the table readers set "try_reconvert" to true and have some kind of emplace_into_table step at the end (although this might be a bit tricky with ordering & reconversion).
    *

@asfimport
Copy link
Author

Weston Pace / @westonpace:
Issue resolved by pull request 10568
#10568

@asfimport asfimport added this to the 5.0.0 milestone Jan 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants