Skip to content

Commit

Permalink
[SPARK-30671][SQL] emptyDataFrame should use a LocalRelation
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR makes `SparkSession.emptyDataFrame` use an empty local relation instead of an empty RDD. This allows to optimizer to recognize this as an empty relation, and creates the opportunity to do some more aggressive optimizations.

### Why are the changes needed?
It allows us to optimize empty dataframes better.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Added a test case to `DataFrameSuite`.

Closes #27400 from hvanhovell/SPARK-30671.

Authored-by: herman <herman@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
hvanhovell authored and HyukjinKwon committed Jan 31, 2020
1 parent 05be81d commit a5c7090
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
Expand Up @@ -274,9 +274,7 @@ class SparkSession private(
* @since 2.0.0
*/
@transient
lazy val emptyDataFrame: DataFrame = {
createDataFrame(sparkContext.emptyRDD[Row].setName("empty"), StructType(Nil))
}
lazy val emptyDataFrame: DataFrame = Dataset.ofRows(self, LocalRelation())

/**
* Creates a new [[Dataset]] of type T containing zero elements.
Expand Down
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.Uuid
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, OneRowRelation, Union}
import org.apache.spark.sql.execution.{FilterExec, QueryExecution, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
Expand Down Expand Up @@ -2287,6 +2287,17 @@ class DataFrameSuite extends QueryTest
}
assert(err.getMessage.contains("cannot resolve '`d`'"))
}

test("emptyDataFrame should be foldable") {
val emptyDf = spark.emptyDataFrame.withColumn("id", lit(1L))
val joined = spark.range(10).join(emptyDf, "id")
joined.queryExecution.optimizedPlan match {
case LocalRelation(Seq(id), Nil, _) =>
assert(id.name == "id")
case _ =>
fail("emptyDataFrame should be foldable")
}
}
}

case class GroupByKey(a: Int, b: Int)

0 comments on commit a5c7090

Please sign in to comment.