From a5c7090ffafbfb4a8f312251b9ce8f25f71497cb Mon Sep 17 00:00:00 2001 From: herman Date: Fri, 31 Jan 2020 16:14:07 +0900 Subject: [PATCH] [SPARK-30671][SQL] emptyDataFrame should use a LocalRelation ### What changes were proposed in this pull request? This PR makes `SparkSession.emptyDataFrame` use an empty local relation instead of an empty RDD. This allows to optimizer to recognize this as an empty relation, and creates the opportunity to do some more aggressive optimizations. ### Why are the changes needed? It allows us to optimize empty dataframes better. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added a test case to `DataFrameSuite`. Closes #27400 from hvanhovell/SPARK-30671. Authored-by: herman Signed-off-by: HyukjinKwon --- .../scala/org/apache/spark/sql/SparkSession.scala | 4 +--- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 13 ++++++++++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index bd2bc1c0ad5d7..0ce514fb0f83c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -274,9 +274,7 @@ class SparkSession private( * @since 2.0.0 */ @transient - lazy val emptyDataFrame: DataFrame = { - createDataFrame(sparkContext.emptyRDD[Row].setName("empty"), StructType(Nil)) - } + lazy val emptyDataFrame: DataFrame = Dataset.ofRows(self, LocalRelation()) /** * Creates a new [[Dataset]] of type T containing zero elements. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 763f92230cdc3..d2d58a83ded5d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.Uuid import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation -import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, OneRowRelation, Union} import org.apache.spark.sql.execution.{FilterExec, QueryExecution, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.aggregate.HashAggregateExec @@ -2287,6 +2287,17 @@ class DataFrameSuite extends QueryTest } assert(err.getMessage.contains("cannot resolve '`d`'")) } + + test("emptyDataFrame should be foldable") { + val emptyDf = spark.emptyDataFrame.withColumn("id", lit(1L)) + val joined = spark.range(10).join(emptyDf, "id") + joined.queryExecution.optimizedPlan match { + case LocalRelation(Seq(id), Nil, _) => + assert(id.name == "id") + case _ => + fail("emptyDataFrame should be foldable") + } + } } case class GroupByKey(a: Int, b: Int)