From a88c5168901dc914ee9dee79bce5261b49a7e35e Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 13 Feb 2015 14:57:23 -0800 Subject: [PATCH 1/5] DDLParser will take a parsering function to take care CTAS statements. --- .../scala/org/apache/spark/sql/DataFrameImpl.scala | 2 +- .../scala/org/apache/spark/sql/SQLContext.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 10 +--------- .../scala/org/apache/spark/sql/sources/ddl.scala | 14 ++++---------- .../org/apache/spark/sql/hive/HiveContext.scala | 9 ++++++--- .../org/apache/spark/sql/hive/HiveStrategies.scala | 8 +------- 6 files changed, 14 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index bb5c6226a2217..453262005cf8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -386,7 +386,7 @@ private[sql] class DataFrameImpl protected[sql]( mode: SaveMode, options: Map[String, String]): Unit = { val cmd = - CreateTableUsingAsLogicalPlan( + CreateTableUsingAsSelect( tableName, source, temporary = false, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 2165949d32c6f..bfc61d88563bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -101,7 +101,7 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer @transient - protected[sql] val ddlParser = new DDLParser + protected[sql] val ddlParser = new DDLParser(sqlParser.apply(_)) @transient protected[sql] val sqlParser = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index e915e0e6a0ec1..5281c7502556a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -319,18 +319,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { sys.error("allowExisting should be set to false when creating a temporary table.") case CreateTableUsingAsSelect(tableName, provider, true, mode, opts, query) => - val logicalPlan = sqlContext.parseSql(query) - val cmd = - CreateTempTableUsingAsSelect(tableName, provider, mode, opts, logicalPlan) - ExecutedCommand(cmd) :: Nil - case c: CreateTableUsingAsSelect if !c.temporary => - sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") - - case CreateTableUsingAsLogicalPlan(tableName, provider, true, mode, opts, query) => val cmd = CreateTempTableUsingAsSelect(tableName, provider, mode, opts, query) ExecutedCommand(cmd) :: Nil - case c: CreateTableUsingAsLogicalPlan if !c.temporary => + case c: CreateTableUsingAsSelect if !c.temporary => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") case LogicalDescribeCommand(table, isExtended) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 8cac9c0fdf7fa..35ead7fe003d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -32,7 +32,8 @@ import org.apache.spark.util.Utils /** * A parser for foreign DDL commands. */ -private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { +private[sql] class DDLParser( + parseQuery: String => LogicalPlan) extends AbstractSparkSQLParser with Logging { def apply(input: String, exceptionOnError: Boolean): Option[LogicalPlan] = { try { @@ -128,12 +129,13 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { SaveMode.ErrorIfExists } + val queryPlan = parseQuery(query.get) CreateTableUsingAsSelect(tableName, provider, temp.isDefined, mode, options, - query.get) + queryPlan) } else { val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields))) CreateTableUsing( @@ -347,14 +349,6 @@ private[sql] case class CreateTableUsing( managedIfNoPath: Boolean) extends Command private[sql] case class CreateTableUsingAsSelect( - tableName: String, - provider: String, - temporary: Boolean, - mode: SaveMode, - options: Map[String, String], - query: String) extends Command - -private[sql] case class CreateTableUsingAsLogicalPlan( tableName: String, provider: String, temporary: Boolean, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index ddc7b181d4d46..2d081a4df8581 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubQueries, Ov import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand, QueryExecutionException} import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand} -import org.apache.spark.sql.sources.{CreateTableUsing, DataSourceStrategy} +import org.apache.spark.sql.sources.{DDLParser, CreateTableUsing, DataSourceStrategy} import org.apache.spark.sql.types._ /** @@ -66,14 +66,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution(plan) + @transient + protected[sql] val ddlParserWithHiveQL = new DDLParser(HiveQl.parseSql(_)) + override def sql(sqlText: String): DataFrame = { val substituted = new VariableSubstitution().substitute(hiveconf, sqlText) // TODO: Create a framework for registering parsers instead of just hardcoding if statements. if (conf.dialect == "sql") { super.sql(substituted) } else if (conf.dialect == "hiveql") { - DataFrame(this, - ddlParser(sqlText, exceptionOnError = false).getOrElse(HiveQl.parseSql(substituted))) + val ddlPlan = ddlParserWithHiveQL(sqlText, exceptionOnError = false) + DataFrame(this, ddlPlan.getOrElse(HiveQl.parseSql(substituted))) } else { sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index cb138be90e2e1..5ea1426bfb12d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeComman import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.parquet.ParquetRelation -import org.apache.spark.sql.sources.{CreateTableUsingAsLogicalPlan, CreateTableUsingAsSelect, CreateTableUsing} +import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, CreateTableUsing} import org.apache.spark.sql.types.StringType @@ -223,12 +223,6 @@ private[hive] trait HiveStrategies { tableName, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath)) :: Nil case CreateTableUsingAsSelect(tableName, provider, false, mode, opts, query) => - val logicalPlan = hiveContext.parseSql(query) - val cmd = - CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, logicalPlan) - ExecutedCommand(cmd) :: Nil - - case CreateTableUsingAsLogicalPlan(tableName, provider, false, mode, opts, query) => val cmd = CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, query) ExecutedCommand(cmd) :: Nil From 6b7545c4abc4455df07b4508cb192718866c8e21 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sun, 15 Feb 2015 16:21:23 -0800 Subject: [PATCH 2/5] Add a pre write check to the data source API. --- .../sql/catalyst/analysis/Analyzer.scala | 13 +++- .../org/apache/spark/sql/DataFrameImpl.scala | 6 +- .../org/apache/spark/sql/SQLContext.scala | 3 +- .../sql/sources/DataSourceStrategy.scala | 5 +- .../org/apache/spark/sql/sources/ddl.scala | 17 ++++- .../org/apache/spark/sql/sources/rules.scala | 73 ++++++++++++++++++- .../spark/sql/parquet/ParquetQuerySuite.scala | 12 +-- .../sources/CreateTableAsSelectSuite.scala | 28 +++++++ .../spark/sql/sources/DataSourceTest.scala | 3 +- ...nsertIntoSuite.scala => InsertSuite.scala} | 16 +++- .../apache/spark/sql/hive/HiveContext.scala | 3 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- 12 files changed, 155 insertions(+), 26 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/sources/{InsertIntoSuite.scala => InsertSuite.scala} (90%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 58a7003977c93..aca53a1ed79b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -50,7 +50,13 @@ class Analyzer(catalog: Catalog, /** * Override to provide additional rules for the "Resolution" batch. */ - val extendedRules: Seq[Rule[LogicalPlan]] = Nil + val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil + + /** + * Override to provide additional rules for the "Check Analysis" batch. + * These rules will be evaluated after our built-in check rules. + */ + val extendedCheckRules: Seq[Rule[LogicalPlan]] = Nil lazy val batches: Seq[Batch] = Seq( Batch("Resolution", fixedPoint, @@ -64,9 +70,10 @@ class Analyzer(catalog: Catalog, UnresolvedHavingClauseAttributes :: TrimGroupingAliases :: typeCoercionRules ++ - extendedRules : _*), + extendedResolutionRules : _*), Batch("Check Analysis", Once, - CheckResolution), + CheckResolution +: + extendedCheckRules: _*), Batch("Remove SubQueries", fixedPoint, EliminateSubQueries) ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index 453262005cf8c..2255daab13254 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -67,7 +67,11 @@ private[sql] class DataFrameImpl protected[sql]( @transient protected[sql] override val logicalPlan: LogicalPlan = queryExecution.logical match { // For various commands (like DDL) and queries with side effects, we force query optimization to // happen right away to let these side effects take place eagerly. - case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: WriteToFile => + case _: Command | + _: InsertIntoTable | + _: CreateTableAsSelect[_] | + _: CreateTableUsingAsSelect | + _: WriteToFile => LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) case _ => queryExecution.logical diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index bfc61d88563bd..7eeceba7b0192 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -92,7 +92,8 @@ class SQLContext(@transient val sparkContext: SparkContext) @transient protected[sql] lazy val analyzer: Analyzer = new Analyzer(catalog, functionRegistry, caseSensitive = true) { - override val extendedRules = + override val extendedResolutionRules = + sources.PreWriteCheck(catalog) :: sources.PreInsertCastAndRename :: Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index 624369afe87b5..d68d081bd252b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -55,10 +55,7 @@ private[sql] object DataSourceStrategy extends Strategy { execution.PhysicalRDD(l.output, t.buildScan()) :: Nil case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite) => - if (partition.nonEmpty) { - sys.error(s"Insert into a partition is not allowed because $l is not partitioned.") - } + l @ LogicalRelation(t: InsertableRelation), part, query, overwrite) if part.isEmpty => execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil case _ => Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 35ead7fe003d5..1da1391f6e897 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.AbstractSparkSQLParser import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -106,6 +106,7 @@ private[sql] class DDLParser( */ protected lazy val createTable: Parser[LogicalPlan] = ( + // TODO: Support database.table. (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ { case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query => @@ -336,7 +337,7 @@ private[sql] case class DescribeCommand( * @param temporary * @param options * @param allowExisting If it is true, we will do nothing when the table already exists. - * If it is false, an exception will be thrown + * If it is false, an exception will be thrown * @param managedIfNoPath */ private[sql] case class CreateTableUsing( @@ -348,13 +349,23 @@ private[sql] case class CreateTableUsing( allowExisting: Boolean, managedIfNoPath: Boolean) extends Command +/** + * A node used to support CTAS statements and saveAsTable for the data source API. + * This node is a [[UnaryNode]] instead of a [[Command]] because we want the analyzer + * can analyze the logical plan that will be used to populate the table. + * So, [[PreWriteCheck]] can detect cases that are not allowed. + */ private[sql] case class CreateTableUsingAsSelect( tableName: String, provider: String, temporary: Boolean, mode: SaveMode, options: Map[String, String], - query: LogicalPlan) extends Command + child: LogicalPlan) extends UnaryNode { + override def output = Seq.empty[Attribute] + // TODO: Override resolved after we support databaseName. + // override lazy val resolved = databaseName != None && childrenResolved +} private [sql] case class CreateTempTableUsing( tableName: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala index 4ed22d363da5b..18ac2bef1d00c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala @@ -17,7 +17,10 @@ package org.apache.spark.sql.sources +import org.apache.spark.sql.{SaveMode, AnalysisException} +import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog} import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias} +import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types.DataType @@ -26,11 +29,9 @@ import org.apache.spark.sql.types.DataType * A rule to do pre-insert data type casting and field renaming. Before we insert into * an [[InsertableRelation]], we will use this rule to make sure that * the columns to be inserted have the correct data type and fields have the correct names. - * @param resolver The resolver used by the Analyzer. */ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = { - plan.transform { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Wait until children are resolved. case p: LogicalPlan if !p.childrenResolved => p @@ -46,7 +47,6 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] { } castAndRenameChildOutput(i, l.output, child) } - } } /** If necessary, cast data types and rename fields to the expected types and names. */ @@ -74,3 +74,68 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] { } } } + +/** + * A rule to do various checks before inserting into or writing to a data source table. + */ +private[sql] case class PreWriteCheck(catalog: Catalog) extends Rule[LogicalPlan] { + def failAnalysis(msg: String) = { throw new AnalysisException(msg) } + + def apply(plan: LogicalPlan): LogicalPlan = { + plan.foreach { + case i @ logical.InsertIntoTable( + l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite) => + // Right now, we do not support insert into a data source table with partition specs. + if (partition.nonEmpty) { + failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.") + } else { + // Get all input data source relations of the query. + val srcRelations = query.collect { + case LogicalRelation(src: BaseRelation) => src + } + if (srcRelations.exists(src => src == t)) { + failAnalysis( + s"It is not allowed to insert overwrite the table because it is used as " + + s"an input table.") + } else { + // OK + } + } + + case i @ logical.InsertIntoTable(l: LogicalRelation, partition, query, overwrite) => + // The relation in l is not an InsertableRelation. + failAnalysis(s"$l does not allow insertion.") + + case CreateTableUsingAsSelect(tableName, _, _, SaveMode.Overwrite, _, query) => + // When the SaveMode is Overwrite, we need to check if the table is an input table of + // the query. If so, we will throw an AnalysisException to let users know it is not allowed. + if (catalog.tableExists(Seq(tableName))) { + // Need to remove SubQuery operator. + EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match { + // Only do the check if the table is a data source table + // (the relation is a BaseRelation). + case l @ LogicalRelation(dest: BaseRelation) => + // Get all input data source relations of the query. + val srcRelations = query.collect { + case LogicalRelation(src: BaseRelation) => src + } + if (srcRelations.exists(src => src == dest)) { + failAnalysis( + s"It is not allowed to save to table $tableName because it is used as " + + s"an input table .") + } else { + // OK + } + + case _ => // OK + } + } else { + // OK + } + + case _ => // OK + } + + plan + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index cba06835f9a61..dd78004a1a881 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -37,20 +37,22 @@ class ParquetQuerySuite extends QueryTest with ParquetTest { test(s"$prefix: appending") { val data = (0 until 10).map(i => (i, i.toString)) + createDataFrame(data).toDataFrame("c1", "c2").registerTempTable("tmp") withParquetTable(data, "t") { - sql("INSERT INTO TABLE t SELECT * FROM t") + sql("INSERT INTO TABLE t SELECT * FROM tmp") checkAnswer(table("t"), (data ++ data).map(Row.fromTuple)) } + catalog.unregisterTable(Seq("tmp")) } - // This test case will trigger the NPE mentioned in - // https://issues.apache.org/jira/browse/PARQUET-151. - ignore(s"$prefix: overwriting") { + test(s"$prefix: overwriting") { val data = (0 until 10).map(i => (i, i.toString)) + createDataFrame(data).toDataFrame("c1", "c2").registerTempTable("tmp") withParquetTable(data, "t") { - sql("INSERT OVERWRITE TABLE t SELECT * FROM t") + sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") checkAnswer(table("t"), data.map(Row.fromTuple)) } + catalog.unregisterTable(Seq("tmp")) } test(s"$prefix: self-join") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 29caed9337ff6..cd9f094f91fb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.sources import java.io.File +import org.apache.spark.sql.AnalysisException import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.catalyst.util @@ -157,4 +158,31 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { """.stripMargin) } } + + test("it is not allowed to write to a table while querying it.") { + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT a, b FROM jt + """.stripMargin) + + val message = intercept[AnalysisException] { + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT a, b FROM jsonTable + """.stripMargin) + }.getMessage + assert( + message.contains("It is not allowed to save to table"), + "Writing to a table while querying it should not be allowed.") + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala index 53f5f7426e9e6..0ec6881d7afe6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala @@ -29,7 +29,8 @@ abstract class DataSourceTest extends QueryTest with BeforeAndAfter { @transient override protected[sql] lazy val analyzer: Analyzer = new Analyzer(catalog, functionRegistry, caseSensitive = false) { - override val extendedRules = + override val extendedResolutionRules = + PreWriteCheck(catalog) :: PreInsertCastAndRename :: Nil } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala similarity index 90% rename from sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 36e504e759152..4515a1bb66956 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -21,11 +21,11 @@ import java.io.File import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.Row +import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.util import org.apache.spark.util.Utils -class InsertIntoSuite extends DataSourceTest with BeforeAndAfterAll { +class InsertSuite extends DataSourceTest with BeforeAndAfterAll { import caseInsensisitiveContext._ @@ -129,6 +129,18 @@ class InsertIntoSuite extends DataSourceTest with BeforeAndAfterAll { } } + test("it is not allowed to write to a table while querying it.") { + val message = intercept[AnalysisException] { + sql( + s""" + |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jsonTable + """.stripMargin) + }.getMessage + assert( + message.contains("It is not allowed to insert overwrite the table"), + "INSERT OVERWRITE to a table while querying it should not be allowed.") + } + test("Caching") { // Cached Query Execution cacheTable("jsonTable") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 2d081a4df8581..8c69f5516f904 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -246,11 +246,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { @transient override protected[sql] lazy val analyzer = new Analyzer(catalog, functionRegistry, caseSensitive = false) { - override val extendedRules = + override val extendedResolutionRules = catalog.CreateTables :: catalog.PreInsertionCasts :: ExtractPythonUdfs :: ResolveUdtfsAlias :: + sources.PreWriteCheck(catalog) :: sources.PreInsertCastAndRename :: Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index eb1ee54247bea..ca539cf226f9b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -606,7 +606,7 @@ private[hive] case class MetastoreRelation } object HiveMetastoreTypes { - protected val ddlParser = new DDLParser + protected val ddlParser = new DDLParser(HiveQl.parseSql(_)) def toDataType(metastoreType: String): DataType = synchronized { ddlParser.parseType(metastoreType) From f30bdade7b1861f735c0ec3c720012d7317549d9 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sun, 15 Feb 2015 17:11:20 -0800 Subject: [PATCH 3/5] Use toDF. --- .../org/apache/spark/sql/parquet/ParquetQuerySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index dd78004a1a881..2a4b59959a997 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -37,7 +37,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest { test(s"$prefix: appending") { val data = (0 until 10).map(i => (i, i.toString)) - createDataFrame(data).toDataFrame("c1", "c2").registerTempTable("tmp") + createDataFrame(data).toDF("c1", "c2").registerTempTable("tmp") withParquetTable(data, "t") { sql("INSERT INTO TABLE t SELECT * FROM tmp") checkAnswer(table("t"), (data ++ data).map(Row.fromTuple)) @@ -47,7 +47,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest { test(s"$prefix: overwriting") { val data = (0 until 10).map(i => (i, i.toString)) - createDataFrame(data).toDataFrame("c1", "c2").registerTempTable("tmp") + createDataFrame(data).toDF("c1", "c2").registerTempTable("tmp") withParquetTable(data, "t") { sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") checkAnswer(table("t"), data.map(Row.fromTuple)) From e76e85aa1822285173c17e8f0caa53c902e503a7 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 16 Feb 2015 13:43:34 -0800 Subject: [PATCH 4/5] Address comments. --- .../org/apache/spark/sql/sources/rules.scala | 9 +++--- .../sources/CreateTableAsSelectSuite.scala | 2 +- .../spark/sql/sources/InsertSuite.scala | 32 ++++++++++++++++++- 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala index 18ac2bef1d00c..36a9c0bdc41e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala @@ -95,14 +95,14 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends Rule[LogicalPlan } if (srcRelations.exists(src => src == t)) { failAnalysis( - s"It is not allowed to insert overwrite the table because it is used as " + - s"an input table.") + "Cannot insert overwrite into table that is also being read from.") } else { // OK } } - case i @ logical.InsertIntoTable(l: LogicalRelation, partition, query, overwrite) => + case i @ logical.InsertIntoTable( + l: LogicalRelation, partition, query, overwrite) if !l.isInstanceOf[InsertableRelation] => // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") @@ -121,8 +121,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends Rule[LogicalPlan } if (srcRelations.exists(src => src == dest)) { failAnalysis( - s"It is not allowed to save to table $tableName because it is used as " + - s"an input table .") + s"Cannot overwrite table $tableName that is also being read from.") } else { // OK } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index cd9f094f91fb2..60355414a40fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -182,7 +182,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { """.stripMargin) }.getMessage assert( - message.contains("It is not allowed to save to table"), + message.contains("Cannot overwrite table "), "Writing to a table while querying it should not be allowed.") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 4515a1bb66956..5682e5a2bcea9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -137,7 +137,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll { """.stripMargin) }.getMessage assert( - message.contains("It is not allowed to insert overwrite the table"), + message.contains("Cannot insert overwrite into table that is also being read from."), "INSERT OVERWRITE to a table while querying it should not be allowed.") } @@ -185,4 +185,34 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll { uncacheTable("jsonTable") assertCached(sql("SELECT * FROM jsonTable"), 0) } + + test("it's not allowed to insert into a relation that is not an InsertableRelation") { + sql( + """ + |CREATE TEMPORARY TABLE oneToTen + |USING org.apache.spark.sql.sources.SimpleScanSource + |OPTIONS ( + | From '1', + | To '10' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM oneToTen"), + (1 to 10).map(Row(_)).toSeq + ) + + val message = intercept[AnalysisException] { + sql( + s""" + |INSERT OVERWRITE TABLE oneToTen SELECT a FROM jt + """.stripMargin) + }.getMessage + assert( + message.contains("does not allow insertion."), + "It is not allowed to insert into a table that is not an InsertableRelation." + ) + + dropTempTable("oneToTen") + } } From 8e3019df8d27815746825704d773ff39ed7a9a83 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 16 Feb 2015 14:27:55 -0800 Subject: [PATCH 5/5] Fix compilation error. --- sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 7976fef360440..dd8b3d211be64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -106,7 +106,6 @@ private[sql] class DDLParser( * AS SELECT ... */ protected lazy val createTable: Parser[LogicalPlan] = - ( // TODO: Support database.table. (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~ tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {