New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19931][SQL] InMemoryTableScanExec should rewrite output partitioning and ordering when aliasing output attributes #17175
Conversation
…to ensure no unnecessary shuffle/sort in Datasets.
Test build #73989 has finished for PR 17175 at commit
|
@@ -43,6 +43,12 @@ object Canonicalize extends { | |||
case _ => e | |||
} | |||
|
|||
/** Remove some unnecessary parameters. */ | |||
private[expressions] def ignoreParameters(e: Expression): Expression = e match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can just put this logic in ignoreNamesTypes
and rename it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. will update.
@@ -78,9 +78,42 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) | |||
} | |||
} | |||
|
|||
override def outputOrdering: Seq[SortOrder] = child.outputOrdering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I don't get it. Why it was wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take the plan in PR description as an example. In the Project
,
Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105]
Its output is _1#105
. Its outputPartitioning
is _1#83
. But the parent operator of the Project
requires partitioning on _1#105._1
. Since _1#83.semanticEquals(_1#105._1)
is false, additional ShuffleExchange will be added.
outputOrdering
is the same issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, if we wanna support this, we should support all the case. e.g.
Project [(a#1 + b#2) as c#3, b#2]
and then the parent operator requires partitioning on c#3 - b#2
. Since (c#3 - b#2).samanticEquals(a#1)
is false, additional ShuffleExchange will be added.
This is not trivial and we should spend more time on a holistic solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My original thought is to support the cases where requiresChildDistribution
refers nested fields created and aliased in Project
, as the example I showed.
The example you give is another kind of case. In order to support, if we wannt, at least we need to improve expression canonicalization and partitioning/distribution matching.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nested field is nothing special, just CreateStruct
and GetStructField
expressions. What we should handle is symmetric expressions, e.g. CreateStruct
and GetStructField
, Add
and Subtract
, etc.
It doesn't make sense to only handle CreateStruct
and GetStructField
specially, and leave others behind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I see. In order to support general cases, I am going first improve expression canonicalization. So we can check semantic equal between (a + b) - b
and a
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#17242 is a WIP pr for that.
@@ -42,10 +42,34 @@ case class InMemoryTableScanExec( | |||
override def output: Seq[Attribute] = attributes | |||
|
|||
// The cached version does not change the outputPartitioning of the original SparkPlan. | |||
override def outputPartitioning: Partitioning = relation.child.outputPartitioning | |||
// But the cached version could alias output, so we need to replace output. | |||
override def outputPartitioning: Partitioning = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is valid, shall we only keep this and merge this PR first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, do you mean only keeping change of outputPartitioning
without outputOrdering
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. Do you mean only keeping the change of InMemoryTableScanExec
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose you mean to keep the change of InMemoryTableScanExec
. Other changes are removed. Can you take a look? Thanks.
relation.child.output.zip(output) | ||
) | ||
relation.child.outputPartitioning match { | ||
case HashPartitioning(expressions, numPartitions) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HashPartitioning
is an expression, we can simplify it to
case h: HashPartitioning => h.transformExpression {
...
}.asInstanceOf[HashPartitioning]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Will update.
relation.child.output.zip(output) | ||
) | ||
relation.child.outputOrdering.map { sortOrder => | ||
val newSortExpr = sortOrder.child.transform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, SortOrder
is an expression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can even have a method def updateAttribute(expr: Expression): Expression
to do this.
Test build #74419 has finished for PR 17175 at commit
|
Test build #74424 has started for PR 17175 at commit |
retest this please. |
Test build #74438 has finished for PR 17175 at commit
|
@@ -41,11 +41,31 @@ case class InMemoryTableScanExec( | |||
|
|||
override def output: Seq[Attribute] = attributes | |||
|
|||
private def updateAttribute(expr: Expression, attrMap: AttributeMap[Attribute]): Expression = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can create the attrMap
in this method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then when processing outputOrdering
, we will create attrMap
many times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outputOrdering
should not be very long. This may be ok. I will update it.
Test build #74496 has finished for PR 17175 at commit
|
retest this please. |
relation.child.output.zip(output) | ||
) | ||
expr.transform { | ||
case attr: Attribute if attrMap.contains(attr) => attrMap.get(attr).get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: case attr: Attribute => attrMap.getOrElse(attr, attr)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Test build #74511 has finished for PR 17175 at commit
|
Test build #74518 has finished for PR 17175 at commit
|
.repartition(col("_1")).sortWithinPartitions(col("_1")).persist | ||
val ds2 = Seq((0, 0), (1, 1)).toDS | ||
.repartition(col("_1")).sortWithinPartitions(col("_1")).persist | ||
val joined = ds1.joinWith(ds2, ds1("_1") === ds2("_1")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
somehow my comment is lost, let me type it again: why we need to test join here? The test logic below seems have nothing to do with join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The join is there to force one underlying relation to alias the output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this only happens for self-join?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the two datasets cached have the same logical plan, it is a self-join actually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use DataFrame
? The code here only use DataFrame
features. We should also add some comments to explain why we join here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. I updated it.
@@ -41,11 +41,28 @@ case class InMemoryTableScanExec( | |||
|
|||
override def output: Seq[Attribute] = attributes | |||
|
|||
private def updateAttribute(expr: Expression): Expression = { | |||
val attrMap = AttributeMap( | |||
relation.child.output.zip(output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can these in one line?
LGTM, pending test |
Test build #74587 has finished for PR 17175 at commit
|
Test build #74591 has finished for PR 17175 at commit
|
thank, merging to master |
What changes were proposed in this pull request?
Now
InMemoryTableScanExec
simply takes theoutputPartitioning
andoutputOrdering
from the associatedInMemoryRelation
'schild.outputPartitioning
andoutputOrdering
.However,
InMemoryTableScanExec
can alias the output attributes. In this case, itsoutputPartitioning
andoutputOrdering
are not correct and its parent operators can't correctly determine its data distribution.How was this patch tested?
Jenkins tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.