Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30603][SQL] Move RESERVED_PROPERTIES from SupportsNamespaces and TableCatalog to CatalogV2Util #27318

Closed
wants to merge 9 commits into from
Expand Up @@ -21,8 +21,6 @@
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -42,33 +40,22 @@
public interface SupportsNamespaces extends CatalogPlugin {

/**
* A property to specify the location of the namespace. If the namespace
* A reserved property to specify the location of the namespace. If the namespace
* needs to store files, it should be under this location.
*/
String PROP_LOCATION = "location";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These property names should remain in the public class, as we need the implementations to read and know them.

The only thing interval is which properties are reserved. We should move the RESERVED_PROPERTIES to CatalogV2Utils

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see


/**
* A property to specify the description of the namespace. The description
* A reserved property to specify the description of the namespace. The description
* will be returned in the result of "DESCRIBE NAMESPACE" command.
*/
String PROP_COMMENT = "comment";

/**
* A property to specify the owner of the namespace.
* A reserved property to specify the owner of the namespace.
*/
String PROP_OWNER = "owner";

/**
* The list of reserved namespace properties, which can not be removed or changed directly by
* the syntax:
* {{
* ALTER NAMESPACE ... SET PROPERTIES ...
* }}
*
* They need specific syntax to modify
*/
List<String> RESERVED_PROPERTIES = Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_OWNER);

/**
* Return a default namespace for the catalog.
* <p>
Expand Down
Expand Up @@ -24,8 +24,6 @@
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -41,32 +39,26 @@
public interface TableCatalog extends CatalogPlugin {

/**
* A property to specify the location of the table. The files of the table
* A reserved property to specify the location of the table. The files of the table
* should be under this location.
*/
String PROP_LOCATION = "location";

/**
* A property to specify the description of the table.
* A reserved property to specify the description of the table.
*/
String PROP_COMMENT = "comment";

/**
* A property to specify the provider of the table.
* A reserved property to specify the provider of the table.
*/
String PROP_PROVIDER = "provider";

/**
* A property to specify the owner of the table.
* A reserved property to specify the owner of the table.
*/
String PROP_OWNER = "owner";

/**
* The list of reserved table properties.
*/
List<String> RESERVED_PROPERTIES =
Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER, PROP_OWNER);

/**
* List the tables in a namespace from the catalog.
* <p>
Expand Down
Expand Up @@ -33,6 +33,35 @@ import org.apache.spark.util.Utils
private[sql] object CatalogV2Util {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

/**
* The list of reserved table properties, which can not be removed or changed directly by
* the syntax:
* {{
* ALTER TABLE ... SET TBLPROPERTIES ...
* }}
*
* They need specific syntax to modify
*/
val TABLE_RESERVED_PROPERTIES =
Seq(TableCatalog.PROP_COMMENT,
TableCatalog.PROP_LOCATION,
TableCatalog.PROP_PROVIDER,
TableCatalog.PROP_OWNER)

/**
* The list of reserved namespace properties, which can not be removed or changed directly by
* the syntax:
* {{
* ALTER NAMESPACE ... SET PROPERTIES ...
* }}
*
* They need specific syntax to modify
*/
val NAMESPACE_RESERVED_PROPERTIES =
Seq(SupportsNamespaces.PROP_COMMENT,
SupportsNamespaces.PROP_LOCATION,
SupportsNamespaces.PROP_OWNER)

/**
* Apply properties changes to a map and return the result.
*/
Expand Down
Expand Up @@ -17,14 +17,12 @@

package org.apache.spark.sql.catalyst.analysis

import scala.collection.JavaConverters._

import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable}
Expand Down Expand Up @@ -326,7 +324,7 @@ class ResolveSessionCatalog(

val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT)
val location = c.properties.get(SupportsNamespaces.PROP_LOCATION)
val newProperties = c.properties -- SupportsNamespaces.RESERVED_PROPERTIES.asScala
val newProperties = c.properties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES
CreateDatabaseCommand(ns.head, c.ifNotExists, location, comment, newProperties)

case d @ DropNamespace(SessionCatalogAndNamespace(_, ns), _, _) =>
Expand Down
Expand Up @@ -21,7 +21,6 @@ import java.util.Locale
import java.util.concurrent.TimeUnit._

