-
Notifications
You must be signed in to change notification settings - Fork 70
feat: add aggregate expressions and evaluator #335
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
7a646ab to
0281987
Compare
wgtmac
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution! I just scanned through the design and it seems that the current design is not consistent with both C++ and Java impls. What is in my mind is like something below:
template <typename T>
concept TermType = std::derived_from<T, Term>;
template <TermType T>
class ICEBERG_EXPORT Aggregate : public virtual Expression {
public:
Expression::Operation op() const override { return operation_; }
const std::shared_ptr<T>& term() const { return term_; }
protected:
Expression::Operation operation_;
std::shared_ptr<T> term_;
};
class ICEBERG_EXPORT UnboundAggregate : public virtual Expression,
public Unbound<Expression> {
public:
Result<std::shared_ptr<Expression>> Bind(const Schema& schema,
bool case_sensitive) const override = 0;
bool is_unbound_aggregate() const override { return true; }
};
template <typename B>
class ICEBERG_EXPORT UnboundAggregateImpl : public UnboundAggregate,
public Aggregate<UnboundTerm<B>> {
using BASE = Aggregate<UnboundTerm<B>>;
public:
std::shared_ptr<NamedReference> reference() override {
return BASE::term() ? BASE::term()->reference() : nullptr;
}
Result<std::shared_ptr<Expression>> Bind(const Schema& schema,
bool case_sensitive) const override;
};
class ICEBERG_EXPORT BoundAggregate : public Aggregate<BoundTerm>, public Bound {
public:
using Aggregate<BoundTerm>::op;
using Aggregate<BoundTerm>::term;
std::shared_ptr<BoundReference> reference() override {
return term_ ? term_->reference() : nullptr;
}
Result<Literal> Evaluate(const StructLike& data) const override;
bool is_bound_aggregate() const override { return true; }
enum class Kind : int8_t {
// Count aggregates (COUNT, COUNT_STAR, COUNT_NULL)
kCount = 0,
// Value aggregates (MIN, MAX)
kValue,
};
virtual Kind kind() const = 0;
};
class ICEBERG_EXPORT CountAggregate : public BoundAggregate {
public:
Result<Literal> Evaluate(const StructLike& data) const override;
Kind kind() const override { return Kind::kCount; }
};
class ICEBERG_EXPORT ValueAggregate : public BoundAggregate {
public:
Result<Literal> Evaluate(const StructLike& data) const override;
Kind kind() const override { return Kind::kValue; }
};
src/iceberg/expression/aggregate.h
Outdated
| }; | ||
|
|
||
| /// \brief COUNT aggregate variants. | ||
| class ICEBERG_EXPORT CountAggregate : public UnboundAggregate { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to define subclass for unbound aggregates. Please check the current design of Predicate in both C++ and Java implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha! In progress.
Co-authored-by: Gang Wu <ustcwg@gmail.com>
|
I've updated the design to align with the current Predicate pattern and the Java implementation:
|
Jinchul81
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- If you refer to the implementation of Iceberg Java version, please add some context.
- In the header files, please add a comment for each function and member variables.
- It looks like unit test coverage for your change is insufficient. I am not sure you're working to add more unit tests. Please add more test cases.
src/iceberg/expression/aggregate.cc
Outdated
|
|
||
| namespace { | ||
|
|
||
| std::string OperationToPrefix(Expression::Operation op) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we use std::string_view instead of std::string if the return values should be constant string literals?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we use
std::string_viewinstead ofstd::stringif the return values should be constant string literals?
Good point — these return values are all string literals with static lifetime, so std::string_view is more appropriate here. I'll update the signature accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make the return type constexpr std::string_view.
|
|
||
| // Aggregates | ||
|
|
||
| std::shared_ptr<CountAggregate> Expressions::Count(std::string name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can change the parameter to std::string&& to avoid an unnecessary copy and clearly express that the function intends to take ownership of the string. Using an rvalue reference makes the API’s intent clearer and can reduce overhead in performance-critical paths.
| return std::shared_ptr<CountAggregate>(std::move(agg)); | ||
| } | ||
|
|
||
| std::shared_ptr<CountAggregate> Expressions::CountNull(std::string name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same above.
| return std::shared_ptr<CountAggregate>(std::move(agg)); | ||
| } | ||
|
|
||
| std::shared_ptr<CountAggregate> Expressions::CountNotNull(std::string name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same above.
| return std::shared_ptr<CountAggregate>(std::move(agg)); | ||
| } | ||
|
|
||
| std::shared_ptr<ValueAggregate> Expressions::Max(std::string name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same above.
src/iceberg/expression/aggregate.cc
Outdated
| default: | ||
| break; | ||
| } | ||
| return "aggregate"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is the right return value. Why do we have to map the others to aggregate?
src/iceberg/expression/aggregate.cc
Outdated
| Expression::Operation::kCount, Mode::kNull, std::move(term), std::move(ref))); | ||
| } | ||
|
|
||
| std::unique_ptr<CountAggregate> CountAggregate::CountStar() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it return std::unique_ptr<CountAggregate> instead of Result<std::unique_ptr<CountAggregate>>?
src/iceberg/expression/aggregate.cc
Outdated
| : BoundAggregate(op, std::move(term)), mode_(mode) {} | ||
|
|
||
| std::string BoundCountAggregate::ToString() const { | ||
| if (mode_ == CountAggregate::Mode::kStar) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you special handling for kStar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you special handling for
kStar?
Good catch — mapping all remaining operations to "aggregate" is not really correct.
In the Java implementation, Aggregate.toString() handles each aggregate operation explicitly and the default branch throws an UnsupportedOperationException("Invalid aggregate: " + op()), so there is no generic "aggregate" fallback.
I’ll update the C++ helper to mirror that behavior: only the supported aggregate operations will be handled explicitly, and for anything else we’ll treat it as invalid/unreachable instead of returning "aggregate". This way we don’t silently hide unsupported operations and stay consistent with the Java implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's set aside the idea of copying the Java implementation. Can you please clarify why we cannot make it as the generic approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's set aside the idea of copying the Java implementation. Can you please clarify why we cannot make it as the generic approach?
Got it, thanks for clarifying.
I understand your point. The goal here is not to blindly mirror the Java implementation, but to design a more generic and idiomatic C++ solution.
I'll take some time to revisit the current design and see how kStar can be modeled in a more generic way (e.g. reducing special-casing in ToString() and improving consistency with the Predicate style), while still preserving the same external semantics.
I’ll follow up with an updated implementation once I’ve reworked this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please file an issue for the follow up item and add a comment including the reason why you've applied special handling and TODO(#XYZ).
src/iceberg/expression/aggregate.h
Outdated
| bool case_sensitive) const override; | ||
|
|
||
| private: | ||
| CountAggregate(Expression::Operation op, Mode mode, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment why public ctor is not allowed.
src/iceberg/expression/aggregate.cc
Outdated
| Result<std::shared_ptr<Expression>> CountAggregate::Bind(const Schema& schema, | ||
| bool case_sensitive) const { | ||
| std::shared_ptr<BoundTerm> bound_term; | ||
| if (term_ != nullptr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please write the code like if (term_ != nullptr) [[unlikely]] { to help branch prediction.
src/iceberg/expression/aggregate.cc
Outdated
| ICEBERG_DCHECK(aggregate != nullptr, "Aggregate cannot be null"); | ||
|
|
||
| if (auto count = std::dynamic_pointer_cast<BoundCountAggregate>(aggregate)) { | ||
| if (count->mode() != CountAggregate::Mode::kStar && !count->term()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please write the code like if (count->mode() != CountAggregate::Mode::kStar && !count->term()) [[unlikely]] { to help branch prediction.
| explicit VectorStructLike(std::vector<Scalar> fields) : fields_(std::move(fields)) {} | ||
|
|
||
| Result<Scalar> GetField(size_t pos) const override { | ||
| if (pos >= fields_.size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please write the code like if (pos >= fields_.size()) [[unlikely]] { to help branch prediction.
|
@wgtmac @Jinchul81 Thanks for the review, I'll take some time to address these changes and follow up shortly. |
Co-authored-by: Gang Wu <ustcwg@gmail.com>
Co-authored-by: Gang Wu <ustcwg@gmail.com>
This PR refactors the C++ aggregate implementation to better align with the Java Iceberg design and existing Predicate patterns. The changes introduce a dedicated Aggregator hierarchy, simplify COUNT handling by splitting it into distinct classes, and improve evaluator extensibility for future input types (e.g. DataFile).
This change refactors the aggregate framework to better match the Java implementation and improves API clarity, performance, and test coverage.
4db99fa to
94853ee
Compare
wgtmac
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this, @SuKi2cn! This looks good to me.
This reverts commit dbcbdf2.
This PR addresses issue #330 by introducing aggregate expressions & execution support.
Add aggregate expression family (count / count_null / count_star / max / min)
with bound/unbound types, visitor and binder support.
Add
AggregateEvaluatorfor count/max/min execution overStructLikerows.Expose aggregate factories in
Expressionsand wire into CMake/Meson buildswith new aggregate tests.