From e193bc34766a55e2c17605b0bec371d53aec8353 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 22 Jan 2020 17:36:04 +0800 Subject: [PATCH 1/7] [SPARK-30603][SQL] Keep the reserved properties of namespaces and tables private --- .../connector/catalog/SupportsNamespaces.java | 28 ---------- .../sql/connector/catalog/TableCatalog.java | 31 +---------- .../sql/catalyst/parser/AstBuilder.scala | 8 +-- .../catalyst/plans/logical/v2Commands.scala | 2 +- .../sql/connector/catalog/CatalogV2Util.scala | 50 ++++++++++++++++-- .../catalog/ExternalCatalogSuite.scala | 2 +- .../catalog/SessionCatalogSuite.scala | 2 +- .../apache/spark/sql/DataFrameWriter.scala | 21 ++++---- .../apache/spark/sql/DataFrameWriterV2.scala | 9 ++-- .../analysis/ResolveSessionCatalog.scala | 8 +-- .../spark/sql/execution/command/ddl.scala | 12 ++--- .../datasources/v2/CreateNamespaceExec.scala | 5 +- .../datasources/v2/DataSourceV2Strategy.scala | 6 +-- .../v2/DescribeNamespaceExec.scala | 6 +-- .../datasources/v2/DescribeTableExec.scala | 6 +-- .../datasources/v2/V2SessionCatalog.scala | 22 ++++---- .../spark/sql/DataFrameWriterV2Suite.scala | 4 +- .../sql/connector/DataSourceV2SQLSuite.scala | 52 +++++++++---------- .../sql/execution/command/DDLSuite.scala | 2 +- .../v2/V2SessionCatalogSuite.scala | 8 +-- .../sql/hive/client/HiveClientImpl.scala | 2 +- .../sql/hive/execution/HiveDDLSuite.scala | 2 +- 22 files changed, 136 insertions(+), 152 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java index 757d303e82255..ca9451e4b1594 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java @@ -41,34 +41,6 @@ @Experimental public interface SupportsNamespaces extends CatalogPlugin { - /** - * A property to specify the location of the namespace. If the namespace - * needs to store files, it should be under this location. - */ - String PROP_LOCATION = "location"; - - /** - * A property to specify the description of the namespace. The description - * will be returned in the result of "DESCRIBE NAMESPACE" command. - */ - String PROP_COMMENT = "comment"; - - /** - * A property to specify the owner of the namespace. - */ - String PROP_OWNER = "owner"; - - /** - * The list of reserved namespace properties, which can not be removed or changed directly by - * the syntax: - * {{ - * ALTER NAMESPACE ... SET PROPERTIES ... - * }} - * - * They need specific syntax to modify - */ - List RESERVED_PROPERTIES = Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_OWNER); - /** * Return a default namespace for the catalog. *

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 591e1c631be13..97a5ae3d80b4c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -18,14 +18,12 @@ package org.apache.spark.sql.connector.catalog; import org.apache.spark.annotation.Experimental; -import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.types.StructType; -import java.util.Arrays; -import java.util.List; import java.util.Map; /** @@ -40,33 +38,6 @@ @Experimental public interface TableCatalog extends CatalogPlugin { - /** - * A property to specify the location of the table. The files of the table - * should be under this location. - */ - String PROP_LOCATION = "location"; - - /** - * A property to specify the description of the table. - */ - String PROP_COMMENT = "comment"; - - /** - * A property to specify the provider of the table. - */ - String PROP_PROVIDER = "provider"; - - /** - * A property to specify the owner of the table. - */ - String PROP_OWNER = "owner"; - - /** - * The list of reserved table properties. - */ - List RESERVED_PROPERTIES = - Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER, PROP_OWNER); - /** * List the tables in a namespace from the catalog. *

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index a8a96f0f6803a..4ff8cd22d803c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp} import org.apache.spark.sql.catalyst.util.IntervalUtils import org.apache.spark.sql.catalyst.util.IntervalUtils.IntervalUnit -import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces, TableCatalog} import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} import org.apache.spark.sql.internal.SQLConf @@ -2526,7 +2526,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging private def cleanNamespaceProperties( properties: Map[String, String], ctx: ParserRuleContext): Map[String, String] = withOrigin(ctx) { - import SupportsNamespaces._ + import CatalogV2Util._ val legacyOn = conf.getConf(SQLConf.LEGACY_PROPERTY_NON_RESERVED) properties.filter { case (PROP_LOCATION, _) if !legacyOn => @@ -2556,7 +2556,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * }}} */ override def visitCreateNamespace(ctx: CreateNamespaceContext): LogicalPlan = withOrigin(ctx) { - import SupportsNamespaces._ + import CatalogV2Util._ checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx) checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) checkDuplicateClauses(ctx.PROPERTIES, "WITH PROPERTIES", ctx) @@ -2666,7 +2666,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging def cleanTableProperties( ctx: ParserRuleContext, properties: Map[String, String]): Map[String, String] = { - import TableCatalog._ + import CatalogV2Util._ val legacyOn = conf.getConf(SQLConf.LEGACY_PROPERTY_NON_RESERVED) properties.filter { case (PROP_PROVIDER, _) if !legacyOn => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 3e3c81c22b61d..c8bd016b4c3d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -499,7 +499,7 @@ case class AlterTableSetLocation( partitionSpec: Option[TablePartitionSpec], location: String) extends AlterTable { override lazy val changes: Seq[TableChange] = { - Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, location)) + Seq(TableChange.setProperty(CatalogV2Util.PROP_LOCATION, location)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index a4c7b4c3a2894..e8a1e4353ce4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -32,6 +32,48 @@ import org.apache.spark.util.Utils private[sql] object CatalogV2Util { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + /** + * A property to specify the location of the table or namespace + */ + val PROP_LOCATION = "location" + + /** + * A property to specify the description of the table. + */ + val PROP_COMMENT = "comment" + + /** + * A property to specify the provider of the table. + */ + val PROP_PROVIDER = "provider" + + /** + * A property to specify the owner of the table. + */ + val PROP_OWNER = "owner" + + /** + * The list of reserved table properties, which can not be removed or changed directly by + * the syntax: + * {{ + * ALTER TABLE ... SET TBLPROPERTIES ... + * }} + * + * They need specific syntax to modify + */ + val TABLE_RESERVED_PROPERTIES = Seq(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER, PROP_OWNER) + + /** + * The list of reserved namespace properties, which can not be removed or changed directly by + * the syntax: + * {{ + * ALTER NAMESPACE ... SET PROPERTIES ... + * }} + * + * They need specific syntax to modify + */ + val NAMESPACE_RESERVED_PROPERTIES = Seq(PROP_COMMENT, PROP_LOCATION, PROP_OWNER) + /** * Apply properties changes to a map and return the result. */ @@ -271,13 +313,13 @@ private[sql] object CatalogV2Util { provider: String): Map[String, String] = { properties ++ options ++ - Map(TableCatalog.PROP_PROVIDER -> provider) ++ - comment.map(TableCatalog.PROP_COMMENT -> _) ++ - location.map(TableCatalog.PROP_LOCATION -> _) + Map(PROP_PROVIDER -> provider) ++ + comment.map(PROP_COMMENT -> _) ++ + location.map(PROP_LOCATION -> _) } def withDefaultOwnership(properties: Map[String, String]): Map[String, String] = { - properties ++ Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName()) + properties ++ Map(PROP_OWNER -> Utils.getCurrentUserName()) } def getTableProviderCatalog( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 55712d0da518d..9989c5b3487e7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, N import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER +import org.apache.spark.sql.connector.catalog.CatalogV2Util.PROP_OWNER import org.apache.spark.sql.types._ import org.apache.spark.util.Utils diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 0d9e2f61e812a..397a5aaf95cf6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View} import org.apache.spark.sql.connector.catalog.CatalogManager -import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER +import org.apache.spark.sql.connector.catalog.CatalogV2Util.PROP_OWNER import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 998ec9ebdff85..4473669513f72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -26,18 +26,18 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, SupportsWrite, Table, TableCatalog, TableProvider, V1Table} +import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.TableCapability._ -import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, LiteralValue, Transform} +import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.sources.BaseRelation -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap /** @@ -303,7 +303,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val catalog = CatalogV2Util.getTableProviderCatalog( supportsExtract, sessionState.catalogManager, dsOptions) - val location = Option(dsOptions.get("path")).map(TableCatalog.PROP_LOCATION -> _) + val location = Option(dsOptions.get("path")).map(CatalogV2Util.PROP_LOCATION -> _) runCommand(df.sparkSession, "save") { CreateTableAsSelect( @@ -311,7 +311,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ident, partitioningAsV2, df.queryExecution.analyzed, - Map(TableCatalog.PROP_PROVIDER -> source) ++ location, + Map(CatalogV2Util.PROP_PROVIDER -> source) ++ location, extraOptions.toMap, ignoreIfExists = createMode == SaveMode.Ignore) } @@ -380,8 +380,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ def insertInto(tableName: String): Unit = { import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, NonSessionCatalogAndIdentifier, SessionCatalogAndIdentifier} + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - import org.apache.spark.sql.connector.catalog.CatalogV2Util._ assertNotBucketed("insertInto") @@ -520,6 +520,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ def saveAsTable(tableName: String): Unit = { import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, NonSessionCatalogAndIdentifier, SessionCatalogAndIdentifier} + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ val session = df.sparkSession @@ -550,7 +551,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { def getLocationIfExists: Option[(String, String)] = { val opts = CaseInsensitiveMap(extraOptions.toMap) - opts.get("path").map(TableCatalog.PROP_LOCATION -> _) + opts.get("path").map(CatalogV2Util.PROP_LOCATION -> _) } val command = (mode, tableOpt) match { @@ -567,7 +568,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ident, partitioningAsV2, df.queryExecution.analyzed, - Map(TableCatalog.PROP_PROVIDER -> source) ++ getLocationIfExists, + Map(CatalogV2Util.PROP_PROVIDER -> source) ++ getLocationIfExists, extraOptions.toMap, orCreate = true) // Create the table if it doesn't exist @@ -580,7 +581,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ident, partitioningAsV2, df.queryExecution.analyzed, - Map(TableCatalog.PROP_PROVIDER -> source) ++ getLocationIfExists, + Map(CatalogV2Util.PROP_PROVIDER -> source) ++ getLocationIfExists, extraOptions.toMap, ignoreIfExists = other == SaveMode.Ignore) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index f0758809bd749..5ac77310095cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -23,8 +23,8 @@ import scala.collection.mutable import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years} -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect} -import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.connector.catalog.CatalogV2Util import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation @@ -39,9 +39,10 @@ import org.apache.spark.sql.types.IntegerType final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) extends CreateTableWriter[T] { + import df.sparkSession.sessionState.analyzer.CatalogAndIdentifier + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.CatalogV2Util._ - import df.sparkSession.sessionState.analyzer.CatalogAndIdentifier private val df: DataFrame = ds.toDF() @@ -129,7 +130,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) identifier, partitioning.getOrElse(Seq.empty), logicalPlan, - properties = provider.map(p => properties + (TableCatalog.PROP_PROVIDER -> p)) + properties = provider.map(p => properties + (CatalogV2Util.PROP_PROVIDER -> p)) .getOrElse(properties).toMap, writeOptions = options.toMap, ignoreIfExists = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 0aaf9d7e2e1ac..b7a2e8bef2661 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, SupportsNamespaces, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, LookupCatalog, SupportsNamespaces, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable} @@ -246,9 +246,9 @@ class ResolveSessionCatalog( s"The database name is not valid: ${ns.quoted}") } - val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT) - val location = c.properties.get(SupportsNamespaces.PROP_LOCATION) - val newProperties = c.properties -- SupportsNamespaces.RESERVED_PROPERTIES.asScala + val comment = c.properties.get(CatalogV2Util.PROP_COMMENT) + val location = c.properties.get(CatalogV2Util.PROP_LOCATION) + val newProperties = c.properties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES CreateDatabaseCommand(ns.head, c.ifNotExists, location, comment, newProperties) case d @ DropNamespace(SessionCatalogAndNamespace(_, ns), _, _) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 9682778d40383..32bd56cb45a39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -21,7 +21,6 @@ import java.util.Locale import java.util.concurrent.TimeUnit._ import scala.collection.{GenMap, GenSeq} -import scala.collection.JavaConverters._ import scala.collection.parallel.ForkJoinTaskSupport import scala.collection.parallel.immutable.ParVector import scala.util.control.NonFatal @@ -38,8 +37,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ -import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.connector.catalog.CatalogV2Util._ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter @@ -183,7 +181,7 @@ case class DescribeDatabaseCommand( Row("Owner", allDbProperties.getOrElse(PROP_OWNER, "")) :: Nil if (extended) { - val properties = allDbProperties -- RESERVED_PROPERTIES.asScala + val properties = allDbProperties -- NAMESPACE_RESERVED_PROPERTIES val propertiesStr = if (properties.isEmpty) { "" @@ -277,7 +275,7 @@ case class AlterTableSetPropertiesCommand( // direct property. val newTable = table.copy( properties = table.properties ++ properties, - comment = properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment)) + comment = properties.get(PROP_COMMENT).orElse(table.comment)) catalog.alterTable(newTable) Seq.empty[Row] } @@ -306,14 +304,14 @@ case class AlterTableUnsetPropertiesCommand( DDLUtils.verifyAlterTableType(catalog, table, isView) if (!ifExists) { propKeys.foreach { k => - if (!table.properties.contains(k) && k != TableCatalog.PROP_COMMENT) { + if (!table.properties.contains(k) && k != PROP_COMMENT) { throw new AnalysisException( s"Attempted to unset non-existent property '$k' in table '${table.identifier}'") } } } // If comment is in the table property, we reset it to None - val tableComment = if (propKeys.contains(TableCatalog.PROP_COMMENT)) None else table.comment + val tableComment = if (propKeys.contains(PROP_COMMENT)) None else table.comment val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) } val newTable = table.copy(properties = newProperties, comment = tableComment) catalog.alterTable(newTable) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala index d5b81d13a7cc4..1498ee42c0a0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.catalog.SupportsNamespaces +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces} import org.apache.spark.util.Utils /** @@ -36,13 +36,12 @@ case class CreateNamespaceExec( extends V2CommandExec { override protected def run(): Seq[InternalRow] = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ val ns = namespace.toArray if (!catalog.namespaceExists(ns)) { try { val ownership = - Map(PROP_OWNER -> Utils.getCurrentUserName()) + Map(CatalogV2Util.PROP_OWNER -> Utils.getCurrentUserName()) catalog.createNamespace(ns, (properties ++ ownership).asJava) } catch { case _: NamespaceAlreadyExistsException if ifNotExists => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 7c8fd4e105ca7..942579bba66d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -264,16 +264,16 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat AlterNamespaceSetPropertiesExec( catalog, ns, - Map(SupportsNamespaces.PROP_LOCATION -> location)) :: Nil + Map(CatalogV2Util.PROP_LOCATION -> location)) :: Nil case CommentOnNamespace(ResolvedNamespace(catalog, ns), comment) => AlterNamespaceSetPropertiesExec( catalog, ns, - Map(SupportsNamespaces.PROP_COMMENT -> comment)) :: Nil + Map(CatalogV2Util.PROP_COMMENT -> comment)) :: Nil case CommentOnTable(ResolvedTable(catalog, identifier, _), comment) => - val changes = TableChange.setProperty(TableCatalog.PROP_COMMENT, comment) + val changes = TableChange.setProperty(CatalogV2Util.PROP_COMMENT, comment) AlterTableExec(catalog, identifier, Seq(changes)) :: Nil case CreateNamespace(catalog, namespace, ifNotExists, properties) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala index 9a0481ef01b63..b2198bd671d7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala @@ -35,7 +35,7 @@ case class DescribeNamespaceExec( namespace: Seq[String], isExtended: Boolean) extends V2CommandExec { private val encoder = RowEncoder(StructType.fromAttributes(output)).resolveAndBind() - import SupportsNamespaces._ + import org.apache.spark.sql.connector.catalog.CatalogV2Util._ override protected def run(): Seq[InternalRow] = { val rows = new ArrayBuffer[InternalRow]() @@ -44,12 +44,12 @@ case class DescribeNamespaceExec( rows += toCatalystRow("Namespace Name", ns.last) - SupportsNamespaces.RESERVED_PROPERTIES.asScala.foreach { p => + NAMESPACE_RESERVED_PROPERTIES.foreach { p => rows ++= Option(metadata.get(p)).map(toCatalystRow(p.capitalize, _)) } if (isExtended) { - val properties = metadata.asScala -- RESERVED_PROPERTIES.asScala + val properties = metadata.asScala -- NAMESPACE_RESERVED_PROPERTIES if (properties.nonEmpty) { rows += toCatalystRow("Properties", properties.toSeq.mkString("(", ",", ")")) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala index 2815b0ac131f0..175975cf6ab87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema} -import org.apache.spark.sql.connector.catalog.{Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Table} import org.apache.spark.sql.types.StructType case class DescribeTableExec( @@ -49,14 +49,14 @@ case class DescribeTableExec( rows += toCatalystRow("# Detailed Table Information", "", "") rows += toCatalystRow("Name", table.name(), "") - TableCatalog.RESERVED_PROPERTIES.asScala.toList.foreach(propKey => { + CatalogV2Util.TABLE_RESERVED_PROPERTIES.toList.foreach(propKey => { if (table.properties.containsKey(propKey)) { rows += toCatalystRow(propKey.capitalize, table.properties.get(propKey), "") } }) val properties = table.properties.asScala.toList - .filter(kv => !TableCatalog.RESERVED_PROPERTIES.contains(kv._1)) + .filter(kv => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(kv._1)) .sortBy(_._1).map { case (key, value) => key + "=" + value }.mkString("[", ",", "]") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 8eea1cf9c06e4..1a308436e760c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -83,9 +83,9 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) properties: util.Map[String, String]): Table = { val (partitionColumns, maybeBucketSpec) = V2SessionCatalog.convertTransforms(partitions) - val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName) + val provider = properties.getOrDefault(CatalogV2Util.PROP_PROVIDER, conf.defaultDataSourceName) val tableProperties = properties.asScala - val location = Option(properties.get(TableCatalog.PROP_LOCATION)) + val location = Option(properties.get(CatalogV2Util.PROP_LOCATION)) val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap) .copy(locationUri = location.map(CatalogUtils.stringToURI)) val tableType = if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED @@ -100,7 +100,7 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) bucketSpec = maybeBucketSpec, properties = tableProperties.toMap, tracksPartitionsInCatalog = conf.manageFilesourcePartitions, - comment = Option(properties.get(TableCatalog.PROP_COMMENT))) + comment = Option(properties.get(CatalogV2Util.PROP_COMMENT))) try { catalog.createTable(tableDesc, ignoreIfExists = false) @@ -124,8 +124,8 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes) val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes) - val comment = properties.get(TableCatalog.PROP_COMMENT) - val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner) + val comment = properties.get(CatalogV2Util.PROP_COMMENT) + val owner = properties.getOrElse(CatalogV2Util.PROP_OWNER, catalogTable.owner) try { catalog.alterTable( @@ -232,7 +232,7 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) // validate that this catalog's reserved properties are not removed changes.foreach { case remove: RemoveProperty - if SupportsNamespaces.RESERVED_PROPERTIES.contains(remove.property) => + if CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.contains(remove.property) => throw new UnsupportedOperationException( s"Cannot remove reserved property: ${remove.property}") case _ => @@ -296,13 +296,13 @@ private[sql] object V2SessionCatalog { defaultLocation: Option[URI] = None): CatalogDatabase = { CatalogDatabase( name = db, - description = metadata.getOrDefault(SupportsNamespaces.PROP_COMMENT, ""), - locationUri = Option(metadata.get(SupportsNamespaces.PROP_LOCATION)) + description = metadata.getOrDefault(CatalogV2Util.PROP_COMMENT, ""), + locationUri = Option(metadata.get(CatalogV2Util.PROP_LOCATION)) .map(CatalogUtils.stringToURI) .orElse(defaultLocation) .getOrElse(throw new IllegalArgumentException("Missing database location")), properties = metadata.asScala.toMap -- - Seq(SupportsNamespaces.PROP_COMMENT, SupportsNamespaces.PROP_LOCATION)) + Seq(CatalogV2Util.PROP_COMMENT, CatalogV2Util.PROP_LOCATION)) } private implicit class CatalogDatabaseHelper(catalogDatabase: CatalogDatabase) { @@ -312,8 +312,8 @@ private[sql] object V2SessionCatalog { catalogDatabase.properties.foreach { case (key, value) => metadata.put(key, value) } - metadata.put(SupportsNamespaces.PROP_LOCATION, catalogDatabase.locationUri.toString) - metadata.put(SupportsNamespaces.PROP_COMMENT, catalogDatabase.description) + metadata.put(CatalogV2Util.PROP_LOCATION, catalogDatabase.locationUri.toString) + metadata.put(CatalogV2Util.PROP_COMMENT, catalogDatabase.description) metadata.asJava } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index 4e6381aea3c31..2b4e4e3d33dca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -23,7 +23,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.InMemoryTableCatalog -import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog} import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType} @@ -38,7 +38,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo spark.sessionState.catalogManager.catalog(name).asTableCatalog } - private val defaultOwnership = Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName()) + private val defaultOwnership = Map(CatalogV2Util.PROP_OWNER -> Utils.getCurrentUserName()) before { spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 75c9bb7be05f4..620e9a5150ef0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources.SimpleScanSource -import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils @@ -162,7 +162,7 @@ class DataSourceV2SQLSuite Array("Comment", "this is a test table", ""), Array("Location", "/tmp/testcat/table_name", ""), Array("Provider", "foo", ""), - Array(TableCatalog.PROP_OWNER.capitalize, defaultUser, ""), + Array(CatalogV2Util.PROP_OWNER.capitalize, defaultUser, ""), Array("Table Properties", "[bar=baz]", ""))) } @@ -438,7 +438,7 @@ class DataSourceV2SQLSuite } test("ReplaceTableAsSelect: REPLACE TABLE throws exception if table is dropped before commit.") { - import InMemoryTableCatalog._ + import org.apache.spark.sql.connector.InMemoryTableCatalog._ spark.sql(s"CREATE TABLE testcat_atomic.created USING $v2Source AS SELECT id, data FROM source") intercept[CannotReplaceMissingTableException] { spark.sql(s"REPLACE TABLE testcat_atomic.replaced" + @@ -877,7 +877,7 @@ class DataSourceV2SQLSuite sql(s"CREATE NAMESPACE testcat.test LOCATION '$path'") val metadata = catalog("testcat").asNamespaceCatalog.loadNamespaceMetadata(Array("test")).asScala - val catalogPath = metadata(SupportsNamespaces.PROP_LOCATION) + val catalogPath = metadata(CatalogV2Util.PROP_LOCATION) assert(catalogPath.equals(catalogPath)) } } @@ -899,9 +899,9 @@ class DataSourceV2SQLSuite } test("CreateNameSpace: reserved properties") { - import SupportsNamespaces._ + import org.apache.spark.sql.connector.catalog.CatalogV2Util._ withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) { - RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => + NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => val exception = intercept[ParseException] { sql(s"CREATE NAMESPACE testcat.reservedTest WITH DBPROPERTIES('$key'='dummyVal')") } @@ -909,7 +909,7 @@ class DataSourceV2SQLSuite } } withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) { - RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => + NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => withNamespace("testcat.reservedTest") { sql(s"CREATE NAMESPACE testcat.reservedTest WITH DBPROPERTIES('$key'='foo')") assert(sql("DESC NAMESPACE EXTENDED testcat.reservedTest") @@ -926,9 +926,9 @@ class DataSourceV2SQLSuite } test("create/replace/alter table - reserved properties") { - import TableCatalog._ + import org.apache.spark.sql.connector.catalog.CatalogV2Util._ withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) { - RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => + TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => Seq("OPTIONS", "TBLPROPERTIES").foreach { clause => Seq("CREATE", "REPLACE").foreach { action => val e = intercept[ParseException] { @@ -950,7 +950,7 @@ class DataSourceV2SQLSuite } } withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) { - RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => + TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => Seq("OPTIONS", "TBLPROPERTIES").foreach { clause => withTable("testcat.reservedTest") { Seq("CREATE", "REPLACE").foreach { action => @@ -994,7 +994,7 @@ class DataSourceV2SQLSuite val tableCatalog = catalog("testcat").asTableCatalog val identifier = Identifier.of(Array(), "reservedTest") assert(tableCatalog.loadTable(identifier).properties() - .get(TableCatalog.PROP_LOCATION) == "foo", + .get(CatalogV2Util.PROP_LOCATION) == "foo", "path as a table property should not have side effects") assert(tableCatalog.loadTable(identifier).properties().get("path") == "bar", "path as a table property should not have side effects") @@ -1082,9 +1082,9 @@ class DataSourceV2SQLSuite val description = descriptionDf.collect() assert(description === Seq( Row("Namespace Name", "ns2"), - Row(SupportsNamespaces.PROP_COMMENT.capitalize, "test namespace"), - Row(SupportsNamespaces.PROP_LOCATION.capitalize, "/tmp/ns_test"), - Row(SupportsNamespaces.PROP_OWNER.capitalize, defaultUser)) + Row(CatalogV2Util.PROP_COMMENT.capitalize, "test namespace"), + Row(CatalogV2Util.PROP_LOCATION.capitalize, "/tmp/ns_test"), + Row(CatalogV2Util.PROP_OWNER.capitalize, defaultUser)) ) } } @@ -1097,18 +1097,18 @@ class DataSourceV2SQLSuite val descriptionDf = sql("DESCRIBE NAMESPACE EXTENDED testcat.ns1.ns2") assert(descriptionDf.collect() === Seq( Row("Namespace Name", "ns2"), - Row(SupportsNamespaces.PROP_COMMENT.capitalize, "test namespace"), - Row(SupportsNamespaces.PROP_LOCATION.capitalize, "/tmp/ns_test"), - Row(SupportsNamespaces.PROP_OWNER.capitalize, defaultUser), + Row(CatalogV2Util.PROP_COMMENT.capitalize, "test namespace"), + Row(CatalogV2Util.PROP_LOCATION.capitalize, "/tmp/ns_test"), + Row(CatalogV2Util.PROP_OWNER.capitalize, defaultUser), Row("Properties", "((a,b),(b,a),(c,c))")) ) } } test("AlterNamespaceSetProperties: reserved properties") { - import SupportsNamespaces._ + import org.apache.spark.sql.connector.catalog.CatalogV2Util._ withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) { - RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => + NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => withNamespace("testcat.reservedTest") { sql("CREATE NAMESPACE testcat.reservedTest") val exception = intercept[ParseException] { @@ -1119,7 +1119,7 @@ class DataSourceV2SQLSuite } } withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) { - RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => + NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => withNamespace("testcat.reservedTest") { sql(s"CREATE NAMESPACE testcat.reservedTest") sql(s"ALTER NAMESPACE testcat.reservedTest SET PROPERTIES ('$key'='foo')") @@ -1144,9 +1144,9 @@ class DataSourceV2SQLSuite val descriptionDf = sql("DESCRIBE NAMESPACE EXTENDED testcat.ns1.ns2") assert(descriptionDf.collect() === Seq( Row("Namespace Name", "ns2"), - Row(SupportsNamespaces.PROP_COMMENT.capitalize, "test namespace"), - Row(SupportsNamespaces.PROP_LOCATION.capitalize, "/tmp/ns_test_2"), - Row(SupportsNamespaces.PROP_OWNER.capitalize, defaultUser)) + Row(CatalogV2Util.PROP_COMMENT.capitalize, "test namespace"), + Row(CatalogV2Util.PROP_LOCATION.capitalize, "/tmp/ns_test_2"), + Row(CatalogV2Util.PROP_OWNER.capitalize, defaultUser)) ) } } @@ -1942,7 +1942,7 @@ class DataSourceV2SQLSuite .add("value", StringType, nullable = false) val expected = Seq( - Row(TableCatalog.PROP_OWNER, defaultUser), + Row(CatalogV2Util.PROP_OWNER, defaultUser), Row("provider", provider), Row("status", status), Row("user", user)) @@ -2171,7 +2171,7 @@ class DataSourceV2SQLSuite Option(comment).map("'" + _ + "'").getOrElse("NULL")) val expectedComment = Option(comment).getOrElse("") assert(sql(s"DESC NAMESPACE extended $namespace").toDF("k", "v") - .where(s"k='${SupportsNamespaces.PROP_COMMENT.capitalize}'") + .where(s"k='${CatalogV2Util.PROP_COMMENT.capitalize}'") .head().getString(1) === expectedComment) } @@ -2209,7 +2209,7 @@ class DataSourceV2SQLSuite sql(s"COMMENT ON TABLE $tableName IS " + Option(comment).map("'" + _ + "'").getOrElse("NULL")) val expectedComment = Option(comment).getOrElse("") assert(sql(s"DESC extended $tableName").toDF("k", "v", "c") - .where(s"k='${TableCatalog.PROP_COMMENT.capitalize}'") + .where(s"k='${CatalogV2Util.PROP_COMMENT.capitalize}'") .head().getString(1) === expectedComment) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index e3fb535ab4cdd..3db9466587d71 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER +import org.apache.spark.sql.connector.catalog.CatalogV2Util.PROP_OWNER import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 9466ed92819c9..7ccaad218a0a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -27,9 +27,9 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, TableChange} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, TableChange} import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap abstract class V2SessionCatalogBaseSuite extends SharedSparkSession with BeforeAndAfter { @@ -742,7 +742,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { actual: scala.collection.Map[String, String]): Unit = { // remove location and comment that are automatically added by HMS unless they are expected val toRemove = - SupportsNamespaces.RESERVED_PROPERTIES.asScala.filter(expected.contains) + CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filter(expected.contains) assert(expected -- toRemove === actual) } @@ -1000,7 +1000,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { catalog.createNamespace(testNs, emptyProps) - SupportsNamespaces.RESERVED_PROPERTIES.asScala.foreach { p => + CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.foreach { p => val exc = intercept[UnsupportedOperationException] { catalog.alterNamespace(testNs, NamespaceChange.removeProperty(p)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index b5c5f0e9381bc..ac1924dcb381b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -53,7 +53,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} -import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ +import org.apache.spark.sql.connector.catalog.CatalogV2Util._ import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index ba48cfd4142f6..0a4926fb719d9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAl import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.connector.catalog.CatalogManager -import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER +import org.apache.spark.sql.connector.catalog.CatalogV2Util.PROP_OWNER import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.HiveExternalCatalog From 938aeeaf6ed8e3d8c7111fc5b054a4f886669578 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 22 Jan 2020 19:12:16 +0800 Subject: [PATCH 2/7] style --- .../apache/spark/sql/connector/catalog/SupportsNamespaces.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java index ca9451e4b1594..40d9c2b122817 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java @@ -21,8 +21,6 @@ import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; -import java.util.Arrays; -import java.util.List; import java.util.Map; /** From 276786c1662496d59c853b018f5308cfc4960d4e Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 23 Jan 2020 13:56:07 +0800 Subject: [PATCH 3/7] Revert "style" This reverts commit 938aeeaf6ed8e3d8c7111fc5b054a4f886669578. --- .../apache/spark/sql/connector/catalog/SupportsNamespaces.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java index 40d9c2b122817..ca9451e4b1594 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java @@ -21,6 +21,8 @@ import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import java.util.Arrays; +import java.util.List; import java.util.Map; /** From 9f83ca7244180b2dccdb56d97f5acd222488945a Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 23 Jan 2020 13:56:19 +0800 Subject: [PATCH 4/7] Revert "[SPARK-30603][SQL] Keep the reserved properties of namespaces and tables private" This reverts commit e193bc34766a55e2c17605b0bec371d53aec8353. --- .../connector/catalog/SupportsNamespaces.java | 28 ++++++++++ .../sql/connector/catalog/TableCatalog.java | 31 ++++++++++- .../sql/catalyst/parser/AstBuilder.scala | 8 +-- .../catalyst/plans/logical/v2Commands.scala | 2 +- .../sql/connector/catalog/CatalogV2Util.scala | 50 ++---------------- .../catalog/ExternalCatalogSuite.scala | 2 +- .../catalog/SessionCatalogSuite.scala | 2 +- .../apache/spark/sql/DataFrameWriter.scala | 21 ++++---- .../apache/spark/sql/DataFrameWriterV2.scala | 9 ++-- .../analysis/ResolveSessionCatalog.scala | 8 +-- .../spark/sql/execution/command/ddl.scala | 12 +++-- .../datasources/v2/CreateNamespaceExec.scala | 5 +- .../datasources/v2/DataSourceV2Strategy.scala | 6 +-- .../v2/DescribeNamespaceExec.scala | 6 +-- .../datasources/v2/DescribeTableExec.scala | 6 +-- .../datasources/v2/V2SessionCatalog.scala | 22 ++++---- .../spark/sql/DataFrameWriterV2Suite.scala | 4 +- .../sql/connector/DataSourceV2SQLSuite.scala | 52 +++++++++---------- .../sql/execution/command/DDLSuite.scala | 2 +- .../v2/V2SessionCatalogSuite.scala | 8 +-- .../sql/hive/client/HiveClientImpl.scala | 2 +- .../sql/hive/execution/HiveDDLSuite.scala | 2 +- 22 files changed, 152 insertions(+), 136 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java index ca9451e4b1594..757d303e82255 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java @@ -41,6 +41,34 @@ @Experimental public interface SupportsNamespaces extends CatalogPlugin { + /** + * A property to specify the location of the namespace. If the namespace + * needs to store files, it should be under this location. + */ + String PROP_LOCATION = "location"; + + /** + * A property to specify the description of the namespace. The description + * will be returned in the result of "DESCRIBE NAMESPACE" command. + */ + String PROP_COMMENT = "comment"; + + /** + * A property to specify the owner of the namespace. + */ + String PROP_OWNER = "owner"; + + /** + * The list of reserved namespace properties, which can not be removed or changed directly by + * the syntax: + * {{ + * ALTER NAMESPACE ... SET PROPERTIES ... + * }} + * + * They need specific syntax to modify + */ + List RESERVED_PROPERTIES = Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_OWNER); + /** * Return a default namespace for the catalog. *

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 97a5ae3d80b4c..591e1c631be13 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -18,12 +18,14 @@ package org.apache.spark.sql.connector.catalog; import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; -import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.types.StructType; +import java.util.Arrays; +import java.util.List; import java.util.Map; /** @@ -38,6 +40,33 @@ @Experimental public interface TableCatalog extends CatalogPlugin { + /** + * A property to specify the location of the table. The files of the table + * should be under this location. + */ + String PROP_LOCATION = "location"; + + /** + * A property to specify the description of the table. + */ + String PROP_COMMENT = "comment"; + + /** + * A property to specify the provider of the table. + */ + String PROP_PROVIDER = "provider"; + + /** + * A property to specify the owner of the table. + */ + String PROP_OWNER = "owner"; + + /** + * The list of reserved table properties. + */ + List RESERVED_PROPERTIES = + Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER, PROP_OWNER); + /** * List the tables in a namespace from the catalog. *

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 4ff8cd22d803c..a8a96f0f6803a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp} import org.apache.spark.sql.catalyst.util.IntervalUtils import org.apache.spark.sql.catalyst.util.IntervalUtils.IntervalUnit -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces, TableCatalog} +import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog} import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} import org.apache.spark.sql.internal.SQLConf @@ -2526,7 +2526,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging private def cleanNamespaceProperties( properties: Map[String, String], ctx: ParserRuleContext): Map[String, String] = withOrigin(ctx) { - import CatalogV2Util._ + import SupportsNamespaces._ val legacyOn = conf.getConf(SQLConf.LEGACY_PROPERTY_NON_RESERVED) properties.filter { case (PROP_LOCATION, _) if !legacyOn => @@ -2556,7 +2556,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * }}} */ override def visitCreateNamespace(ctx: CreateNamespaceContext): LogicalPlan = withOrigin(ctx) { - import CatalogV2Util._ + import SupportsNamespaces._ checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx) checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) checkDuplicateClauses(ctx.PROPERTIES, "WITH PROPERTIES", ctx) @@ -2666,7 +2666,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging def cleanTableProperties( ctx: ParserRuleContext, properties: Map[String, String]): Map[String, String] = { - import CatalogV2Util._ + import TableCatalog._ val legacyOn = conf.getConf(SQLConf.LEGACY_PROPERTY_NON_RESERVED) properties.filter { case (PROP_PROVIDER, _) if !legacyOn => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index c8bd016b4c3d2..3e3c81c22b61d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -499,7 +499,7 @@ case class AlterTableSetLocation( partitionSpec: Option[TablePartitionSpec], location: String) extends AlterTable { override lazy val changes: Seq[TableChange] = { - Seq(TableChange.setProperty(CatalogV2Util.PROP_LOCATION, location)) + Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, location)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index e8a1e4353ce4b..a4c7b4c3a2894 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -32,48 +32,6 @@ import org.apache.spark.util.Utils private[sql] object CatalogV2Util { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - /** - * A property to specify the location of the table or namespace - */ - val PROP_LOCATION = "location" - - /** - * A property to specify the description of the table. - */ - val PROP_COMMENT = "comment" - - /** - * A property to specify the provider of the table. - */ - val PROP_PROVIDER = "provider" - - /** - * A property to specify the owner of the table. - */ - val PROP_OWNER = "owner" - - /** - * The list of reserved table properties, which can not be removed or changed directly by - * the syntax: - * {{ - * ALTER TABLE ... SET TBLPROPERTIES ... - * }} - * - * They need specific syntax to modify - */ - val TABLE_RESERVED_PROPERTIES = Seq(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER, PROP_OWNER) - - /** - * The list of reserved namespace properties, which can not be removed or changed directly by - * the syntax: - * {{ - * ALTER NAMESPACE ... SET PROPERTIES ... - * }} - * - * They need specific syntax to modify - */ - val NAMESPACE_RESERVED_PROPERTIES = Seq(PROP_COMMENT, PROP_LOCATION, PROP_OWNER) - /** * Apply properties changes to a map and return the result. */ @@ -313,13 +271,13 @@ private[sql] object CatalogV2Util { provider: String): Map[String, String] = { properties ++ options ++ - Map(PROP_PROVIDER -> provider) ++ - comment.map(PROP_COMMENT -> _) ++ - location.map(PROP_LOCATION -> _) + Map(TableCatalog.PROP_PROVIDER -> provider) ++ + comment.map(TableCatalog.PROP_COMMENT -> _) ++ + location.map(TableCatalog.PROP_LOCATION -> _) } def withDefaultOwnership(properties: Map[String, String]): Map[String, String] = { - properties ++ Map(PROP_OWNER -> Utils.getCurrentUserName()) + properties ++ Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName()) } def getTableProviderCatalog( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 9989c5b3487e7..55712d0da518d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, N import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.connector.catalog.CatalogV2Util.PROP_OWNER +import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER import org.apache.spark.sql.types._ import org.apache.spark.util.Utils diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 397a5aaf95cf6..0d9e2f61e812a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View} import org.apache.spark.sql.connector.catalog.CatalogManager -import org.apache.spark.sql.connector.catalog.CatalogV2Util.PROP_OWNER +import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 4473669513f72..998ec9ebdff85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -26,18 +26,18 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.connector.catalog._ +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, SupportsWrite, Table, TableCatalog, TableProvider, V1Table} import org.apache.spark.sql.connector.catalog.TableCapability._ -import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} +import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, LiteralValue, Transform} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.sources.BaseRelation -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{IntegerType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap /** @@ -303,7 +303,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val catalog = CatalogV2Util.getTableProviderCatalog( supportsExtract, sessionState.catalogManager, dsOptions) - val location = Option(dsOptions.get("path")).map(CatalogV2Util.PROP_LOCATION -> _) + val location = Option(dsOptions.get("path")).map(TableCatalog.PROP_LOCATION -> _) runCommand(df.sparkSession, "save") { CreateTableAsSelect( @@ -311,7 +311,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ident, partitioningAsV2, df.queryExecution.analyzed, - Map(CatalogV2Util.PROP_PROVIDER -> source) ++ location, + Map(TableCatalog.PROP_PROVIDER -> source) ++ location, extraOptions.toMap, ignoreIfExists = createMode == SaveMode.Ignore) } @@ -380,8 +380,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ def insertInto(tableName: String): Unit = { import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, NonSessionCatalogAndIdentifier, SessionCatalogAndIdentifier} - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + import org.apache.spark.sql.connector.catalog.CatalogV2Util._ assertNotBucketed("insertInto") @@ -520,7 +520,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ def saveAsTable(tableName: String): Unit = { import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, NonSessionCatalogAndIdentifier, SessionCatalogAndIdentifier} - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ val session = df.sparkSession @@ -551,7 +550,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { def getLocationIfExists: Option[(String, String)] = { val opts = CaseInsensitiveMap(extraOptions.toMap) - opts.get("path").map(CatalogV2Util.PROP_LOCATION -> _) + opts.get("path").map(TableCatalog.PROP_LOCATION -> _) } val command = (mode, tableOpt) match { @@ -568,7 +567,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ident, partitioningAsV2, df.queryExecution.analyzed, - Map(CatalogV2Util.PROP_PROVIDER -> source) ++ getLocationIfExists, + Map(TableCatalog.PROP_PROVIDER -> source) ++ getLocationIfExists, extraOptions.toMap, orCreate = true) // Create the table if it doesn't exist @@ -581,7 +580,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ident, partitioningAsV2, df.queryExecution.analyzed, - Map(CatalogV2Util.PROP_PROVIDER -> source) ++ getLocationIfExists, + Map(TableCatalog.PROP_PROVIDER -> source) ++ getLocationIfExists, extraOptions.toMap, ignoreIfExists = other == SaveMode.Ignore) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index 5ac77310095cf..f0758809bd749 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -23,8 +23,8 @@ import scala.collection.mutable import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years} -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.connector.catalog.CatalogV2Util +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect} +import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation @@ -39,10 +39,9 @@ import org.apache.spark.sql.types.IntegerType final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) extends CreateTableWriter[T] { - import df.sparkSession.sessionState.analyzer.CatalogAndIdentifier - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.CatalogV2Util._ + import df.sparkSession.sessionState.analyzer.CatalogAndIdentifier private val df: DataFrame = ds.toDF() @@ -130,7 +129,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) identifier, partitioning.getOrElse(Seq.empty), logicalPlan, - properties = provider.map(p => properties + (CatalogV2Util.PROP_PROVIDER -> p)) + properties = provider.map(p => properties + (TableCatalog.PROP_PROVIDER -> p)) .getOrElse(properties).toMap, writeOptions = options.toMap, ignoreIfExists = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index b7a2e8bef2661..0aaf9d7e2e1ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, LookupCatalog, SupportsNamespaces, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, SupportsNamespaces, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable} @@ -246,9 +246,9 @@ class ResolveSessionCatalog( s"The database name is not valid: ${ns.quoted}") } - val comment = c.properties.get(CatalogV2Util.PROP_COMMENT) - val location = c.properties.get(CatalogV2Util.PROP_LOCATION) - val newProperties = c.properties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES + val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT) + val location = c.properties.get(SupportsNamespaces.PROP_LOCATION) + val newProperties = c.properties -- SupportsNamespaces.RESERVED_PROPERTIES.asScala CreateDatabaseCommand(ns.head, c.ifNotExists, location, comment, newProperties) case d @ DropNamespace(SessionCatalogAndNamespace(_, ns), _, _) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 32bd56cb45a39..9682778d40383 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -21,6 +21,7 @@ import java.util.Locale import java.util.concurrent.TimeUnit._ import scala.collection.{GenMap, GenSeq} +import scala.collection.JavaConverters._ import scala.collection.parallel.ForkJoinTaskSupport import scala.collection.parallel.immutable.ParVector import scala.util.control.NonFatal @@ -37,7 +38,8 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.connector.catalog.CatalogV2Util._ +import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ +import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter @@ -181,7 +183,7 @@ case class DescribeDatabaseCommand( Row("Owner", allDbProperties.getOrElse(PROP_OWNER, "")) :: Nil if (extended) { - val properties = allDbProperties -- NAMESPACE_RESERVED_PROPERTIES + val properties = allDbProperties -- RESERVED_PROPERTIES.asScala val propertiesStr = if (properties.isEmpty) { "" @@ -275,7 +277,7 @@ case class AlterTableSetPropertiesCommand( // direct property. val newTable = table.copy( properties = table.properties ++ properties, - comment = properties.get(PROP_COMMENT).orElse(table.comment)) + comment = properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment)) catalog.alterTable(newTable) Seq.empty[Row] } @@ -304,14 +306,14 @@ case class AlterTableUnsetPropertiesCommand( DDLUtils.verifyAlterTableType(catalog, table, isView) if (!ifExists) { propKeys.foreach { k => - if (!table.properties.contains(k) && k != PROP_COMMENT) { + if (!table.properties.contains(k) && k != TableCatalog.PROP_COMMENT) { throw new AnalysisException( s"Attempted to unset non-existent property '$k' in table '${table.identifier}'") } } } // If comment is in the table property, we reset it to None - val tableComment = if (propKeys.contains(PROP_COMMENT)) None else table.comment + val tableComment = if (propKeys.contains(TableCatalog.PROP_COMMENT)) None else table.comment val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) } val newTable = table.copy(properties = newProperties, comment = tableComment) catalog.alterTable(newTable) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala index 1498ee42c0a0c..d5b81d13a7cc4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces} +import org.apache.spark.sql.connector.catalog.SupportsNamespaces import org.apache.spark.util.Utils /** @@ -36,12 +36,13 @@ case class CreateNamespaceExec( extends V2CommandExec { override protected def run(): Seq[InternalRow] = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ val ns = namespace.toArray if (!catalog.namespaceExists(ns)) { try { val ownership = - Map(CatalogV2Util.PROP_OWNER -> Utils.getCurrentUserName()) + Map(PROP_OWNER -> Utils.getCurrentUserName()) catalog.createNamespace(ns, (properties ++ ownership).asJava) } catch { case _: NamespaceAlreadyExistsException if ifNotExists => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 942579bba66d8..7c8fd4e105ca7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -264,16 +264,16 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat AlterNamespaceSetPropertiesExec( catalog, ns, - Map(CatalogV2Util.PROP_LOCATION -> location)) :: Nil + Map(SupportsNamespaces.PROP_LOCATION -> location)) :: Nil case CommentOnNamespace(ResolvedNamespace(catalog, ns), comment) => AlterNamespaceSetPropertiesExec( catalog, ns, - Map(CatalogV2Util.PROP_COMMENT -> comment)) :: Nil + Map(SupportsNamespaces.PROP_COMMENT -> comment)) :: Nil case CommentOnTable(ResolvedTable(catalog, identifier, _), comment) => - val changes = TableChange.setProperty(CatalogV2Util.PROP_COMMENT, comment) + val changes = TableChange.setProperty(TableCatalog.PROP_COMMENT, comment) AlterTableExec(catalog, identifier, Seq(changes)) :: Nil case CreateNamespace(catalog, namespace, ifNotExists, properties) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala index b2198bd671d7e..9a0481ef01b63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala @@ -35,7 +35,7 @@ case class DescribeNamespaceExec( namespace: Seq[String], isExtended: Boolean) extends V2CommandExec { private val encoder = RowEncoder(StructType.fromAttributes(output)).resolveAndBind() - import org.apache.spark.sql.connector.catalog.CatalogV2Util._ + import SupportsNamespaces._ override protected def run(): Seq[InternalRow] = { val rows = new ArrayBuffer[InternalRow]() @@ -44,12 +44,12 @@ case class DescribeNamespaceExec( rows += toCatalystRow("Namespace Name", ns.last) - NAMESPACE_RESERVED_PROPERTIES.foreach { p => + SupportsNamespaces.RESERVED_PROPERTIES.asScala.foreach { p => rows ++= Option(metadata.get(p)).map(toCatalystRow(p.capitalize, _)) } if (isExtended) { - val properties = metadata.asScala -- NAMESPACE_RESERVED_PROPERTIES + val properties = metadata.asScala -- RESERVED_PROPERTIES.asScala if (properties.nonEmpty) { rows += toCatalystRow("Properties", properties.toSeq.mkString("(", ",", ")")) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala index 175975cf6ab87..2815b0ac131f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema} -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Table} +import org.apache.spark.sql.connector.catalog.{Table, TableCatalog} import org.apache.spark.sql.types.StructType case class DescribeTableExec( @@ -49,14 +49,14 @@ case class DescribeTableExec( rows += toCatalystRow("# Detailed Table Information", "", "") rows += toCatalystRow("Name", table.name(), "") - CatalogV2Util.TABLE_RESERVED_PROPERTIES.toList.foreach(propKey => { + TableCatalog.RESERVED_PROPERTIES.asScala.toList.foreach(propKey => { if (table.properties.containsKey(propKey)) { rows += toCatalystRow(propKey.capitalize, table.properties.get(propKey), "") } }) val properties = table.properties.asScala.toList - .filter(kv => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(kv._1)) + .filter(kv => !TableCatalog.RESERVED_PROPERTIES.contains(kv._1)) .sortBy(_._1).map { case (key, value) => key + "=" + value }.mkString("[", ",", "]") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 1a308436e760c..8eea1cf9c06e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -83,9 +83,9 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) properties: util.Map[String, String]): Table = { val (partitionColumns, maybeBucketSpec) = V2SessionCatalog.convertTransforms(partitions) - val provider = properties.getOrDefault(CatalogV2Util.PROP_PROVIDER, conf.defaultDataSourceName) + val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName) val tableProperties = properties.asScala - val location = Option(properties.get(CatalogV2Util.PROP_LOCATION)) + val location = Option(properties.get(TableCatalog.PROP_LOCATION)) val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap) .copy(locationUri = location.map(CatalogUtils.stringToURI)) val tableType = if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED @@ -100,7 +100,7 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) bucketSpec = maybeBucketSpec, properties = tableProperties.toMap, tracksPartitionsInCatalog = conf.manageFilesourcePartitions, - comment = Option(properties.get(CatalogV2Util.PROP_COMMENT))) + comment = Option(properties.get(TableCatalog.PROP_COMMENT))) try { catalog.createTable(tableDesc, ignoreIfExists = false) @@ -124,8 +124,8 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes) val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes) - val comment = properties.get(CatalogV2Util.PROP_COMMENT) - val owner = properties.getOrElse(CatalogV2Util.PROP_OWNER, catalogTable.owner) + val comment = properties.get(TableCatalog.PROP_COMMENT) + val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner) try { catalog.alterTable( @@ -232,7 +232,7 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) // validate that this catalog's reserved properties are not removed changes.foreach { case remove: RemoveProperty - if CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.contains(remove.property) => + if SupportsNamespaces.RESERVED_PROPERTIES.contains(remove.property) => throw new UnsupportedOperationException( s"Cannot remove reserved property: ${remove.property}") case _ => @@ -296,13 +296,13 @@ private[sql] object V2SessionCatalog { defaultLocation: Option[URI] = None): CatalogDatabase = { CatalogDatabase( name = db, - description = metadata.getOrDefault(CatalogV2Util.PROP_COMMENT, ""), - locationUri = Option(metadata.get(CatalogV2Util.PROP_LOCATION)) + description = metadata.getOrDefault(SupportsNamespaces.PROP_COMMENT, ""), + locationUri = Option(metadata.get(SupportsNamespaces.PROP_LOCATION)) .map(CatalogUtils.stringToURI) .orElse(defaultLocation) .getOrElse(throw new IllegalArgumentException("Missing database location")), properties = metadata.asScala.toMap -- - Seq(CatalogV2Util.PROP_COMMENT, CatalogV2Util.PROP_LOCATION)) + Seq(SupportsNamespaces.PROP_COMMENT, SupportsNamespaces.PROP_LOCATION)) } private implicit class CatalogDatabaseHelper(catalogDatabase: CatalogDatabase) { @@ -312,8 +312,8 @@ private[sql] object V2SessionCatalog { catalogDatabase.properties.foreach { case (key, value) => metadata.put(key, value) } - metadata.put(CatalogV2Util.PROP_LOCATION, catalogDatabase.locationUri.toString) - metadata.put(CatalogV2Util.PROP_COMMENT, catalogDatabase.description) + metadata.put(SupportsNamespaces.PROP_LOCATION, catalogDatabase.locationUri.toString) + metadata.put(SupportsNamespaces.PROP_COMMENT, catalogDatabase.description) metadata.asJava } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index 2b4e4e3d33dca..4e6381aea3c31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -23,7 +23,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.InMemoryTableCatalog -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog} +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType} @@ -38,7 +38,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo spark.sessionState.catalogManager.catalog(name).asTableCatalog } - private val defaultOwnership = Map(CatalogV2Util.PROP_OWNER -> Utils.getCurrentUserName()) + private val defaultOwnership = Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName()) before { spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 620e9a5150ef0..75c9bb7be05f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources.SimpleScanSource -import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils @@ -162,7 +162,7 @@ class DataSourceV2SQLSuite Array("Comment", "this is a test table", ""), Array("Location", "/tmp/testcat/table_name", ""), Array("Provider", "foo", ""), - Array(CatalogV2Util.PROP_OWNER.capitalize, defaultUser, ""), + Array(TableCatalog.PROP_OWNER.capitalize, defaultUser, ""), Array("Table Properties", "[bar=baz]", ""))) } @@ -438,7 +438,7 @@ class DataSourceV2SQLSuite } test("ReplaceTableAsSelect: REPLACE TABLE throws exception if table is dropped before commit.") { - import org.apache.spark.sql.connector.InMemoryTableCatalog._ + import InMemoryTableCatalog._ spark.sql(s"CREATE TABLE testcat_atomic.created USING $v2Source AS SELECT id, data FROM source") intercept[CannotReplaceMissingTableException] { spark.sql(s"REPLACE TABLE testcat_atomic.replaced" + @@ -877,7 +877,7 @@ class DataSourceV2SQLSuite sql(s"CREATE NAMESPACE testcat.test LOCATION '$path'") val metadata = catalog("testcat").asNamespaceCatalog.loadNamespaceMetadata(Array("test")).asScala - val catalogPath = metadata(CatalogV2Util.PROP_LOCATION) + val catalogPath = metadata(SupportsNamespaces.PROP_LOCATION) assert(catalogPath.equals(catalogPath)) } } @@ -899,9 +899,9 @@ class DataSourceV2SQLSuite } test("CreateNameSpace: reserved properties") { - import org.apache.spark.sql.connector.catalog.CatalogV2Util._ + import SupportsNamespaces._ withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) { - NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => + RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => val exception = intercept[ParseException] { sql(s"CREATE NAMESPACE testcat.reservedTest WITH DBPROPERTIES('$key'='dummyVal')") } @@ -909,7 +909,7 @@ class DataSourceV2SQLSuite } } withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) { - NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => + RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => withNamespace("testcat.reservedTest") { sql(s"CREATE NAMESPACE testcat.reservedTest WITH DBPROPERTIES('$key'='foo')") assert(sql("DESC NAMESPACE EXTENDED testcat.reservedTest") @@ -926,9 +926,9 @@ class DataSourceV2SQLSuite } test("create/replace/alter table - reserved properties") { - import org.apache.spark.sql.connector.catalog.CatalogV2Util._ + import TableCatalog._ withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) { - TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => + RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => Seq("OPTIONS", "TBLPROPERTIES").foreach { clause => Seq("CREATE", "REPLACE").foreach { action => val e = intercept[ParseException] { @@ -950,7 +950,7 @@ class DataSourceV2SQLSuite } } withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) { - TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => + RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => Seq("OPTIONS", "TBLPROPERTIES").foreach { clause => withTable("testcat.reservedTest") { Seq("CREATE", "REPLACE").foreach { action => @@ -994,7 +994,7 @@ class DataSourceV2SQLSuite val tableCatalog = catalog("testcat").asTableCatalog val identifier = Identifier.of(Array(), "reservedTest") assert(tableCatalog.loadTable(identifier).properties() - .get(CatalogV2Util.PROP_LOCATION) == "foo", + .get(TableCatalog.PROP_LOCATION) == "foo", "path as a table property should not have side effects") assert(tableCatalog.loadTable(identifier).properties().get("path") == "bar", "path as a table property should not have side effects") @@ -1082,9 +1082,9 @@ class DataSourceV2SQLSuite val description = descriptionDf.collect() assert(description === Seq( Row("Namespace Name", "ns2"), - Row(CatalogV2Util.PROP_COMMENT.capitalize, "test namespace"), - Row(CatalogV2Util.PROP_LOCATION.capitalize, "/tmp/ns_test"), - Row(CatalogV2Util.PROP_OWNER.capitalize, defaultUser)) + Row(SupportsNamespaces.PROP_COMMENT.capitalize, "test namespace"), + Row(SupportsNamespaces.PROP_LOCATION.capitalize, "/tmp/ns_test"), + Row(SupportsNamespaces.PROP_OWNER.capitalize, defaultUser)) ) } } @@ -1097,18 +1097,18 @@ class DataSourceV2SQLSuite val descriptionDf = sql("DESCRIBE NAMESPACE EXTENDED testcat.ns1.ns2") assert(descriptionDf.collect() === Seq( Row("Namespace Name", "ns2"), - Row(CatalogV2Util.PROP_COMMENT.capitalize, "test namespace"), - Row(CatalogV2Util.PROP_LOCATION.capitalize, "/tmp/ns_test"), - Row(CatalogV2Util.PROP_OWNER.capitalize, defaultUser), + Row(SupportsNamespaces.PROP_COMMENT.capitalize, "test namespace"), + Row(SupportsNamespaces.PROP_LOCATION.capitalize, "/tmp/ns_test"), + Row(SupportsNamespaces.PROP_OWNER.capitalize, defaultUser), Row("Properties", "((a,b),(b,a),(c,c))")) ) } } test("AlterNamespaceSetProperties: reserved properties") { - import org.apache.spark.sql.connector.catalog.CatalogV2Util._ + import SupportsNamespaces._ withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) { - NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => + RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => withNamespace("testcat.reservedTest") { sql("CREATE NAMESPACE testcat.reservedTest") val exception = intercept[ParseException] { @@ -1119,7 +1119,7 @@ class DataSourceV2SQLSuite } } withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) { - NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => + RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => withNamespace("testcat.reservedTest") { sql(s"CREATE NAMESPACE testcat.reservedTest") sql(s"ALTER NAMESPACE testcat.reservedTest SET PROPERTIES ('$key'='foo')") @@ -1144,9 +1144,9 @@ class DataSourceV2SQLSuite val descriptionDf = sql("DESCRIBE NAMESPACE EXTENDED testcat.ns1.ns2") assert(descriptionDf.collect() === Seq( Row("Namespace Name", "ns2"), - Row(CatalogV2Util.PROP_COMMENT.capitalize, "test namespace"), - Row(CatalogV2Util.PROP_LOCATION.capitalize, "/tmp/ns_test_2"), - Row(CatalogV2Util.PROP_OWNER.capitalize, defaultUser)) + Row(SupportsNamespaces.PROP_COMMENT.capitalize, "test namespace"), + Row(SupportsNamespaces.PROP_LOCATION.capitalize, "/tmp/ns_test_2"), + Row(SupportsNamespaces.PROP_OWNER.capitalize, defaultUser)) ) } } @@ -1942,7 +1942,7 @@ class DataSourceV2SQLSuite .add("value", StringType, nullable = false) val expected = Seq( - Row(CatalogV2Util.PROP_OWNER, defaultUser), + Row(TableCatalog.PROP_OWNER, defaultUser), Row("provider", provider), Row("status", status), Row("user", user)) @@ -2171,7 +2171,7 @@ class DataSourceV2SQLSuite Option(comment).map("'" + _ + "'").getOrElse("NULL")) val expectedComment = Option(comment).getOrElse("") assert(sql(s"DESC NAMESPACE extended $namespace").toDF("k", "v") - .where(s"k='${CatalogV2Util.PROP_COMMENT.capitalize}'") + .where(s"k='${SupportsNamespaces.PROP_COMMENT.capitalize}'") .head().getString(1) === expectedComment) } @@ -2209,7 +2209,7 @@ class DataSourceV2SQLSuite sql(s"COMMENT ON TABLE $tableName IS " + Option(comment).map("'" + _ + "'").getOrElse("NULL")) val expectedComment = Option(comment).getOrElse("") assert(sql(s"DESC extended $tableName").toDF("k", "v", "c") - .where(s"k='${CatalogV2Util.PROP_COMMENT.capitalize}'") + .where(s"k='${TableCatalog.PROP_COMMENT.capitalize}'") .head().getString(1) === expectedComment) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 3db9466587d71..e3fb535ab4cdd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.connector.catalog.CatalogV2Util.PROP_OWNER +import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 7ccaad218a0a3..9466ed92819c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -27,9 +27,9 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, TableChange} +import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, TableChange} import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap abstract class V2SessionCatalogBaseSuite extends SharedSparkSession with BeforeAndAfter { @@ -742,7 +742,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { actual: scala.collection.Map[String, String]): Unit = { // remove location and comment that are automatically added by HMS unless they are expected val toRemove = - CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filter(expected.contains) + SupportsNamespaces.RESERVED_PROPERTIES.asScala.filter(expected.contains) assert(expected -- toRemove === actual) } @@ -1000,7 +1000,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { catalog.createNamespace(testNs, emptyProps) - CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.foreach { p => + SupportsNamespaces.RESERVED_PROPERTIES.asScala.foreach { p => val exc = intercept[UnsupportedOperationException] { catalog.alterNamespace(testNs, NamespaceChange.removeProperty(p)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index ac1924dcb381b..b5c5f0e9381bc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -53,7 +53,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} -import org.apache.spark.sql.connector.catalog.CatalogV2Util._ +import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 0a4926fb719d9..ba48cfd4142f6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAl import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.connector.catalog.CatalogManager -import org.apache.spark.sql.connector.catalog.CatalogV2Util.PROP_OWNER +import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.HiveExternalCatalog From 93d316abc452fe97101290f37acf45328d2b0fad Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 23 Jan 2020 14:24:50 +0800 Subject: [PATCH 5/7] only move lists --- .../connector/catalog/SupportsNamespaces.java | 19 ++---------- .../sql/connector/catalog/TableCatalog.java | 16 +++------- .../sql/connector/catalog/CatalogV2Util.scala | 29 +++++++++++++++++++ .../analysis/ResolveSessionCatalog.scala | 4 +-- .../spark/sql/execution/command/ddl.scala | 5 ++-- .../v2/DescribeNamespaceExec.scala | 7 ++--- .../datasources/v2/DescribeTableExec.scala | 6 ++-- .../datasources/v2/V2SessionCatalog.scala | 2 +- .../sql/connector/DataSourceV2SQLSuite.scala | 12 ++++---- .../v2/V2SessionCatalogSuite.scala | 6 ++-- 10 files changed, 56 insertions(+), 50 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java index 757d303e82255..57f65616f7efd 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java @@ -21,8 +21,6 @@ import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; -import java.util.Arrays; -import java.util.List; import java.util.Map; /** @@ -42,33 +40,22 @@ public interface SupportsNamespaces extends CatalogPlugin { /** - * A property to specify the location of the namespace. If the namespace + * A reserved property to specify the location of the namespace. If the namespace * needs to store files, it should be under this location. */ String PROP_LOCATION = "location"; /** - * A property to specify the description of the namespace. The description + * A reserved property to specify the description of the namespace. The description * will be returned in the result of "DESCRIBE NAMESPACE" command. */ String PROP_COMMENT = "comment"; /** - * A property to specify the owner of the namespace. + * A reserved property to specify the owner of the namespace. */ String PROP_OWNER = "owner"; - /** - * The list of reserved namespace properties, which can not be removed or changed directly by - * the syntax: - * {{ - * ALTER NAMESPACE ... SET PROPERTIES ... - * }} - * - * They need specific syntax to modify - */ - List RESERVED_PROPERTIES = Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_OWNER); - /** * Return a default namespace for the catalog. *

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 591e1c631be13..a69b23bf84d0c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -24,8 +24,6 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; import org.apache.spark.sql.types.StructType; -import java.util.Arrays; -import java.util.List; import java.util.Map; /** @@ -41,32 +39,26 @@ public interface TableCatalog extends CatalogPlugin { /** - * A property to specify the location of the table. The files of the table + * A reserved property to specify the location of the table. The files of the table * should be under this location. */ String PROP_LOCATION = "location"; /** - * A property to specify the description of the table. + * A reserved property to specify the description of the table. */ String PROP_COMMENT = "comment"; /** - * A property to specify the provider of the table. + * A reserved property to specify the provider of the table. */ String PROP_PROVIDER = "provider"; /** - * A property to specify the owner of the table. + * A reserved property to specify the owner of the table. */ String PROP_OWNER = "owner"; - /** - * The list of reserved table properties. - */ - List RESERVED_PROPERTIES = - Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER, PROP_OWNER); - /** * List the tables in a namespace from the catalog. *

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index a4c7b4c3a2894..7ea1dff54978e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -32,6 +32,35 @@ import org.apache.spark.util.Utils private[sql] object CatalogV2Util { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + /** + * The list of reserved table properties, which can not be removed or changed directly by + * the syntax: + * {{ + * ALTER TABLE ... SET TBLPROPERTIES ... + * }} + * + * They need specific syntax to modify + */ + val TABLE_RESERVED_PROPERTIES = + Seq(TableCatalog.PROP_COMMENT, + TableCatalog.PROP_LOCATION, + TableCatalog.PROP_PROVIDER, + TableCatalog.PROP_OWNER) + + /** + * The list of reserved namespace properties, which can not be removed or changed directly by + * the syntax: + * {{ + * ALTER NAMESPACE ... SET PROPERTIES ... + * }} + * + * They need specific syntax to modify + */ + val NAMESPACE_RESERVED_PROPERTIES = + Seq(SupportsNamespaces.PROP_COMMENT, + SupportsNamespaces.PROP_LOCATION, + SupportsNamespaces.PROP_OWNER) + /** * Apply properties changes to a map and return the result. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 0aaf9d7e2e1ac..91f99a0c0c2fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, SupportsNamespaces, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, LookupCatalog, SupportsNamespaces, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable} @@ -248,7 +248,7 @@ class ResolveSessionCatalog( val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT) val location = c.properties.get(SupportsNamespaces.PROP_LOCATION) - val newProperties = c.properties -- SupportsNamespaces.RESERVED_PROPERTIES.asScala + val newProperties = c.properties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES CreateDatabaseCommand(ns.head, c.ifNotExists, location, comment, newProperties) case d @ DropNamespace(SessionCatalogAndNamespace(_, ns), _, _) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 9682778d40383..9c0053c6ae0ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -21,7 +21,6 @@ import java.util.Locale import java.util.concurrent.TimeUnit._ import scala.collection.{GenMap, GenSeq} -import scala.collection.JavaConverters._ import scala.collection.parallel.ForkJoinTaskSupport import scala.collection.parallel.immutable.ParVector import scala.util.control.NonFatal @@ -38,8 +37,8 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog} import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ -import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter @@ -183,7 +182,7 @@ case class DescribeDatabaseCommand( Row("Owner", allDbProperties.getOrElse(PROP_OWNER, "")) :: Nil if (extended) { - val properties = allDbProperties -- RESERVED_PROPERTIES.asScala + val properties = allDbProperties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES val propertiesStr = if (properties.isEmpty) { "" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala index 9a0481ef01b63..64b98fb83b8fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema} -import org.apache.spark.sql.connector.catalog.SupportsNamespaces +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces} import org.apache.spark.sql.types.StructType /** @@ -35,7 +35,6 @@ case class DescribeNamespaceExec( namespace: Seq[String], isExtended: Boolean) extends V2CommandExec { private val encoder = RowEncoder(StructType.fromAttributes(output)).resolveAndBind() - import SupportsNamespaces._ override protected def run(): Seq[InternalRow] = { val rows = new ArrayBuffer[InternalRow]() @@ -44,12 +43,12 @@ case class DescribeNamespaceExec( rows += toCatalystRow("Namespace Name", ns.last) - SupportsNamespaces.RESERVED_PROPERTIES.asScala.foreach { p => + CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.foreach { p => rows ++= Option(metadata.get(p)).map(toCatalystRow(p.capitalize, _)) } if (isExtended) { - val properties = metadata.asScala -- RESERVED_PROPERTIES.asScala + val properties = metadata.asScala -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES if (properties.nonEmpty) { rows += toCatalystRow("Properties", properties.toSeq.mkString("(", ",", ")")) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala index 2815b0ac131f0..9c280206c548e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema} -import org.apache.spark.sql.connector.catalog.{Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Table, TableCatalog} import org.apache.spark.sql.types.StructType case class DescribeTableExec( @@ -49,14 +49,14 @@ case class DescribeTableExec( rows += toCatalystRow("# Detailed Table Information", "", "") rows += toCatalystRow("Name", table.name(), "") - TableCatalog.RESERVED_PROPERTIES.asScala.toList.foreach(propKey => { + CatalogV2Util.TABLE_RESERVED_PROPERTIES.foreach(propKey => { if (table.properties.containsKey(propKey)) { rows += toCatalystRow(propKey.capitalize, table.properties.get(propKey), "") } }) val properties = table.properties.asScala.toList - .filter(kv => !TableCatalog.RESERVED_PROPERTIES.contains(kv._1)) + .filter(kv => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(kv._1)) .sortBy(_._1).map { case (key, value) => key + "=" + value }.mkString("[", ",", "]") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 8eea1cf9c06e4..cef9b5f675889 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -232,7 +232,7 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) // validate that this catalog's reserved properties are not removed changes.foreach { case remove: RemoveProperty - if SupportsNamespaces.RESERVED_PROPERTIES.contains(remove.property) => + if CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.contains(remove.property) => throw new UnsupportedOperationException( s"Cannot remove reserved property: ${remove.property}") case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 75c9bb7be05f4..3e496e177b121 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -901,7 +901,7 @@ class DataSourceV2SQLSuite test("CreateNameSpace: reserved properties") { import SupportsNamespaces._ withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) { - RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => + CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => val exception = intercept[ParseException] { sql(s"CREATE NAMESPACE testcat.reservedTest WITH DBPROPERTIES('$key'='dummyVal')") } @@ -909,7 +909,7 @@ class DataSourceV2SQLSuite } } withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) { - RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => + CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => withNamespace("testcat.reservedTest") { sql(s"CREATE NAMESPACE testcat.reservedTest WITH DBPROPERTIES('$key'='foo')") assert(sql("DESC NAMESPACE EXTENDED testcat.reservedTest") @@ -928,7 +928,7 @@ class DataSourceV2SQLSuite test("create/replace/alter table - reserved properties") { import TableCatalog._ withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) { - RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => + CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => Seq("OPTIONS", "TBLPROPERTIES").foreach { clause => Seq("CREATE", "REPLACE").foreach { action => val e = intercept[ParseException] { @@ -950,7 +950,7 @@ class DataSourceV2SQLSuite } } withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) { - RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => + CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => Seq("OPTIONS", "TBLPROPERTIES").foreach { clause => withTable("testcat.reservedTest") { Seq("CREATE", "REPLACE").foreach { action => @@ -1108,7 +1108,7 @@ class DataSourceV2SQLSuite test("AlterNamespaceSetProperties: reserved properties") { import SupportsNamespaces._ withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) { - RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => + CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => withNamespace("testcat.reservedTest") { sql("CREATE NAMESPACE testcat.reservedTest") val exception = intercept[ParseException] { @@ -1119,7 +1119,7 @@ class DataSourceV2SQLSuite } } withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) { - RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key => + CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => withNamespace("testcat.reservedTest") { sql(s"CREATE NAMESPACE testcat.reservedTest") sql(s"ALTER NAMESPACE testcat.reservedTest SET PROPERTIES ('$key'='foo')") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 9466ed92819c9..c399a011f9073 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, TableChange} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, TableChange} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -742,7 +742,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { actual: scala.collection.Map[String, String]): Unit = { // remove location and comment that are automatically added by HMS unless they are expected val toRemove = - SupportsNamespaces.RESERVED_PROPERTIES.asScala.filter(expected.contains) + CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filter(expected.contains) assert(expected -- toRemove === actual) } @@ -1000,7 +1000,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { catalog.createNamespace(testNs, emptyProps) - SupportsNamespaces.RESERVED_PROPERTIES.asScala.foreach { p => + CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.foreach { p => val exc = intercept[UnsupportedOperationException] { catalog.alterNamespace(testNs, NamespaceChange.removeProperty(p)) } From c9943ca93192e65de35153d330c72c42a5dc6f49 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 23 Jan 2020 22:51:11 +0800 Subject: [PATCH 6/7] nit --- .../apache/spark/sql/connector/catalog/SupportsNamespaces.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java index 6e1b1f88f2a8f..7c374ffe3577d 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java @@ -21,8 +21,6 @@ import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; -import java.util.Arrays; -import java.util.List; import java.util.Map; /** From ac1ef4b570688537a2c6d0f70e824029844da8fb Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 23 Jan 2020 22:54:47 +0800 Subject: [PATCH 7/7] nit --- .../spark/sql/connector/catalog/SupportsNamespaces.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java index 7c374ffe3577d..190f1a14d7129 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java @@ -40,19 +40,19 @@ public interface SupportsNamespaces extends CatalogPlugin { /** - * A property to specify the location of the namespace. If the namespace + * A reserved property to specify the location of the namespace. If the namespace * needs to store files, it should be under this location. */ String PROP_LOCATION = "location"; /** - * A property to specify the description of the namespace. The description + * A reserved property to specify the description of the namespace. The description * will be returned in the result of "DESCRIBE NAMESPACE" command. */ String PROP_COMMENT = "comment"; /** - * A property to specify the owner of the namespace. + * A reserved property to specify the owner of the namespace. */ String PROP_OWNER = "owner";