Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31620][SQL] Fix reference binding failure in case of an final agg contains subquery #28496

Closed
wants to merge 11 commits into from

Conversation

Ngone51
Copy link
Member

@Ngone51 Ngone51 commented May 11, 2020

What changes were proposed in this pull request?

Instead of using child.output directly, we should use inputAggBufferAttributes from the current agg expression for Final and PartialMerge aggregates to bind references for their mergeExpression.

Why are the changes needed?

When planning aggregates, the partial aggregate uses agg fucs' inputAggBufferAttributes as its output, see https://github.com/apache/spark/blob/v3.0.0-rc1/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala#L105

For final HashAggregateExec, we need to bind the DeclarativeAggregate.mergeExpressions with the output of the partial aggregate operator, see https://github.com/apache/spark/blob/v3.0.0-rc1/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L348

This is usually fine. However, if we copy the agg func somehow after agg planning, like PlanSubqueries, the DeclarativeAggregate will be replaced by a new instance with new inputAggBufferAttributes and mergeExpressions. Then we can't bind the mergeExpressions with the output of the partial aggregate operator, as it uses the inputAggBufferAttributes of the original DeclarativeAggregate before copy.

Note that, ImperativeAggregate doesn't have this problem, as we don't need to bind its mergeExpressions. It has a different mechanism to access buffer values, via mutableAggBufferOffset and inputAggBufferOffset.

Does this PR introduce any user-facing change?

Yes, user hit error previously but run query successfully after this change.

How was this patch tested?

Added a regression test.

@AngersZhuuuu
Copy link
Contributor

AngersZhuuuu commented May 11, 2020

Is there any material about makeCopy, tag etc... To better know the TreeNode.

@Ngone51
Copy link
Member Author

Ngone51 commented May 11, 2020

Is there any material about makeCopy, tag etc... To better know the TreeNode.

Just check the source code?

fyi, tags could be copied along with makeCopy @AngersZhuuuu

@Ngone51
Copy link
Member Author

Ngone51 commented May 11, 2020

cc @AngersZhuuuu @cloud-fan @wangyum

@AngersZhuuuu
Copy link
Contributor

fyi, tags could be copied along with makeCopy @AngersZhuuuu

thanks a lot, it's not clear to me that which one will be retained and which one will be reset in makeCopy, I will check the SourceCode detail.

And for

private lazy val sum = AttributeReference("sum", sumDataType)()

In Sum, it will be change in PlanSubqueries too, and result in different ExprId,
Do we need to handle this ?

@Ngone51
Copy link
Member Author

Ngone51 commented May 11, 2020

Do we need to handle this ?

I think no, because those attributes are not referenced by others.

@SparkQA
Copy link

SparkQA commented May 11, 2020

Test build #122492 has finished for PR 28496 at commit bc1b7e5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 11, 2020

Test build #122506 has finished for PR 28496 at commit cbd891d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rednaxelafx
Copy link
Contributor

I'm wondering if there could be a solution that doesn't involve using tree tags, but perhaps use a secondary constructor like the way AggregateExpression passes resultIds around, and make concrete derived types of DeclarativeAggregate use the explicitly set Id if defined.
So e.g. for Sum, instead of

