Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] How to load an arrow::Table from a buffer of data without intermediate file creation in C++? #37144

Closed
mohabouje opened this issue Aug 14, 2023 · 4 comments · Fixed by #37167
Assignees
Labels
Component: C++ Type: usage Issue is a user question
Milestone

Comments

@mohabouje
Copy link

mohabouje commented Aug 14, 2023

I have a pipeline of processes that do different stuff. One of the pipes reads a file and de-compresses it into a buffer. The buffer in question contains an arrow Table. A component takes this buffer and returns a table with the data.

The code works fine, but there is a significant performance issue as the code needs to write the data into a file and then process it to read it with the Arrow file stream classes.

I've been finding a way to perform this action in the documentation without writing the data into a file on disk. I was not able to make it work with a RecordBatchStreamReader or any of the alternatives I've found in the docs.

Would you happen to have any working examples showing how to avoid this disk write? Is this even possible?

Here's the code in question:

auto read(std::span<const char> buffer) -> std::shared_ptr<arrow::Table> {

    // Wasting resources creating a temporal file...
    auto const output = "/tmp/random.arrow";
    auto stream       = std::ofstream(output, std::ios::binary | std::ios::out);
    stream.write(buffer.data(), buffer.size());
    stream.flush();
    stream.close();

    // Read the file and convert it into an arrow::Table
    auto const file_stream = ::arrow::io::ReadableFile::Open(output);
    auto const ipc_reader  = ::arrow::ipc::RecordBatchFileReader::Open(file_stream.ValueUnsafe());
    if (!ipc_reader.ok()) {
        return nullptr;
    }

    auto const reader             = ipc_reader.ValueOrDie();
    auto const num_record_batches = reader->num_record_batches();
    auto batches                  = std::vector<std::shared_ptr<::arrow::RecordBatch>>(num_record_batches);
    for (auto i = 0; i < num_record_batches; ++i) {
        auto const batch = reader->ReadRecordBatch(i);
        if (!batch.ok()) {
            return nullptr;
        }
        batches[i] = batch.ValueOrDie();
    }

    auto const creation = ::arrow::Table::FromRecordBatches(batches);
    if (!creation.ok()) {
        return nullptr;

    }
    return creation.ValueOrDie();
}

Component(s)

C++

@mohabouje mohabouje added the Type: usage Issue is a user question label Aug 14, 2023
@kou
Copy link
Member

kou commented Aug 14, 2023

Does this work?

auto read(std::span<const char> buffer) -> std::shared_ptr<arrow::Table> {
    ::arrow::Buffer arrow_buffer(buffer.data(), buffer.size());
    ::arrow::io::BufferReader buffer_reader(arrow_buffer);
    auto const ipc_reader = ::arrow::ipc::RecordBatchFileReader::Open(&buffer_reader);
    if (!ipc_reader.ok()) {
        return nullptr;
    }
    auto const reader = *ipc_reader;
    auto const creation = ::arrow::Table::FromRecordBatcheReader(reader.get());
    if (!creation.ok()) {
        return nullptr;

    }
    return *creation;
}

@mohabouje
Copy link
Author

mohabouje commented Aug 14, 2023

The code does not compile because the ::arrow::Table::FromRecordBatcheReader is expecting a RecordBatchReader, while we are building a RecordBatchFileReader.

Using the buffer ::arrow::io::BufferReader to avoid dumping the data into a file definitely helped. The missing part is the need to create a vector holding all the ::arrow::RecordBatch to create an ::arrow::Table after. Any idea of how to avoid this intermediate step?

auto read(std::span<const char> buffer) -> std::shared_ptr<arrow::Table> {

    ::arrow::Buffer arrow_buffer(buffer.data(), buffer.size());
    ::arrow::io::BufferReader buffer_reader(arrow_buffer);
    auto const ipc_reader = ::arrow::ipc::RecordBatchFileReader::Open(&buffer_reader);
    if (!ipc_reader.ok()) {
        return nullptr;
    }

    auto const reader             = ipc_reader.ValueOrDie();
    auto const num_record_batches = reader->num_record_batches();
    auto batches                  = std::vector<std::shared_ptr<::arrow::RecordBatch>>(num_record_batches);
    for (auto i = 0; i < num_record_batches; ++i) {
        auto const batch = reader->ReadRecordBatch(i);
        if (!batch.ok()) {
            return nullptr;
        }
        batches[i] = batch.ValueOrDie();
    }

    auto const creation = ::arrow::Table::FromRecordBatches(batches);
    if (!creation.ok()) {
        return nullptr;

    }
    return creation.ValueOrDie();
}

I've also tried with a RecordBatchStreamReader:

auto read(std::span<const char> buffer) -> std::shared_ptr<arrow::Table> {
    ::arrow::Buffer arrow_buffer(buffer.data(), buffer.size());
    ::arrow::io::BufferReader buffer_reader(arrow_buffer);
    auto const ipc_reader = ::arrow::ipc::RecordBatchStreamReader::Open(&buffer_reader);
    if (!ipc_reader.ok()) {
        return nullptr;
    }
    auto const reader = *ipc_reader;
    auto const creation = ::arrow::Table::FromRecordBatcheReader(reader.get());
    if (!creation.ok()) {
        return nullptr;

    }
    return *creation;
}

But I do get this error:

 Expected to read 1330795073 metadata bytes, but only read 829202182

@kou
Copy link
Member

kou commented Aug 15, 2023

Ah, RecordBatchFileReader::ToTable() will be convenient. I've implemented it: #37167

You can't use RecordBatchStreamReader for IPC file format: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
It's for IPC streaming format: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format

If you change the in-memory data format to IPC streaming format from IPC file format, you can use RecordBatchStreamReader.

If you still need to use IPC file format, you need to use #37167.

@mohabouje
Copy link
Author

mohabouje commented Aug 15, 2023

Thanks for the quick PR introducing the feature; RecordBatchFileReader::ToTable is exactly what I was looking for :)

As you mentioned in the PR, Table::FromRecordBatchFileReader() could be added for consistency and simplicity.

kou added a commit that referenced this issue Aug 17, 2023
…37167)

### Rationale for this change

`RecordBatchReader` has them but `RecordBatchFileReader` doesn't. They are convenient.

### What changes are included in this PR?

Add them.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* Closes: #37144

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
@kou kou added this to the 14.0.0 milestone Aug 17, 2023
loicalleyne pushed a commit to loicalleyne/arrow that referenced this issue Nov 13, 2023
…le} (apache#37167)

### Rationale for this change

`RecordBatchReader` has them but `RecordBatchFileReader` doesn't. They are convenient.

### What changes are included in this PR?

Add them.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* Closes: apache#37144

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: C++ Type: usage Issue is a user question
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants