-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-12522: [C++] Add ReadRangeCache::WaitFor #10145
Conversation
CC @westonpace if you'd like to take a look. There's an example of it being used to adapt the IPC reader here: lidavidm@24f3bb1#diff-e992169684aea9845ac776ada4cbb2b5dc711b49e5a3fbc6046c92299e1aefceR1380-R1412 |
ea524e5
to
6f2093e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if, given a bunch of small record batches, we might sometimes want to coalesce across record batches. I think the current design preempts that. Although I think there would be more challenges than just this tool to tackle that problem.
For example, AWS likes 8MB/16MB reads source for S3 and prefers 256KB reads source and Linux doesn't really care (it does its own read coalescing in the kernel, although in theory maybe this would apply to the EBS case as well).
However, if the filesystem was waiting for 8MB to fill up before issuing a read and batch readahead was low we may never fill up enough data to trigger the read. I don't think this is something we should solve right now but maybe something to think about going forwards.
I suppose we could update batch readahead to be bytes instead of # of batches (makes more sense anyways given the unpredictable nature of batch sizes). Then the read cache could be given a max "buffering" size and then it could issue reads whenever the cache gets bigger than either the "buffering limit" or the "ideal read size" of the filesystem.
cpp/src/arrow/io/caching.cc
Outdated
} | ||
|
||
RangeCacheEntry Cache(const ReadRange& range) override { | ||
return {range, Future<std::shared_ptr<Buffer>>()}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What fills this future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little unclear, my bad - what'll happen is the user calls Cache(vector<Range>)
, which coalesces the ranges and calls Cache(Range)
for each coalesced range to make a cache entry. I'll rename the functions and clarify inline.
cpp/src/arrow/io/caching.cc
Outdated
if (next.offset >= entry.range.offset && | ||
next.offset + next.length <= entry.range.offset + entry.range.length) { | ||
include = true; | ||
ranges.pop_back(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there no case where a range will span two entries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are expected to give the ranges up front in the granularity that you expect to read them, so no. In principle it could be supported and in principle if we wanted to split large ranges to take advantage of I/O parallelism we'd have to do that.
|
||
Status Cache(std::vector<ReadRange> ranges) override { | ||
std::unique_lock<std::mutex> guard(entry_mutex); | ||
return ReadRangeCache::Impl::Cache(std::move(ranges)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current file->ReadAsync
has some leeway in it which allows the method to be synchronous if needed. If that is the case this could end up holding onto the lock for a while. Actually, it looks like you have guards on the Wait
/WaitFor
method as well so perhaps this isn't intended to be consumed by multiple threads?
Could you maybe add a short comment explaining how you expect this class to be used (e.g. first a thread does a bunch of cache calls and then a bunch of read calls? Or maybe there are multiple threads calling cache or read?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding to this, could you create a simple test case around whatever type of multithreading you expect to guard against with these mutexes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I feel that if ReadAsync is synchronous, that's because it's also very fast (e.g. in-memory copy), in which case it's not a concern. I'll document the usage pattern and both variants.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or put another way, (when lazy == true
) this 'passes through' the synchronicity of ReadAsync (an oxymoron if there ever was one), which is the intent.
So overall, the use pattern for this class is:
Since all the byte ranges are given up front, you do get coalescing across record batches/column chunks. |
cpp/src/arrow/io/memory_test.cc
Outdated
TEST(RangeReadCache, Basics) { | ||
std::string data = "abcdefghijklmnopqrstuvwxyz"; | ||
|
||
auto file = std::make_shared<BufferReader>(Buffer(data)); | ||
auto file = std::make_shared<CountingBufferReader>(Buffer(data)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should you test both lazy and non-lazy versions here?
cpp/src/arrow/io/caching.cc
Outdated
[](const ReadRange& a, const ReadRange& b) { return a.offset > b.offset; }); | ||
|
||
std::vector<Future<>> futures; | ||
for (auto& entry : entries) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This algorithm looks a bit unexpected to me. Basically, you're iterating all known entries in the hope that they might match a requested range? It will be a bit costly if the number of entries is much larger than the number of requested ranges, since you may iterate all entries.
Why not do the converse? For each requested range, try to find it in the existing entries. It is doable using bisection (see Read
above), and you shouldn't need to sort the requested ranges.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I've changed the implementation. This is definitely better (avoids a sort and since # ranges is likely << # entries, it's ~O(log(# entries)) instead of ~O(# entries)).
cpp/src/arrow/io/caching.cc
Outdated
} | ||
|
||
// Make a cache entry for a range | ||
virtual RangeCacheEntry MakeCacheEntry(const ReadRange& range) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may make this std::vector<RangeCacheEntry> MakeCacheEntries(const std::vector<ReadRange>&)
instead and you will issue one virtual call instead of N.
Ah, I should backport a fix from ba7ba9e as well. (The coalescer didn't handle completely-overlapping ranges which the Parquet reader can generate when reading a 0-row file.) |
This should be ready again; I've incorporated the feedback + added a fix for ranges that are completely identical (which the Parquet reader can generate if there's 0 rows). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM
This was split out of ARROW-11883 since it may also be useful to test with ARROW-11772. This adds a method to get a Future<> from a ReadRangeCache so it can be easily used in an async context. Also, it adds a config flag to make the cache not perform readahead so that readahead can be handled at a different layer of the stack. Closes apache#10145 from lidavidm/async-cache Authored-by: David Li <li.davidm96@gmail.com> Signed-off-by: Antoine Pitrou <antoine@python.org>
This was split out of ARROW-11883 since it may also be useful to test with ARROW-11772. This adds a method to get a Future<> from a ReadRangeCache so it can be easily used in an async context. Also, it adds a config flag to make the cache not perform readahead so that readahead can be handled at a different layer of the stack. Closes apache#10145 from lidavidm/async-cache Authored-by: David Li <li.davidm96@gmail.com> Signed-off-by: Antoine Pitrou <antoine@python.org>
This was split out of ARROW-11883 since it may also be useful to test with ARROW-11772. This adds a method to get a Future<> from a ReadRangeCache so it can be easily used in an async context. Also, it adds a config flag to make the cache not perform readahead so that readahead can be handled at a different layer of the stack. Closes apache#10145 from lidavidm/async-cache Authored-by: David Li <li.davidm96@gmail.com> Signed-off-by: Antoine Pitrou <antoine@python.org>
This was split out of ARROW-11883 since it may also be useful to test with ARROW-11772.
This adds a method to get a Future<> from a ReadRangeCache so it can be easily used in an async context. Also, it adds a config flag to make the cache not perform readahead so that readahead can be handled at a different layer of the stack.