import scala.collection.{GenMap, GenSeq}
import scala.collection.JavaConverters._
import scala.collection.parallel.ForkJoinTaskSupport
import scala.collection.parallel.immutable.ParVector
import scala.util.control.NonFatal
Expand All @@ -38,8 +37,8 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog}
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
Expand Down Expand Up @@ -183,7 +182,7 @@ case class DescribeDatabaseCommand(
Row("Owner", allDbProperties.getOrElse(PROP_OWNER, "")) :: Nil

if (extended) {
val properties = allDbProperties -- RESERVED_PROPERTIES.asScala
val properties = allDbProperties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES
val propertiesStr =
if (properties.isEmpty) {
""
Expand Down
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema}
import org.apache.spark.sql.connector.catalog.SupportsNamespaces
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces}
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -35,7 +35,6 @@ case class DescribeNamespaceExec(
namespace: Seq[String],
isExtended: Boolean) extends V2CommandExec {
private val encoder = RowEncoder(StructType.fromAttributes(output)).resolveAndBind()
import SupportsNamespaces._

override protected def run(): Seq[InternalRow] = {
val rows = new ArrayBuffer[InternalRow]()
Expand All @@ -44,12 +43,12 @@ case class DescribeNamespaceExec(

rows += toCatalystRow("Namespace Name", ns.last)

SupportsNamespaces.RESERVED_PROPERTIES.asScala.foreach { p =>
CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.foreach { p =>
rows ++= Option(metadata.get(p)).map(toCatalystRow(p.capitalize, _))
}

if (isExtended) {
val properties = metadata.asScala -- RESERVED_PROPERTIES.asScala
val properties = metadata.asScala -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES
if (properties.nonEmpty) {
rows += toCatalystRow("Properties", properties.toSeq.mkString("(", ",", ")"))
}
Expand Down
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema}
import org.apache.spark.sql.connector.catalog.{Table, TableCatalog}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Table, TableCatalog}
import org.apache.spark.sql.types.StructType

case class DescribeTableExec(
Expand All @@ -49,14 +49,14 @@ case class DescribeTableExec(
rows += toCatalystRow("# Detailed Table Information", "", "")
rows += toCatalystRow("Name", table.name(), "")

TableCatalog.RESERVED_PROPERTIES.asScala.toList.foreach(propKey => {
CatalogV2Util.TABLE_RESERVED_PROPERTIES.foreach(propKey => {
if (table.properties.containsKey(propKey)) {
rows += toCatalystRow(propKey.capitalize, table.properties.get(propKey), "")
}
})
val properties =
table.properties.asScala.toList
.filter(kv => !TableCatalog.RESERVED_PROPERTIES.contains(kv._1))
.filter(kv => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(kv._1))
.sortBy(_._1).map {
case (key, value) => key + "=" + value
}.mkString("[", ",", "]")
Expand Down
Expand Up @@ -232,7 +232,7 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
// validate that this catalog's reserved properties are not removed
changes.foreach {
case remove: RemoveProperty
if SupportsNamespaces.RESERVED_PROPERTIES.contains(remove.property) =>
if CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.contains(remove.property) =>
throw new UnsupportedOperationException(
s"Cannot remove reserved property: ${remove.property}")
case _ =>
Expand Down
Expand Up @@ -901,15 +901,15 @@ class DataSourceV2SQLSuite
test("CreateNameSpace: reserved properties") {
import SupportsNamespaces._
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) {
RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key =>
CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key =>
val exception = intercept[ParseException] {
sql(s"CREATE NAMESPACE testcat.reservedTest WITH DBPROPERTIES('$key'='dummyVal')")
}
assert(exception.getMessage.contains(s"$key is a reserved namespace property"))
}
}
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) {
RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key =>
CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key =>
withNamespace("testcat.reservedTest") {
sql(s"CREATE NAMESPACE testcat.reservedTest WITH DBPROPERTIES('$key'='foo')")
assert(sql("DESC NAMESPACE EXTENDED testcat.reservedTest")
Expand All @@ -928,7 +928,7 @@ class DataSourceV2SQLSuite
test("create/replace/alter table - reserved properties") {
import TableCatalog._
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) {
RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key =>
CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key =>
Seq("OPTIONS", "TBLPROPERTIES").foreach { clause =>
Seq("CREATE", "REPLACE").foreach { action =>
val e = intercept[ParseException] {
Expand All @@ -950,7 +950,7 @@ class DataSourceV2SQLSuite
}
}
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) {
RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key =>
CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key =>
Seq("OPTIONS", "TBLPROPERTIES").foreach { clause =>
withTable("testcat.reservedTest") {
Seq("CREATE", "REPLACE").foreach { action =>
Expand Down Expand Up @@ -1108,7 +1108,7 @@ class DataSourceV2SQLSuite
test("AlterNamespaceSetProperties: reserved properties") {
import SupportsNamespaces._
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) {
RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key =>
CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key =>
withNamespace("testcat.reservedTest") {
sql("CREATE NAMESPACE testcat.reservedTest")
val exception = intercept[ParseException] {
Expand All @@ -1119,7 +1119,7 @@ class DataSourceV2SQLSuite
}
}
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) {
RESERVED_PROPERTIES.asScala.filterNot(_ == PROP_COMMENT).foreach { key =>
CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key =>
withNamespace("testcat.reservedTest") {
sql(s"CREATE NAMESPACE testcat.reservedTest")
sql(s"ALTER NAMESPACE testcat.reservedTest SET PROPERTIES ('$key'='foo')")
Expand Down
Expand Up @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, TableChange}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, TableChange}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -742,7 +742,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
actual: scala.collection.Map[String, String]): Unit = {
// remove location and comment that are automatically added by HMS unless they are expected
val toRemove =
SupportsNamespaces.RESERVED_PROPERTIES.asScala.filter(expected.contains)
CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filter(expected.contains)
assert(expected -- toRemove === actual)
}

Expand Down Expand Up @@ -1000,7 +1000,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {

catalog.createNamespace(testNs, emptyProps)

SupportsNamespaces.RESERVED_PROPERTIES.asScala.foreach { p =>
CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.foreach { p =>
val exc = intercept[UnsupportedOperationException] {
catalog.alterNamespace(testNs, NamespaceChange.removeProperty(p))
}
Expand Down