From 3df03a48742c49cd041d4adb5e1c41981c1b91ae Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Tue, 30 Nov 2021 14:45:44 +0700 Subject: [PATCH 1/9] [SPARK-36902][SQL] Migrate CreateTableAsSelectStatement to v2 command --- .../catalyst/analysis/ResolveCatalogs.scala | 12 ----- .../sql/catalyst/parser/AstBuilder.scala | 10 ++-- .../catalyst/plans/logical/statements.scala | 23 -------- .../catalyst/plans/logical/v2Commands.scala | 23 +++++--- .../sql/connector/catalog/CatalogV2Util.scala | 7 +-- ...eateTablePartitioningValidationSuite.scala | 54 ++++++++++--------- .../sql/catalyst/parser/DDLParserSuite.scala | 26 ++++----- .../apache/spark/sql/DataFrameWriter.scala | 42 ++++++++------- .../apache/spark/sql/DataFrameWriterV2.scala | 21 +++----- .../analysis/ResolveSessionCatalog.scala | 29 +++++----- .../datasources/v2/DataSourceV2Strategy.scala | 13 ++--- .../v2/WriteToDataSourceV2Exec.scala | 20 ++++++- .../V2CommandsCaseSensitivitySuite.scala | 14 ++--- .../command/PlanResolutionSuite.scala | 46 +++++----------- 14 files changed, 154 insertions(+), 186 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index d7c6301b6ef4a..3e21a603316b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -37,18 +37,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case UnresolvedDBObjectName(CatalogAndIdentifier(catalog, identifier), _) => ResolvedDBObjectName(catalog, identifier.namespace :+ identifier.name()) - case c @ CreateTableAsSelectStatement( - NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) => - CreateTableAsSelect( - catalog.asTableCatalog, - tbl.asIdentifier, - // convert the bucket spec and add it as a transform - c.partitioning ++ c.bucketSpec.map(_.asTransform), - c.asSelect, - convertTableProperties(c), - writeOptions = c.writeOptions, - ignoreIfExists = c.ifNotExists) - case c @ ReplaceTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) => ReplaceTable( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f7d96f85e1198..c4d82d5fc88e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3410,7 +3410,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Create a table, returning a [[CreateTable]] or [[CreateTableAsSelectStatement]] logical plan. + * Create a table, returning a [[CreateTable]] or [[CreateTableAsSelect]] logical plan. * * Expected format: * {{{ @@ -3470,9 +3470,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg ctx) case Some(query) => - CreateTableAsSelectStatement( - table, query, partitioning, bucketSpec, properties, provider, options, location, comment, - writeOptions = Map.empty, serdeInfo, external = external, ifNotExists = ifNotExists) + val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment, + serdeInfo, external) + CreateTableAsSelect( + UnresolvedDBObjectName(table, isNamespace = false), + partitioning, query, tableSpec, Map.empty, ifNotExists) case _ => // Note: table schema includes both the table columns list and the partition columns diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 70c6f1529062b..20d6894c9b423 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -123,29 +123,6 @@ object SerdeInfo { } } -/** - * A CREATE TABLE AS SELECT command, as parsed from SQL. - */ -case class CreateTableAsSelectStatement( - tableName: Seq[String], - asSelect: LogicalPlan, - partitioning: Seq[Transform], - bucketSpec: Option[BucketSpec], - properties: Map[String, String], - provider: Option[String], - options: Map[String, String], - location: Option[String], - comment: Option[String], - writeOptions: Map[String, String], - serde: Option[SerdeInfo], - external: Boolean, - ifNotExists: Boolean) extends UnaryParsedStatement { - - override def child: LogicalPlan = asSelect - override protected def withNewChildInternal(newChild: LogicalPlan): CreateTableAsSelectStatement = - copy(asSelect = newChild) -} - /** * A REPLACE TABLE command, as parsed from SQL. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index d9e5dfe16b51b..7428da3b2b4c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -220,16 +220,22 @@ case class CreateTable( * Create a new table from a select query with a v2 catalog. */ case class CreateTableAsSelect( - catalog: TableCatalog, - tableName: Identifier, + name: LogicalPlan, partitioning: Seq[Transform], query: LogicalPlan, - properties: Map[String, String], + tableSpec: TableSpec, writeOptions: Map[String, String], - ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan { + ignoreIfExists: Boolean) extends BinaryCommand with V2CreateTablePlan { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper override def tableSchema: StructType = query.schema - override def child: LogicalPlan = query + override def left: LogicalPlan = name + override def right: LogicalPlan = query + + override def tableName: Identifier = { + assert(left.resolved) + left.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier + } override lazy val resolved: Boolean = childrenResolved && { // the table schema is created from the query schema, so the only resolution needed is to check @@ -242,8 +248,11 @@ case class CreateTableAsSelect( this.copy(partitioning = rewritten) } - override protected def withNewChildInternal(newChild: LogicalPlan): CreateTableAsSelect = - copy(query = newChild) + override protected def withNewChildrenInternal( + newLeft: LogicalPlan, + newRight: LogicalPlan + ): CreateTableAsSelect = + copy(name = newLeft, query = newRight) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 44e57f22ae0f5..4c5ccc2d6d640 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -23,7 +23,7 @@ import java.util.Collections import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, ReplaceTableStatement, SerdeInfo, TableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{ReplaceTableStatement, SerdeInfo, TableSpec} import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} @@ -305,11 +305,6 @@ private[sql] object CatalogV2Util { catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) } - def convertTableProperties(c: CreateTableAsSelectStatement): Map[String, String] = { - convertTableProperties( - c.properties, c.options, c.serde, c.location, c.comment, c.provider, c.external) - } - def convertTableProperties(r: ReplaceTableStatement): Map[String, String] = { convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala index aee8e31f3ebf8..41b22bc019014 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala @@ -20,22 +20,22 @@ package org.apache.spark.sql.catalyst.analysis import java.util import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode} -import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog, Table, TableCapability, TableCatalog} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, TableSpec} +import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, Table, TableCapability, TableCatalog} import org.apache.spark.sql.connector.expressions.Expressions import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap class CreateTablePartitioningValidationSuite extends AnalysisTest { - import CreateTablePartitioningValidationSuite._ test("CreateTableAsSelect: fail missing top-level column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "does_not_exist") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -46,12 +46,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: fail missing top-level column nested reference") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "does_not_exist.z") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -62,12 +63,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: fail missing nested column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "point.z") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -78,12 +80,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: fail with multiple errors") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "does_not_exist", "point.z") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -95,12 +98,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: success with top-level column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "id") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -108,12 +112,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: success using nested column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "point.x") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -121,12 +126,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: success using complex column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "point") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 182f028b5c703..ba0a70ac9ed79 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -719,8 +719,8 @@ class DDLParserSuite extends AnalysisTest { parsedPlan match { case create: CreateTable if newTableToken == "CREATE" => assert(create.ignoreIfExists == expectedIfNotExists) - case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" => - assert(ctas.ifNotExists == expectedIfNotExists) + case ctas: CreateTableAsSelect if newTableToken == "CREATE" => + assert(ctas.ignoreIfExists == expectedIfNotExists) case replace: ReplaceTableStatement if newTableToken == "REPLACE" => case replace: ReplaceTableAsSelect if newTableToken == "REPLACE" => case other => @@ -2310,19 +2310,19 @@ class DDLParserSuite extends AnalysisTest { replace.location, replace.comment, replace.serde) - case ctas: CreateTableAsSelectStatement => + case ctas: CreateTableAsSelect => TableSpec( - ctas.tableName, - Some(ctas.asSelect).filter(_.resolved).map(_.schema), + ctas.name.asInstanceOf[UnresolvedDBObjectName].nameParts, + Some(ctas.query).filter(_.resolved).map(_.schema), ctas.partitioning, - ctas.bucketSpec, - ctas.properties, - ctas.provider, - ctas.options, - ctas.location, - ctas.comment, - ctas.serde, - ctas.external) + ctas.tableSpec.bucketSpec, + ctas.tableSpec.properties, + ctas.tableSpec.provider, + ctas.tableSpec.options, + ctas.tableSpec.location, + ctas.tableSpec.comment, + ctas.tableSpec.serde, + ctas.tableSpec.external) case rtas: ReplaceTableAsSelect => TableSpec( rtas.name.asInstanceOf[UnresolvedDBObjectName].nameParts, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index d5d814d177042..0b8e32fa9dd94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedDBObjectName, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateTableAsSelectStatement, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table} import org.apache.spark.sql.connector.catalog.TableCapability._ @@ -323,18 +323,25 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { provider match { case supportsExtract: SupportsCatalogOptions => val ident = supportsExtract.extractIdentifier(dsOptions) - val catalog = CatalogV2Util.getTableProviderCatalog( - supportsExtract, catalogManager, dsOptions) + val catalog = if (ident.namespace().isEmpty) { + Array(dsOptions.get("catalog")) + } else { + ident.namespace() + } val location = Option(dsOptions.get("path")).map(TableCatalog.PROP_LOCATION -> _) - + val tableSpec = TableSpec(None, Map.empty, Some(source), Map.empty, + extraOptions.get("path"), extraOptions.get(TableCatalog.PROP_COMMENT), + None, false) runCommand(df.sparkSession) { CreateTableAsSelect( - catalog, - ident, + UnresolvedDBObjectName( + catalog.toSeq :+ ident.name, + isNamespace = false + ), partitioningAsV2, df.queryExecution.analyzed, - Map(TableCatalog.PROP_PROVIDER -> source) ++ location, + tableSpec, finalOptions, ignoreIfExists = createMode == SaveMode.Ignore) } @@ -607,20 +614,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { // We have a potential race condition here in AppendMode, if the table suddenly gets // created between our existence check and physical execution, but this can't be helped // in any case. - CreateTableAsSelectStatement( - nameParts, - df.queryExecution.analyzed, + val tableSpec = TableSpec(None, Map.empty, Some(source), Map.empty, + extraOptions.get("path"), extraOptions.get(TableCatalog.PROP_COMMENT), + None, false) + + CreateTableAsSelect( + UnresolvedDBObjectName(nameParts, isNamespace = false), partitioningAsV2, - None, - Map.empty, - Some(source), + df.queryExecution.analyzed, + tableSpec, Map.empty, - extraOptions.get("path"), - extraOptions.get(TableCatalog.PROP_COMMENT), - extraOptions.toMap, - None, - ifNotExists = other == SaveMode.Ignore, - external = false) + other == SaveMode.Ignore) } runCommand(df.sparkSession) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index b99195de13d05..610c14c8ec53e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedDBObjectName, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years} -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec} import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.IntegerType @@ -107,21 +107,16 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) } override def create(): Unit = { + val tableSpec = TableSpec(None, properties.toMap, provider, Map.empty, + None, None, None, false) runCommand( - CreateTableAsSelectStatement( - tableName, - logicalPlan, + CreateTableAsSelect( + UnresolvedDBObjectName(tableName, isNamespace = false), partitioning.getOrElse(Seq.empty), - None, - properties.toMap, - provider, - Map.empty, - None, - None, + logicalPlan, + tableSpec, options.toMap, - None, - ifNotExists = false, - external = false)) + false)) } override def replace(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 6f41497ddb6f2..d0c9de7cbb05e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -163,26 +163,21 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) tableSpec = newTableSpec) } - case c @ CreateTableAsSelectStatement( - SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) => + case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, _, _) + if isSessionCatalog(catalog) => val (storageFormat, provider) = getStorageFormatAndProvider( - c.provider, c.options, c.location, c.serde, ctas = true) + c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde, + ctas = true) if (!isV2Provider(provider)) { - val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType, - c.partitioning, c.bucketSpec, c.properties, provider, c.location, - c.comment, storageFormat, c.external) - val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - CreateTableV1(tableDesc, mode, Some(c.asSelect)) + val tableDesc = buildCatalogTable(name.asTableIdentifier, new StructType, + c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties, provider, + c.tableSpec.location, c.tableSpec.comment, storageFormat, c.tableSpec.external) + val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTableV1(tableDesc, mode, Some(c.query)) } else { - CreateTableAsSelect( - catalog.asTableCatalog, - tbl.asIdentifier, - // convert the bucket spec and add it as a transform - c.partitioning ++ c.bucketSpec.map(_.asTransform), - c.asSelect, - convertTableProperties(c), - writeOptions = c.writeOptions, - ignoreIfExists = c.ifNotExists) + val newTableSpec = c.tableSpec.copy(bucketSpec = None) + c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform), + tableSpec = newTableSpec) } case RefreshTable(ResolvedV1TableIdentifier(ident)) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 8a82f36f4aa9d..d13d7f4545e2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -169,16 +169,17 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema, partitioning, tableSpec.copy(location = qualifiedLocation), ifNotExists) :: Nil - case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) => - val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) + case CreateTableAsSelect( + ResolvedDBObjectName(catalog, ident), parts, query, tableSpec, options, ifNotExists) => + val propsWithOwner = CatalogV2Util.withDefaultOwnership(tableSpec.properties) val writeOptions = new CaseInsensitiveStringMap(options.asJava) catalog match { case staging: StagingTableCatalog => - AtomicCreateTableAsSelectExec(staging, ident, parts, query, planLater(query), - propsWithOwner, writeOptions, ifNotExists) :: Nil + AtomicCreateTableAsSelectExec(staging, ident.asIdentifier, parts, query, planLater(query), + tableSpec, writeOptions, ifNotExists) :: Nil case _ => - CreateTableAsSelectExec(catalog, ident, parts, query, planLater(query), - propsWithOwner, writeOptions, ifNotExists) :: Nil + CreateTableAsSelectExec(catalog.asTableCatalog, ident.asIdentifier, parts, query, + planLater(query), tableSpec, writeOptions, ifNotExists) :: Nil } case RefreshTable(r: ResolvedTable) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index c61ef56eaf4e8..c67d97c061e07 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -71,10 +71,18 @@ case class CreateTableAsSelectExec( partitioning: Seq[Transform], plan: LogicalPlan, query: SparkPlan, - properties: Map[String, String], + tableSpec: TableSpec, writeOptions: CaseInsensitiveStringMap, ifNotExists: Boolean) extends TableWriteExecHelper { + val properties = { + val props = CatalogV2Util.convertTableProperties( + tableSpec.properties, tableSpec.options, tableSpec.serde, + tableSpec.location, tableSpec.comment, tableSpec.provider, + tableSpec.external) + CatalogV2Util.withDefaultOwnership(props) + } + override protected def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { if (ifNotExists) { @@ -109,10 +117,18 @@ case class AtomicCreateTableAsSelectExec( partitioning: Seq[Transform], plan: LogicalPlan, query: SparkPlan, - properties: Map[String, String], + tableSpec: TableSpec, writeOptions: CaseInsensitiveStringMap, ifNotExists: Boolean) extends TableWriteExecHelper { + val properties = { + val props = CatalogV2Util.convertTableProperties( + tableSpec.properties, tableSpec.options, tableSpec.serde, + tableSpec.location, tableSpec.comment, tableSpec.provider, + tableSpec.external) + CatalogV2Util.withDefaultOwnership(props) + } + override protected def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { if (ifNotExists) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala index 8e8eb85063ac3..15a25c2680722 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala @@ -46,12 +46,13 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("ID", "iD").foreach { ref => + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.identity(ref) :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -69,12 +70,13 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref => + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, ref) :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index a6b979a3fd521..5862acff70ab1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -563,20 +563,12 @@ class PlanResolutionSuite extends AnalysisTest { |AS SELECT * FROM src """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "option.other" -> "20", - "provider" -> "parquet", - "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment", - "other" -> "20") - parseAndResolve(sql) match { case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == "testcat") - assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name")) - assert(ctas.properties == expectedProperties) + assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") + assert( + ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == "mydb.table_name" + ) assert(ctas.writeOptions.isEmpty) assert(ctas.partitioning.isEmpty) assert(ctas.ignoreIfExists) @@ -598,20 +590,12 @@ class PlanResolutionSuite extends AnalysisTest { |AS SELECT * FROM src """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "option.other" -> "20", - "provider" -> "parquet", - "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment", - "other" -> "20") - parseAndResolve(sql, withDefault = true) match { case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == "testcat") - assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name")) - assert(ctas.properties == expectedProperties) + assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") + assert( + ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == "mydb.table_name" + ) assert(ctas.writeOptions.isEmpty) assert(ctas.partitioning.isEmpty) assert(ctas.ignoreIfExists) @@ -633,18 +617,12 @@ class PlanResolutionSuite extends AnalysisTest { |AS SELECT * FROM src """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "provider" -> v2Format, - "location" -> "/user/external/page_view", - "comment" -> "This is the staging page view table") - parseAndResolve(sql) match { case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == CatalogManager.SESSION_CATALOG_NAME) - assert(ctas.tableName == Identifier.of(Array("mydb"), "page_view")) - assert(ctas.properties == expectedProperties) + assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == + CatalogManager.SESSION_CATALOG_NAME) + assert(ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == + "mydb.page_view") assert(ctas.writeOptions.isEmpty) assert(ctas.partitioning.isEmpty) assert(ctas.ignoreIfExists) From 8428ce81c6da162111c9f5a635778bd206ebd4a7 Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Wed, 1 Dec 2021 16:18:17 +0700 Subject: [PATCH 2/9] remove unused code --- .../sql/execution/datasources/v2/DataSourceV2Strategy.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index d13d7f4545e2a..98834ffa33e76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -171,7 +171,6 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case CreateTableAsSelect( ResolvedDBObjectName(catalog, ident), parts, query, tableSpec, options, ifNotExists) => - val propsWithOwner = CatalogV2Util.withDefaultOwnership(tableSpec.properties) val writeOptions = new CaseInsensitiveStringMap(options.asJava) catalog match { case staging: StagingTableCatalog => From bb72f4536e68c3d325de4d28d5805d600af91f29 Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Wed, 1 Dec 2021 16:21:03 +0700 Subject: [PATCH 3/9] label the arguments of TableSpec --- .../apache/spark/sql/DataFrameWriter.scala | 24 ++++++++++++++----- .../apache/spark/sql/DataFrameWriterV2.scala | 11 +++++++-- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 0b8e32fa9dd94..9fa00003a955c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -330,9 +330,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } val location = Option(dsOptions.get("path")).map(TableCatalog.PROP_LOCATION -> _) - val tableSpec = TableSpec(None, Map.empty, Some(source), Map.empty, - extraOptions.get("path"), extraOptions.get(TableCatalog.PROP_COMMENT), - None, false) + val tableSpec = TableSpec( + bucketSpec = None, + properties = Map(TableCatalog.PROP_PROVIDER -> source) ++ location, + provider = Some(source), + options = Map.empty, + location = extraOptions.get("path"), + comment = extraOptions.get(TableCatalog.PROP_COMMENT), + serde = None, + external = false) runCommand(df.sparkSession) { CreateTableAsSelect( UnresolvedDBObjectName( @@ -614,9 +620,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { // We have a potential race condition here in AppendMode, if the table suddenly gets // created between our existence check and physical execution, but this can't be helped // in any case. - val tableSpec = TableSpec(None, Map.empty, Some(source), Map.empty, - extraOptions.get("path"), extraOptions.get(TableCatalog.PROP_COMMENT), - None, false) + val tableSpec = TableSpec( + bucketSpec = None, + properties = Map.empty, + provider = Some(source), + options = Map.empty, + location = extraOptions.get("path"), + comment = extraOptions.get(TableCatalog.PROP_COMMENT), + serde = None, + external = false) CreateTableAsSelect( UnresolvedDBObjectName(nameParts, isNamespace = false), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index 610c14c8ec53e..22b2eb978d917 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -107,8 +107,15 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) } override def create(): Unit = { - val tableSpec = TableSpec(None, properties.toMap, provider, Map.empty, - None, None, None, false) + val tableSpec = TableSpec( + bucketSpec = None, + properties = properties.toMap, + provider = provider, + options = Map.empty, + location = None, + comment = None, + serde = None, + external = false) runCommand( CreateTableAsSelect( UnresolvedDBObjectName(tableName, isNamespace = false), From d9fa8a67ea0957fde685949d5b1bb80b2e362cf4 Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Thu, 2 Dec 2021 10:02:33 +0700 Subject: [PATCH 4/9] refactor --- .../datasources/v2/WriteToDataSourceV2Exec.scala | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index c67d97c061e07..65c49283dd763 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -75,13 +75,7 @@ case class CreateTableAsSelectExec( writeOptions: CaseInsensitiveStringMap, ifNotExists: Boolean) extends TableWriteExecHelper { - val properties = { - val props = CatalogV2Util.convertTableProperties( - tableSpec.properties, tableSpec.options, tableSpec.serde, - tableSpec.location, tableSpec.comment, tableSpec.provider, - tableSpec.external) - CatalogV2Util.withDefaultOwnership(props) - } + val properties = CatalogV2Util.convertTableProperties(tableSpec) override protected def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { @@ -121,13 +115,7 @@ case class AtomicCreateTableAsSelectExec( writeOptions: CaseInsensitiveStringMap, ifNotExists: Boolean) extends TableWriteExecHelper { - val properties = { - val props = CatalogV2Util.convertTableProperties( - tableSpec.properties, tableSpec.options, tableSpec.serde, - tableSpec.location, tableSpec.comment, tableSpec.provider, - tableSpec.external) - CatalogV2Util.withDefaultOwnership(props) - } + val properties = CatalogV2Util.convertTableProperties(tableSpec) override protected def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { From 729cc2272f77216eb825e6ec81e0254560972e01 Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Thu, 2 Dec 2021 10:07:28 +0700 Subject: [PATCH 5/9] fix catalog --- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 9fa00003a955c..9f8ad56445149 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -323,11 +323,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { provider match { case supportsExtract: SupportsCatalogOptions => val ident = supportsExtract.extractIdentifier(dsOptions) - val catalog = if (ident.namespace().isEmpty) { - Array(dsOptions.get("catalog")) - } else { - ident.namespace() - } + val catalog = CatalogV2Util.getTableProviderCatalog( + supportsExtract, catalogManager, dsOptions) val location = Option(dsOptions.get("path")).map(TableCatalog.PROP_LOCATION -> _) val tableSpec = TableSpec( @@ -342,7 +339,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { runCommand(df.sparkSession) { CreateTableAsSelect( UnresolvedDBObjectName( - catalog.toSeq :+ ident.name, + catalog.name :: ident.name :: Nil, isNamespace = false ), partitioningAsV2, From 616ed905e71b1800e2189d7e2f17b31a235c5221 Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Thu, 2 Dec 2021 13:08:51 +0700 Subject: [PATCH 6/9] fix scala style --- .../sql/execution/datasources/v2/DataSourceV2Strategy.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 98834ffa33e76..3355403a4ef27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -169,8 +169,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema, partitioning, tableSpec.copy(location = qualifiedLocation), ifNotExists) :: Nil - case CreateTableAsSelect( - ResolvedDBObjectName(catalog, ident), parts, query, tableSpec, options, ifNotExists) => + case CreateTableAsSelect(ResolvedDBObjectName(catalog, ident), parts, query, tableSpec, + options, ifNotExists) => val writeOptions = new CaseInsensitiveStringMap(options.asJava) catalog match { case staging: StagingTableCatalog => From 9ce81a6f7f9152c084bc2050b52bdd1029fa1b84 Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Fri, 3 Dec 2021 09:42:03 +0700 Subject: [PATCH 7/9] fix namespaces --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 9f8ad56445149..365b56d83bd64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -339,7 +339,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { runCommand(df.sparkSession) { CreateTableAsSelect( UnresolvedDBObjectName( - catalog.name :: ident.name :: Nil, + catalog.name +: ident.namespace.toSeq :+ ident.name, isNamespace = false ), partitioningAsV2, From cba6519762740639728b1e71639bd75982bd40f8 Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Fri, 3 Dec 2021 09:43:24 +0700 Subject: [PATCH 8/9] deduplicate --- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index c4d82d5fc88e6..235f6a641bd10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3456,6 +3456,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } val partitioning = partitionExpressions(partTransforms, partCols, ctx) + val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment, + serdeInfo, external) Option(ctx.query).map(plan) match { case Some(_) if columns.nonEmpty => @@ -3470,8 +3472,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg ctx) case Some(query) => - val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment, - serdeInfo, external) CreateTableAsSelect( UnresolvedDBObjectName(table, isNamespace = false), partitioning, query, tableSpec, Map.empty, ifNotExists) @@ -3479,8 +3479,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg case _ => // Note: table schema includes both the table columns list and the partition columns // with data type. - val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment, - serdeInfo, external) val schema = StructType(columns ++ partCols) CreateTable( UnresolvedDBObjectName(table, isNamespace = false), From 5186f817f143ee4861ab6ef2e42dd488c6af624c Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Fri, 3 Dec 2021 14:17:38 +0700 Subject: [PATCH 9/9] discard location and provider from properties --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 365b56d83bd64..8e2f9cbccdd2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -326,10 +326,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val catalog = CatalogV2Util.getTableProviderCatalog( supportsExtract, catalogManager, dsOptions) - val location = Option(dsOptions.get("path")).map(TableCatalog.PROP_LOCATION -> _) val tableSpec = TableSpec( bucketSpec = None, - properties = Map(TableCatalog.PROP_PROVIDER -> source) ++ location, + properties = Map.empty, provider = Some(source), options = Map.empty, location = extraOptions.get("path"),