Skip to content

Commit

Permalink
ARROW-7916: [C++] Project IPC batches to materialized fields only
Browse files Browse the repository at this point in the history
This is an optimization where if both a filter and a projection are given, we apply the -pre-projection before the filter such that columns not referenced by the final projection are not copied.

Closes #6474 from bkietz/7916-Dataset-Project-IPC-recor and squashes the following commits:

f8f1194 <Benjamin Kietzman> revert to unique-ing materialized fields in IpcScanTask
a1b3362 <Benjamin Kietzman> refactor file format tests for clarity
c268e9b <Benjamin Kietzman> remove unique-ing of materialized fields
6240363 <Benjamin Kietzman> ARROW-7916:  Project IPC batches to materialized fields only

Authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
  • Loading branch information
bkietz committed Feb 27, 2020
1 parent 34340c6 commit 946beaa
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 219 deletions.
17 changes: 11 additions & 6 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,29 @@ class ARROW_DS_EXPORT Fragment {
public:
/// \brief Scan returns an iterator of ScanTasks, each of which yields
/// RecordBatches from this Fragment.
///
/// Note that batches yielded using this method will not be filtered and may not align
/// with the Fragment's schema. In particular, note that columns referenced by the
/// filter may be present in yielded batches even if they are not projected (so that
/// they are available when a filter is applied). Additionally, explicitly projected
/// columns may be absent if they were not present in this fragment.
///
/// To receive a record batch stream which is fully filtered and projected, use Scanner.
virtual Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) = 0;

/// \brief Return true if the fragment can benefit from parallel
/// scanning
/// \brief Return true if the fragment can benefit from parallel scanning.
virtual bool splittable() const = 0;

/// \brief Filtering, schema reconciliation, and partition options to use when
/// scanning this fragment. May be nullptr, which indicates that no filtering
/// or schema reconciliation will be performed and all partitions will be
/// scanned.
/// scanning this fragment.
const std::shared_ptr<ScanOptions>& scan_options() const { return scan_options_; }

const std::shared_ptr<Schema>& schema() const;

virtual ~Fragment() = default;

/// \brief An expression which evaluates to true for all data viewed by this
/// Fragment. May be null, which indicates no information is available.
/// Fragment.
const std::shared_ptr<Expression>& partition_expression() const {
return partition_expression_;
}
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/arrow/dataset/dataset_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,12 @@ inline std::shared_ptr<Schema> SchemaFromColumnNames(
const std::shared_ptr<Schema>& input, const std::vector<std::string>& column_names) {
std::vector<std::shared_ptr<Field>> columns;
for (const auto& name : column_names) {
auto field = input->GetFieldByName(name);
if (field != nullptr) {
if (auto field = input->GetFieldByName(name)) {
columns.push_back(std::move(field));
}
}

return std::make_shared<Schema>(columns);
return schema(std::move(columns));
}

} // namespace dataset
Expand Down
31 changes: 25 additions & 6 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "arrow/dataset/file_ipc.h"

#include <algorithm>
#include <memory>
#include <unordered_set>
#include <utility>
Expand Down Expand Up @@ -57,22 +58,40 @@ class IpcScanTask : public ScanTask {
: ScanTask(std::move(options), std::move(context)), source_(std::move(source)) {}

Result<RecordBatchIterator> Execute() override {
struct {
struct Impl {
static Result<Impl> Make(const FileSource& source,
const std::vector<std::string>& materialized_fields,
MemoryPool* pool) {
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source));
auto materialized_schema =
SchemaFromColumnNames(reader->schema(), materialized_fields);
return Impl{std::move(reader),
RecordBatchProjector(std::move(materialized_schema)), pool, 0};
}

Result<std::shared_ptr<RecordBatch>> Next() {
if (i_ == reader_->num_record_batches()) {
return nullptr;
}

std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(reader_->ReadRecordBatch(i_++, &batch));
return batch;
return projector_.Project(*batch, pool_);
}

std::shared_ptr<ipc::RecordBatchFileReader> reader_;
int i_ = 0;
} batch_it;

ARROW_ASSIGN_OR_RAISE(batch_it.reader_, OpenReader(source_));
RecordBatchProjector projector_;
MemoryPool* pool_;
int i_;
};

