Skip to content

Commit

Permalink
ARROW-7979: [C++] Add experimental buffer compression to IPC write pa…
Browse files Browse the repository at this point in the history
…th. Add "field" selection to read path. Migrate some APIs to Result<T>. Read/write Message metadata

I apologize for the complexity of this patch, there was some necessary refactoring and then as I needed to add a `IpcReadOptions` struct, it made sense to port some APIs from Status to Result.

Summary of what's here

* Add IpcReadOptions struct
* Rename IpcOptions to IpcWriteOptions
* Serialize and write custom_metadata field in Message Flatbuffer struct
* Move `MemoryPool*` arguments in IPC functions to options structs
* Adds EXPERIMENTAL compression option to IpcOptions (which should be renamed IpcWriteOptions) which causes each buffer in a record batch body to be separately compressed with e.g. ZSTD. This is intended to be used internally in Feather V2 and not exported for general use. Based on the mailing list discussion, if we were to adopt this in the IPC protocol then this can be promoted to non-experimental
* Add "included_fields" option to select a subset of fields to load from a record batch when doing an IPC load. The motivation for this was to avoid unnecessary decompression when reading a subset of columns from an IPC stream
* Migrate most IPC read APIs to use Result, deprecate old methods
* Deprecate Status-returning IPC write method

Some other small changes:

* Add `check_metadata` option to `RecordBatch::Equals`
* Add `Codec::GetCompressionType` method for looking up `Compression::type` given a codec name

I don't have size/perf benchmarks about how the compression helps with file sizes or read performance, so I'll do that next in the course of completing ARROW-5510 ("Feather V2" file format)

Closes #6638 from wesm/ARROW-7979

