From c55a1f95491b10208ccd2cdf5910e6ec813c3522 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 19 Jan 2017 22:04:44 +0800 Subject: [PATCH 1/5] add post-hoc resolution --- .../sql/catalyst/analysis/Analyzer.scala | 8 +++++ .../datasources/DataSourceStrategy.scala | 25 ++-------------- .../sql/execution/datasources/rules.scala | 4 +-- .../spark/sql/internal/SessionState.scala | 8 +++-- .../spark/sql/hive/HiveSessionState.scala | 10 ++++--- .../spark/sql/hive/HiveStrategies.scala | 30 +++++-------------- 6 files changed, 30 insertions(+), 55 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 98851cb8557a3..cb56e94c0a77a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -106,6 +106,13 @@ class Analyzer( */ val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil + /** + * Override to provide rules to do post-hoc resolution. Note that these rules will be executed + * in an individual batch. This batch is to run right after the normal resolution batch and + * execute its rules in one pass. + */ + val postHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil + lazy val batches: Seq[Batch] = Seq( Batch("Substitution", fixedPoint, CTESubstitution, @@ -139,6 +146,7 @@ class Analyzer( ResolveInlineTables :: TypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), + Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*), Batch("View", Once, AliasViewChild(conf)), Batch("Nondeterministic", Once, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 21b07ee85adc8..19db293132f54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -44,6 +44,8 @@ import org.apache.spark.unsafe.types.UTF8String /** * Replaces generic operations with specific variants that are designed to work with Spark * SQL Data Sources. + * + * Note that, this rule must be run after [[PreprocessTableInsertion]]. */ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { @@ -127,30 +129,9 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { projectList } - /** - * Returns true if the [[InsertIntoTable]] plan has already been preprocessed by analyzer rule - * [[PreprocessTableInsertion]]. It is important that this rule([[DataSourceAnalysis]]) has to - * be run after [[PreprocessTableInsertion]], to normalize the column names in partition spec and - * fix the schema mismatch by adding Cast. - */ - private def hasBeenPreprocessed( - tableOutput: Seq[Attribute], - partSchema: StructType, - partSpec: Map[String, Option[String]], - query: LogicalPlan): Boolean = { - val partColNames = partSchema.map(_.name).toSet - query.resolved && partSpec.keys.forall(partColNames.contains) && { - val staticPartCols = partSpec.filter(_._2.isDefined).keySet - val expectedColumns = tableOutput.filterNot(a => staticPartCols.contains(a.name)) - expectedColumns.toStructType.sameType(query.schema) - } - } - override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case InsertIntoTable( - l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, false) - if hasBeenPreprocessed(l.output, t.partitionSchema, parts, query) => - + l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, false) => // If the InsertIntoTable command is for a partitioned HadoopFsRelation and // the user has specified static partitions, we add a Project operator on top of the query // to include those constant column values in the query result. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index beacb08994430..87e7017aee3a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources -import scala.util.control.NonFatal - import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ @@ -385,7 +383,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { } def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(table, partition, child, _, _) if table.resolved && child.resolved => + case i @ InsertIntoTable(table, _, child, _, _) if table.resolved && child.resolved => table match { case relation: CatalogRelation => val metadata = relation.catalogTable diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 64ec62f41d1f8..68b774b52fd7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -114,12 +114,14 @@ private[sql] class SessionState(sparkSession: SparkSession) { lazy val analyzer: Analyzer = { new Analyzer(catalog, conf) { override val extendedResolutionRules = - AnalyzeCreateTable(sparkSession) :: - PreprocessTableInsertion(conf) :: new FindDataSourceTable(sparkSession) :: - DataSourceAnalysis(conf) :: new ResolveDataSource(sparkSession) :: Nil + override val postHocResolutionRules = + AnalyzeCreateTable(sparkSession) :: + PreprocessTableInsertion(conf) :: + DataSourceAnalysis(conf) :: Nil + override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog), HiveOnlyCheck) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index d3cef6e0cb0cf..9fd03ef8ba037 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -62,15 +62,17 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override val extendedResolutionRules = catalog.ParquetConversions :: catalog.OrcConversions :: - AnalyzeCreateTable(sparkSession) :: - PreprocessTableInsertion(conf) :: - DataSourceAnalysis(conf) :: new DetermineHiveSerde(conf) :: - new HiveAnalysis(sparkSession) :: new FindDataSourceTable(sparkSession) :: new FindHiveSerdeTable(sparkSession) :: new ResolveDataSource(sparkSession) :: Nil + override val postHocResolutionRules = + AnalyzeCreateTable(sparkSession) :: + PreprocessTableInsertion(conf) :: + DataSourceAnalysis(conf) :: + new HiveAnalysis(sparkSession) :: Nil + override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 838e6f4008108..6cde783c5ae32 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -25,10 +25,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.CreateTable +import org.apache.spark.sql.execution.datasources.{CreateTable, PreprocessTableInsertion} import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} -import org.apache.spark.sql.types.StructType /** @@ -78,10 +77,14 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] { } } +/** + * Replaces generic operations with specific variants that are designed to work with Hive. + * + * Note that, this rule must be run after [[PreprocessTableInsertion]]. + */ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) - if hasBeenPreprocessed(table.output, table.partitionKeys.toStructType, partSpec, query) => + case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) => InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists) case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => @@ -98,25 +101,6 @@ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] { query, mode == SaveMode.Ignore) } - - /** - * Returns true if the [[InsertIntoTable]] plan has already been preprocessed by analyzer rule - * [[PreprocessTableInsertion]]. It is important that this rule([[HiveAnalysis]]) has to - * be run after [[PreprocessTableInsertion]], to normalize the column names in partition spec and - * fix the schema mismatch by adding Cast. - */ - private def hasBeenPreprocessed( - tableOutput: Seq[Attribute], - partSchema: StructType, - partSpec: Map[String, Option[String]], - query: LogicalPlan): Boolean = { - val partColNames = partSchema.map(_.name).toSet - query.resolved && partSpec.keys.forall(partColNames.contains) && { - val staticPartCols = partSpec.filter(_._2.isDefined).keySet - val expectedColumns = tableOutput.filterNot(a => staticPartCols.contains(a.name)) - expectedColumns.toStructType.sameType(query.schema) - } - } } /** From 3336699461c532fe30f97aa32f715e3772dee817 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 23 Jan 2017 17:04:14 +0800 Subject: [PATCH 2/5] simplify data source analysis --- .../sql/catalyst/analysis/Analyzer.scala | 4 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 26 +--- .../plans/logical/basicLogicalOperators.scala | 15 +- .../apache/spark/sql/DataFrameWriter.scala | 2 +- .../spark/sql/execution/SparkPlanner.scala | 1 - .../spark/sql/execution/SparkStrategies.scala | 28 ---- .../spark/sql/execution/command/tables.scala | 6 +- .../datasources/DataSourceStrategy.scala | 72 +++++----- .../spark/sql/execution/datasources/ddl.scala | 18 ++- .../sql/execution/datasources/rules.scala | 133 +++++++----------- .../spark/sql/internal/SessionState.scala | 5 +- .../sql/execution/command/DDLSuite.scala | 4 +- .../spark/sql/sources/InsertSuite.scala | 2 +- .../spark/sql/hive/HiveSessionState.scala | 17 +-- .../spark/sql/hive/HiveStrategies.scala | 5 +- 15 files changed, 130 insertions(+), 208 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index cb56e94c0a77a..2190b324be50b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -601,8 +601,8 @@ class Analyzer( } def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => - i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) + case i @ InsertIntoTable(table: SubqueryAlias, _, _, _, _) => + i.copy(table = EliminateSubqueryAliases(table)) case u: UnresolvedRelation => resolveRelation(u) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 5f7609fa13e6d..18d78b9bf3888 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -370,28 +370,6 @@ trait CheckAnalysis extends PredicateHelper { |Conflicting attributes: ${conflictingAttributes.mkString(",")} """.stripMargin) - case InsertIntoTable(t, _, _, _, _) - if !t.isInstanceOf[LeafNode] || - t.isInstanceOf[Range] || - t == OneRowRelation || - t.isInstanceOf[LocalRelation] => - failAnalysis(s"Inserting into an RDD-based table is not allowed.") - - case i @ InsertIntoTable(table, partitions, query, _, _) => - val numStaticPartitions = partitions.values.count(_.isDefined) - if (table.output.size != (query.output.size + numStaticPartitions)) { - failAnalysis( - s"$table requires that the data to be inserted have the same number of " + - s"columns as the target table: target table has ${table.output.size} " + - s"column(s) but the inserted data has " + - s"${query.output.size + numStaticPartitions} column(s), including " + - s"$numStaticPartitions partition column(s) having constant value(s).") - } - - case o if !o.resolved => - failAnalysis( - s"unresolved operator ${operator.simpleString}") - case o if o.expressions.exists(!_.deterministic) && !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] && !o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] => @@ -407,6 +385,10 @@ trait CheckAnalysis extends PredicateHelper { } } extendedCheckRules.foreach(_(plan)) + plan.foreachUp { + case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}") + case _ => + } plan.foreach(_.setAnalyzed()) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 432097d6218d0..a883df0679d31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -363,7 +363,8 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { } /** - * Insert some data into a table. + * Insert some data into a table. Note that this plan is unresolved and has to be replaced by the + * concrete implementations during analysis. * * @param table the logical plan representing the table. In the future this should be a * [[org.apache.spark.sql.catalyst.catalog.CatalogTable]] once we converge Hive tables @@ -374,25 +375,23 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { * Map('a' -> Some('1'), 'b' -> Some('2')), * and `INSERT INTO tbl PARTITION (a=1, b) AS ...` * would have Map('a' -> Some('1'), 'b' -> None). - * @param child the logical plan representing data to write to. + * @param query the logical plan representing data to write to. * @param overwrite overwrite existing table or partitions. * @param ifNotExists If true, only write if the table or partition does not exist. */ case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], - child: LogicalPlan, + query: LogicalPlan, overwrite: Boolean, ifNotExists: Boolean) extends LogicalPlan { - - override def children: Seq[LogicalPlan] = child :: Nil - override def output: Seq[Attribute] = Seq.empty - assert(overwrite || !ifNotExists) assert(partition.values.forall(_.nonEmpty) || !ifNotExists) - override lazy val resolved: Boolean = childrenResolved && table.resolved + override def children: Seq[LogicalPlan] = table :: query :: Nil + override def output: Seq[Attribute] = Seq.empty + override lazy val resolved: Boolean = false } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ff1f0177e8ba0..81657d9e47fe7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -265,7 +265,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { InsertIntoTable( table = UnresolvedRelation(tableIdent), partition = Map.empty[String, Option[String]], - child = df.logicalPlan, + query = df.logicalPlan, overwrite = mode == SaveMode.Overwrite, ifNotExists = false)).toRdd } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 73e2ffdf007d3..678241656c011 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -36,7 +36,6 @@ class SparkPlanner( extraStrategies ++ ( FileSourceStrategy :: DataSourceStrategy :: - DDLStrategy :: SpecialLimits :: Aggregation :: JoinSelection :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index fafb91967086f..e3ec343479a03 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -405,32 +405,4 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => Nil } } - - object DDLStrategy extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => - val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore) - ExecutedCommandExec(cmd) :: Nil - - case CreateTable(tableDesc, mode, None) => - val cmd = - CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) - ExecutedCommandExec(cmd) :: Nil - - // CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule - // `CreateTables` - - case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isDatasourceTable(tableDesc) => - val cmd = - CreateDataSourceTableAsSelectCommand( - tableDesc, - mode, - query) - ExecutedCommandExec(cmd) :: Nil - - case c: CreateTempViewUsing => ExecutedCommandExec(c) :: Nil - - case _ => Nil - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 1b596c97a1c4e..6f71ae057b165 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -111,10 +111,12 @@ case class CreateTableLikeCommand( * [AS select_statement]; * }}} */ -case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand { +case class CreateTableCommand( + table: CatalogTable, + ignoreIfExists: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - sparkSession.sessionState.catalog.createTable(table, ifNotExists) + sparkSession.sessionState.catalog.createTable(table, ignoreIfExists) Seq.empty[Row] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 19db293132f54..a9c95b834e234 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -130,6 +130,17 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { } override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) => + CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) + + case CreateTable(tableDesc, mode, Some(query)) + if query.resolved && DDLUtils.isDatasourceTable(tableDesc) => + CreateDataSourceTableAsSelectCommand(tableDesc, mode, query) + + case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _), + parts, query, overwrite, false) if parts.isEmpty => + InsertIntoDataSourceCommand(l, query, overwrite) + case InsertIntoTable( l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, false) => // If the InsertIntoTable command is for a partitioned HadoopFsRelation and @@ -199,40 +210,33 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { * Replaces [[SimpleCatalogRelation]] with data source table if its table provider is not hive. */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { - private def readDataSourceTable(table: CatalogTable): LogicalPlan = { - val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) - val cache = sparkSession.sessionState.catalog.tableRelationCache - val withHiveSupport = - sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive" - - cache.get(qualifiedTableName, new Callable[LogicalPlan]() { - override def call(): LogicalPlan = { - val pathOption = table.storage.locationUri.map("path" -> _) - val dataSource = - DataSource( - sparkSession, - // In older version(prior to 2.1) of Spark, the table schema can be empty and should be - // inferred at runtime. We should still support it. - userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema), - partitionColumns = table.partitionColumnNames, - bucketSpec = table.bucketSpec, - className = table.provider.get, - options = table.storage.properties ++ pathOption, - // TODO: improve `InMemoryCatalog` and remove this limitation. - catalogTable = if (withHiveSupport) Some(table) else None) - - LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table)) - } - }) - } - override def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) - if DDLUtils.isDatasourceTable(s.metadata) => - i.copy(table = readDataSourceTable(s.metadata)) - case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => - readDataSourceTable(s.metadata) + val table = s.metadata + val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) + val cache = sparkSession.sessionState.catalog.tableRelationCache + val withHiveSupport = + sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive" + + cache.get(qualifiedTableName, new Callable[LogicalPlan]() { + override def call(): LogicalPlan = { + val pathOption = table.storage.locationUri.map("path" -> _) + val dataSource = + DataSource( + sparkSession, + // In older version(prior to 2.1) of Spark, the table schema can be empty and should + // be inferred at runtime. We should still support it. + userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema), + partitionColumns = table.partitionColumnNames, + bucketSpec = table.bucketSpec, + className = table.provider.get, + options = table.storage.properties ++ pathOption, + // TODO: improve `InMemoryCatalog` and remove this limitation. + catalogTable = if (withHiveSupport) Some(table) else None) + + LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table)) + } + }) } } @@ -273,10 +277,6 @@ object DataSourceStrategy extends Strategy with Logging { Map.empty, None) :: Nil - case InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _), - part, query, overwrite, false) if part.isEmpty => - ExecutedCommandExec(InsertIntoDataSourceCommand(l, query, overwrite)) :: Nil - case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 695ba1234d458..a77cc70ef3b27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -20,15 +20,23 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} -import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.types._ +/** + * Create a table and optionally insert some data into it. Note that this plan is unresolved and + * has to be replaced by the concrete implementations during analyse. + * + * @param tableDesc the metadata of the table to be created. + * @param mode the data writing mode + * @param query an optional logical plan representing data to write into the created table. + */ case class CreateTable( tableDesc: CatalogTable, mode: SaveMode, - query: Option[LogicalPlan]) extends Command { + query: Option[LogicalPlan]) extends LogicalPlan { assert(tableDesc.provider.isDefined, "The table to be created must have a provider.") if (query.isEmpty) { @@ -37,7 +45,9 @@ case class CreateTable( "create table without data insertion can only use ErrorIfExists or Ignore as SaveMode.") } - override def innerChildren: Seq[QueryPlan[_]] = query.toSeq + override def children: Seq[LogicalPlan] = query.toSeq + override def output: Seq[Attribute] = Seq.empty + override lazy val resolved: Boolean = false } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 87e7017aee3a4..3b23b4640643b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -65,10 +65,10 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { } /** - * Analyze [[CreateTable]] and do some normalization and checking. - * For CREATE TABLE AS SELECT, the SELECT query is also analyzed. + * Preprocess [[CreateTable]], to do some normalization and checking. */ -case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { +case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[LogicalPlan] { + private val catalog = sparkSession.sessionState.catalog def apply(plan: LogicalPlan): LogicalPlan = plan transform { // When we CREATE TABLE without specifying the table schema, we should fail the query if @@ -91,16 +91,10 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl // bucket spec, etc. match the existing table, and adjust the columns order of the given query // if necessary. case c @ CreateTable(tableDesc, SaveMode.Append, Some(query)) - if sparkSession.sessionState.catalog.tableExists(tableDesc.identifier) => + if query.resolved && catalog.tableExists(tableDesc.identifier) => // This is guaranteed by the parser and `DataFrameWriter` assert(tableDesc.schema.isEmpty && tableDesc.provider.isDefined) - // Analyze the query in CTAS and then we can do the normalization and checking. - val qe = sparkSession.sessionState.executePlan(query) - qe.assertAnalyzed() - val analyzedQuery = qe.analyzed - - val catalog = sparkSession.sessionState.catalog val db = tableDesc.identifier.database.getOrElse(catalog.getCurrentDatabase) val tableIdentWithDB = tableDesc.identifier.copy(database = Some(db)) val tableName = tableIdentWithDB.unquotedString @@ -126,7 +120,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl s"`${specifiedProvider.getSimpleName}`.") } - if (analyzedQuery.schema.length != existingTable.schema.length) { + if (query.schema.length != existingTable.schema.length) { throw new AnalysisException( s"The column number of the existing table $tableName" + s"(${existingTable.schema.catalogString}) doesn't match the data schema" + @@ -140,8 +134,8 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl // adjust the column order of the given dataframe according to it, or throw exception // if the column names do not match. val adjustedColumns = tableCols.map { col => - analyzedQuery.resolve(Seq(col), resolver).getOrElse { - val inputColumns = analyzedQuery.schema.map(_.name).mkString(", ") + query.resolve(Seq(col), resolver).getOrElse { + val inputColumns = query.schema.map(_.name).mkString(", ") throw new AnalysisException( s"cannot resolve '$col' given input columns: [$inputColumns]") } @@ -177,10 +171,10 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl """.stripMargin) } - val newQuery = if (adjustedColumns != analyzedQuery.output) { - Project(adjustedColumns, analyzedQuery) + val newQuery = if (adjustedColumns != query.output) { + Project(adjustedColumns, query) } else { - analyzedQuery + query } c.copy( @@ -198,15 +192,12 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl // * partition columns' type must be AtomicType. // * sort columns' type must be orderable. // * reorder table schema or output of query plan, to put partition columns at the end. - case c @ CreateTable(tableDesc, _, query) => + case c @ CreateTable(tableDesc, _, query) if query.forall(_.resolved) => if (query.isDefined) { assert(tableDesc.schema.isEmpty, "Schema may not be specified in a Create Table As Select (CTAS) statement") - val qe = sparkSession.sessionState.executePlan(query.get) - qe.assertAnalyzed() - val analyzedQuery = qe.analyzed - + val analyzedQuery = query.get val normalizedTable = normalizeCatalogTable(analyzedQuery.schema, tableDesc) val output = analyzedQuery.output @@ -326,21 +317,20 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { val normalizedPartSpec = PartitioningUtils.normalizePartitionSpec( insert.partition, partColNames, tblName, conf.resolver) - val expectedColumns = { - val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet - insert.table.output.filterNot(a => staticPartCols.contains(a.name)) - } + val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet + val expectedColumns = insert.table.output.filterNot(a => staticPartCols.contains(a.name)) - if (expectedColumns.length != insert.child.schema.length) { - throw new AnalysisException( - s"Cannot insert into table $tblName because the number of columns are different: " + - s"need ${expectedColumns.length} columns, " + - s"but query has ${insert.child.schema.length} columns.") + if (expectedColumns.length != insert.query.schema.length) { + failAnalysis( + s"$tblName requires that the data to be inserted have the same number of columns as the " + + s"target table: target table has ${insert.table.output.size} column(s) but the " + + s"inserted data has ${insert.query.output.length + staticPartCols.size} column(s), " + + s"including ${staticPartCols.size} partition column(s) having constant value(s).") } if (normalizedPartSpec.nonEmpty) { if (normalizedPartSpec.size != partColNames.length) { - throw new AnalysisException( + failAnalysis( s""" |Requested partitioning does not match the table $tblName: |Requested partitions: ${normalizedPartSpec.keys.mkString(",")} @@ -360,7 +350,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { private def castAndRenameChildOutput( insert: InsertIntoTable, expectedOutput: Seq[Attribute]): InsertIntoTable = { - val newChildOutput = expectedOutput.zip(insert.child.output).map { + val newChildOutput = expectedOutput.zip(insert.query.output).map { case (expected, actual) => if (expected.dataType.sameType(actual.dataType) && expected.name == actual.name && @@ -375,15 +365,17 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { } } - if (newChildOutput == insert.child.output) { + if (newChildOutput == insert.query.output) { insert } else { - insert.copy(child = Project(newChildOutput, insert.child)) + insert.copy(query = Project(newChildOutput, insert.query)) } } + private def failAnalysis(msg: String) = throw new AnalysisException(msg) + def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(table, _, child, _, _) if table.resolved && child.resolved => + case i @ InsertIntoTable(table, _, query, _, _) if i.childrenResolved => table match { case relation: CatalogRelation => val metadata = relation.catalogTable @@ -394,7 +386,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { case LogicalRelation(_: InsertableRelation, _, catalogTable) => val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown") preprocess(i, tblName, Nil) - case other => i + case _ => i } } } @@ -405,10 +397,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { object HiveOnlyCheck extends (LogicalPlan => Unit) { def apply(plan: LogicalPlan): Unit = { plan.foreach { - case CreateTable(tableDesc, _, Some(_)) if DDLUtils.isHiveTable(tableDesc) => - throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT") case CreateTable(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) => - throw new AnalysisException("Hive support is required to CREATE Hive TABLE") + throw new AnalysisException("Hive support is required to CREATE Hive TABLE (AS SELECT)") case _ => // OK } } @@ -417,63 +407,40 @@ object HiveOnlyCheck extends (LogicalPlan => Unit) { /** * A rule to do various checks before inserting into or writing to a data source table. */ -case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) - extends (LogicalPlan => Unit) { +object PreWriteCheck extends (LogicalPlan => Unit) { def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) } def apply(plan: LogicalPlan): Unit = { plan.foreach { - case logical.InsertIntoTable( - l @ LogicalRelation(t: InsertableRelation, _, _), partition, query, _, _) => - // Right now, we do not support insert into a data source table with partition specs. - if (partition.nonEmpty) { - failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.") - } else { - // Get all input data source relations of the query. - val srcRelations = query.collect { - case LogicalRelation(src: BaseRelation, _, _) => src - } - if (srcRelations.contains(t)) { - failAnalysis( - "Cannot insert overwrite into table that is also being read from.") - } else { - // OK - } + case InsertIntoTable(l @ LogicalRelation(relation, _, _), partition, query, _, _) => + // Get all input data source relations of the query. + val srcRelations = query.collect { + case LogicalRelation(src, _, _) => src } - - case logical.InsertIntoTable( - LogicalRelation(r: HadoopFsRelation, _, _), part, query, _, _) => - // We need to make sure the partition columns specified by users do match partition - // columns of the relation. - val existingPartitionColumns = r.partitionSchema.fieldNames.toSet - val specifiedPartitionColumns = part.keySet - if (existingPartitionColumns != specifiedPartitionColumns) { - failAnalysis("Specified partition columns " + - s"(${specifiedPartitionColumns.mkString(", ")}) " + - "do not match the partition columns of the table. Please use " + - s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.") + if (srcRelations.contains(relation)) { + failAnalysis("Cannot insert into table that is also being read from.") } else { // OK } - PartitioningUtils.validatePartitionColumn( - r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis) + relation match { + case _: HadoopFsRelation => // OK - // Get all input data source relations of the query. - val srcRelations = query.collect { - case LogicalRelation(src: BaseRelation, _, _) => src - } - if (srcRelations.contains(r)) { - failAnalysis( - "Cannot insert overwrite into table that is also being read from.") - } else { - // OK + // Right now, we do not support insert into a non-file-based data source table with + // partition specs. + case _: InsertableRelation if partition.nonEmpty => + failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.") + + case _ => failAnalysis(s"$relation does not allow insertion.") } - case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) => - // The relation in l is not an InsertableRelation. - failAnalysis(s"$l does not allow insertion.") + case InsertIntoTable(t, _, _, _, _) + if !t.isInstanceOf[LeafNode] || + t.isInstanceOf[Range] || + t == OneRowRelation || + t.isInstanceOf[LocalRelation] => + failAnalysis(s"Inserting into an RDD-based table is not allowed.") case _ => // OK } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 68b774b52fd7f..79c45cffa182c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -118,12 +118,11 @@ private[sql] class SessionState(sparkSession: SparkSession) { new ResolveDataSource(sparkSession) :: Nil override val postHocResolutionRules = - AnalyzeCreateTable(sparkSession) :: + PreprocessTableCreation(sparkSession) :: PreprocessTableInsertion(conf) :: DataSourceAnalysis(conf) :: Nil - override val extendedCheckRules = - Seq(PreWriteCheck(conf, catalog), HiveOnlyCheck) + override val extendedCheckRules = Seq(PreWriteCheck, HiveOnlyCheck) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 51f5946c19e00..42fc52b480a7a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1540,13 +1540,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { var e = intercept[AnalysisException] { sql("CREATE TABLE t SELECT 1 as a, 1 as b") }.getMessage - assert(e.contains("Hive support is required to use CREATE Hive TABLE AS SELECT")) + assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)")) spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1") e = intercept[AnalysisException] { sql("CREATE TABLE t SELECT a, b from t1") }.getMessage - assert(e.contains("Hive support is required to use CREATE Hive TABLE AS SELECT")) + assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 13284ba649abd..5b215ca07f0d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -113,7 +113,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { |INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt """.stripMargin) }.getMessage - assert(message.contains("the number of columns are different") + assert(message.contains("target table has 2 column(s) but the inserted data has 1 column(s)") ) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 9fd03ef8ba037..c929f83164ac0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -68,12 +68,12 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) new ResolveDataSource(sparkSession) :: Nil override val postHocResolutionRules = - AnalyzeCreateTable(sparkSession) :: + PreprocessTableCreation(sparkSession) :: PreprocessTableInsertion(conf) :: DataSourceAnalysis(conf) :: new HiveAnalysis(sparkSession) :: Nil - override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) + override val extendedCheckRules = Seq(PreWriteCheck) } } @@ -86,18 +86,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override val sparkSession: SparkSession = self.sparkSession override def strategies: Seq[Strategy] = { - experimentalMethods.extraStrategies ++ Seq( - FileSourceStrategy, - DataSourceStrategy, - DDLStrategy, - SpecialLimits, - InMemoryScans, - HiveTableScans, - Scripts, - Aggregation, - JoinSelection, - BasicOperators - ) + super.strategies ++ Seq(HiveTableScans, Scripts) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 6cde783c5ae32..8c5794e745d86 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.{CreateTable, PreprocessTableInsertion} import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -87,6 +87,9 @@ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] { case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) => InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists) + case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => + CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) + case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => // Currently `DataFrameWriter.saveAsTable` doesn't support the Append mode of hive serde // tables yet. From dcea3a61f3670e1080ab3b863d17fd2ccfba902a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 24 Jan 2017 00:51:52 +0800 Subject: [PATCH 3/5] fix test --- .../sql/catalyst/analysis/Analyzer.scala | 4 +- .../plans/logical/basicLogicalOperators.scala | 3 +- .../datasources/DataSourceStrategy.scala | 57 +++++++++++-------- .../sql/execution/datasources/rules.scala | 6 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 24 +++----- .../spark/sql/hive/HiveSessionState.scala | 12 +++- .../spark/sql/hive/HiveDDLCommandSuite.scala | 2 +- .../sql/hive/InsertIntoHiveTableSuite.scala | 3 +- .../apache/spark/sql/hive/parquetSuites.scala | 16 ++---- 9 files changed, 66 insertions(+), 61 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2190b324be50b..cb56e94c0a77a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -601,8 +601,8 @@ class Analyzer( } def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoTable(table: SubqueryAlias, _, _, _, _) => - i.copy(table = EliminateSubqueryAliases(table)) + case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => + i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) case u: UnresolvedRelation => resolveRelation(u) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index a883df0679d31..8d7a6bc4b573d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -389,7 +389,8 @@ case class InsertIntoTable( assert(overwrite || !ifNotExists) assert(partition.values.forall(_.nonEmpty) || !ifNotExists) - override def children: Seq[LogicalPlan] = table :: query :: Nil + // We don't want `table` in children as sometimes we don't want to transform it. + override def children: Seq[LogicalPlan] = query :: Nil override def output: Seq[Attribute] = Seq.empty override lazy val resolved: Boolean = false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index a9c95b834e234..4dbbbc23fbc25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -210,33 +210,40 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { * Replaces [[SimpleCatalogRelation]] with data source table if its table provider is not hive. */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { + private def readDataSourceTable(table: CatalogTable): LogicalPlan = { + val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) + val cache = sparkSession.sessionState.catalog.tableRelationCache + val withHiveSupport = + sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive" + + cache.get(qualifiedTableName, new Callable[LogicalPlan]() { + override def call(): LogicalPlan = { + val pathOption = table.storage.locationUri.map("path" -> _) + val dataSource = + DataSource( + sparkSession, + // In older version(prior to 2.1) of Spark, the table schema can be empty and should be + // inferred at runtime. We should still support it. + userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema), + partitionColumns = table.partitionColumnNames, + bucketSpec = table.bucketSpec, + className = table.provider.get, + options = table.storage.properties ++ pathOption, + // TODO: improve `InMemoryCatalog` and remove this limitation. + catalogTable = if (withHiveSupport) Some(table) else None) + + LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table)) + } + }) + } + override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) + if DDLUtils.isDatasourceTable(s.metadata) => + i.copy(table = readDataSourceTable(s.metadata)) + case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => - val table = s.metadata - val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) - val cache = sparkSession.sessionState.catalog.tableRelationCache - val withHiveSupport = - sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive" - - cache.get(qualifiedTableName, new Callable[LogicalPlan]() { - override def call(): LogicalPlan = { - val pathOption = table.storage.locationUri.map("path" -> _) - val dataSource = - DataSource( - sparkSession, - // In older version(prior to 2.1) of Spark, the table schema can be empty and should - // be inferred at runtime. We should still support it. - userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema), - partitionColumns = table.partitionColumnNames, - bucketSpec = table.bucketSpec, - className = table.provider.get, - options = table.storage.properties ++ pathOption, - // TODO: improve `InMemoryCatalog` and remove this limitation. - catalogTable = if (withHiveSupport) Some(table) else None) - - LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table)) - } - }) + readDataSourceTable(s.metadata) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 52408973081a9..23150b1ae26ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -319,7 +319,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { val expectedColumns = insert.table.output.filterNot(a => staticPartCols.contains(a.name)) if (expectedColumns.length != insert.query.schema.length) { - failAnalysis( + throw new AnalysisException( s"$tblName requires that the data to be inserted have the same number of columns as the " + s"target table: target table has ${insert.table.output.size} column(s) but the " + s"inserted data has ${insert.query.output.length + staticPartCols.size} column(s), " + @@ -328,7 +328,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { if (normalizedPartSpec.nonEmpty) { if (normalizedPartSpec.size != partColNames.length) { - failAnalysis( + throw new AnalysisException( s""" |Requested partitioning does not match the table $tblName: |Requested partitions: ${normalizedPartSpec.keys.mkString(",")} @@ -370,8 +370,6 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { } } - private def failAnalysis(msg: String) = throw new AnalysisException(msg) - def apply(plan: LogicalPlan): LogicalPlan = plan transform { case i @ InsertIntoTable(table, _, query, _, _) if i.childrenResolved => table match { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index faa76b73fde4b..677da0dbdc654 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -247,16 +247,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } override def apply(plan: LogicalPlan): LogicalPlan = { - if (!plan.resolved || plan.analyzed) { - return plan - } - plan transformUp { // Write path - case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) - // Inserting into partitioned table is not supported in Parquet data source (yet). - if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => - InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists) + case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Parquet data source (yet). + if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => + InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists) // Read path case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) => @@ -285,16 +281,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } override def apply(plan: LogicalPlan): LogicalPlan = { - if (!plan.resolved || plan.analyzed) { - return plan - } - plan transformUp { // Write path - case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) - // Inserting into partitioned table is not supported in Orc data source (yet). - if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => - InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists) + case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Orc data source (yet). + if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => + InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists) // Read path case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index c929f83164ac0..47d6880185890 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -86,7 +86,17 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override val sparkSession: SparkSession = self.sparkSession override def strategies: Seq[Strategy] = { - super.strategies ++ Seq(HiveTableScans, Scripts) + experimentalMethods.extraStrategies ++ Seq( + FileSourceStrategy, + DataSourceStrategy, + SpecialLimits, + InMemoryScans, + HiveTableScans, + Scripts, + Aggregation, + JoinSelection, + BasicOperators + ) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index b67e5f6fe57a1..7008352009f8c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -52,7 +52,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle private def analyzeCreateTable(sql: String): CatalogTable = { TestHive.sessionState.analyzer.execute(parser.parsePlan(sql)).collect { - case CreateTable(tableDesc, mode, _) => tableDesc + case CreateTableCommand(tableDesc, _) => tableDesc }.head } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index e3ddaf725424d..71ce5a7c4a15a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -376,7 +376,8 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef val e = intercept[AnalysisException] { sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3") } - assert(e.message.contains("the number of columns are different")) + assert(e.message.contains( + "target table has 4 column(s) but the inserted data has 5 column(s)")) } testPartitionedTable("SPARK-16037: INSERT statement should match columns by position") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index e9239ea56f1fb..1a1b2571b67b1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -307,13 +307,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") - df.queryExecution.sparkPlan match { - case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) => + df.queryExecution.analyzed match { + case cmd: InsertIntoHadoopFsRelationCommand => assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet")) case o => fail("test_insert_parquet should be converted to a " + - s"${classOf[HadoopFsRelation ].getCanonicalName} and " + - s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan. " + - s"However, found a ${o.toString} ") + s"${classOf[HadoopFsRelation ].getCanonicalName}. However, found a ${o.toString}") } checkAnswer( @@ -338,13 +336,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") - df.queryExecution.sparkPlan match { - case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) => + df.queryExecution.analyzed match { + case cmd: InsertIntoHadoopFsRelationCommand => assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet")) case o => fail("test_insert_parquet should be converted to a " + - s"${classOf[HadoopFsRelation ].getCanonicalName} and " + - s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan." + - s"However, found a ${o.toString} ") + s"${classOf[HadoopFsRelation ].getCanonicalName}. However, found a ${o.toString}") } checkAnswer( From 4b68c168b0e16071b91c93fc7f2be8fabda46fbe Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 24 Jan 2017 12:30:30 +0800 Subject: [PATCH 4/5] update --- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 3 ++- .../org/apache/spark/sql/execution/datasources/rules.scala | 5 ++--- .../scala/org/apache/spark/sql/hive/HiveSessionState.scala | 4 ++-- .../scala/org/apache/spark/sql/hive/HiveStrategies.scala | 5 +++-- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 4dbbbc23fbc25..41ae41306525c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -45,7 +45,8 @@ import org.apache.spark.unsafe.types.UTF8String * Replaces generic operations with specific variants that are designed to work with Spark * SQL Data Sources. * - * Note that, this rule must be run after [[PreprocessTableInsertion]]. + * Note that, this rule must be run after [[PreprocessTableCreation]] and + * [[PreprocessTableInsertion]]. */ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 23150b1ae26ac..690d6909c7b98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -21,12 +21,11 @@ import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering} -import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation} +import org.apache.spark.sql.sources.InsertableRelation import org.apache.spark.sql.types.{AtomicType, StructType} /** @@ -371,7 +370,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { } def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(table, _, query, _, _) if i.childrenResolved => + case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved => table match { case relation: CatalogRelation => val metadata = relation.catalogTable diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 47d6880185890..6d88784b4d7d9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -60,14 +60,14 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override lazy val analyzer: Analyzer = { new Analyzer(catalog, conf) { override val extendedResolutionRules = - catalog.ParquetConversions :: - catalog.OrcConversions :: new DetermineHiveSerde(conf) :: new FindDataSourceTable(sparkSession) :: new FindHiveSerdeTable(sparkSession) :: new ResolveDataSource(sparkSession) :: Nil override val postHocResolutionRules = + catalog.ParquetConversions :: + catalog.OrcConversions :: PreprocessTableCreation(sparkSession) :: PreprocessTableInsertion(conf) :: DataSourceAnalysis(conf) :: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 8c5794e745d86..e400209e9dcf0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} -import org.apache.spark.sql.execution.datasources.{CreateTable, PreprocessTableInsertion} +import org.apache.spark.sql.execution.datasources.{CreateTable, PreprocessTableCreation, PreprocessTableInsertion} import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -80,7 +80,8 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] { /** * Replaces generic operations with specific variants that are designed to work with Hive. * - * Note that, this rule must be run after [[PreprocessTableInsertion]]. + * Note that, this rule must be run after [[PreprocessTableCreation]] and + * [[PreprocessTableInsertion]]. */ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { From 9742f78412026b8bd83a3cd2f58c682362318d69 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 6 Feb 2017 20:51:32 +0800 Subject: [PATCH 5/5] address comments --- .../sql/catalyst/analysis/UnsupportedOperationChecker.scala | 3 --- .../scala/org/apache/spark/sql/execution/datasources/ddl.scala | 2 +- .../scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala | 2 ++ 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index f4d016cb96711..e4fd737b35eb1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -111,9 +111,6 @@ object UnsupportedOperationChecker { throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " + "streaming DataFrames/Datasets") - case _: InsertIntoTable => - throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets") - case Join(left, right, joinType, _) => joinType match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 382b04512317b..110d503f91cf4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.types._ /** * Create a table and optionally insert some data into it. Note that this plan is unresolved and - * has to be replaced by the concrete implementations during analyse. + * has to be replaced by the concrete implementations during analysis. * * @param tableDesc the metadata of the table to be created. * @param mode the data writing mode diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 44ef5cce2ee05..c9be1b9d100b0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -68,6 +68,8 @@ private[sql] class HiveSessionCatalog( // and HiveCatalog. We should still do it at some point... private val metastoreCatalog = new HiveMetastoreCatalog(sparkSession) + // These 2 rules must be run before all other DDL post-hoc resolution rules, i.e. + // `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`. val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions