Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19931][SQL] InMemoryTableScanExec should rewrite output partitioning and ordering when aliasing output attributes #17175

Closed
wants to merge 8 commits into from
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.UserDefinedType
Expand All @@ -42,10 +42,34 @@ case class InMemoryTableScanExec(
override def output: Seq[Attribute] = attributes

// The cached version does not change the outputPartitioning of the original SparkPlan.
override def outputPartitioning: Partitioning = relation.child.outputPartitioning
// But the cached version could alias output, so we need to replace output.
override def outputPartitioning: Partitioning = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is valid, shall we only keep this and merge this PR first?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, do you mean only keeping change of outputPartitioning without outputOrdering?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Do you mean only keeping the change of InMemoryTableScanExec?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose you mean to keep the change of InMemoryTableScanExec. Other changes are removed. Can you take a look? Thanks.

val attrMap = AttributeMap(
relation.child.output.zip(output)
)
relation.child.outputPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
Copy link
Contributor

@cloud-fan cloud-fan Mar 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashPartitioning is an expression, we can simplify it to

case h: HashPartitioning => h.transformExpression {
    ...
  }.asInstanceOf[HashPartitioning]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Will update.

val newExprs = expressions.map(_.transform {
case attr: Attribute if attrMap.contains(attr) => attrMap.get(attr).get
})
HashPartitioning(newExprs, numPartitions)
case _ => relation.child.outputPartitioning
}
}

// The cached version does not change the outputOrdering of the original SparkPlan.
override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering
// But the cached version could alias output, so we need to replace output.
override def outputOrdering: Seq[SortOrder] = {
val attrMap = AttributeMap(
relation.child.output.zip(output)
)
relation.child.outputOrdering.map { sortOrder =>
val newSortExpr = sortOrder.child.transform {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, SortOrder is an expression

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can even have a method def updateAttribute(expr: Expression): Expression to do this.

case attr: Attribute if attrMap.contains(attr) => attrMap.get(attr).get
}
SortOrder(newSortExpr, sortOrder.direction, sortOrder.nullOrdering)
}
}

private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a)

Expand Down
Expand Up @@ -21,6 +21,9 @@ import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}

import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.expressions.AttributeSet
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.SQLTestData._
Expand Down Expand Up @@ -390,4 +393,23 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("InMemoryTableScanExec should return currect output ordering and partitioning") {
val ds1 = Seq((0, 0), (1, 1)).toDS
.repartition(col("_1")).sortWithinPartitions(col("_1")).persist
val ds2 = Seq((0, 0), (1, 1)).toDS
.repartition(col("_1")).sortWithinPartitions(col("_1")).persist
val joined = ds1.joinWith(ds2, ds1("_1") === ds2("_1"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somehow my comment is lost, let me type it again: why we need to test join here? The test logic below seems have nothing to do with join.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The join is there to force one underlying relation to alias the output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this only happens for self-join?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the two datasets cached have the same logical plan, it is a self-join actually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use DataFrame? The code here only use DataFrame features. We should also add some comments to explain why we join here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I updated it.


val inMemoryScans = joined.queryExecution.executedPlan.collect {
case m: InMemoryTableScanExec => m
}
inMemoryScans.foreach { inMemoryScan =>
val sortedAttrs = AttributeSet(inMemoryScan.outputOrdering.flatMap(_.references))
assert(sortedAttrs.subsetOf(inMemoryScan.outputSet))

val partitionedAttrs =
inMemoryScan.outputPartitioning.asInstanceOf[HashPartitioning].references
assert(partitionedAttrs.subsetOf(inMemoryScan.outputSet))
}
}
}