Skip to content

Commit

Permalink
ARROW-8295: [C++][Dataset] Push down projection to IpcReadOptions
Browse files Browse the repository at this point in the history
Closes #6789 from bkietz/8295-IpcFileFormat-should-expl

Authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
bkietz authored and wesm committed Apr 13, 2020
1 parent 712b8f2 commit 3725aaa
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 151 deletions.
2 changes: 1 addition & 1 deletion c_glib/test/test-feather-file-reader.rb
Expand Up @@ -65,7 +65,7 @@ def setup_file(table)
setup_file(table) do |reader|
assert_equal(build_table("message" => build_string_array(["Login"]),
"host" => build_string_array(["www"])),
reader.read_names(["host", "message"]))
reader.read_names(["message", "host"]))
end
end
end
65 changes: 37 additions & 28 deletions cpp/src/arrow/dataset/file_ipc.cc
Expand Up @@ -32,15 +32,18 @@
namespace arrow {
namespace dataset {

static ipc::IpcReadOptions default_read_options() {
auto options = ipc::IpcReadOptions::Defaults();
options.use_threads = false;
return options;
}

Result<std::shared_ptr<ipc::RecordBatchFileReader>> OpenReader(
const FileSource& source, std::shared_ptr<io::RandomAccessFile> input = nullptr) {
if (input == nullptr) {
ARROW_ASSIGN_OR_RAISE(input, source.Open());
}
const FileSource& source,
const ipc::IpcReadOptions& options = default_read_options()) {
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());

std::shared_ptr<ipc::RecordBatchFileReader> reader;
auto options = ipc::IpcReadOptions::Defaults();
options.use_threads = false;

auto status =
ipc::RecordBatchFileReader::Open(std::move(input), options).Value(&reader);
Expand All @@ -51,6 +54,20 @@ Result<std::shared_ptr<ipc::RecordBatchFileReader>> OpenReader(
return reader;
}

Result<std::vector<int>> GetIncludedFields(
const Schema& schema, const std::vector<std::string>& materialized_fields) {
std::vector<int> included_fields;

for (FieldRef ref : materialized_fields) {
ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(schema));
if (match.indices().empty()) continue;

included_fields.push_back(match.indices()[0]);
}

return included_fields;
}

/// \brief A ScanTask backed by an Ipc file.
class IpcScanTask : public ScanTask {
public:
Expand All @@ -60,41 +77,33 @@ class IpcScanTask : public ScanTask {

Result<RecordBatchIterator> Execute() override {
struct Impl {
static Result<Impl> Make(const FileSource& source,
const std::vector<std::string>& materialized_fields,
MemoryPool* pool) {
static Result<RecordBatchIterator> Make(
const FileSource& source, std::vector<std::string> materialized_fields,
MemoryPool* pool) {
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source));
auto materialized_schema =
SchemaFromColumnNames(reader->schema(), materialized_fields);
return Impl{std::move(reader),
RecordBatchProjector(std::move(materialized_schema)), pool, 0};

auto options = default_read_options();
ARROW_ASSIGN_OR_RAISE(options.included_fields,
GetIncludedFields(*reader->schema(), materialized_fields));

ARROW_ASSIGN_OR_RAISE(reader, OpenReader(source, options));
return RecordBatchIterator(Impl{std::move(reader), pool, 0});
}

Result<std::shared_ptr<RecordBatch>> Next() {
if (i_ == reader_->num_record_batches()) {
return nullptr;
}

ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> batch,
reader_->ReadRecordBatch(i_++));
return projector_.Project(*batch, pool_);
return reader_->ReadRecordBatch(i_++);
}

std::shared_ptr<ipc::RecordBatchFileReader> reader_;
RecordBatchProjector projector_;
MemoryPool* pool_;
int i_;
};

// get names of fields explicitly projected or referenced by filter
auto fields = options_->MaterializedFields();
std::sort(fields.begin(), fields.end());
auto unique_end = std::unique(fields.begin(), fields.end());
fields.erase(unique_end, fields.end());

ARROW_ASSIGN_OR_RAISE(auto batch_it, Impl::Make(source_, fields, context_->pool));

return RecordBatchIterator(std::move(batch_it));
return Impl::Make(source_, options_->MaterializedFields(), context_->pool);
}

private:
Expand Down Expand Up @@ -134,8 +143,8 @@ class IpcScanTaskIterator {
};

Result<bool> IpcFileFormat::IsSupported(const FileSource& source) const {
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
return OpenReader(source, input).ok();
RETURN_NOT_OK(source.Open().status());
return OpenReader(source).ok();
}

Result<std::shared_ptr<Schema>> IpcFileFormat::Inspect(const FileSource& source) const {
Expand Down
40 changes: 37 additions & 3 deletions cpp/src/arrow/ipc/CMakeLists.txt
Expand Up @@ -18,10 +18,44 @@
#
# Messaging and interprocess communication

add_custom_target(arrow_ipc)

function(ADD_ARROW_IPC_TEST REL_TEST_NAME)
set(options)
set(one_value_args PREFIX)
set(multi_value_args LABELS)
cmake_parse_arguments(ARG
"${options}"
"${one_value_args}"
"${multi_value_args}"
${ARGN})

if(ARG_PREFIX)
set(PREFIX ${ARG_PREFIX})
else()
set(PREFIX "arrow-ipc")
endif()

if(ARG_LABELS)
set(LABELS ${ARG_LABELS})
else()
set(LABELS "arrow_ipc")
endif()

add_arrow_test(${REL_TEST_NAME}
EXTRA_LINK_LIBS
${ARROW_DATASET_TEST_LINK_LIBS}
PREFIX
${PREFIX}
LABELS
${LABELS}
${ARG_UNPARSED_ARGUMENTS})
endfunction()

add_arrow_test(feather_test)
add_arrow_test(read_write_test PREFIX "arrow-ipc")
add_arrow_test(json_simple_test PREFIX "arrow-ipc")
add_arrow_test(json_test PREFIX "arrow-ipc")
add_arrow_ipc_test(read_write_test)
add_arrow_ipc_test(json_simple_test)
add_arrow_ipc_test(json_test)

# json_integration_test is two things at the same time:
# - an executable that can be called to answer integration test requests
Expand Down
16 changes: 2 additions & 14 deletions cpp/src/arrow/ipc/feather.cc
Expand Up @@ -716,24 +716,12 @@ class ReaderV2 : public Reader {

Status Read(const IpcReadOptions& options, std::shared_ptr<Table>* out) {
ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchFileReader::Open(source_, options));
std::vector<std::shared_ptr<RecordBatch>> batches(reader->num_record_batches());
RecordBatchVector batches(reader->num_record_batches());
for (int i = 0; i < reader->num_record_batches(); ++i) {
ARROW_ASSIGN_OR_RAISE(batches[i], reader->ReadRecordBatch(i));
}

// XXX: Handle included_fields in RecordBatchFileReader::schema
auto out_schema = reader->schema();
if (options.included_fields) {
const auto& indices = *options.included_fields;
std::vector<std::shared_ptr<Field>> fields;
for (int i = 0; i < out_schema->num_fields(); ++i) {
if (std::find(indices.begin(), indices.end(), i) != indices.end()) {
fields.push_back(out_schema->field(i));
}
}
out_schema = ::arrow::schema(fields, out_schema->metadata());
}
return Table::FromRecordBatches(std::move(out_schema), std::move(batches)).Value(out);
return Table::FromRecordBatches(reader->schema(), batches).Value(out);
}

Status Read(std::shared_ptr<Table>* out) override {
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/arrow/ipc/options.h
Expand Up @@ -22,7 +22,6 @@

#include "arrow/type_fwd.h"
#include "arrow/util/compression.h"
#include "arrow/util/optional.h"
#include "arrow/util/visibility.h"

namespace arrow {
Expand Down Expand Up @@ -81,8 +80,8 @@ struct ARROW_EXPORT IpcReadOptions {
MemoryPool* memory_pool = default_memory_pool();

/// \brief EXPERIMENTAL: Top-level schema fields to include when
/// deserializing RecordBatch. If null, return all deserialized fields
util::optional<std::vector<int>> included_fields;
/// deserializing RecordBatch. If empty, return all deserialized fields
std::vector<int> included_fields;

/// \brief Use global CPU thread pool to parallelize any computational tasks
/// like decompression
Expand Down

0 comments on commit 3725aaa

Please sign in to comment.