private lazy val sum = AttributeReference("sum", sumDataType)(/* implicitly make new exprId every time)

maybe have some supporting utils in DeclarativeAggregate like:

+  val explicitAggBufferAttributeExprIds: Option[Seq[ExprId]] = None
+
+  // TODO... maybe put these in a mixin instead
+  override def otherCopyArgs(): Seq[AnyRef] = explicitAggBufferAttributeExprIds :: Nil
+
+  protected def makeAggBufferAttribute(
+      ordinal: Int,
+      name: String,
+      dataType: DataType,
+      nullable: Boolean = true): AttributeReference = {
+    explicitAggBufferAttributeExprIds match {
+      case Some(exprIds) => AttributeReference(name, dataType, nullable)(exprId = exprIds(ordinal))
+      case _ => AttributeReference(name, dataType, nullable)()
+    }
+  }

and make Sum agg buffer declaration as:

private lazy val sum = makeAggBufferAttribute(0, "sum", sumDataType)

I tried this and unfortunately it requires all the places that create a Sum to go from Sum( ... args ... ) to Sum( ... args ... )() for the secondary ctor. That's quite a few places touched :-(

@Ngone51
Copy link
Member Author

Ngone51 commented May 12, 2020

Those attributes are mostly lazy. So does that mean we need to generate exprIds eagerly before create agg functions in this way? @rednaxelafx

@Ngone51 Ngone51 changed the title [SPARK-31620][SQL] Use TreeTagNode to preserve inputAggBufferAttributes for agg function [SPARK-31620][SQL] Fix reference binding failure in case of an final agg contains subquery May 13, 2020
val aggAttrs = aggregateExpressions.map(_.aggregateFunction)
.flatMap(_.inputAggBufferAttributes)
val distinctAttrs = child.output.filterNot(
a => (groupingAttributes ++ aggAttrs).exists(_.name == a.name))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name matching is fragile, how about

child.output.dropRight(aggAttrs.length) ++ aggAttrs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yea, this looks better!

val fastRowKeys = ctx.generateExpressions(
bindReferences[Expression](groupingExpressions, child.output))
bindReferences[Expression](groupingExpressions, inputAttributes))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's for resolving grouping keys, child.output is fine here.

@@ -973,4 +973,21 @@ class DataFrameAggregateSuite extends QueryTest
assert(error.message.contains("function count_if requires boolean type"))
}
}

Seq(true, false).foreach { value =>
test(s"SPARK-31620: agg with subquery (codegen = $value)") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

codegen -> "whole-stage-codegen"

@SparkQA
Copy link

SparkQA commented May 13, 2020

Test build #122582 has finished for PR 28496 at commit 882467b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 13, 2020

Test build #122583 has finished for PR 28496 at commit 9a0b788.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 13, 2020

Test build #122585 has finished for PR 28496 at commit 3fd39c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// the `inputAggBufferAttributes` of the original `DeclarativeAggregate` before copy. Instead,
// we shall use `inputAggBufferAttributes` after copy to match the new `mergeExpressions`.
val aggAttrs = aggregateExpressions
.filter(a => a.mode == Final || !a.isDistinct).map(_.aggregateFunction)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a few comments to explain the isDistinct check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After another thinking, I changed it to filter(a => a.mode == Final || a.mode == PartialMerge).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about add UT with distinct aggregate expression?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good.

@rednaxelafx
Copy link
Contributor

Those attributes are mostly lazy. So does that mean we need to generate exprIds eagerly before create agg functions in this way? @rednaxelafx

I'd say "yes", generating exprIds eagerly shouldn't be a big overhead, and it'll help keep the AttributeReferences stable later on

@Ngone51
Copy link
Member Author

Ngone51 commented May 14, 2020

Please note that we've switched to another way to fix the issue in order to avoid using TreeTagNode. @AngersZhuuuu @rednaxelafx

@@ -129,7 +129,7 @@ case class HashAggregateExec(
resultExpressions,
(expressions, inputSchema) =>
MutableProjection.create(expressions, inputSchema),
child.output,
inputAttributes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to fix like this before, but I forgot to change child's output here then output column can't work well.

LGTM

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your confirm.

Copy link
Contributor

@rednaxelafx rednaxelafx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. This looks like it'll work well.

(That said, this is more black magic...it sort of makes the current situation of brittle pairing between partial/final aggs creep into more places. If we ever get to add another physical Aggregate that support whole-stage codegen, it'll probably painfully rediscover this design again.

I'm glad your test case included w/o grouping keys + w/ grouping keys + distinct, that's very good!)

@SparkQA
Copy link

SparkQA commented May 14, 2020

Test build #122605 has finished for PR 28496 at commit 89ae4bf.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 14, 2020

Test build #122604 has finished for PR 28496 at commit 64d13fc.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -123,7 +123,7 @@ case class ObjectHashAggregateExec(
resultExpressions,
(expressions, inputSchema) =>
MutableProjection.create(expressions, inputSchema),
child.output,
inputAttributes,
Copy link
Member Author

@Ngone51 Ngone51 May 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's actually another child.output at line 118, but it's dead code indeed. Not sure if we should touch it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can open a new PR to remove dead code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok..

" d, 0))) as csum from t2 group by c"), Row(4) :: Nil)

// test SortAggregateExec
checkAnswer(sql("select max(if(c > (select a from t1), 'str1', 'str2')) as csum from t2"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to check the physical plan and make sure it's sort agg

@SparkQA
Copy link

SparkQA commented May 14, 2020

Test build #122619 has started for PR 28496 at commit 493157a.

@SparkQA
Copy link

SparkQA commented May 14, 2020

Test build #122609 has finished for PR 28496 at commit 8aecd57.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member Author

Ngone51 commented May 15, 2020

retest this please

@SparkQA
Copy link

SparkQA commented May 15, 2020

Test build #122642 has finished for PR 28496 at commit 493157a.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented May 15, 2020

Test build #122662 has finished for PR 28496 at commit 493157a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.0!

@cloud-fan cloud-fan closed this in d8b001f May 15, 2020
cloud-fan pushed a commit that referenced this pull request May 15, 2020
…agg contains subquery

### What changes were proposed in this pull request?

Instead of using `child.output` directly, we should use `inputAggBufferAttributes` from the current agg expression  for `Final` and `PartialMerge` aggregates to bind references for their `mergeExpression`.

### Why are the changes needed?

When planning aggregates, the partial aggregate uses agg fucs' `inputAggBufferAttributes` as its output, see https://github.com/apache/spark/blob/v3.0.0-rc1/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala#L105

For final `HashAggregateExec`, we need to bind the `DeclarativeAggregate.mergeExpressions` with the output of the partial aggregate operator, see https://github.com/apache/spark/blob/v3.0.0-rc1/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L348

This is usually fine. However, if we copy the agg func somehow after agg planning, like `PlanSubqueries`, the `DeclarativeAggregate` will be replaced by a new instance with new `inputAggBufferAttributes` and `mergeExpressions`. Then we can't bind the `mergeExpressions` with the output of the partial aggregate operator, as it uses the `inputAggBufferAttributes` of the original `DeclarativeAggregate` before copy.

Note that, `ImperativeAggregate` doesn't have this problem, as we don't need to bind its `mergeExpressions`. It has a different mechanism to access buffer values, via `mutableAggBufferOffset` and `inputAggBufferOffset`.

### Does this PR introduce _any_ user-facing change?

Yes, user hit error previously but run query successfully after this change.

### How was this patch tested?

Added a regression test.

Closes #28496 from Ngone51/spark-31620.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit d8b001f)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@Ngone51
Copy link
Member Author

Ngone51 commented May 15, 2020

thanks all!

Nnicolini pushed a commit to palantir/spark that referenced this pull request Jun 11, 2020
…agg contains subquery

Instead of using `child.output` directly, we should use `inputAggBufferAttributes` from the current agg expression  for `Final` and `PartialMerge` aggregates to bind references for their `mergeExpression`.

When planning aggregates, the partial aggregate uses agg fucs' `inputAggBufferAttributes` as its output, see https://github.com/apache/spark/blob/v3.0.0-rc1/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala#L105

For final `HashAggregateExec`, we need to bind the `DeclarativeAggregate.mergeExpressions` with the output of the partial aggregate operator, see https://github.com/apache/spark/blob/v3.0.0-rc1/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L348

This is usually fine. However, if we copy the agg func somehow after agg planning, like `PlanSubqueries`, the `DeclarativeAggregate` will be replaced by a new instance with new `inputAggBufferAttributes` and `mergeExpressions`. Then we can't bind the `mergeExpressions` with the output of the partial aggregate operator, as it uses the `inputAggBufferAttributes` of the original `DeclarativeAggregate` before copy.

Note that, `ImperativeAggregate` doesn't have this problem, as we don't need to bind its `mergeExpressions`. It has a different mechanism to access buffer values, via `mutableAggBufferOffset` and `inputAggBufferOffset`.

Yes, user hit error previously but run query successfully after this change.

Added a regression test.

Closes apache#28496 from Ngone51/spark-31620.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
rshkv pushed a commit to palantir/spark that referenced this pull request Dec 8, 2020
…agg contains subquery

Instead of using `child.output` directly, we should use `inputAggBufferAttributes` from the current agg expression  for `Final` and `PartialMerge` aggregates to bind references for their `mergeExpression`.

When planning aggregates, the partial aggregate uses agg fucs' `inputAggBufferAttributes` as its output, see https://github.com/apache/spark/blob/v3.0.0-rc1/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala#L105

For final `HashAggregateExec`, we need to bind the `DeclarativeAggregate.mergeExpressions` with the output of the partial aggregate operator, see https://github.com/apache/spark/blob/v3.0.0-rc1/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L348

This is usually fine. However, if we copy the agg func somehow after agg planning, like `PlanSubqueries`, the `DeclarativeAggregate` will be replaced by a new instance with new `inputAggBufferAttributes` and `mergeExpressions`. Then we can't bind the `mergeExpressions` with the output of the partial aggregate operator, as it uses the `inputAggBufferAttributes` of the original `DeclarativeAggregate` before copy.

Note that, `ImperativeAggregate` doesn't have this problem, as we don't need to bind its `mergeExpressions`. It has a different mechanism to access buffer values, via `mutableAggBufferOffset` and `inputAggBufferOffset`.

Yes, user hit error previously but run query successfully after this change.

Added a regression test.

Closes apache#28496 from Ngone51/spark-31620.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
rshkv pushed a commit to palantir/spark that referenced this pull request Dec 8, 2020
…agg contains subquery

Instead of using `child.output` directly, we should use `inputAggBufferAttributes` from the current agg expression  for `Final` and `PartialMerge` aggregates to bind references for their `mergeExpression`.

When planning aggregates, the partial aggregate uses agg fucs' `inputAggBufferAttributes` as its output, see https://github.com/apache/spark/blob/v3.0.0-rc1/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala#L105

For final `HashAggregateExec`, we need to bind the `DeclarativeAggregate.mergeExpressions` with the output of the partial aggregate operator, see https://github.com/apache/spark/blob/v3.0.0-rc1/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L348

This is usually fine. However, if we copy the agg func somehow after agg planning, like `PlanSubqueries`, the `DeclarativeAggregate` will be replaced by a new instance with new `inputAggBufferAttributes` and `mergeExpressions`. Then we can't bind the `mergeExpressions` with the output of the partial aggregate operator, as it uses the `inputAggBufferAttributes` of the original `DeclarativeAggregate` before copy.

Note that, `ImperativeAggregate` doesn't have this problem, as we don't need to bind its `mergeExpressions`. It has a different mechanism to access buffer values, via `mutableAggBufferOffset` and `inputAggBufferOffset`.

Yes, user hit error previously but run query successfully after this change.

Added a regression test.

Closes apache#28496 from Ngone51/spark-31620.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants