Skip to content

[SPARK-46487][SQL] Push down part of filter through aggregate with nondeterministic field#44460

Closed
zml1206 wants to merge 6 commits intoapache:masterfrom
zml1206:limit_pushdown
Closed

[SPARK-46487][SQL] Push down part of filter through aggregate with nondeterministic field#44460
zml1206 wants to merge 6 commits intoapache:masterfrom
zml1206:limit_pushdown

Conversation

@zml1206
Copy link
Contributor

@zml1206 zml1206 commented Dec 22, 2023

What changes were proposed in this pull request?

Push down part of filter which is deterministic and references are subset of aggregate's child through aggregate with nondeterministic field.
For example

testRelation
      .groupBy($"a")($"a", Rand(10).as("rand"))
      .where($"a" > 5 && $"rand" > 5)

We can push down $"a" > 5 and do not push down $"rand" > 5. Because $"rand" > 5 is non-deterministic, push down it will change the evaluation result in aggregate.

Why are the changes needed?

Filter earlier to improve performance.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

UT.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Dec 22, 2023
@zml1206 zml1206 closed this Dec 25, 2023
@zml1206 zml1206 deleted the limit_pushdown branch December 25, 2023 00:39
@zml1206 zml1206 restored the limit_pushdown branch December 25, 2023 00:39
@zml1206 zml1206 reopened this Dec 25, 2023
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))

case filter @ Filter(condition, aggregate: Aggregate)
if aggregate.aggregateExpressions.forall(_.deterministic)
Copy link
Contributor

@beliefer beliefer Dec 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you investigate the issue https://issues.apache.org/jira/browse/SPARK-13473 ?
It seems we can't release the restrictions.

Copy link
Contributor Author

@zml1206 zml1206 Dec 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I investigated before,
Similar ones are https://issues.apache.org/jira/browse/SPARK-20246.
At that time, it was because it was impossible to determine whether the filter expression was deterministic for example $"rand" > 5, therefore, it will incorrectly push down $"rand" > 5.
However, after the filter expression is replaced, it can be judged whether it is a deterministic expression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, how about keep
if aggregate.aggregateExpressions.exists(_.deterministic)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

Copy link
Contributor

@beliefer beliefer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM cc @cloud-fan

// implies that, for a given input row, the output are determined by the expression's initial
// state and all the input rows processed before. In another word, the order of input rows
// matters for non-deterministic expressions, while pushing down predicates changes the order.
// This also applies to Aggregate.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the rationale for not applying this to Aggregate?

Copy link
Contributor Author

@zml1206 zml1206 Jan 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When "Aggregate" contains both non-deterministic and deterministic expression fields, it can push down some deterministic filters after replaced. Because if the condition after replaced is deterministic, it certainly has no association with the non-deterministic expression field.

Copy link
Contributor Author

@zml1206 zml1206 Jan 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Aggregate, only filter which reference to deterministic groupExpression is pushed down, so it will only filter the group. I can't think of what kind of Non-deterministic expressions will be affected by this push down.

// Push `Filter` operators through `Aggregate` operators. Parts of the predicates that can
// be beneath must satisfy the following conditions:
// 1. Grouping expressions are not empty.
// 2. Predicate expression is deterministic.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean de-aliased predicate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will change the description to make it clearer.

val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
val replaced = replaceAlias(cond, aliasMap)
cond.deterministic && !cond.throwable &&
replaced.deterministic && !cond.throwable &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also use replaced.throwable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think can push down throwable filter through aggregate, it seems does not affect exception thrown
. What do you think? @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will make a PR later.

@cloud-fan
Copy link
Contributor

I checked the aggregate execution implementation. Basically, it has an embedded Project operator and applies the result projection for aggregated values. That said, I think it has the same property as Project and we shouldn't relax the requirement.

On the other hand, the aggregate operator itself is already non-deterministic, as the output row order is different between hash and sort aggregate. But we shouldn't make it worse.

@zml1206
Copy link
Contributor Author

zml1206 commented Jan 31, 2024

I checked the aggregate execution implementation. Basically, it has an embedded Project operator and applies the result projection for aggregated values. That said, I think it has the same property as Project and we shouldn't relax the requirement.

On the other hand, the aggregate operator itself is already non-deterministic, as the output row order is different between hash and sort aggregate. But we shouldn't make it worse.

I understand, thank you, for example

Seq((1, 1), (1, 2), (2, 1), (3, 1)).toDF("a", "b").createOrReplaceTempView("t1")
sql("select a,monotonically_increasing_id() as c from t1 group by a").filter("a = 2").show()

Push down will make the results wrong, correct: (2, 2), error(2, 0).

@zml1206 zml1206 closed this Jan 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants