Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RecordBatch: Serialization of sliced record using StreamWriter produces incorrect resut #1528

Closed
REASY opened this issue Apr 8, 2022 · 5 comments · Fixed by #2040
Closed
Labels
enhancement Any new improvement worthy of a entry in the changelog help wanted

Comments

@REASY
Copy link

REASY commented Apr 8, 2022

When you slice RecordBatch and serialize it with StreamWriter, it produces an incorrect result. I'm using arrow = "11.1.0"

To reproduce once can use the following test:

#[cfg(test)]
mod tests {
    use std::sync::Arc;
    use arrow::array::{Int32Array, StringArray};
    use arrow::datatypes::{DataType, Field, Schema};
    use arrow::ipc::writer::StreamWriter;
    use arrow::record_batch::RecordBatch;

    #[test]
    fn it_works() {
        pub fn serialize(record: &RecordBatch) -> Vec<u8> {
            let buffer: Vec<u8> = Vec::new();
            let mut stream_writer = StreamWriter::try_new(buffer, &record.schema()).unwrap();
            stream_writer.write(record).unwrap();
            stream_writer.finish().unwrap();
            let serialized_batch = stream_writer.into_inner().unwrap();
            serialized_batch
        }

        fn create_batch(rows: usize) -> RecordBatch {
            let schema = Schema::new(vec![
                Field::new("a", DataType::Int32, false),
                Field::new("b", DataType::Utf8, false),
            ]);
            let expected_schema = schema.clone();

            let a = Int32Array::from(vec![1; rows]);
            let b = StringArray::from(vec!["a"; rows]);

            let record_batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])
                .unwrap();
            record_batch
        }
        let big_record_batch = create_batch(65536);
        println!("big_record_batch with dimension ({}, {}) (rows x cols) serialized as Apache Arrow IPC in {} bytes", big_record_batch.num_rows(),
                 big_record_batch.num_columns(), serialize(&big_record_batch).len());
        let length = 5;
        let small_record_batch = create_batch(length);
        println!("small_record_batch with dimension ({}, {}) (rows x cols) serialized as Apache Arrow IPC in {} bytes", small_record_batch.num_rows(),
                 small_record_batch.num_columns(), serialize(&small_record_batch).len());

        let offset = 2;
        let record_batch_slice = big_record_batch.slice(offset, length);
        println!("(Sliced): record_batch_slice with dimension ({}, {}) (rows x cols) serialized as Apache Arrow IPC in {} bytes", record_batch_slice.num_rows(),
                 record_batch_slice.num_columns(), serialize(&record_batch_slice).len());
    }
}

As you can see the sliced one has almost the same size as big_record_batch, but I would expect it to be the same size as small_record_batch:

big_record_batch with dimension (65536, 2) (rows x cols) serialized as Apache Arrow IPC in 606608 bytes
small_record_batch with dimension (5, 2) (rows x cols) serialized as Apache Arrow IPC in 464 bytes
(Sliced): record_batch_slice with dimension (5, 2) (rows x cols) serialized as Apache Arrow IPC in 590240 bytes

This can be related to Add support for writing sliced arrays and flight_data_from_arrow_batch sends too much data

@REASY REASY added the bug label Apr 8, 2022
@tustvold
Copy link
Contributor

Can you confirm that the issue is just the size of the written file, and not a correctness problem - i.e. the data is larger than it could be, but still round-trips correctly? If so, I think as you've suggested this might be a duplicate of #208.

@kaaniboy
Copy link

kaaniboy commented Jul 8, 2022

Is there any plan to resolve this issue? For my use case, I care specifically that I can write multiple smaller IPC messages rather than a single large one. I hoped to achieve this by slicing the large RecordBatch and writing each slice separately. It seems like a similar issue existed in arrow2 but was resolved last year.

@tustvold tustvold added enhancement Any new improvement worthy of a entry in the changelog help wanted and removed bug labels Jul 9, 2022
@tustvold
Copy link
Contributor

tustvold commented Jul 9, 2022

As stated above this isn't a bug per se, but rather that the IPC format faithfully sends the representation of the arrays over the wire - even if some portion of the values have been logically sliced away. I think some feature that truncates buffers, rewriting offsets, etc... is definitely possible as described in #208.

I personally have very limited time to spend on this, but perhaps @nevi-me or @viirya might have some spare cycles?

@viirya
Copy link
Member

viirya commented Jul 9, 2022

I will try to take a look this weekend.

@alamb
Copy link
Contributor

alamb commented Jul 12, 2022

see #2040 from @viirya ❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog help wanted
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants