Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 37 additions & 32 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "arrow/dataset/file_parquet.h"

#include <memory>
#include <unordered_set>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -121,6 +122,14 @@ class RowGroupSkipper {
int64_t rows_skipped_;
};

template <typename M>
static Result<SchemaManifest> GetSchemaManifest(const M& metadata) {
SchemaManifest manifest;
RETURN_NOT_OK(SchemaManifest::Make(
metadata.schema(), nullptr, parquet::default_arrow_reader_properties(), &manifest));
return manifest;
}

class ParquetScanTaskIterator {
public:
static Result<ScanTaskIterator> Make(
Expand Down Expand Up @@ -155,46 +164,44 @@ class ParquetScanTaskIterator {
// Compute the column projection out of an optional arrow::Schema
static std::vector<int> InferColumnProjection(const parquet::FileMetaData& metadata,
const ScanOptionsPtr& options) {
SchemaManifest manifest;
if (!SchemaManifest::Make(metadata.schema(), nullptr,
parquet::default_arrow_reader_properties(), &manifest)
.ok()) {
auto maybe_manifest = GetSchemaManifest(metadata);
if (!maybe_manifest.ok()) {
return internal::Iota(metadata.num_columns());
}

// get column indices
auto filter_fields = FieldsInExpression(options->filter);

std::vector<int> column_projection;

auto manifest = std::move(maybe_manifest).ValueOrDie();

// Checks if the field is needed in either the projection or the filter.
auto fields_name = options->MaterializedFields();
std::unordered_set<std::string> materialized_fields{fields_name.cbegin(),
fields_name.cend()};
auto should_materialize_column = [&materialized_fields](const std::string& f) {
return materialized_fields.find(f) != materialized_fields.end();
};

std::vector<int> columns_selection;
// Note that the loop is using the file's schema to iterate instead of the
// materialized fields of the ScanOptions. This ensures that missing
// fields in the file (but present in the ScanOptions) will be ignored. The
// scanner's projector will take care of padding the column with the proper
// values.
for (const auto& schema_field : manifest.schema_fields) {
auto field_name = schema_field.field->name();

if (options->projector.schema()->GetFieldIndex(field_name) != -1) {
// add explicitly projected field
AddColumnIndices(schema_field, &column_projection);
continue;
}

if (std::find(filter_fields.begin(), filter_fields.end(), field_name) !=
filter_fields.end()) {
// add field referenced by filter
AddColumnIndices(schema_field, &column_projection);
if (should_materialize_column(schema_field.field->name())) {
AddColumnIndices(schema_field, &columns_selection);
}
}

return column_projection;
return columns_selection;
}

static void AddColumnIndices(const SchemaField& schema_field,
std::vector<int>* column_projection) {
if (schema_field.column_index != -1) {
if (schema_field.is_leaf()) {
column_projection->push_back(schema_field.column_index);
return;
}

for (const auto& child : schema_field.children) {
AddColumnIndices(child, column_projection);
} else {
// The following ensure that complex types, e.g. struct, are materialized.
for (const auto& child : schema_field.children) {
AddColumnIndices(child, column_projection);
}
}
}

Expand Down Expand Up @@ -310,9 +317,7 @@ static ExpressionPtr ColumnChunkStatisticsAsExpression(

Result<ExpressionPtr> RowGroupStatisticsAsExpression(
const parquet::RowGroupMetaData& metadata) {
SchemaManifest manifest;
RETURN_NOT_OK(SchemaManifest::Make(
metadata.schema(), nullptr, parquet::default_arrow_reader_properties(), &manifest));
ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata));

ExpressionVector expressions;
for (const auto& schema_field : manifest.schema_fields) {
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ ScanOptionsPtr ScanOptions::ReplaceSchema(std::shared_ptr<Schema> schema) const
return copy;
}

std::vector<std::string> ScanOptions::MaterializedFields() const {
std::vector<std::string> fields;

for (const auto& f : schema()->fields()) {
fields.push_back(f->name());
}

for (auto&& name : FieldsInExpression(filter)) {
fields.push_back(std::move(name));
}

return fields;
}

Result<RecordBatchIterator> SimpleScanTask::Execute() {
return MakeVectorIterator(record_batches_);
}
Expand Down
17 changes: 17 additions & 0 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <memory>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -73,6 +74,22 @@ class ARROW_DS_EXPORT ScanOptions {
// Projector for reconciling the final RecordBatch to the requested schema.
RecordBatchProjector projector;

// Return a vector of fields that requires materialization.
//
// This is usually the union of the fields referenced in the projection and the
// filter expression. Examples:
//
// - `SELECT a, b WHERE a < 2 && c > 1` => ["a", "b", "a", "c"]
// - `SELECT a + b < 3 WHERE a > 1` => ["a", "b"]
//
// This is needed for expression where a field may not be directly
// used in the final projection but is still required to evaluate the
// expression.
//
// This is used by DataFragments implementation to apply the column
// sub-selection optimization.
std::vector<std::string> MaterializedFields() const;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non blocking: Maybe eventually this should be an accessor for a (maybe lazily initialized) shared_ptr<unordered_set<string>>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's not compatible with being a non-member though

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what this method does. Can you add a docstring?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what's the point of having this return an unordered_set? Do you plan to implement unions or intersections of those containers?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it's used as a lookup structure. The ordering and the number of repeats is not important (and should not be exposed). I can still convert it to std::vector.

private:
explicit ScanOptions(std::shared_ptr<Schema> schema);
};
Expand Down
26 changes: 26 additions & 0 deletions cpp/src/arrow/dataset/scanner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,5 +187,31 @@ TEST_F(TestScannerBuilder, TestFilter) {
builder.Filter("i64"_ == int64_t(10) || "not_a_column"_ == true));
}

using testing::ElementsAre;
using testing::IsEmpty;

TEST(ScanOptions, TestMaterializedFields) {
auto i32 = field("i32", int32());
auto i64 = field("i64", int64());

auto opts = ScanOptions::Make(schema({}));
EXPECT_THAT(opts->MaterializedFields(), IsEmpty());

opts->filter = ("i32"_ == 10).Copy();
EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32"));

opts = ScanOptions::Make(schema({i32, i64}));
EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32", "i64"));

opts = opts->ReplaceSchema(schema({i32}));
EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32"));

opts->filter = ("i32"_ == 10).Copy();
EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32", "i32"));

opts->filter = ("i64"_ == 10).Copy();
EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32", "i64"));
}

} // namespace dataset
} // namespace arrow
10 changes: 5 additions & 5 deletions cpp/src/arrow/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,10 @@ class Result : public util::EqualityComparable<Result<T>> {
arrow::util::variant<T, Status, const char*> variant_;
};

#define ARROW_ASSIGN_OR_RAISE_IMPL(status_name, lhs, rexpr) \
auto status_name = (rexpr); \
ARROW_RETURN_NOT_OK(status_name.status()); \
lhs = std::move(status_name).ValueOrDie();
#define ARROW_ASSIGN_OR_RAISE_IMPL(result_name, lhs, rexpr) \
auto result_name = (rexpr); \
ARROW_RETURN_NOT_OK((result_name).status()); \
lhs = std::move(result_name).ValueOrDie();

#define ARROW_ASSIGN_OR_RAISE_NAME(x, y) ARROW_CONCAT(x, y)

Expand All @@ -405,7 +405,7 @@ class Result : public util::EqualityComparable<Result<T>> {
// ValueType value;
// ARROW_ASSIGN_OR_RAISE(value, MaybeGetValue(arg));
//
// WARNING: ASSIGN_OR_RAISE expands into multiple statements; it cannot be used
// WARNING: ARROW_ASSIGN_OR_RAISE expands into multiple statements; it cannot be used
// in a single statement (e.g. as the body of an if statement without {})!
#define ARROW_ASSIGN_OR_RAISE(lhs, rexpr) \
ARROW_ASSIGN_OR_RAISE_IMPL(ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), \
Expand Down