Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 63 additions & 19 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,51 +124,64 @@ static Result<SchemaManifest> GetSchemaManifest(
return manifest;
}

static std::shared_ptr<Expression> ColumnChunkStatisticsAsExpression(
static std::shared_ptr<StructScalar> MakeMinMaxScalar(std::shared_ptr<Scalar> min,
std::shared_ptr<Scalar> max) {
DCHECK(min->type->Equals(max->type));
return std::make_shared<StructScalar>(ScalarVector{min, max},
struct_({
field("min", min->type),
field("max", max->type),
}));
}

static std::shared_ptr<StructScalar> ColumnChunkStatisticsAsStructScalar(
const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) {
// For the remaining of this function, failure to extract/parse statistics
// are ignored by returning the `true` scalar. The goal is two fold. First
// avoid that an optimization break the computation. Second, allow the
// are ignored by returning nullptr. The goal is two fold. First
// avoid an optimization which breaks the computation. Second, allow the
// following columns to maybe succeed in extracting column statistics.

// For now, only leaf (primitive) types are supported.
if (!schema_field.is_leaf()) {
return scalar(true);
return nullptr;
}

auto column_metadata = metadata.ColumnChunk(schema_field.column_index);
auto statistics = column_metadata->statistics();
if (statistics == nullptr) {
return scalar(true);
return nullptr;
}

const auto& field = schema_field.field;
auto field_expr = field_ref(field->name());

// Optimize for corner case where all values are nulls
if (statistics->num_values() == statistics->null_count()) {
return equal(field_expr, scalar(MakeNullScalar(field->type())));
auto null = MakeNullScalar(field->type());
return MakeMinMaxScalar(null, null);
}

std::shared_ptr<Scalar> min, max;
if (!StatisticsAsScalars(*statistics, &min, &max).ok()) {
return scalar(true);
return nullptr;
}

return and_(greater_equal(field_expr, scalar(min)),
less_equal(field_expr, scalar(max)));
return MakeMinMaxScalar(std::move(min), std::move(max));
}

static std::shared_ptr<Expression> RowGroupStatisticsAsExpression(
static std::shared_ptr<StructScalar> RowGroupStatisticsAsStructScalar(
const parquet::RowGroupMetaData& metadata, const SchemaManifest& manifest) {
const auto& fields = manifest.schema_fields;
ExpressionVector expressions;
expressions.reserve(fields.size());
for (const auto& field : fields) {
expressions.emplace_back(ColumnChunkStatisticsAsExpression(field, metadata));
FieldVector fields;
ScalarVector statistics;
for (const auto& schema_field : manifest.schema_fields) {
if (auto min_max = ColumnChunkStatisticsAsStructScalar(schema_field, metadata)) {
fields.push_back(field(schema_field.field->name(), min_max->type));
statistics.push_back(std::move(min_max));
}
}

return expressions.empty() ? scalar(true) : and_(expressions);
return std::make_shared<StructScalar>(std::move(statistics),
struct_(std::move(fields)));
}

class ParquetScanTaskIterator {
Expand Down Expand Up @@ -349,7 +362,7 @@ static inline Result<std::vector<RowGroupInfo>> AugmentRowGroups(
if (!info.HasStatistics() && info.id() < num_row_groups) {
auto row_group = metadata->RowGroup(info.id());
info.set_num_rows(row_group->num_rows());
info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
info.set_statistics(RowGroupStatisticsAsStructScalar(*row_group, manifest));
}
};
std::for_each(row_groups.begin(), row_groups.end(), augment);
Expand Down Expand Up @@ -444,8 +457,39 @@ std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
return result;
}

void RowGroupInfo::SetStatisticsExpression() {
if (!HasStatistics()) {
statistics_expression_ = nullptr;
return;
}

if (statistics_->value.empty()) {
statistics_expression_ = scalar(true);
return;
}

ExpressionVector expressions{statistics_->value.size()};

for (size_t i = 0; i < expressions.size(); ++i) {
const auto& col_stats =
internal::checked_cast<const StructScalar&>(*statistics_->value[i]);
auto field_expr = field_ref(statistics_->type->field(static_cast<int>(i))->name());

DCHECK_EQ(col_stats.value.size(), 2);
const auto& min = col_stats.value[0];
const auto& max = col_stats.value[1];

DCHECK_EQ(min->is_valid, max->is_valid);
expressions[i] = min->is_valid ? and_(greater_equal(field_expr, scalar(min)),
less_equal(field_expr, scalar(max)))
: equal(std::move(field_expr), scalar(min));
}

statistics_expression_ = and_(std::move(expressions));
}

bool RowGroupInfo::Satisfy(const Expression& predicate) const {
return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
return !HasStatistics() || predicate.IsSatisfiableWith(statistics_expression_);
}

///
Expand Down Expand Up @@ -622,7 +666,7 @@ ParquetDatasetFactory::CollectParquetFragments(
auto row_group = metadata.RowGroup(i);
ARROW_ASSIGN_OR_RAISE(auto path,
FileFromRowGroup(filesystem_.get(), base_path_, *row_group));
auto stats = RowGroupStatisticsAsExpression(*row_group, manifest);
auto stats = RowGroupStatisticsAsStructScalar(*row_group, manifest);
auto num_rows = row_group->num_rows();

// Insert the path, or increase the count of row groups. It will be
Expand Down
20 changes: 14 additions & 6 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInf
explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}

/// \brief Construct a RowGroup from an identifier with statistics.
RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> statistics)
: id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<StructScalar> statistics)
: id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {
SetStatisticsExpression();
}

/// \brief Transform a vector of identifiers into a vector of RowGroupInfos
static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
Expand All @@ -150,10 +152,13 @@ class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInf
int64_t num_rows() const { return num_rows_; }
void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }

/// \brief Return the RowGroup's statistics
const std::shared_ptr<Expression>& statistics() const { return statistics_; }
void set_statistics(std::shared_ptr<Expression> statistics) {
/// \brief Return the RowGroup's statistics as a StructScalar with a field for
/// each column with statistics.
/// Each field will also be a StructScalar with "min" and "max" fields.
const std::shared_ptr<StructScalar>& statistics() const { return statistics_; }
void set_statistics(std::shared_ptr<StructScalar> statistics) {
statistics_ = std::move(statistics);
SetStatisticsExpression();
}

/// \brief Indicate if statistics are set.
Expand All @@ -169,9 +174,12 @@ class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInf
bool Equals(const RowGroupInfo& other) const { return id() == other.id(); }

private:
void SetStatisticsExpression();

int id_;
int64_t num_rows_;
std::shared_ptr<Expression> statistics_;
std::shared_ptr<Expression> statistics_expression_;
std::shared_ptr<StructScalar> statistics_;
};

/// \brief A FileFragment with parquet logic.
Expand Down
22 changes: 22 additions & 0 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,28 @@ cdef class RowGroupInfo:
def num_rows(self):
return self.info.num_rows()

@property
def statistics(self):
if not self.info.HasStatistics():
return None

cdef:
CStructScalar* c_statistics
CStructScalar* c_minmax

statistics = dict()
c_statistics = self.info.statistics().get()
for i in range(c_statistics.value.size()):
name = frombytes(c_statistics.type.get().field(i).get().name())
c_minmax = <CStructScalar*> c_statistics.value[i].get()

statistics[name] = {
'min': pyarrow_wrap_scalar(c_minmax.value[0]).as_py(),
'max': pyarrow_wrap_scalar(c_minmax.value[1]).as_py(),
}

return statistics

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to expose the statistics_expression as well? (not fully sure if it would have a use case, so maybe we should only do that if we have one)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's desired it can wait for a follow up

def __eq__(self, other):
if not isinstance(other, RowGroupInfo):
return False
Expand Down
3 changes: 3 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
cdef cppclass CStringScalar" arrow::StringScalar"(CScalar):
shared_ptr[CBuffer] value

cdef cppclass CStructScalar" arrow::StructScalar"(CScalar):
vector[shared_ptr[CScalar]] value

shared_ptr[CScalar] MakeScalar[Value](Value value)
shared_ptr[CScalar] MakeStringScalar" arrow::MakeScalar"(c_string value)

Expand Down
4 changes: 2 additions & 2 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
cdef cppclass CRowGroupInfo "arrow::dataset::RowGroupInfo":
CRowGroupInfo()
CRowGroupInfo(int id)
CRowGroupInfo(
int id, int64_t n_rows, shared_ptr[CExpression] statistics)
int id() const
int64_t num_rows() const
bint Equals(const CRowGroupInfo& other)
c_bool HasStatistics() const
shared_ptr[CStructScalar] statistics() const

cdef cppclass CParquetFileFragment "arrow::dataset::ParquetFileFragment"(
CFileFragment):
Expand Down
6 changes: 6 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,12 @@ def test_fragments_parquet_row_groups(tempdir):
assert len(result) == 2
assert result.equals(table.slice(0, 2))

assert row_group_fragments[0].row_groups is not None
assert row_group_fragments[0].row_groups[0].statistics == {
'f1': {'min': 0, 'max': 1},
'f2': {'min': 1, 'max': 1},
}

fragment = list(dataset.get_fragments(filter=ds.field('f1') < 1))[0]
row_group_fragments = list(fragment.split_by_row_group(ds.field('f1') < 1))
assert len(row_group_fragments) == 1
Expand Down