// get names of fields explicitly projected or referenced by filter
auto fields = options_->MaterializedFields();
std::sort(fields.begin(), fields.end());
auto unique_end = std::unique(fields.begin(), fields.end());
fields.erase(unique_end, fields.end());

ARROW_ASSIGN_OR_RAISE(auto batch_it, Impl::Make(source_, fields, context_->pool));

return RecordBatchIterator(std::move(batch_it));
}
Expand Down
141 changes: 98 additions & 43 deletions cpp/src/arrow/dataset/file_ipc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "arrow/io/memory.h"
#include "arrow/ipc/writer.h"
#include "arrow/record_batch.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/util.h"

Expand All @@ -40,20 +39,16 @@ constexpr int64_t kNumRows = kBatchSize * kBatchRepetitions;

class ArrowIpcWriterMixin : public ::testing::Test {
public:
std::shared_ptr<Buffer> Write(std::vector<RecordBatchReader*> readers) {
std::shared_ptr<Buffer> Write(RecordBatchReader* reader) {
EXPECT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create());
auto writer_schema = readers[0]->schema();

EXPECT_OK_AND_ASSIGN(auto writer,
ipc::RecordBatchFileWriter::Open(sink.get(), writer_schema));

for (auto reader : readers) {
std::vector<std::shared_ptr<RecordBatch>> batches;
ARROW_EXPECT_OK(reader->ReadAll(&batches));
for (auto batch : batches) {
AssertSchemaEqual(batch->schema(), writer_schema);
ARROW_EXPECT_OK(writer->WriteRecordBatch(*batch));
}
ipc::RecordBatchFileWriter::Open(sink.get(), reader->schema()));

std::vector<std::shared_ptr<RecordBatch>> batches;
ARROW_EXPECT_OK(reader->ReadAll(&batches));
for (auto batch : batches) {
ARROW_EXPECT_OK(writer->WriteRecordBatch(*batch));
}

ARROW_EXPECT_OK(writer->Close());
Expand All @@ -62,10 +57,6 @@ class ArrowIpcWriterMixin : public ::testing::Test {
return out;
}

std::shared_ptr<Buffer> Write(RecordBatchReader* reader) {
return Write(std::vector<RecordBatchReader*>{reader});
}

std::shared_ptr<Buffer> Write(const Table& table) {
EXPECT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create());

Expand All @@ -81,36 +72,34 @@ class ArrowIpcWriterMixin : public ::testing::Test {
}
};

class IpcBufferFixtureMixin : public ArrowIpcWriterMixin {
class TestIpcFileFormat : public ArrowIpcWriterMixin {
public:
std::unique_ptr<FileSource> GetFileSource(RecordBatchReader* reader) {
auto buffer = Write(reader);
return internal::make_unique<FileSource>(std::move(buffer));
}

std::unique_ptr<FileSource> GetFileSource(std::vector<RecordBatchReader*> readers) {
auto buffer = Write(std::move(readers));
return internal::make_unique<FileSource>(std::move(buffer));
std::unique_ptr<RecordBatchReader> GetRecordBatchReader(
std::shared_ptr<Schema> schema = nullptr) {
return MakeGeneratedRecordBatch(schema ? schema : schema_, kBatchSize,
kBatchRepetitions);
}

std::unique_ptr<RecordBatchReader> GetRecordBatchReader() {
auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
int64_t i = 0;
return MakeGeneratedRecordBatch(
batch->schema(), [batch, i](std::shared_ptr<RecordBatch>* out) mutable {
*out = i++ < kBatchRepetitions ? batch : nullptr;
return Status::OK();
});
RecordBatchIterator Batches(ScanTaskIterator scan_task_it) {
return MakeFlattenIterator(MakeMaybeMapIterator(
[](std::shared_ptr<ScanTask> scan_task) { return scan_task->Execute(); },
std::move(scan_task_it)));
}

protected:
std::shared_ptr<Schema> schema_ = schema({field("f64", float64())});
};
RecordBatchIterator Batches(Fragment* fragment) {
EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_));
return Batches(std::move(scan_task_it));
}

class TestIpcFileFormat : public IpcBufferFixtureMixin {
protected:
std::shared_ptr<ScanOptions> opts_;
std::shared_ptr<ScanContext> ctx_ = std::make_shared<ScanContext>();
std::shared_ptr<Schema> schema_ = schema({field("f64", float64())});
};

TEST_F(TestIpcFileFormat, ScanRecordBatchReader) {
Expand All @@ -120,16 +109,11 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReader) {
opts_ = ScanOptions::Make(reader->schema());
auto fragment = std::make_shared<IpcFragment>(*source, opts_);

ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_));
int64_t row_count = 0;

for (auto maybe_task : scan_task_it) {
ASSERT_OK_AND_ASSIGN(auto task, std::move(maybe_task));
ASSERT_OK_AND_ASSIGN(auto rb_it, task->Execute());
for (auto maybe_batch : rb_it) {
ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch));
row_count += batch->num_rows();
}
for (auto maybe_batch : Batches(fragment.get())) {
ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch));
row_count += batch->num_rows();
}

