diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index d7c6301b6ef4a..3e21a603316b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -37,18 +37,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case UnresolvedDBObjectName(CatalogAndIdentifier(catalog, identifier), _) => ResolvedDBObjectName(catalog, identifier.namespace :+ identifier.name()) - case c @ CreateTableAsSelectStatement( - NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) => - CreateTableAsSelect( - catalog.asTableCatalog, - tbl.asIdentifier, - // convert the bucket spec and add it as a transform - c.partitioning ++ c.bucketSpec.map(_.asTransform), - c.asSelect, - convertTableProperties(c), - writeOptions = c.writeOptions, - ignoreIfExists = c.ifNotExists) - case c @ ReplaceTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) => ReplaceTable( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f7d96f85e1198..235f6a641bd10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3410,7 +3410,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Create a table, returning a [[CreateTable]] or [[CreateTableAsSelectStatement]] logical plan. + * Create a table, returning a [[CreateTable]] or [[CreateTableAsSelect]] logical plan. * * Expected format: * {{{ @@ -3456,6 +3456,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } val partitioning = partitionExpressions(partTransforms, partCols, ctx) + val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment, + serdeInfo, external) Option(ctx.query).map(plan) match { case Some(_) if columns.nonEmpty => @@ -3470,15 +3472,13 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg ctx) case Some(query) => - CreateTableAsSelectStatement( - table, query, partitioning, bucketSpec, properties, provider, options, location, comment, - writeOptions = Map.empty, serdeInfo, external = external, ifNotExists = ifNotExists) + CreateTableAsSelect( + UnresolvedDBObjectName(table, isNamespace = false), + partitioning, query, tableSpec, Map.empty, ifNotExists) case _ => // Note: table schema includes both the table columns list and the partition columns // with data type. - val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment, - serdeInfo, external) val schema = StructType(columns ++ partCols) CreateTable( UnresolvedDBObjectName(table, isNamespace = false), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 70c6f1529062b..20d6894c9b423 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -123,29 +123,6 @@ object SerdeInfo { } } -/** - * A CREATE TABLE AS SELECT command, as parsed from SQL. - */ -case class CreateTableAsSelectStatement( - tableName: Seq[String], - asSelect: LogicalPlan, - partitioning: Seq[Transform], - bucketSpec: Option[BucketSpec], - properties: Map[String, String], - provider: Option[String], - options: Map[String, String], - location: Option[String], - comment: Option[String], - writeOptions: Map[String, String], - serde: Option[SerdeInfo], - external: Boolean, - ifNotExists: Boolean) extends UnaryParsedStatement { - - override def child: LogicalPlan = asSelect - override protected def withNewChildInternal(newChild: LogicalPlan): CreateTableAsSelectStatement = - copy(asSelect = newChild) -} - /** * A REPLACE TABLE command, as parsed from SQL. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index d9e5dfe16b51b..7428da3b2b4c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -220,16 +220,22 @@ case class CreateTable( * Create a new table from a select query with a v2 catalog. */ case class CreateTableAsSelect( - catalog: TableCatalog, - tableName: Identifier, + name: LogicalPlan, partitioning: Seq[Transform], query: LogicalPlan, - properties: Map[String, String], + tableSpec: TableSpec, writeOptions: Map[String, String], - ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan { + ignoreIfExists: Boolean) extends BinaryCommand with V2CreateTablePlan { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper override def tableSchema: StructType = query.schema - override def child: LogicalPlan = query + override def left: LogicalPlan = name + override def right: LogicalPlan = query + + override def tableName: Identifier = { + assert(left.resolved) + left.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier + } override lazy val resolved: Boolean = childrenResolved && { // the table schema is created from the query schema, so the only resolution needed is to check @@ -242,8 +248,11 @@ case class CreateTableAsSelect( this.copy(partitioning = rewritten) } - override protected def withNewChildInternal(newChild: LogicalPlan): CreateTableAsSelect = - copy(query = newChild) + override protected def withNewChildrenInternal( + newLeft: LogicalPlan, + newRight: LogicalPlan + ): CreateTableAsSelect = + copy(name = newLeft, query = newRight) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 44e57f22ae0f5..4c5ccc2d6d640 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -23,7 +23,7 @@ import java.util.Collections import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, ReplaceTableStatement, SerdeInfo, TableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{ReplaceTableStatement, SerdeInfo, TableSpec} import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} @@ -305,11 +305,6 @@ private[sql] object CatalogV2Util { catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) } - def convertTableProperties(c: CreateTableAsSelectStatement): Map[String, String] = { - convertTableProperties( - c.properties, c.options, c.serde, c.location, c.comment, c.provider, c.external) - } - def convertTableProperties(r: ReplaceTableStatement): Map[String, String] = { convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala index aee8e31f3ebf8..41b22bc019014 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala @@ -20,22 +20,22 @@ package org.apache.spark.sql.catalyst.analysis import java.util import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode} -import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog, Table, TableCapability, TableCatalog} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, TableSpec} +import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, Table, TableCapability, TableCatalog} import org.apache.spark.sql.connector.expressions.Expressions import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap class CreateTablePartitioningValidationSuite extends AnalysisTest { - import CreateTablePartitioningValidationSuite._ test("CreateTableAsSelect: fail missing top-level column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "does_not_exist") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -46,12 +46,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: fail missing top-level column nested reference") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "does_not_exist.z") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -62,12 +63,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: fail missing nested column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "point.z") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -78,12 +80,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: fail with multiple errors") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "does_not_exist", "point.z") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -95,12 +98,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: success with top-level column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "id") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -108,12 +112,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: success using nested column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "point.x") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -121,12 +126,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: success using complex column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "point") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 182f028b5c703..ba0a70ac9ed79 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -719,8 +719,8 @@ class DDLParserSuite extends AnalysisTest { parsedPlan match { case create: CreateTable if newTableToken == "CREATE" => assert(create.ignoreIfExists == expectedIfNotExists) - case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" => - assert(ctas.ifNotExists == expectedIfNotExists) + case ctas: CreateTableAsSelect if newTableToken == "CREATE" => + assert(ctas.ignoreIfExists == expectedIfNotExists) case replace: ReplaceTableStatement if newTableToken == "REPLACE" => case replace: ReplaceTableAsSelect if newTableToken == "REPLACE" => case other => @@ -2310,19 +2310,19 @@ class DDLParserSuite extends AnalysisTest { replace.location, replace.comment, replace.serde) - case ctas: CreateTableAsSelectStatement => + case ctas: CreateTableAsSelect => TableSpec( - ctas.tableName, - Some(ctas.asSelect).filter(_.resolved).map(_.schema), + ctas.name.asInstanceOf[UnresolvedDBObjectName].nameParts, + Some(ctas.query).filter(_.resolved).map(_.schema), ctas.partitioning, - ctas.bucketSpec, - ctas.properties, - ctas.provider, - ctas.options, - ctas.location, - ctas.comment, - ctas.serde, - ctas.external) + ctas.tableSpec.bucketSpec, + ctas.tableSpec.properties, + ctas.tableSpec.provider, + ctas.tableSpec.options, + ctas.tableSpec.location, + ctas.tableSpec.comment, + ctas.tableSpec.serde, + ctas.tableSpec.external) case rtas: ReplaceTableAsSelect => TableSpec( rtas.name.asInstanceOf[UnresolvedDBObjectName].nameParts, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index d5d814d177042..8e2f9cbccdd2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedDBObjectName, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateTableAsSelectStatement, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table} import org.apache.spark.sql.connector.catalog.TableCapability._ @@ -326,15 +326,24 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val catalog = CatalogV2Util.getTableProviderCatalog( supportsExtract, catalogManager, dsOptions) - val location = Option(dsOptions.get("path")).map(TableCatalog.PROP_LOCATION -> _) - + val tableSpec = TableSpec( + bucketSpec = None, + properties = Map.empty, + provider = Some(source), + options = Map.empty, + location = extraOptions.get("path"), + comment = extraOptions.get(TableCatalog.PROP_COMMENT), + serde = None, + external = false) runCommand(df.sparkSession) { CreateTableAsSelect( - catalog, - ident, + UnresolvedDBObjectName( + catalog.name +: ident.namespace.toSeq :+ ident.name, + isNamespace = false + ), partitioningAsV2, df.queryExecution.analyzed, - Map(TableCatalog.PROP_PROVIDER -> source) ++ location, + tableSpec, finalOptions, ignoreIfExists = createMode == SaveMode.Ignore) } @@ -607,20 +616,23 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { // We have a potential race condition here in AppendMode, if the table suddenly gets // created between our existence check and physical execution, but this can't be helped // in any case. - CreateTableAsSelectStatement( - nameParts, - df.queryExecution.analyzed, + val tableSpec = TableSpec( + bucketSpec = None, + properties = Map.empty, + provider = Some(source), + options = Map.empty, + location = extraOptions.get("path"), + comment = extraOptions.get(TableCatalog.PROP_COMMENT), + serde = None, + external = false) + + CreateTableAsSelect( + UnresolvedDBObjectName(nameParts, isNamespace = false), partitioningAsV2, - None, - Map.empty, - Some(source), + df.queryExecution.analyzed, + tableSpec, Map.empty, - extraOptions.get("path"), - extraOptions.get(TableCatalog.PROP_COMMENT), - extraOptions.toMap, - None, - ifNotExists = other == SaveMode.Ignore, - external = false) + other == SaveMode.Ignore) } runCommand(df.sparkSession) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index b99195de13d05..22b2eb978d917 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedDBObjectName, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years} -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec} import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.IntegerType @@ -107,21 +107,23 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) } override def create(): Unit = { + val tableSpec = TableSpec( + bucketSpec = None, + properties = properties.toMap, + provider = provider, + options = Map.empty, + location = None, + comment = None, + serde = None, + external = false) runCommand( - CreateTableAsSelectStatement( - tableName, - logicalPlan, + CreateTableAsSelect( + UnresolvedDBObjectName(tableName, isNamespace = false), partitioning.getOrElse(Seq.empty), - None, - properties.toMap, - provider, - Map.empty, - None, - None, + logicalPlan, + tableSpec, options.toMap, - None, - ifNotExists = false, - external = false)) + false)) } override def replace(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 6f41497ddb6f2..d0c9de7cbb05e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -163,26 +163,21 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) tableSpec = newTableSpec) } - case c @ CreateTableAsSelectStatement( - SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) => + case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, _, _) + if isSessionCatalog(catalog) => val (storageFormat, provider) = getStorageFormatAndProvider( - c.provider, c.options, c.location, c.serde, ctas = true) + c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde, + ctas = true) if (!isV2Provider(provider)) { - val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType, - c.partitioning, c.bucketSpec, c.properties, provider, c.location, - c.comment, storageFormat, c.external) - val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - CreateTableV1(tableDesc, mode, Some(c.asSelect)) + val tableDesc = buildCatalogTable(name.asTableIdentifier, new StructType, + c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties, provider, + c.tableSpec.location, c.tableSpec.comment, storageFormat, c.tableSpec.external) + val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTableV1(tableDesc, mode, Some(c.query)) } else { - CreateTableAsSelect( - catalog.asTableCatalog, - tbl.asIdentifier, - // convert the bucket spec and add it as a transform - c.partitioning ++ c.bucketSpec.map(_.asTransform), - c.asSelect, - convertTableProperties(c), - writeOptions = c.writeOptions, - ignoreIfExists = c.ifNotExists) + val newTableSpec = c.tableSpec.copy(bucketSpec = None) + c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform), + tableSpec = newTableSpec) } case RefreshTable(ResolvedV1TableIdentifier(ident)) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 8a82f36f4aa9d..3355403a4ef27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -169,16 +169,16 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema, partitioning, tableSpec.copy(location = qualifiedLocation), ifNotExists) :: Nil - case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) => - val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) + case CreateTableAsSelect(ResolvedDBObjectName(catalog, ident), parts, query, tableSpec, + options, ifNotExists) => val writeOptions = new CaseInsensitiveStringMap(options.asJava) catalog match { case staging: StagingTableCatalog => - AtomicCreateTableAsSelectExec(staging, ident, parts, query, planLater(query), - propsWithOwner, writeOptions, ifNotExists) :: Nil + AtomicCreateTableAsSelectExec(staging, ident.asIdentifier, parts, query, planLater(query), + tableSpec, writeOptions, ifNotExists) :: Nil case _ => - CreateTableAsSelectExec(catalog, ident, parts, query, planLater(query), - propsWithOwner, writeOptions, ifNotExists) :: Nil + CreateTableAsSelectExec(catalog.asTableCatalog, ident.asIdentifier, parts, query, + planLater(query), tableSpec, writeOptions, ifNotExists) :: Nil } case RefreshTable(r: ResolvedTable) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index c61ef56eaf4e8..65c49283dd763 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -71,10 +71,12 @@ case class CreateTableAsSelectExec( partitioning: Seq[Transform], plan: LogicalPlan, query: SparkPlan, - properties: Map[String, String], + tableSpec: TableSpec, writeOptions: CaseInsensitiveStringMap, ifNotExists: Boolean) extends TableWriteExecHelper { + val properties = CatalogV2Util.convertTableProperties(tableSpec) + override protected def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { if (ifNotExists) { @@ -109,10 +111,12 @@ case class AtomicCreateTableAsSelectExec( partitioning: Seq[Transform], plan: LogicalPlan, query: SparkPlan, - properties: Map[String, String], + tableSpec: TableSpec, writeOptions: CaseInsensitiveStringMap, ifNotExists: Boolean) extends TableWriteExecHelper { + val properties = CatalogV2Util.convertTableProperties(tableSpec) + override protected def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { if (ifNotExists) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala index 8e8eb85063ac3..15a25c2680722 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala @@ -46,12 +46,13 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("ID", "iD").foreach { ref => + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.identity(ref) :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -69,12 +70,13 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref => + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, ref) :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index a6b979a3fd521..5862acff70ab1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -563,20 +563,12 @@ class PlanResolutionSuite extends AnalysisTest { |AS SELECT * FROM src """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "option.other" -> "20", - "provider" -> "parquet", - "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment", - "other" -> "20") - parseAndResolve(sql) match { case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == "testcat") - assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name")) - assert(ctas.properties == expectedProperties) + assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") + assert( + ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == "mydb.table_name" + ) assert(ctas.writeOptions.isEmpty) assert(ctas.partitioning.isEmpty) assert(ctas.ignoreIfExists) @@ -598,20 +590,12 @@ class PlanResolutionSuite extends AnalysisTest { |AS SELECT * FROM src """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "option.other" -> "20", - "provider" -> "parquet", - "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment", - "other" -> "20") - parseAndResolve(sql, withDefault = true) match { case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == "testcat") - assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name")) - assert(ctas.properties == expectedProperties) + assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") + assert( + ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == "mydb.table_name" + ) assert(ctas.writeOptions.isEmpty) assert(ctas.partitioning.isEmpty) assert(ctas.ignoreIfExists) @@ -633,18 +617,12 @@ class PlanResolutionSuite extends AnalysisTest { |AS SELECT * FROM src """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "provider" -> v2Format, - "location" -> "/user/external/page_view", - "comment" -> "This is the staging page view table") - parseAndResolve(sql) match { case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == CatalogManager.SESSION_CATALOG_NAME) - assert(ctas.tableName == Identifier.of(Array("mydb"), "page_view")) - assert(ctas.properties == expectedProperties) + assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == + CatalogManager.SESSION_CATALOG_NAME) + assert(ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == + "mydb.page_view") assert(ctas.writeOptions.isEmpty) assert(ctas.partitioning.isEmpty) assert(ctas.ignoreIfExists)