Skip to content

Commit

Permalink
[SPARK-24373][SQL] Add AnalysisBarrier to RelationalGroupedDataset's …
Browse files Browse the repository at this point in the history
…child
  • Loading branch information
mgaido91 committed May 25, 2018
1 parent b6c50d7 commit 361fee8
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class Dataset[T] private[sql](
}

// Wraps analyzed logical plans with an analysis barrier so we won't traverse/resolve it again.
@transient private val planWithBarrier = AnalysisBarrier(logicalPlan)
@transient private[sql] val planWithBarrier = AnalysisBarrier(logicalPlan)

/**
* Currently [[ExpressionEncoder]] is the only implementation of [[Encoder]], here we turn the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,17 @@ class RelationalGroupedDataset protected[sql](
groupType match {
case RelationalGroupedDataset.GroupByType =>
Dataset.ofRows(
df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.planWithBarrier))
case RelationalGroupedDataset.RollupType =>
Dataset.ofRows(
df.sparkSession, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan))
df.sparkSession, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.planWithBarrier))
case RelationalGroupedDataset.CubeType =>
Dataset.ofRows(
df.sparkSession, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan))
df.sparkSession, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.planWithBarrier))
case RelationalGroupedDataset.PivotType(pivotCol, values) =>
val aliasedGrps = groupingExprs.map(alias)
Dataset.ofRows(
df.sparkSession, Pivot(Some(aliasedGrps), pivotCol, values, aggExprs, df.logicalPlan))
df.sparkSession, Pivot(Some(aliasedGrps), pivotCol, values, aggExprs, df.planWithBarrier))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,4 +529,15 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-24373: avoid running Analyzer rules twice on RelationalGroupedDataset") {
val myUDF = udf((x: Long) => { x + 1 })
val df1 = spark.range(0, 1).toDF("s").select(myUDF($"s"))
df1.cache()
val countDf = df1.groupBy().count()
val cachedPlan = countDf.queryExecution.executedPlan.collect {
case plan: InMemoryTableScanExec => plan
}
assert(cachedPlan.nonEmpty)
}
}

0 comments on commit 361fee8

Please sign in to comment.