Lead-authored-by: Wes McKinney <wesm+git@apache.org>
Co-authored-by: Sutou Kouhei <kou@clear-code.com>
Co-authored-by: Neal Richardson <neal.p.richardson@gmail.com>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
3 people committed Mar 25, 2020
1 parent 50c5daf commit fcde1eb
Show file tree
Hide file tree
Showing 64 changed files with 1,759 additions and 1,295 deletions.
21 changes: 9 additions & 12 deletions c_glib/arrow-glib/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,14 +296,12 @@ GArrowRecordBatchStreamReader *
garrow_record_batch_stream_reader_new(GArrowInputStream *stream,
GError **error)
{
using BaseType = arrow::ipc::RecordBatchReader;
using ReaderType = arrow::ipc::RecordBatchStreamReader;

auto arrow_input_stream = garrow_input_stream_get_raw(stream);
std::shared_ptr<BaseType> arrow_reader;
auto status = ReaderType::Open(arrow_input_stream, &arrow_reader);
if (garrow_error_check(error, status, "[record-batch-stream-reader][open]")) {
auto subtype = std::dynamic_pointer_cast<ReaderType>(arrow_reader);
auto arrow_reader = ReaderType::Open(arrow_input_stream);
if (garrow::check(error, arrow_reader, "[record-batch-stream-reader][open]")) {
auto subtype = std::dynamic_pointer_cast<ReaderType>(*arrow_reader);
return garrow_record_batch_stream_reader_new_raw(&subtype);
} else {
return NULL;
Expand Down Expand Up @@ -411,14 +409,13 @@ GArrowRecordBatchFileReader *
garrow_record_batch_file_reader_new(GArrowSeekableInputStream *file,
GError **error)
{
auto arrow_random_access_file = garrow_seekable_input_stream_get_raw(file);
using ReaderType = arrow::ipc::RecordBatchFileReader;

std::shared_ptr<arrow::ipc::RecordBatchFileReader> arrow_reader;
auto status =
arrow::ipc::RecordBatchFileReader::Open(arrow_random_access_file,
&arrow_reader);
if (garrow_error_check(error, status, "[record-batch-file-reader][open]")) {
return garrow_record_batch_file_reader_new_raw(&arrow_reader);
auto arrow_random_access_file = garrow_seekable_input_stream_get_raw(file);
auto arrow_reader = ReaderType::Open(arrow_random_access_file);
if (garrow::check(error, arrow_reader, "[record-batch-file-reader][open]")) {
auto subtype = std::dynamic_pointer_cast<ReaderType>(*arrow_reader);
return garrow_record_batch_file_reader_new_raw(&subtype);
} else {
return NULL;
}
Expand Down
43 changes: 20 additions & 23 deletions c_glib/arrow-glib/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,17 +238,15 @@ garrow_record_batch_stream_writer_new(GArrowOutputStream *sink,
GArrowSchema *schema,
GError **error)
{
using BaseType = arrow::ipc::RecordBatchWriter;
using WriterType = arrow::ipc::RecordBatchStreamWriter;

auto arrow_sink = garrow_output_stream_get_raw(sink).get();
std::shared_ptr<BaseType> arrow_writer;
auto status = WriterType::Open(arrow_sink,
garrow_schema_get_raw(schema),
&arrow_writer);
if (garrow_error_check(error, status, "[record-batch-stream-writer][open]")) {
auto subtype = std::dynamic_pointer_cast<WriterType>(arrow_writer);
return garrow_record_batch_stream_writer_new_raw(&subtype);
auto arrow_schema = garrow_schema_get_raw(schema);
auto arrow_writer_result =
arrow::ipc::NewStreamWriter(arrow_sink, arrow_schema);
if (garrow::check(error,
arrow_writer_result,
"[record-batch-stream-writer][open]")) {
auto arrow_writer = *arrow_writer_result;
return garrow_record_batch_stream_writer_new_raw(&arrow_writer);
} else {
return NULL;
}
Expand Down Expand Up @@ -285,17 +283,16 @@ garrow_record_batch_file_writer_new(GArrowOutputStream *sink,
GArrowSchema *schema,
GError **error)
{
using BaseType = arrow::ipc::RecordBatchWriter;
using WriterType = arrow::ipc::RecordBatchFileWriter;

auto arrow_sink = garrow_output_stream_get_raw(sink);
std::shared_ptr<BaseType> arrow_writer;
auto status = WriterType::Open(arrow_sink.get(),
garrow_schema_get_raw(schema),
&arrow_writer);
if (garrow_error_check(error, status, "[record-batch-file-writer][open]")) {
auto subtype = std::dynamic_pointer_cast<WriterType>(arrow_writer);
return garrow_record_batch_file_writer_new_raw(&subtype);
auto arrow_sink = garrow_output_stream_get_raw(sink).get();
auto arrow_schema = garrow_schema_get_raw(schema);
std::shared_ptr<arrow::ipc::RecordBatchWriter> arrow_writer;
auto arrow_writer_result =
arrow::ipc::NewFileWriter(arrow_sink, arrow_schema);
if (garrow::check(error,
arrow_writer_result,
"[record-batch-file-writer][open]")) {
auto arrow_writer = *arrow_writer_result;
return garrow_record_batch_file_writer_new_raw(&arrow_writer);
} else {
return NULL;
}
Expand Down Expand Up @@ -529,7 +526,7 @@ garrow_record_batch_writer_get_raw(GArrowRecordBatchWriter *writer)
}

GArrowRecordBatchStreamWriter *
garrow_record_batch_stream_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> *arrow_writer)
garrow_record_batch_stream_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchWriter> *arrow_writer)
{
auto writer =
GARROW_RECORD_BATCH_STREAM_WRITER(
Expand All @@ -540,7 +537,7 @@ garrow_record_batch_stream_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatc
}

GArrowRecordBatchFileWriter *
garrow_record_batch_file_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchFileWriter> *arrow_writer)
garrow_record_batch_file_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchWriter> *arrow_writer)
{
auto writer =
GARROW_RECORD_BATCH_FILE_WRITER(
Expand Down
4 changes: 2 additions & 2 deletions c_glib/arrow-glib/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
GArrowRecordBatchWriter *garrow_record_batch_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchWriter> *arrow_writer);
std::shared_ptr<arrow::ipc::RecordBatchWriter> garrow_record_batch_writer_get_raw(GArrowRecordBatchWriter *writer);

GArrowRecordBatchStreamWriter *garrow_record_batch_stream_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> *arrow_writer);
GArrowRecordBatchStreamWriter *garrow_record_batch_stream_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchWriter> *arrow_writer);

GArrowRecordBatchFileWriter *garrow_record_batch_file_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchFileWriter> *arrow_writer);
GArrowRecordBatchFileWriter *garrow_record_batch_file_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchWriter> *arrow_writer);

GArrowFeatherFileWriter *garrow_feather_file_writer_new_raw(arrow::ipc::feather::TableWriter *arrow_writer);
arrow::ipc::feather::TableWriter *garrow_feather_file_writer_get_raw(GArrowFeatherFileWriter *writer);
2 changes: 1 addition & 1 deletion cpp/src/arrow/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#pragma once

#include <atomic>
#include <atomic> // IWYU pragma: export
#include <cstdint>
#include <iosfwd>
#include <memory>
Expand Down
17 changes: 13 additions & 4 deletions cpp/src/arrow/compute/kernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,24 @@ class ARROW_EXPORT BinaryKernel : public OpKernel {
Datum* out) = 0;
};

// TODO doxygen 1.8.16 does not like the following code
///@cond INTERNAL

static inline bool CollectionEquals(const std::vector<Datum>& left,
const std::vector<Datum>& right) {
if (left.size() != right.size()) return false;

for (size_t i = 0; i < left.size(); i++)
if (!left[i].Equals(right[i])) return false;
if (left.size() != right.size()) {
return false;
}

for (size_t i = 0; i < left.size(); i++) {
if (!left[i].Equals(right[i])) {
return false;
}
}
return true;
}

///@endcond

} // namespace compute
} // namespace arrow
12 changes: 4 additions & 8 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@

