From ce6a71e013c403d0a3690cf823934530ce0ea5ef Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 20 Sep 2017 09:00:43 -0700 Subject: [PATCH] [SPARK-22076][SQL] Expand.projections should not be a Stream ## What changes were proposed in this pull request? Spark with Scala 2.10 fails with a group by cube: ``` spark.range(1).select($"id" as "a", $"id" as "b").write.partitionBy("a").mode("overwrite").saveAsTable("rollup_bug") spark.sql("select 1 from rollup_bug group by rollup ()").show ``` It can be traced back to https://github.com/apache/spark/pull/15484 , which made `Expand.projections` a lazy `Stream` for group by cube. In scala 2.10 `Stream` captures a lot of stuff, and in this case it captures the entire query plan which has some un-serializable parts. This change is also good for master branch, to reduce the serialized size of `Expand.projections`. ## How was this patch tested? manually verified with Spark with Scala 2.10. Author: Wenchen Fan Closes #19289 from cloud-fan/bug. --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index db276fbc9d53a..4535176a14d89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -279,9 +279,15 @@ class Analyzer( * We need to get all of its subsets for a given GROUPBY expression, the subsets are * represented as sequence of expressions. */ - def cubeExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = exprs.toList match { + def cubeExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = { + // `cubeExprs0` is recursive and returns a lazy Stream. Here we call `toIndexedSeq` to + // materialize it and avoid serialization problems later on. + cubeExprs0(exprs).toIndexedSeq + } + + def cubeExprs0(exprs: Seq[Expression]): Seq[Seq[Expression]] = exprs.toList match { case x :: xs => - val initial = cubeExprs(xs) + val initial = cubeExprs0(xs) initial.map(x +: _) ++ initial case Nil => Seq(Seq.empty)