Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ statement
| SHOW CREATE TABLE multipartIdentifier #showCreateTable
| SHOW CURRENT NAMESPACE #showCurrentNamespace
| (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction
| (DESC | DESCRIBE) database EXTENDED? db=errorCapturingIdentifier #describeDatabase
| (DESC | DESCRIBE) (database | NAMESPACE) EXTENDED?
multipartIdentifier #describeNamespace
| (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)?
multipartIdentifier partitionSpec? describeColName? #describeTable
| (DESC | DESCRIBE) QUERY? query #describeQuery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case DropNamespaceStatement(NonSessionCatalog(catalog, nameParts), ifExists, cascade) =>
DropNamespace(catalog, nameParts, ifExists, cascade)

case DescribeNamespaceStatement(NonSessionCatalog(catalog, nameParts), extended) =>
DescribeNamespace(catalog, nameParts, extended)

case ShowNamespacesStatement(Some(CatalogAndNamespace(catalog, namespace)), pattern) =>
ShowNamespaces(catalog.asNamespaceCatalog, namespace, pattern)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2541,6 +2541,21 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
Option(ctx.pattern).map(string))
}

/**
* Create a [[DescribeNamespaceStatement]].
*
* For example:
* {{{
* DESCRIBE (DATABASE|SCHEMA|NAMESPACE) [EXTENDED] database;
* }}}
*/
override def visitDescribeNamespace(ctx: DescribeNamespaceContext): LogicalPlan =
withOrigin(ctx) {
DescribeNamespaceStatement(
visitMultipartIdentifier(ctx.multipartIdentifier()),
ctx.EXTENDED != null)
}

/**
* Create a table, returning a [[CreateTableStatement]] logical plan.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ case class DescribeTableStatement(
partitionSpec: TablePartitionSpec,
isExtended: Boolean) extends ParsedStatement

/**
* A DESCRIBE NAMESPACE statement, as parsed from SQL.
*/
case class DescribeNamespaceStatement(
namespace: Seq[String],
extended: Boolean) extends ParsedStatement

/**
* A DESCRIBE TABLE tbl_name col_name statement, as parsed from SQL.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.DescribeTableSchema
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, SupportsNamespaces, TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.{DataType, StringType, StructType}
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType, StructType}

/**
* Base trait for DataSourceV2 write commands
Expand Down Expand Up @@ -255,6 +255,21 @@ case class DropNamespace(
ifExists: Boolean,
cascade: Boolean) extends Command

/**
* The logical plan of the DESCRIBE NAMESPACE command that works for v2 catalogs.
*/
case class DescribeNamespace(
catalog: CatalogPlugin,
namespace: Seq[String],
extended: Boolean) extends Command {

override def output: Seq[Attribute] = Seq(
AttributeReference("name", StringType, nullable = false,
new MetadataBuilder().putString("comment", "name of the column").build())(),
AttributeReference("value", StringType, nullable = true,
new MetadataBuilder().putString("comment", "value of the column").build())())
}

/**
* The logical plan of the SHOW NAMESPACES command that works for v2 catalogs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,13 @@ class DDLParserSuite extends AnalysisTest {
"DESC TABLE COLUMN for a specific partition is not supported"))
}

test("describe database") {
val sql1 = "DESCRIBE DATABASE EXTENDED a.b"
val sql2 = "DESCRIBE DATABASE a.b"
comparePlans(parsePlan(sql1), DescribeNamespaceStatement(Seq("a", "b"), extended = true))
comparePlans(parsePlan(sql2), DescribeNamespaceStatement(Seq("a", "b"), extended = false))
}

test("SPARK-17328 Fix NPE with EXPLAIN DESCRIBE TABLE") {
comparePlans(parsePlan("describe t"),
DescribeTableStatement(Seq("t"), Map.empty, isExtended = false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ class ResolveSessionCatalog(
case AlterViewUnsetPropertiesStatement(SessionCatalog(catalog, tableName), keys, ifExists) =>
AlterTableUnsetPropertiesCommand(tableName.asTableIdentifier, keys, ifExists, isView = true)

case d @ DescribeNamespaceStatement(SessionCatalog(_, nameParts), _) =>
if (nameParts.length != 1) {
throw new AnalysisException(
s"The database name is not valid: ${nameParts.quoted}")
}
DescribeDatabaseCommand(nameParts.head, d.extended)

case DescribeTableStatement(
nameParts @ SessionCatalog(catalog, tableName), partitionSpec, isExtended) =>
loadTable(catalog, tableName.asIdentifier).collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,18 +275,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
)
}

/**
* Create a [[DescribeDatabaseCommand]] command.
*
* For example:
* {{{
* DESCRIBE DATABASE [EXTENDED] database;
* }}}
*/
override def visitDescribeDatabase(ctx: DescribeDatabaseContext): LogicalPlan = withOrigin(ctx) {
DescribeDatabaseCommand(ctx.db.getText, ctx.EXTENDED != null)
}