#include <algorithm>
#include <memory>
#include <unordered_set>
#include <utility>
#include <vector>

#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/filter.h"
#include "arrow/dataset/scanner.h"
#include "arrow/ipc/reader.h"
#include "arrow/ipc/writer.h"
#include "arrow/util/iterator.h"

namespace arrow {
Expand All @@ -40,12 +39,11 @@ Result<std::shared_ptr<ipc::RecordBatchFileReader>> OpenReader(
}

std::shared_ptr<ipc::RecordBatchFileReader> reader;
auto status = ipc::RecordBatchFileReader::Open(std::move(input), &reader);
auto status = ipc::RecordBatchFileReader::Open(std::move(input)).Value(&reader);
if (!status.ok()) {
return status.WithMessage("Could not open IPC input source '", source.path(),
"': ", status.message());
}

return reader;
}

Expand Down Expand Up @@ -161,10 +159,8 @@ Result<std::shared_ptr<WriteTask>> IpcFileFormat::WriteFragment(
RETURN_NOT_OK(CreateDestinationParentDir());

ARROW_ASSIGN_OR_RAISE(auto out_stream, destination_.OpenWritable());

ARROW_ASSIGN_OR_RAISE(auto writer, ipc::RecordBatchFileWriter::Open(
out_stream.get(), fragment_->schema()));

ARROW_ASSIGN_OR_RAISE(auto writer,
ipc::NewFileWriter(out_stream.get(), fragment_->schema()));
ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment_->Scan(scan_context_));

for (auto maybe_scan_task : scan_task_it) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

#include <memory>
#include <string>
#include <utility>

#include "arrow/dataset/file_base.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/result.h"

