Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Truncate IPC record batch #2040

Merged
merged 11 commits into from Jul 14, 2022
Merged

Conversation

viirya
Copy link
Member

@viirya viirya commented Jul 11, 2022

Which issue does this PR close?

Closes #1528.
Closes #208

Rationale for this change

When serialize RecordBatch through IPC, we should truncate it to only serialize the necessary part if the batch is sliced.

C++ implementation also truncates it:
https://github.com/apache/arrow/blob/master/cpp/src/arrow/ipc/writer.cc#L351
https://github.com/apache/arrow/blob/master/cpp/src/arrow/ipc/writer.cc#L327

What changes are included in this PR?

Truncate arrays of record batch when serializing it through IPC. Following C++, we truncate arrays of numeric type, temporal types, fixed size binary type and base binary types (binary, string).

Are there any user-facing changes?

@github-actions github-actions bot added the arrow Changes to the arrow crate label Jul 11, 2022
@codecov-commenter
Copy link

codecov-commenter commented Jul 11, 2022

Codecov Report

Merging #2040 (50c4eb7) into master (330505c) will increase coverage by 0.05%.
The diff coverage is 93.50%.

@@            Coverage Diff             @@
##           master    #2040      +/-   ##
==========================================
+ Coverage   83.55%   83.60%   +0.05%     
==========================================
  Files         222      223       +1     
  Lines       58230    58539     +309     
==========================================
+ Hits        48656    48944     +288     
- Misses       9574     9595      +21     
Impacted Files Coverage Δ
arrow/src/array/mod.rs 100.00% <ø> (ø)
arrow/src/datatypes/datatype.rs 64.05% <30.00%> (-1.63%) ⬇️
arrow/src/ipc/writer.rs 85.05% <97.91%> (+3.26%) ⬆️
parquet/src/file/serialized_reader.rs 93.58% <0.00%> (-1.76%) ⬇️
arrow/src/array/builder/generic_list_builder.rs 95.09% <0.00%> (-1.61%) ⬇️
...row/src/array/builder/string_dictionary_builder.rs 90.64% <0.00%> (-0.72%) ⬇️
parquet/src/file/metadata.rs 94.70% <0.00%> (-0.69%) ⬇️
parquet_derive/src/parquet_field.rs 65.75% <0.00%> (-0.46%) ⬇️
parquet/src/encodings/encoding.rs 93.43% <0.00%> (-0.20%) ⬇️
... and 12 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 330505c...50c4eb7. Read the comment docs.

Copy link
Contributor

@HaoYang670 HaoYang670 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not very familiar with IPC, so just give some small nits I find.

arrow/src/ipc/writer.rs Show resolved Hide resolved
arrow/src/ipc/writer.rs Show resolved Hide resolved
arrow/src/ipc/writer.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, I think there is an issue with the way bitmaps are currently handled but I think this should just be a case of using Buffer::bit_slice or similar

arrow/src/ipc/writer.rs Outdated Show resolved Hide resolved
arrow/src/ipc/writer.rs Show resolved Hide resolved
arrow/src/ipc/writer.rs Show resolved Hide resolved
arrow/src/ipc/writer.rs Show resolved Hide resolved
arrow/src/ipc/writer.rs Outdated Show resolved Hide resolved
arrow/src/ipc/writer.rs Outdated Show resolved Hide resolved
arrow/src/ipc/writer.rs Outdated Show resolved Hide resolved
arrow/src/ipc/writer.rs Outdated Show resolved Hide resolved
arrow/src/ipc/writer.rs Outdated Show resolved Hide resolved
) {
// Rebase offsets and truncate values
let new_offsets = get_zero_based_value_offsets(array_data);
offset = write_buffer(new_offsets.as_slice(), buffers, arrow_data, offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely something that could be left to another PR, but I do wonder if there might be some way to write the new offsets directly without buffering them first. I'm not very familiar with the IPC code so not sure how feasible this may be

@alamb
Copy link
Contributor

alamb commented Jul 12, 2022

❤️ -- thank you @viirya

Note we have some code in IOx to workaround the lack of this feature (described in https://github.com/influxdata/influxdb_iox/issues/1133):
https://github.com/influxdata/influxdb_iox/blob/3592aa52d815e861ac14031186ba557c9290dfaf/arrow_util/src/optimize.rs#L97-L141

I wonder if this PR also closes #208?

@viirya
Copy link
Member Author

viirya commented Jul 12, 2022

Yes, flight data also uses IPC to encode record batch. This also closes #208 too.

@alamb
Copy link
Contributor

alamb commented Jul 12, 2022

Yes, flight data also uses IPC to encode record batch. This also closes #208 too.

Updated the PR description to match

arrow/src/ipc/writer.rs Outdated Show resolved Hide resolved
@tustvold
Copy link
Contributor

tustvold commented Jul 14, 2022

Apologies I'm trying to review this, but I'm extremely confused as to how this has been and is working, I can't find where the IPC writer makes use of the ArrayData offset, and am therefore at a loss as to how slicing has historically been being preserved. Could you possibly point me at the relevant code, as far as I can tell it previously wrote the entirety of the buffers, but I can't work out how the reader then knows to ignore most of this?

Edit: It would appear that the previous logic was and still is simply broken...

);

Copy link
Contributor

@tustvold tustvold Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert_eq!(
deserialize(serialize_without_truncate(&record_batch_slice)),
record_batch_slice
);

Or something, basically I think the non-sliced logic was and still is wrong.

arrow/src/ipc/writer.rs Outdated Show resolved Hide resolved
@viirya
Copy link
Member Author

viirya commented Jul 14, 2022

I was looking for how the reader knows how to ignore it. But I didn't find it. Then I realized that previously the writer wrote the entire buffers. For the reader, it sets offsets from zero and reads the buffers from incorrect positions.

So I think it is a bug that sliced record batch cannot be read correctly after IPC. We can reproduce it:

fn create_batch() -> RecordBatch {
  let schema = Schema::new(vec![
    Field::new("a", DataType::Int32, true),
    Field::new("b", DataType::Utf8, true),
  ]);

  let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
  let b = StringArray::from(vec!["a", "b", "c", "d", "e"]);

  RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])
    .unwrap()
}

let big_record_batch = create_batch();

let offset = 2;
let record_batch_slice = big_record_batch.slice(offset, 3);

Original record batches:

RecordBatch { schema: Schema { fields: [Field { name: "a", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: fal
se, metadata: None }], metadata: {} }, columns: [PrimitiveArray<Int32>
[                                                                                                                  
  1,
  2,                                                                                                               
  3,
  4,                                                                                                               
  5,
], StringArray 
[
  "a",                                                                                                             
  "b",
  "c",                                                                                                             
  "d",
  "e",         
]], row_count: 5 }
RecordBatch { schema: Schema { fields: [Field { name: "a", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: fal
se, metadata: None }], metadata: {} }, columns: [PrimitiveArray<Int32>
[                                                                                                                  
  3,
  4,
  5,
], StringArray
[
  "c",
  "d",
  "e",
]], row_count: 3 }

Record batches after IPC serialization/deserialzation:

RecordBatch { schema: Schema { fields: [Field { name: "a", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: fal
se, metadata: None }], metadata: {} }, columns: [PrimitiveArray<Int32>                       
[
  1,                                                                                                               
  2,
  3,           
  4,
  5,                                                                                                               
], StringArray
[                                                                                                                                                                                                                                      
  "a",
  "b",         
  "c",
  "d",                                                                                                             
  "e",
]], row_count: 5 }                                                                                                                                                                                                                     
RecordBatch { schema: Schema { fields: [Field { name: "a", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: fal
se, metadata: None }], metadata: {} }, columns: [PrimitiveArray<Int32>
[
  null,                                                                                                            
  null,
  5,                                                                                                               
], StringArray
[              
  "a",
  "b",                                                                                                             
  "c",
]], row_count: 3 } 

@viirya
Copy link
Member Author

viirya commented Jul 14, 2022

That's said that we need to truncate it without the option. I don't find C++ implementation has an option for that actually.

@tustvold
Copy link
Contributor

Perhaps we should make it non-optional then, I had thought the offset was separately preserved, given it isn't, there doesn't seem to be a reason to have the option

@viirya
Copy link
Member Author

viirya commented Jul 14, 2022

Unrelated error on Windows CI:

error: error: Invalid value for '<+toolchain>': Toolchain overrides must begin with '+'

@tustvold tustvold merged commit 86543a4 into apache:master Jul 14, 2022
@tustvold
Copy link
Contributor

I think there is more to be done in this vein, e.g. ListArray, StructArray, etc... but this is a good step forward. Thank you 🙂

@viirya
Copy link
Member Author

viirya commented Jul 14, 2022

Thanks @alamb @tustvold @JasonLi-cn @HaoYang670

@viirya
Copy link
Member Author

viirya commented Jul 14, 2022

I don't find C++ implementation does truncation for ListArray, StructArray, but maybe I miss it when reading the code. I will re-check this.

@ursabot
Copy link

ursabot commented Jul 14, 2022

Benchmark runs are scheduled for baseline = 5e520bb and contender = 86543a4. 86543a4 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@alamb
Copy link
Contributor

alamb commented Jul 15, 2022

I think there is more to be done in this vein, e.g. ListArray, StructArray, etc... but this is a good step forward. Thank you 🙂

@tustvold is the remaining work recorded anywhere? I am happy to write up a ticket but I don't understand the issue well enough to do so at this moment

@tustvold
Copy link
Contributor

Filed #2080 to track follow up work, I personally think we should accelerate efforts to remove offset from ArrayData, as opposed to continuing to work around it in various places

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate
Projects
None yet
7 participants