-
Notifications
You must be signed in to change notification settings - Fork 786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Separate ArrayReader::next_batch with read_records and consume_batch #2237
Conversation
…rrayReader::consume_batch.
@@ -80,8 +80,15 @@ where | |||
|
|||
/// Reads at most `batch_size` records into array. | |||
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> { | |||
read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; | |||
let size = self.read_records(batch_size)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, i think we can keep next_batch
, it just use read_records
and consume_batch
in order.
@@ -349,30 +398,32 @@ mod tests { | |||
|
|||
let mut accu_len: usize = 0; | |||
|
|||
let array = array_reader.next_batch(values_per_page / 2).unwrap(); | |||
assert_eq!(array.len(), values_per_page / 2); | |||
let len = array_reader.read_records(values_per_page / 2).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, after consume_batch
we will clean up the buffer which has been used.
So now we should get get_def_levels
after read_records
before consume_batch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again I think removing batch_size from consume_batch allows preserving the existing behaviour
Codecov Report
@@ Coverage Diff @@
## master #2237 +/- ##
==========================================
- Coverage 82.53% 82.31% -0.22%
==========================================
Files 239 241 +2
Lines 62304 62500 +196
==========================================
+ Hits 51422 51447 +25
- Misses 10882 11053 +171
Help us with your feedback. Take ten seconds to tell us how you rate us. |
@tustvold PTAL😊 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, mostly minor nits, but I would remove batch_size
from consume_batch
.
This is for two major reasons:
- It adds a fair amount of complexity
- We want to avoid split_off as much as possible, as it is slow and has non-trivial implications for dictionary preservation - https://github.com/apache/arrow-rs/blob/master/parquet/src/arrow/record_reader/mod.rs#L151
@@ -349,30 +398,32 @@ mod tests { | |||
|
|||
let mut accu_len: usize = 0; | |||
|
|||
let array = array_reader.next_batch(values_per_page / 2).unwrap(); | |||
assert_eq!(array.len(), values_per_page / 2); | |||
let len = array_reader.read_records(values_per_page / 2).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again I think removing batch_size from consume_batch allows preserving the existing behaviour
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a still an issue with ComplexObjectArrayReader. I think it needs to keep the levels data around for longer to avoid breaking parent ArrayReader. Otherwise looking very nice 😃
Edit: I think ComplexObjectArrayReader might just be broken, I'll experiment when I get to a computer, and potentially accelerate plans to just remove it
@@ -160,6 +181,10 @@ where | |||
array = arrow::compute::cast(&array, &self.data_type)?; | |||
} | |||
|
|||
self.data_buffer = vec![]; | |||
self.def_levels_buffer = None; | |||
self.rep_levels_buffer = None; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will break, as parent ArrayReader assume the definition levels live until the next call to consume_batch?
I'm not sure we actually have test coverage of say a nullable StructArray containing a DecimalArray 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean structArray which contains one child use complex_object_array
reader? 🤔 why need the child re?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If StructArray reader is itself nullable it needs to read the definition levels read by its child in order to work out where it's NULLs are located. It's a similar story for ListArrayReader.
I'll try to rustle up some tests this evening, so that we can be confident this PR won't break anything. Longer term I want to remove ComplexObjectArrayReader as it is slow, complicated and largely replaced by the newer generics.
Edit: ran out of time today, will look into first thing tomorrow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I suspected the test added in #2254 is now failing, I think it should be simple enough to fix, but let me know if you get stuck 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tustvold Sorry for the late reply, fix at fix new ut
I took the liberty of merging in master to get #2254 |
@@ -1161,7 +1161,7 @@ mod tests { | |||
Some(props), | |||
) | |||
.expect("Unable to write file"); | |||
writer.write(&expected_batch).unwrap(); | |||
writer.write(expected_batch).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related, but got clippy error at local.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor nits, looking good
@@ -206,11 +213,19 @@ where | |||
} | |||
|
|||
fn get_def_levels(&self) -> Option<&[i16]> { | |||
self.def_levels_buffer.as_deref() | |||
if self.before_consume { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can just return self.def_levels_buffer.as_deref()
, if you look at PrimitiveArrayReader it will only make the data available after the call to consume_batch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without this, will fail in
failures:
arrow::array_reader::complex_object_array::tests::test_complex_array_reader_def_and_rep_levels
arrow::array_reader::complex_object_array::tests::test_complex_array_reader_dict_enc_string
this cause by get_level
before consume complex_object_array
(this is the common situation like other readers), but complex_object_array
sometimes(self is nullable) need get_level
after consume, so i think we should keep this check
Benchmark runs are scheduled for baseline = 6b2c757 and contender = 1f9973c. 1f9973c is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #2236.
Related #2197
Rationale for this change
Separate add read_records and consume_batch in
ArrayReader
, so we can read_records multi times in buffer and consume once (avoid small batch in skipping read).What changes are included in this PR?
Are there any user-facing changes?