namespace arrow {
namespace dataset {
Expand Down
7 changes: 2 additions & 5 deletions cpp/src/arrow/dataset/file_ipc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ class ArrowIpcWriterMixin : public ::testing::Test {
std::shared_ptr<Buffer> Write(RecordBatchReader* reader) {
EXPECT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create());

EXPECT_OK_AND_ASSIGN(auto writer,
ipc::RecordBatchFileWriter::Open(sink.get(), reader->schema()));
EXPECT_OK_AND_ASSIGN(auto writer, ipc::NewFileWriter(sink.get(), reader->schema()));

std::vector<std::shared_ptr<RecordBatch>> batches;
ARROW_EXPECT_OK(reader->ReadAll(&batches));
Expand All @@ -63,9 +62,7 @@ class ArrowIpcWriterMixin : public ::testing::Test {

std::shared_ptr<Buffer> Write(const Table& table) {
EXPECT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create());

EXPECT_OK_AND_ASSIGN(auto writer,
ipc::RecordBatchFileWriter::Open(sink.get(), table.schema()));
EXPECT_OK_AND_ASSIGN(auto writer, ipc::NewFileWriter(sink.get(), table.schema()));

ARROW_EXPECT_OK(writer->WriteTable(table));

Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/extension_type_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,14 @@ TEST_F(TestExtensionType, ExtensionTypeTest) {
auto RoundtripBatch = [](const std::shared_ptr<RecordBatch>& batch,
std::shared_ptr<RecordBatch>* out) {
ASSERT_OK_AND_ASSIGN(auto out_stream, io::BufferOutputStream::Create());
ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcOptions::Defaults(),
ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcWriteOptions::Defaults(),
out_stream.get()));

ASSERT_OK_AND_ASSIGN(auto complete_ipc_stream, out_stream->Finish());

io::BufferReader reader(complete_ipc_stream);
std::shared_ptr<RecordBatchReader> batch_reader;
ASSERT_OK(ipc::RecordBatchStreamReader::Open(&reader, &batch_reader));
ASSERT_OK_AND_ASSIGN(batch_reader, ipc::RecordBatchStreamReader::Open(&reader));
ASSERT_OK(batch_reader->ReadNext(out));
};

Expand Down Expand Up @@ -256,7 +256,7 @@ TEST_F(TestExtensionType, UnrecognizedExtension) {
// Write full IPC stream including schema, then unregister type, then read
// and ensure that a plain instance of the storage type is created
ASSERT_OK_AND_ASSIGN(auto out_stream, io::BufferOutputStream::Create());
ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcOptions::Defaults(),
ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcWriteOptions::Defaults(),
out_stream.get()));

ASSERT_OK_AND_ASSIGN(auto complete_ipc_stream, out_stream->Finish());
Expand All @@ -270,7 +270,7 @@ TEST_F(TestExtensionType, UnrecognizedExtension) {

io::BufferReader reader(complete_ipc_stream);
std::shared_ptr<RecordBatchReader> batch_reader;
ASSERT_OK(ipc::RecordBatchStreamReader::Open(&reader, &batch_reader));
ASSERT_OK_AND_ASSIGN(batch_reader, ipc::RecordBatchStreamReader::Open(&reader));
std::shared_ptr<RecordBatch> read_batch;
ASSERT_OK(batch_reader->ReadNext(&read_batch));
CompareBatch(*batch_no_ext, *read_batch);
Expand Down
9 changes: 3 additions & 6 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class GrpcStreamReader : public FlightStreamReader {

private:
friend class GrpcIpcMessageReader;
std::unique_ptr<ipc::RecordBatchReader> batch_reader_;
std::shared_ptr<ipc::RecordBatchReader> batch_reader_;
std::shared_ptr<Buffer> last_app_metadata_;
std::shared_ptr<ClientRpc> rpc_;
};
Expand Down Expand Up @@ -327,8 +327,8 @@ Status GrpcStreamReader::Open(std::unique_ptr<ClientRpc> rpc,
out->get()->rpc_ = std::move(rpc);
std::unique_ptr<GrpcIpcMessageReader> message_reader(
new GrpcIpcMessageReader(out->get(), out->get()->rpc_, std::move(stream)));
return ipc::RecordBatchStreamReader::Open(std::move(message_reader),
&(*out)->batch_reader_);
return (ipc::RecordBatchStreamReader::Open(std::move(message_reader))
.Value(&(*out)->batch_reader_));
}

std::shared_ptr<Schema> GrpcStreamReader::schema() const {
Expand Down Expand Up @@ -385,9 +385,6 @@ class GrpcStreamWriter : public FlightStreamWriter {
}
return Status::OK();
}
void set_memory_pool(MemoryPool* pool) override {
batch_writer_->set_memory_pool(pool);
}
Status Close() override { return batch_writer_->Close(); }

private:
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/flight/perf_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ class PerfDataStream : public FlightDataStream {
} else {
records_sent_ += batch_length_;
}
return ipc::internal::GetRecordBatchPayload(
*batch, ipc_options_, default_memory_pool(), &payload->ipc_message);
return ipc::internal::GetRecordBatchPayload(*batch, ipc_options_,
&payload->ipc_message);
}

private:
Expand All @@ -114,7 +114,7 @@ class PerfDataStream : public FlightDataStream {
int64_t records_sent_;
std::shared_ptr<Schema> schema_;
ipc::DictionaryMemo dictionary_memo_;
ipc::IpcOptions ipc_options_;
ipc::IpcWriteOptions ipc_options_;
std::shared_ptr<RecordBatch> batch_;
ArrayVector arrays_;
};
Expand Down
16 changes: 9 additions & 7 deletions cpp/src/arrow/flight/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ class FlightMessageReaderImpl : public FlightMessageReader {
Status Init() {
message_reader_ = new FlightIpcMessageReader(reader_, &last_metadata_);
return ipc::RecordBatchStreamReader::Open(
std::unique_ptr<ipc::MessageReader>(message_reader_), &batch_reader_);
std::unique_ptr<ipc::MessageReader>(message_reader_))
.Value(&batch_reader_);
}

const FlightDescriptor& descriptor() const override {
Expand Down Expand Up @@ -804,7 +805,9 @@ class RecordBatchStream::RecordBatchStreamImpl {

RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader,
MemoryPool* pool)
: pool_(pool), reader_(reader), ipc_options_(ipc::IpcOptions::Defaults()) {}
: reader_(reader), ipc_options_(ipc::IpcWriteOptions::Defaults()) {
ipc_options_.memory_pool = pool;
}

std::shared_ptr<Schema> schema() { return reader_->schema(); }

Expand All @@ -828,7 +831,7 @@ class RecordBatchStream::RecordBatchStreamImpl {
if (stage_ == Stage::DICTIONARY) {
if (dictionary_index_ == static_cast<int>(dictionaries_.size())) {
stage_ = Stage::RECORD_BATCH;
return ipc::internal::GetRecordBatchPayload(*current_batch_, ipc_options_, pool_,
return ipc::internal::GetRecordBatchPayload(*current_batch_, ipc_options_,
&payload->ipc_message);
} else {
return GetNextDictionary(payload);
Expand All @@ -843,15 +846,15 @@ class RecordBatchStream::RecordBatchStreamImpl {
payload->ipc_message.metadata = nullptr;
return Status::OK();
} else {
return ipc::internal::GetRecordBatchPayload(*current_batch_, ipc_options_, pool_,
return ipc::internal::GetRecordBatchPayload(*current_batch_, ipc_options_,
&payload->ipc_message);
}
}

private:
Status GetNextDictionary(FlightPayload* payload) {
const auto& it = dictionaries_[dictionary_index_++];
return ipc::internal::GetDictionaryPayload(it.first, it.second, ipc_options_, pool_,
return ipc::internal::GetDictionaryPayload(it.first, it.second, ipc_options_,
&payload->ipc_message);
}

Expand All @@ -864,10 +867,9 @@ class RecordBatchStream::RecordBatchStreamImpl {
}

Stage stage_ = Stage::NEW;
MemoryPool* pool_;
std::shared_ptr<RecordBatchReader> reader_;
ipc::DictionaryMemo dictionary_memo_;
ipc::IpcOptions ipc_options_;
ipc::IpcWriteOptions ipc_options_;
std::shared_ptr<RecordBatch> current_batch_;
std::vector<std::pair<int64_t, std::shared_ptr<Array>>> dictionaries_;

Expand Down
Loading

0 comments on commit fcde1eb

Please sign in to comment.