/**
* Create a plan for a DESCRIBE FUNCTION command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables}
import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
Expand Down Expand Up @@ -192,6 +192,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
Nil
}

case desc @ DescribeNamespace(catalog, namespace, extended) =>
DescribeNamespaceExec(desc.output, catalog, namespace, extended) :: Nil

case desc @ DescribeTable(DataSourceV2Relation(table, _, _), isExtended) =>
DescribeTableExec(desc.output, table, isExtended) :: Nil

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema}
import org.apache.spark.sql.connector.catalog.CatalogPlugin
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.COMMENT_TABLE_PROP
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.LOCATION_TABLE_PROP
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.RESERVED_PROPERTIES
import org.apache.spark.sql.types.StructType

/**
* Physical plan node for describing a namespace.
*/
case class DescribeNamespaceExec(
output: Seq[Attribute],
catalog: CatalogPlugin,
namespace: Seq[String],
isExtended: Boolean) extends V2CommandExec {

private val encoder = RowEncoder(StructType.fromAttributes(output)).resolveAndBind()

override protected def run(): Seq[InternalRow] = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

val rows = new ArrayBuffer[InternalRow]()
val nsCatalog = catalog.asNamespaceCatalog
val ns = namespace.toArray
val metadata = nsCatalog.loadNamespaceMetadata(ns)

rows += toCatalystRow("Namespace Name", ns.last)
rows += toCatalystRow("Description", metadata.get(COMMENT_TABLE_PROP))
rows += toCatalystRow("Location", metadata.get(LOCATION_TABLE_PROP))
if (isExtended) {
val properties = metadata.asScala.toSeq.filter(p => !RESERVED_PROPERTIES.contains(p._1))
if (properties.nonEmpty) {
rows += toCatalystRow("Properties", properties.mkString("(", ",", ")"))
}
}
rows
}

private def toCatalystRow(strs: String*): InternalRow = {
encoder.toRow(new GenericRowWithSchema(strs.toArray, schema)).copy()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,6 @@ class DataSourceV2SQLSuite
assert(catalogPath.equals(catalogPath))
}
}
// TODO: Add tests for validating namespace metadata when DESCRIBE NAMESPACE is available.
}

test("CreateNameSpace: test handling of 'IF NOT EXIST'") {
Expand Down Expand Up @@ -871,6 +870,25 @@ class DataSourceV2SQLSuite
assert(exception.getMessage.contains("Namespace 'ns1' not found"))
}

test("DescribeNamespace using v2 catalog") {
withNamespace("testcat.ns1.ns2") {
sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns1.ns2 COMMENT " +
"'test namespace' LOCATION '/tmp/ns_test'")
val descriptionDf = sql("DESCRIBE NAMESPACE testcat.ns1.ns2")
assert(descriptionDf.schema.map(field => (field.name, field.dataType)) ===
Seq(
("name", StringType),
("value", StringType)
))
val description = descriptionDf.collect()
assert(description === Seq(
Row("Namespace Name", "ns2"),
Row("Description", "test namespace"),
Row("Location", "/tmp/ns_test")
))
}
}

test("ShowNamespaces: show root namespaces with default v2 catalog") {
spark.conf.set(SQLConf.DEFAULT_CATALOG.key, "testcat")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,25 +108,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
comparePlans(parsed1, expected1)
}

test("describe database") {
// DESCRIBE DATABASE [EXTENDED] db_name;
val sql1 = "DESCRIBE DATABASE EXTENDED db_name"
val sql2 = "DESCRIBE DATABASE db_name"

val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)

val expected1 = DescribeDatabaseCommand(
"db_name",
extended = true)
val expected2 = DescribeDatabaseCommand(
"db_name",
extended = false)

comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}

test("create function") {
val sql1 =
"""
Expand Down