Skip to content

Commit

Permalink
[SPARK-14127][SQL] Makes 'DESC [EXTENDED|FORMATTED] <table>' support …
Browse files Browse the repository at this point in the history
…data source tables

## What changes were proposed in this pull request?

This is a follow-up of PR #12844. It makes the newly updated `DescribeTableCommand` to support data sources tables.

## How was this patch tested?

A test case is added to check `DESC [EXTENDED | FORMATTED] <table>` output.

Author: Cheng Lian <lian@databricks.com>

Closes #12934 from liancheng/spark-14127-desc-table-follow-up.
  • Loading branch information
liancheng authored and yhuai committed May 9, 2016
1 parent b1e01fd commit 671b382
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 30 deletions.
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogRelation, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
Expand Down Expand Up @@ -288,45 +288,45 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF

override def run(sparkSession: SparkSession): Seq[Row] = {
val result = new ArrayBuffer[Row]
sparkSession.sessionState.catalog.lookupRelation(table) match {
case catalogRelation: CatalogRelation =>
if (isExtended) {
describeExtended(catalogRelation, result)
} else if (isFormatted) {
describeFormatted(catalogRelation, result)
} else {
describe(catalogRelation, result)
}
val catalog = sparkSession.sessionState.catalog

if (catalog.isTemporaryTable(table)) {
describeSchema(catalog.lookupRelation(table).schema, result)
} else {
val metadata = catalog.getTableMetadata(table)

case relation =>
describeSchema(relation.schema, result)
if (isExtended) {
describeExtended(metadata, result)
} else if (isFormatted) {
describeFormatted(metadata, result)
} else {
describe(metadata, result)
}
}

result
}

// Shows data columns and partitioned columns (if any)
private def describe(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = {
describeSchema(relation.catalogTable.schema, buffer)
private def describe(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
describeSchema(table.schema, buffer)

if (relation.catalogTable.partitionColumns.nonEmpty) {
if (table.partitionColumns.nonEmpty) {
append(buffer, "# Partition Information", "", "")
append(buffer, s"# ${output(0).name}", output(1).name, output(2).name)
describeSchema(relation.catalogTable.partitionColumns, buffer)
describeSchema(table.partitionColumns, buffer)
}
}

private def describeExtended(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = {
describe(relation, buffer)
private def describeExtended(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
describe(table, buffer)

append(buffer, "", "", "")
append(buffer, "# Detailed Table Information", relation.catalogTable.toString, "")
append(buffer, "# Detailed Table Information", table.toString, "")
}

private def describeFormatted(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = {
describe(relation, buffer)

val table = relation.catalogTable
private def describeFormatted(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
describe(table, buffer)

append(buffer, "", "", "")
append(buffer, "# Detailed Table Information", "", "")
Expand Down Expand Up @@ -358,17 +358,17 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
}

private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = {
private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
val comment =
if (column.metadata.contains("comment")) column.metadata.getString("comment") else ""
append(buffer, column.name, column.dataType.simpleString, comment)
append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)
}
}

private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)
val comment =
if (column.metadata.contains("comment")) column.metadata.getString("comment") else ""
append(buffer, column.name, column.dataType.simpleString, comment)
}
}

Expand Down
Expand Up @@ -365,7 +365,7 @@ class HiveDDLSuite
}
}

test("desc table") {
test("desc table for Hive table") {
withTable("tab1") {
val tabName = "tab1"
sql(s"CREATE TABLE $tabName(c1 int)")
Expand Down Expand Up @@ -503,4 +503,21 @@ class HiveDDLSuite
}.getMessage
assert(message.contains("Can not drop default database"))
}

test("desc table for data source table") {
withTable("tab1") {
val tabName = "tab1"
sqlContext.range(1).write.format("json").saveAsTable(tabName)

assert(sql(s"DESC $tabName").collect().length == 1)

assert(
sql(s"DESC FORMATTED $tabName").collect()
.exists(_.getString(0) == "# Storage Information"))

assert(
sql(s"DESC EXTENDED $tabName").collect()
.exists(_.getString(0) == "# Detailed Table Information"))
}
}
}

0 comments on commit 671b382

Please sign in to comment.