ASSERT_EQ(row_count, kNumRows);
Expand All @@ -151,9 +135,80 @@ TEST_F(TestIpcFileFormat, OpenFailureWithRelevantError) {
result.status());
}

// TODO(bkietz) extend IpcFileFormat to support projection pushdown
// TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjected)
// TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjectedMissingCols)
TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjected) {
schema_ = schema({field("f64", float64()), field("i64", int64()),
field("f32", float32()), field("i32", int32())});

opts_ = ScanOptions::Make(schema_);
opts_->projector = RecordBatchProjector(SchemaFromColumnNames(schema_, {"f64"}));
opts_->filter = equal(field_ref("i32"), scalar(0));

// NB: projector is applied by the scanner; IpcFragment does not evaluate it so
// we will not drop "i32" even though it is not in the projector's schema
auto expected_schema = schema({field("f64", float64()), field("i32", int32())});

auto reader = GetRecordBatchReader();
auto source = GetFileSource(reader.get());
auto fragment = std::make_shared<IpcFragment>(*source, opts_);

int64_t row_count = 0;

for (auto maybe_batch : Batches(fragment.get())) {
ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch));
row_count += batch->num_rows();
AssertSchemaEqual(*batch->schema(), *expected_schema,
/*check_metadata=*/false);
}

ASSERT_EQ(row_count, kNumRows);
}

TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjectedMissingCols) {
auto reader_without_i32 = GetRecordBatchReader(
schema({field("f64", float64()), field("i64", int64()), field("f32", float32())}));

auto reader_without_f64 = GetRecordBatchReader(
schema({field("i64", int64()), field("f32", float32()), field("i32", int32())}));

auto reader =
GetRecordBatchReader(schema({field("f64", float64()), field("i64", int64()),
field("f32", float32()), field("i32", int32())}));

schema_ = reader->schema();
opts_ = ScanOptions::Make(schema_);
opts_->projector = RecordBatchProjector(SchemaFromColumnNames(schema_, {"f64"}));
opts_->filter = equal(field_ref("i32"), scalar(0));

for (auto reader : {reader.get(), reader_without_i32.get(), reader_without_f64.get()}) {
auto source = GetFileSource(reader);
auto fragment = std::make_shared<IpcFragment>(*source, opts_);

// NB: projector is applied by the scanner; Fragment does not evaluate it.
// We will not drop "i32" even though it is not in the projector's schema.
//
// in the case where a file doesn't contain a referenced field, we won't
// materialize it (the filter/projector will populate it with nulls later)
std::shared_ptr<Schema> expected_schema;
if (reader == reader_without_i32.get()) {
expected_schema = schema({field("f64", float64())});
} else if (reader == reader_without_f64.get()) {
expected_schema = schema({field("i32", int32())});
} else {
expected_schema = schema({field("f64", float64()), field("i32", int32())});
}

int64_t row_count = 0;

for (auto maybe_batch : Batches(fragment.get())) {
ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch));
row_count += batch->num_rows();
AssertSchemaEqual(*batch->schema(), *expected_schema,
/*check_metadata=*/false);
}

ASSERT_EQ(row_count, kNumRows);
}
}

TEST_F(TestIpcFileFormat, Inspect) {
auto reader = GetRecordBatchReader();
Expand Down
Loading

0 comments on commit 946beaa

Please sign in to comment.