From b5c27874bdbd60a34e814559c0e428e16cacd95d Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Tue, 30 Nov 2021 14:45:44 +0700 Subject: [PATCH] [SPARK-36902][SQL] Migrate CreateTableAsSelectStatement to v2 command --- .../catalyst/analysis/ResolveCatalogs.scala | 12 ----- .../sql/catalyst/parser/AstBuilder.scala | 10 ++-- .../catalyst/plans/logical/statements.scala | 23 -------- .../catalyst/plans/logical/v2Commands.scala | 23 +++++--- .../sql/connector/catalog/CatalogV2Util.scala | 7 +-- ...eateTablePartitioningValidationSuite.scala | 54 ++++++++++--------- .../sql/catalyst/parser/DDLParserSuite.scala | 26 ++++----- .../apache/spark/sql/DataFrameWriter.scala | 44 ++++++++------- .../apache/spark/sql/DataFrameWriterV2.scala | 23 ++++---- .../analysis/ResolveSessionCatalog.scala | 29 +++++----- .../datasources/v2/DataSourceV2Strategy.scala | 13 ++--- .../v2/WriteToDataSourceV2Exec.scala | 24 +++++++-- .../V2CommandsCaseSensitivitySuite.scala | 18 ++++--- .../command/PlanResolutionSuite.scala | 46 +++++----------- 14 files changed, 160 insertions(+), 192 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 83d7b932a8bc6..c53393e99b262 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -37,18 +37,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case UnresolvedDBObjectName(CatalogAndIdentifier(catalog, identifier), _) => ResolvedDBObjectName(catalog, identifier.namespace :+ identifier.name()) - case c @ CreateTableAsSelectStatement( - NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) => - CreateTableAsSelect( - catalog.asTableCatalog, - tbl.asIdentifier, - // convert the bucket spec and add it as a transform - c.partitioning ++ c.bucketSpec.map(_.asTransform), - c.asSelect, - convertTableProperties(c), - writeOptions = c.writeOptions, - ignoreIfExists = c.ifNotExists) - case c @ ReplaceTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) => ReplaceTable( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 2c32ba7db5512..8e6d3203ae948 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3410,7 +3410,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Create a table, returning a [[CreateTable]] or [[CreateTableAsSelectStatement]] logical plan. + * Create a table, returning a [[CreateTable]] or [[CreateTableAsSelect]] logical plan. * * Expected format: * {{{ @@ -3470,9 +3470,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg ctx) case Some(query) => - CreateTableAsSelectStatement( - table, query, partitioning, bucketSpec, properties, provider, options, location, comment, - writeOptions = Map.empty, serdeInfo, external = external, ifNotExists = ifNotExists) + val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment, + serdeInfo, external) + CreateTableAsSelect( + UnresolvedDBObjectName(table, isNamespace = false), + partitioning, query, tableSpec, Map.empty, ifNotExists) case _ => // Note: table schema includes both the table columns list and the partition columns diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index a6ed304e7155c..9563753c292f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -123,29 +123,6 @@ object SerdeInfo { } } -/** - * A CREATE TABLE AS SELECT command, as parsed from SQL. - */ -case class CreateTableAsSelectStatement( - tableName: Seq[String], - asSelect: LogicalPlan, - partitioning: Seq[Transform], - bucketSpec: Option[BucketSpec], - properties: Map[String, String], - provider: Option[String], - options: Map[String, String], - location: Option[String], - comment: Option[String], - writeOptions: Map[String, String], - serde: Option[SerdeInfo], - external: Boolean, - ifNotExists: Boolean) extends UnaryParsedStatement { - - override def child: LogicalPlan = asSelect - override protected def withNewChildInternal(newChild: LogicalPlan): CreateTableAsSelectStatement = - copy(asSelect = newChild) -} - /** * A REPLACE TABLE command, as parsed from SQL. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index d39e28865da82..c2ea1af85ec10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -221,16 +221,22 @@ case class CreateTable( * Create a new table from a select query with a v2 catalog. */ case class CreateTableAsSelect( - catalog: TableCatalog, - tableName: Identifier, + name: LogicalPlan, partitioning: Seq[Transform], query: LogicalPlan, - properties: Map[String, String], + tableSpec: TableSpec, writeOptions: Map[String, String], - ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan { + ignoreIfExists: Boolean) extends BinaryCommand with V2CreateTablePlan { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper override def tableSchema: StructType = query.schema - override def child: LogicalPlan = query + override def left: LogicalPlan = name + override def right: LogicalPlan = query + + override def tableName: Identifier = { + assert(left.resolved) + left.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier + } override lazy val resolved: Boolean = childrenResolved && { // the table schema is created from the query schema, so the only resolution needed is to check @@ -243,8 +249,11 @@ case class CreateTableAsSelect( this.copy(partitioning = rewritten) } - override protected def withNewChildInternal(newChild: LogicalPlan): CreateTableAsSelect = - copy(query = newChild) + override protected def withNewChildrenInternal( + newLeft: LogicalPlan, + newRight: LogicalPlan + ): CreateTableAsSelect = + copy(name = newLeft, query = newRight) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index fabc73f9cf694..9bd5075680b31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -23,7 +23,7 @@ import java.util.Collections import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo} +import org.apache.spark.sql.catalyst.plans.logical.{ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo} import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} @@ -305,11 +305,6 @@ private[sql] object CatalogV2Util { catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) } - def convertTableProperties(c: CreateTableAsSelectStatement): Map[String, String] = { - convertTableProperties( - c.properties, c.options, c.serde, c.location, c.comment, c.provider, c.external) - } - def convertTableProperties(r: ReplaceTableStatement): Map[String, String] = { convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala index aee8e31f3ebf8..41b22bc019014 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala @@ -20,22 +20,22 @@ package org.apache.spark.sql.catalyst.analysis import java.util import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode} -import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog, Table, TableCapability, TableCatalog} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, TableSpec} +import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, Table, TableCapability, TableCatalog} import org.apache.spark.sql.connector.expressions.Expressions import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap class CreateTablePartitioningValidationSuite extends AnalysisTest { - import CreateTablePartitioningValidationSuite._ test("CreateTableAsSelect: fail missing top-level column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "does_not_exist") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -46,12 +46,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: fail missing top-level column nested reference") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "does_not_exist.z") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -62,12 +63,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: fail missing nested column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "point.z") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -78,12 +80,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: fail with multiple errors") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "does_not_exist", "point.z") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -95,12 +98,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: success with top-level column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "id") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -108,12 +112,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: success using nested column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "point.x") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -121,12 +126,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: success using complex column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "point") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index f4ab8076938dc..13a9d1adc94bb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -719,8 +719,8 @@ class DDLParserSuite extends AnalysisTest { parsedPlan match { case create: CreateTable if newTableToken == "CREATE" => assert(create.ignoreIfExists == expectedIfNotExists) - case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" => - assert(ctas.ifNotExists == expectedIfNotExists) + case ctas: CreateTableAsSelect if newTableToken == "CREATE" => + assert(ctas.ignoreIfExists == expectedIfNotExists) case replace: ReplaceTableStatement if newTableToken == "REPLACE" => case replace: ReplaceTableAsSelectStatement if newTableToken == "REPLACE" => case other => @@ -2310,19 +2310,19 @@ class DDLParserSuite extends AnalysisTest { replace.location, replace.comment, replace.serde) - case ctas: CreateTableAsSelectStatement => + case ctas: CreateTableAsSelect => TableSpec( - ctas.tableName, - Some(ctas.asSelect).filter(_.resolved).map(_.schema), + ctas.name.asInstanceOf[UnresolvedDBObjectName].nameParts, + Some(ctas.query).filter(_.resolved).map(_.schema), ctas.partitioning, - ctas.bucketSpec, - ctas.properties, - ctas.provider, - ctas.options, - ctas.location, - ctas.comment, - ctas.serde, - ctas.external) + ctas.tableSpec.bucketSpec, + ctas.tableSpec.properties, + ctas.tableSpec.provider, + ctas.tableSpec.options, + ctas.tableSpec.location, + ctas.tableSpec.comment, + ctas.tableSpec.serde, + ctas.tableSpec.external) case rtas: ReplaceTableAsSelectStatement => TableSpec( rtas.tableName, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 71d06576bc68e..03d8e41f981bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -23,10 +23,10 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.Stable import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation} +import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedDBObjectName, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateTableAsSelectStatement, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement, TableSpec} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table} import org.apache.spark.sql.connector.catalog.TableCapability._ @@ -323,18 +323,25 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { provider match { case supportsExtract: SupportsCatalogOptions => val ident = supportsExtract.extractIdentifier(dsOptions) - val catalog = CatalogV2Util.getTableProviderCatalog( - supportsExtract, catalogManager, dsOptions) + val catalog = if (ident.namespace().isEmpty) { + Array(dsOptions.get("catalog")) + } else { + ident.namespace() + } val location = Option(dsOptions.get("path")).map(TableCatalog.PROP_LOCATION -> _) - + val tableSpec = TableSpec(None, Map.empty, Some(source), Map.empty, + extraOptions.get("path"), extraOptions.get(TableCatalog.PROP_COMMENT), + None, false) runCommand(df.sparkSession) { CreateTableAsSelect( - catalog, - ident, + UnresolvedDBObjectName( + catalog.toSeq :+ ident.name, + isNamespace = false + ), partitioningAsV2, df.queryExecution.analyzed, - Map(TableCatalog.PROP_PROVIDER -> source) ++ location, + tableSpec, finalOptions, ignoreIfExists = createMode == SaveMode.Ignore) } @@ -604,20 +611,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { // We have a potential race condition here in AppendMode, if the table suddenly gets // created between our existence check and physical execution, but this can't be helped // in any case. - CreateTableAsSelectStatement( - nameParts, - df.queryExecution.analyzed, + val tableSpec = TableSpec(None, Map.empty, Some(source), Map.empty, + extraOptions.get("path"), extraOptions.get(TableCatalog.PROP_COMMENT), + None, false) + + CreateTableAsSelect( + UnresolvedDBObjectName(nameParts, isNamespace = false), partitioningAsV2, - None, - Map.empty, - Some(source), + df.queryExecution.analyzed, + tableSpec, Map.empty, - extraOptions.get("path"), - extraOptions.get(TableCatalog.PROP_COMMENT), - extraOptions.toMap, - None, - ifNotExists = other == SaveMode.Ignore, - external = false) + other == SaveMode.Ignore) } runCommand(df.sparkSession) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index bff7ee4323dde..9b84ea681b7b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -21,9 +21,9 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedRelation} +import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedDBObjectName, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years} -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement, TableSpec} import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.IntegerType @@ -107,21 +107,16 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) } override def create(): Unit = { + val tableSpec = TableSpec(None, properties.toMap, provider, Map.empty, + None, None, None, false) runCommand( - CreateTableAsSelectStatement( - tableName, - logicalPlan, + CreateTableAsSelect( + UnresolvedDBObjectName(tableName, isNamespace = false), partitioning.getOrElse(Seq.empty), - None, - properties.toMap, - provider, - Map.empty, - None, - None, + logicalPlan, + tableSpec, options.toMap, - None, - ifNotExists = false, - external = false)) + false)) } override def replace(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 0940982f7a3f9..dda6924480cf6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -164,26 +164,21 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) tableSpec = newTableSpec) } - case c @ CreateTableAsSelectStatement( - SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) => + case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, _, _) + if isSessionCatalog(catalog) => val (storageFormat, provider) = getStorageFormatAndProvider( - c.provider, c.options, c.location, c.serde, ctas = true) + c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde, + ctas = true) if (!isV2Provider(provider)) { - val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType, - c.partitioning, c.bucketSpec, c.properties, provider, c.location, - c.comment, storageFormat, c.external) - val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - CreateTable(tableDesc, mode, Some(c.asSelect)) + val tableDesc = buildCatalogTable(name.asTableIdentifier, new StructType, + c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties, provider, + c.tableSpec.location, c.tableSpec.comment, storageFormat, c.tableSpec.external) + val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTable(tableDesc, mode, Some(c.query)) } else { - CreateTableAsSelect( - catalog.asTableCatalog, - tbl.asIdentifier, - // convert the bucket spec and add it as a transform - c.partitioning ++ c.bucketSpec.map(_.asTransform), - c.asSelect, - convertTableProperties(c), - writeOptions = c.writeOptions, - ignoreIfExists = c.ifNotExists) + val newTableSpec = c.tableSpec.copy(bucketSpec = None) + c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform), + tableSpec = newTableSpec) } case RefreshTable(ResolvedV1TableIdentifier(ident)) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index f64c1ee001beb..3c5d79b5bfba4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -170,16 +170,17 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema, partitioning, tableSpec, ifNotExists) :: Nil - case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) => - val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) + case CreateTableAsSelect( + ResolvedDBObjectName(catalog, ident), parts, query, tableSpec, options, ifNotExists) => + val propsWithOwner = CatalogV2Util.withDefaultOwnership(tableSpec.properties) val writeOptions = new CaseInsensitiveStringMap(options.asJava) catalog match { case staging: StagingTableCatalog => - AtomicCreateTableAsSelectExec(staging, ident, parts, query, planLater(query), - propsWithOwner, writeOptions, ifNotExists) :: Nil + AtomicCreateTableAsSelectExec(staging, ident.asIdentifier, parts, query, planLater(query), + tableSpec, writeOptions, ifNotExists) :: Nil case _ => - CreateTableAsSelectExec(catalog, ident, parts, query, planLater(query), - propsWithOwner, writeOptions, ifNotExists) :: Nil + CreateTableAsSelectExec(catalog.asTableCatalog, ident.asIdentifier, parts, query, + planLater(query), tableSpec, writeOptions, ifNotExists) :: Nil } case RefreshTable(r: ResolvedTable) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index add698f99080f..39b6604b0cba4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -28,9 +28,9 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TableSpec, UnaryNode} import org.apache.spark.sql.catalyst.util.CharVarcharUtils -import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, SupportsWrite, Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagedTable, StagingTableCatalog, SupportsWrite, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, LogicalWriteInfoImpl, PhysicalWriteInfoImpl, V1Write, Write, WriterCommitMessage} @@ -71,10 +71,18 @@ case class CreateTableAsSelectExec( partitioning: Seq[Transform], plan: LogicalPlan, query: SparkPlan, - properties: Map[String, String], + tableSpec: TableSpec, writeOptions: CaseInsensitiveStringMap, ifNotExists: Boolean) extends TableWriteExecHelper { + val properties = { + val props = CatalogV2Util.convertTableProperties( + tableSpec.properties, tableSpec.options, tableSpec.serde, + tableSpec.location, tableSpec.comment, tableSpec.provider, + tableSpec.external) + CatalogV2Util.withDefaultOwnership(props) + } + override protected def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { if (ifNotExists) { @@ -109,10 +117,18 @@ case class AtomicCreateTableAsSelectExec( partitioning: Seq[Transform], plan: LogicalPlan, query: SparkPlan, - properties: Map[String, String], + tableSpec: TableSpec, writeOptions: CaseInsensitiveStringMap, ifNotExists: Boolean) extends TableWriteExecHelper { + val properties = { + val props = CatalogV2Util.convertTableProperties( + tableSpec.properties, tableSpec.options, tableSpec.serde, + tableSpec.location, tableSpec.comment, tableSpec.provider, + tableSpec.external) + CatalogV2Util.withDefaultOwnership(props) + } + override protected def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { if (ifNotExists) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala index f262cf152c332..915bd6ac1df22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.connector -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedFieldName, UnresolvedFieldPosition} -import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedDBObjectName, UnresolvedFieldName, UnresolvedFieldPosition} +import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, TableSpec} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition @@ -46,12 +46,13 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("ID", "iD").foreach { ref => + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.identity(ref) :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -69,12 +70,13 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref => + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, ref) :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index a6b979a3fd521..5862acff70ab1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -563,20 +563,12 @@ class PlanResolutionSuite extends AnalysisTest { |AS SELECT * FROM src """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "option.other" -> "20", - "provider" -> "parquet", - "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment", - "other" -> "20") - parseAndResolve(sql) match { case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == "testcat") - assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name")) - assert(ctas.properties == expectedProperties) + assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") + assert( + ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == "mydb.table_name" + ) assert(ctas.writeOptions.isEmpty) assert(ctas.partitioning.isEmpty) assert(ctas.ignoreIfExists) @@ -598,20 +590,12 @@ class PlanResolutionSuite extends AnalysisTest { |AS SELECT * FROM src """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "option.other" -> "20", - "provider" -> "parquet", - "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment", - "other" -> "20") - parseAndResolve(sql, withDefault = true) match { case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == "testcat") - assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name")) - assert(ctas.properties == expectedProperties) + assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") + assert( + ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == "mydb.table_name" + ) assert(ctas.writeOptions.isEmpty) assert(ctas.partitioning.isEmpty) assert(ctas.ignoreIfExists) @@ -633,18 +617,12 @@ class PlanResolutionSuite extends AnalysisTest { |AS SELECT * FROM src """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "provider" -> v2Format, - "location" -> "/user/external/page_view", - "comment" -> "This is the staging page view table") - parseAndResolve(sql) match { case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == CatalogManager.SESSION_CATALOG_NAME) - assert(ctas.tableName == Identifier.of(Array("mydb"), "page_view")) - assert(ctas.properties == expectedProperties) + assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == + CatalogManager.SESSION_CATALOG_NAME) + assert(ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == + "mydb.page_view") assert(ctas.writeOptions.isEmpty) assert(ctas.partitioning.isEmpty) assert(ctas.ignoreIfExists)