Skip to content

Commit

Permalink
[SPARK-14445][SQL] Support native execution of SHOW COLUMNS and SHOW …
Browse files Browse the repository at this point in the history
…PARTITIONS

## What changes were proposed in this pull request?
This PR adds Native execution of SHOW COLUMNS and SHOW PARTITION commands.

Command Syntax:
``` SQL
SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database]
```
``` SQL
SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)]
```

## How was this patch tested?

Added test cases in HiveCommandSuite to verify execution and DDLCommandSuite
to verify plans.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #12222 from dilipbiswal/dkb_show_columns.
  • Loading branch information
dilipbiswal authored and liancheng committed Apr 27, 2016
1 parent bd2c9a6 commit d93976d
Show file tree
Hide file tree
Showing 16 changed files with 401 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ statement
| SHOW DATABASES (LIKE pattern=STRING)? #showDatabases
| SHOW TBLPROPERTIES table=tableIdentifier
('(' key=tablePropertyKey ')')? #showTblProperties
| SHOW COLUMNS (FROM | IN) tableIdentifier
((FROM | IN) db=identifier)? #showColumns
| SHOW PARTITIONS tableIdentifier partitionSpec? #showPartitions
| SHOW FUNCTIONS (LIKE? (qualifiedName | pattern=STRING))? #showFunctions
| (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction
| (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)?
Expand All @@ -128,7 +131,6 @@ hiveNativeCommands
: DELETE FROM tableIdentifier (WHERE booleanExpression)?
| TRUNCATE TABLE tableIdentifier partitionSpec?
(COLUMNS identifierList)?
| SHOW COLUMNS (FROM | IN) tableIdentifier ((FROM|IN) identifier)?
| START TRANSACTION (transactionMode (',' transactionMode)*)?
| COMMIT WORK?
| ROLLBACK WORK?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,13 @@ class InMemoryCatalog extends ExternalCatalog {

override def listPartitions(
db: String,
table: String): Seq[CatalogTablePartition] = synchronized {
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = synchronized {
requireTableExists(db, table)
if (partialSpec.nonEmpty) {
throw new AnalysisException("listPartition does not support partition spec in " +
"InMemoryCatalog.")
}
catalog(db).tables(table).partitions.values.toSeq
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,13 +471,18 @@ class SessionCatalog(
}

/**
* List all partitions in a table, assuming it exists.
* If no database is specified, assume the table is in the current database.
* List the metadata of all partitions that belong to the specified table, assuming it exists.
*
* A partial partition spec may optionally be provided to filter the partitions returned.
* For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'),
* then a partial spec of (a='1') will return the first two only.
*/
def listPartitions(tableName: TableIdentifier): Seq[CatalogTablePartition] = {
def listPartitions(
tableName: TableIdentifier,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = {
val db = tableName.database.getOrElse(currentDb)
val table = formatTableName(tableName.table)
externalCatalog.listPartitions(db, table)
externalCatalog.listPartitions(db, table, partialSpec)
}

// ----------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,20 @@ abstract class ExternalCatalog {

def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition

// TODO: support listing by pattern
def listPartitions(db: String, table: String): Seq[CatalogTablePartition]
/**
* List the metadata of all partitions that belong to the specified table, assuming it exists.
*
* A partial partition spec may optionally be provided to filter the partitions returned.
* For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'),
* then a partial spec of (a='1') will return the first two only.
* @param db database name
* @param table table name
* @param partialSpec partition spec
*/
def listPartitions(
db: String,
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]

// --------------------------------------------------------------------------
// Functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,44 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
Option(ctx.key).map(visitTablePropertyKey))
}

/**
* A command for users to list the columm names for a table.
* This function creates a [[ShowColumnsCommand]] logical plan.
*
* The syntax of using this command in SQL is:
* {{{
* SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
* }}}
*/
override def visitShowColumns(ctx: ShowColumnsContext): LogicalPlan = withOrigin(ctx) {
val table = visitTableIdentifier(ctx.tableIdentifier)

val lookupTable = Option(ctx.db) match {
case None => table
case Some(db) if table.database.isDefined =>
throw new ParseException("Duplicates the declaration for database", ctx)
case Some(db) => TableIdentifier(table.identifier, Some(db.getText))
}
ShowColumnsCommand(lookupTable)
}

/**
* A command for users to list the partition names of a table. If partition spec is specified,
* partitions that match the spec are returned. Otherwise an empty result set is returned.
*
* This function creates a [[ShowPartitionsCommand]] logical plan
*
* The syntax of using this command in SQL is:
* {{{
* SHOW PARTITIONS table_identifier [partition_spec];
* }}}
*/
override def visitShowPartitions(ctx: ShowPartitionsContext): LogicalPlan = withOrigin(ctx) {
val table = visitTableIdentifier(ctx.tableIdentifier)
val partitionKeys = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)
ShowPartitionsCommand(table, partitionKeys)
}

/**
* Create a [[RefreshTable]] logical plan.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@

package org.apache.spark.sql.execution.command

import java.io.File

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.execution.debug._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -112,3 +117,101 @@ case class ExplainCommand(
("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
}
}

/**
* A command to list the column names for a table. This function creates a
* [[ShowColumnsCommand]] logical plan.
*
* The syntax of using this command in SQL is:
* {{{
* SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
* }}}
*/
case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
// The result of SHOW COLUMNS has one column called 'result'
override val output: Seq[Attribute] = {
AttributeReference("result", StringType, nullable = false)() :: Nil
}

override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c =>
Row(c.name)
}
}
}

