diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 49fba6b7f35df..2339a075124b1 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -199,7 +199,8 @@ statement | SHOW CREATE TABLE multipartIdentifier #showCreateTable | SHOW CURRENT NAMESPACE #showCurrentNamespace | (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction - | (DESC | DESCRIBE) database EXTENDED? db=errorCapturingIdentifier #describeDatabase + | (DESC | DESCRIBE) (database | NAMESPACE) EXTENDED? + multipartIdentifier #describeNamespace | (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)? multipartIdentifier partitionSpec? describeColName? #describeTable | (DESC | DESCRIBE) QUERY? query #describeQuery diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index f1a8e5bfda4a9..22caf33b50ce6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -172,6 +172,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case DropNamespaceStatement(NonSessionCatalog(catalog, nameParts), ifExists, cascade) => DropNamespace(catalog, nameParts, ifExists, cascade) + case DescribeNamespaceStatement(NonSessionCatalog(catalog, nameParts), extended) => + DescribeNamespace(catalog, nameParts, extended) + case ShowNamespacesStatement(Some(CatalogAndNamespace(catalog, namespace)), pattern) => ShowNamespaces(catalog.asNamespaceCatalog, namespace, pattern) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index c623b5c4d36a5..ae037681fb641 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2541,6 +2541,21 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging Option(ctx.pattern).map(string)) } + /** + * Create a [[DescribeNamespaceStatement]]. + * + * For example: + * {{{ + * DESCRIBE (DATABASE|SCHEMA|NAMESPACE) [EXTENDED] database; + * }}} + */ + override def visitDescribeNamespace(ctx: DescribeNamespaceContext): LogicalPlan = + withOrigin(ctx) { + DescribeNamespaceStatement( + visitMultipartIdentifier(ctx.multipartIdentifier()), + ctx.EXTENDED != null) + } + /** * Create a table, returning a [[CreateTableStatement]] logical plan. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index ec373d95fad88..a46d64dadd827 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -269,6 +269,13 @@ case class DescribeTableStatement( partitionSpec: TablePartitionSpec, isExtended: Boolean) extends ParsedStatement +/** + * A DESCRIBE NAMESPACE statement, as parsed from SQL. + */ +case class DescribeNamespaceStatement( + namespace: Seq[String], + extended: Boolean) extends ParsedStatement + /** * A DESCRIBE TABLE tbl_name col_name statement, as parsed from SQL. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 7d8e9a0c18f65..3179bab8c2f7f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.DescribeTableSchema import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, SupportsNamespaces, TableCatalog, TableChange} import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.types.{DataType, StringType, StructType} +import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType, StructType} /** * Base trait for DataSourceV2 write commands @@ -255,6 +255,21 @@ case class DropNamespace( ifExists: Boolean, cascade: Boolean) extends Command +/** + * The logical plan of the DESCRIBE NAMESPACE command that works for v2 catalogs. + */ +case class DescribeNamespace( + catalog: CatalogPlugin, + namespace: Seq[String], + extended: Boolean) extends Command { + + override def output: Seq[Attribute] = Seq( + AttributeReference("name", StringType, nullable = false, + new MetadataBuilder().putString("comment", "name of the column").build())(), + AttributeReference("value", StringType, nullable = true, + new MetadataBuilder().putString("comment", "value of the column").build())()) +} + /** * The logical plan of the SHOW NAMESPACES command that works for v2 catalogs. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 94171feba2ac7..ddc77ac62ffed 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -653,6 +653,13 @@ class DDLParserSuite extends AnalysisTest { "DESC TABLE COLUMN for a specific partition is not supported")) } + test("describe database") { + val sql1 = "DESCRIBE DATABASE EXTENDED a.b" + val sql2 = "DESCRIBE DATABASE a.b" + comparePlans(parsePlan(sql1), DescribeNamespaceStatement(Seq("a", "b"), extended = true)) + comparePlans(parsePlan(sql2), DescribeNamespaceStatement(Seq("a", "b"), extended = false)) + } + test("SPARK-17328 Fix NPE with EXPLAIN DESCRIBE TABLE") { comparePlans(parsePlan("describe t"), DescribeTableStatement(Seq("t"), Map.empty, isExtended = false)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 340e09ae66adb..0be894ea959b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -158,6 +158,13 @@ class ResolveSessionCatalog( case AlterViewUnsetPropertiesStatement(SessionCatalog(catalog, tableName), keys, ifExists) => AlterTableUnsetPropertiesCommand(tableName.asTableIdentifier, keys, ifExists, isView = true) + case d @ DescribeNamespaceStatement(SessionCatalog(_, nameParts), _) => + if (nameParts.length != 1) { + throw new AnalysisException( + s"The database name is not valid: ${nameParts.quoted}") + } + DescribeDatabaseCommand(nameParts.head, d.extended) + case DescribeTableStatement( nameParts @ SessionCatalog(catalog, tableName), partitionSpec, isExtended) => loadTable(catalog, tableName.asIdentifier).collect { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index b1271ad870565..d5368e3bd72eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -275,18 +275,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { ) } - /** - * Create a [[DescribeDatabaseCommand]] command. - * - * For example: - * {{{ - * DESCRIBE DATABASE [EXTENDED] database; - * }}} - */ - override def visitDescribeDatabase(ctx: DescribeDatabaseContext): LogicalPlan = withOrigin(ctx) { - DescribeDatabaseCommand(ctx.db.getText, ctx.EXTENDED != null) - } - /** * Create a plan for a DESCRIBE FUNCTION command. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 01ff4a9303e98..a04bceb18b9b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables} import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} @@ -192,6 +192,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { Nil } + case desc @ DescribeNamespace(catalog, namespace, extended) => + DescribeNamespaceExec(desc.output, catalog, namespace, extended) :: Nil + case desc @ DescribeTable(DataSourceV2Relation(table, _, _), isExtended) => DescribeTableExec(desc.output, table, isExtended) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala new file mode 100644 index 0000000000000..5c20e5ae08383 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema} +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.COMMENT_TABLE_PROP +import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.LOCATION_TABLE_PROP +import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.RESERVED_PROPERTIES +import org.apache.spark.sql.types.StructType + +/** + * Physical plan node for describing a namespace. + */ +case class DescribeNamespaceExec( + output: Seq[Attribute], + catalog: CatalogPlugin, + namespace: Seq[String], + isExtended: Boolean) extends V2CommandExec { + + private val encoder = RowEncoder(StructType.fromAttributes(output)).resolveAndBind() + + override protected def run(): Seq[InternalRow] = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + val rows = new ArrayBuffer[InternalRow]() + val nsCatalog = catalog.asNamespaceCatalog + val ns = namespace.toArray + val metadata = nsCatalog.loadNamespaceMetadata(ns) + + rows += toCatalystRow("Namespace Name", ns.last) + rows += toCatalystRow("Description", metadata.get(COMMENT_TABLE_PROP)) + rows += toCatalystRow("Location", metadata.get(LOCATION_TABLE_PROP)) + if (isExtended) { + val properties = metadata.asScala.toSeq.filter(p => !RESERVED_PROPERTIES.contains(p._1)) + if (properties.nonEmpty) { + rows += toCatalystRow("Properties", properties.mkString("(", ",", ")")) + } + } + rows + } + + private def toCatalystRow(strs: String*): InternalRow = { + encoder.toRow(new GenericRowWithSchema(strs.toArray, schema)).copy() + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 17f6e51f8454c..235933839505b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -790,7 +790,6 @@ class DataSourceV2SQLSuite assert(catalogPath.equals(catalogPath)) } } - // TODO: Add tests for validating namespace metadata when DESCRIBE NAMESPACE is available. } test("CreateNameSpace: test handling of 'IF NOT EXIST'") { @@ -871,6 +870,25 @@ class DataSourceV2SQLSuite assert(exception.getMessage.contains("Namespace 'ns1' not found")) } + test("DescribeNamespace using v2 catalog") { + withNamespace("testcat.ns1.ns2") { + sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns1.ns2 COMMENT " + + "'test namespace' LOCATION '/tmp/ns_test'") + val descriptionDf = sql("DESCRIBE NAMESPACE testcat.ns1.ns2") + assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === + Seq( + ("name", StringType), + ("value", StringType) + )) + val description = descriptionDf.collect() + assert(description === Seq( + Row("Namespace Name", "ns2"), + Row("Description", "test namespace"), + Row("Location", "/tmp/ns_test") + )) + } + } + test("ShowNamespaces: show root namespaces with default v2 catalog") { spark.conf.set(SQLConf.DEFAULT_CATALOG.key, "testcat") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index d98f2ca62972c..b2185f8559f36 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -108,25 +108,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { comparePlans(parsed1, expected1) } - test("describe database") { - // DESCRIBE DATABASE [EXTENDED] db_name; - val sql1 = "DESCRIBE DATABASE EXTENDED db_name" - val sql2 = "DESCRIBE DATABASE db_name" - - val parsed1 = parser.parsePlan(sql1) - val parsed2 = parser.parsePlan(sql2) - - val expected1 = DescribeDatabaseCommand( - "db_name", - extended = true) - val expected2 = DescribeDatabaseCommand( - "db_name", - extended = false) - - comparePlans(parsed1, expected1) - comparePlans(parsed2, expected2) - } - test("create function") { val sql1 = """