From af13d8f59aa630cee92c19d41962401b42080fdc Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 22 Feb 2015 16:03:18 +0800 Subject: [PATCH] Make dataFrameEagerAnalysis cover the commands and queries with side effects. --- .../org/apache/spark/sql/DataFrame.scala | 23 +++++++++++-------- .../org/apache/spark/sql/DataFrameSuite.scala | 9 +++++++- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 810f7c77477bb..f730d1ecd6a69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -123,18 +123,23 @@ class DataFrame protected[sql]( }) } - @transient protected[sql] val logicalPlan: LogicalPlan = queryExecution.logical match { + @transient protected[sql] val logicalPlan: LogicalPlan = // For various commands (like DDL) and queries with side effects, we force query optimization to // happen right away to let these side effects take place eagerly. - case _: Command | - _: InsertIntoTable | - _: CreateTableAsSelect[_] | - _: CreateTableUsingAsSelect | - _: WriteToFile => - LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) - case _ => + if (sqlContext.conf.dataFrameEagerAnalysis) { + queryExecution.logical match { + case _: Command | + _: InsertIntoTable | + _: CreateTableAsSelect[_] | + _: CreateTableUsingAsSelect | + _: WriteToFile => + LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) + case _ => + queryExecution.logical + } + } else { queryExecution.logical - } + } /** * An implicit conversion function internal to this class for us to avoid doing diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 6b9b3a8425964..e1d664b0ddd83 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -45,11 +45,18 @@ class DataFrameSuite extends QueryTest { intercept[Exception] { testData.groupBy($"abcd").agg(Map("key" -> "sum")) } - + + testData.registerTempTable("testDataTemp") + intercept[Exception] { + sql("INSERT OVERWRITE TABLE testDataTemp SELECT nonExistentName, value FROM testData") + } + // No more eager analysis once the flag is turned off TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false") testData.select('nonExistentName) + sql("INSERT OVERWRITE TABLE testDataTemp SELECT nonExistentName, value FROM testData") + // Set the flag back to original value before this test. TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString) }