/**
* A command to list the partition names of a table. If the partition spec is specified,
* partitions that match the spec are returned. [[AnalysisException]] exception is thrown under
* the following conditions:
*
* 1. If the command is called for a non partitioned table.
* 2. If the partition spec refers to the columns that are not defined as partitioning columns.
*
* This function creates a [[ShowPartitionsCommand]] logical plan
*
* The syntax of using this command in SQL is:
* {{{
* SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)]
* }}}
*/
case class ShowPartitionsCommand(
table: TableIdentifier,
spec: Option[TablePartitionSpec]) extends RunnableCommand {
// The result of SHOW PARTITIONS has one column called 'result'
override val output: Seq[Attribute] = {
AttributeReference("result", StringType, nullable = false)() :: Nil
}

private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = {
partColNames.map { name =>
PartitioningUtils.escapePathName(name) + "=" + PartitioningUtils.escapePathName(spec(name))
}.mkString(File.separator)
}

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val db = table.database.getOrElse(catalog.getCurrentDatabase)
if (catalog.isTemporaryTable(table)) {
throw new AnalysisException("SHOW PARTITIONS is not allowed on a temporary table: " +
s"${table.unquotedString}")
} else {
val tab = catalog.getTableMetadata(table)
/**
* Validate and throws an [[AnalysisException]] exception under the following conditions:
* 1. If the table is not partitioned.
* 2. If it is a datasource table.
* 3. If it is a view or index table.
*/
if (tab.tableType == CatalogTableType.VIRTUAL_VIEW ||
tab.tableType == CatalogTableType.INDEX_TABLE) {
throw new AnalysisException("SHOW PARTITIONS is not allowed on a view or index table: " +
s"${tab.qualifiedName}")
}
if (!DDLUtils.isTablePartitioned(tab)) {
throw new AnalysisException("SHOW PARTITIONS is not allowed on a table that is not " +
s"partitioned: ${tab.qualifiedName}")
}
if (DDLUtils.isDatasourceTable(tab)) {
throw new AnalysisException("SHOW PARTITIONS is not allowed on a datasource table: " +
s"${tab.qualifiedName}")
}
/**
* Validate the partitioning spec by making sure all the referenced columns are
* defined as partitioning columns in table definition. An AnalysisException exception is
* thrown if the partitioning spec is invalid.
*/
if (spec.isDefined) {
val badColumns = spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains)
if (badColumns.nonEmpty) {
throw new AnalysisException(
s"Non-partitioning column(s) [${badColumns.mkString(", ")}] are " +
s"specified for SHOW PARTITIONS")
}
}
val partNames =
catalog.listPartitions(table, spec).map(p => getPartName(p.spec, tab.partitionColumnNames))
partNames.map { p => Row(p) }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -536,5 +536,9 @@ private[sql] object DDLUtils {
case _ =>
})
}
def isTablePartitioned(table: CatalogTable): Boolean = {
table.partitionColumns.size > 0 ||
table.properties.contains("spark.sql.sources.schema.numPartCols")
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ private[sql] object PartitioningUtils {
path.foreach { c =>
if (needsEscaping(c)) {
builder.append('%')
builder.append(f"${c.asInstanceOf[Int]}%02x")
builder.append(f"${c.asInstanceOf[Int]}%02X")
} else {
builder.append(c)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,4 +678,44 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed3, expected3)
comparePlans(parsed4, expected4)
}

test("show columns") {
val sql1 = "SHOW COLUMNS FROM t1"
val sql2 = "SHOW COLUMNS IN db1.t1"
val sql3 = "SHOW COLUMNS FROM t1 IN db1"
val sql4 = "SHOW COLUMNS FROM db1.t1 IN db2"

val parsed1 = parser.parsePlan(sql1)
val expected1 = ShowColumnsCommand(TableIdentifier("t1", None))
val parsed2 = parser.parsePlan(sql2)
val expected2 = ShowColumnsCommand(TableIdentifier("t1", Some("db1")))
val parsed3 = parser.parsePlan(sql3)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected2)
val message = intercept[ParseException] {
parser.parsePlan(sql4)
}.getMessage
assert(message.contains("Duplicates the declaration for database"))
}

test("show partitions") {
val sql1 = "SHOW PARTITIONS t1"
val sql2 = "SHOW PARTITIONS db1.t1"
val sql3 = "SHOW PARTITIONS t1 PARTITION(partcol1='partvalue', partcol2='partvalue')"

val parsed1 = parser.parsePlan(sql1)
val expected1 =
ShowPartitionsCommand(TableIdentifier("t1", None), None)
val parsed2 = parser.parsePlan(sql2)
val expected2 =
ShowPartitionsCommand(TableIdentifier("t1", Some("db1")), None)
val expected3 =
ShowPartitionsCommand(TableIdentifier("t1", None),
Some(Map("partcol1" -> "partvalue", "partcol2" -> "partvalue")))
val parsed3 = parser.parsePlan(sql3)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected3)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,14 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
client.getPartition(db, table, spec)
}

/**
* Returns the partition names from hive metastore for a given table in a database.
*/
override def listPartitions(
db: String,
table: String): Seq[CatalogTablePartition] = withClient {
client.getAllPartitions(db, table)
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient {
client.getPartitions(db, table, partialSpec)
}

// --------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private[hive] case class MetastoreRelation(

// When metastore partition pruning is turned off, we cache the list of all partitions to
// mimic the behavior of Spark < 1.5
private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(catalogTable)
private lazy val allPartitions: Seq[CatalogTablePartition] = client.getPartitions(catalogTable)

def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,24 @@ private[hive] trait HiveClient {
table: CatalogTable,
spec: ExternalCatalog.TablePartitionSpec): Option[CatalogTablePartition]

/** Returns all partitions for the given table. */
final def getAllPartitions(db: String, table: String): Seq[CatalogTablePartition] = {
getAllPartitions(getTable(db, table))
/**
* Returns the partitions for the given table that match the supplied partition spec.
* If no partition spec is specified, all partitions are returned.
*/
final def getPartitions(
db: String,
table: String,
partialSpec: Option[ExternalCatalog.TablePartitionSpec]): Seq[CatalogTablePartition] = {
getPartitions(getTable(db, table), partialSpec)
}

/** Returns all partitions for the given table. */
def getAllPartitions(table: CatalogTable): Seq[CatalogTablePartition]
/**
* Returns the partitions for the given table that match the supplied partition spec.
* If no partition spec is specified, all partitions are returned.
*/
def getPartitions(
table: CatalogTable,
partialSpec: Option[ExternalCatalog.TablePartitionSpec] = None): Seq[CatalogTablePartition]

/** Returns partitions filtered by predicates for the given table. */
def getPartitionsByFilter(
Expand Down

0 comments on commit d93976d

Please sign in to comment.