Skip to content

Commit

Permalink
Refactor CatalogTable to use TableIdentifier
Browse files Browse the repository at this point in the history
This is a standalone commit such that in the future we can split
it out into a separate patch if preferrable.
  • Loading branch information
Andrew Or committed Mar 16, 2016
1 parent 39a153c commit dd1fbae
Show file tree
Hide file tree
Showing 13 changed files with 54 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier


/**
Expand Down Expand Up @@ -156,12 +157,13 @@ class InMemoryCatalog extends ExternalCatalog {
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized {
requireDbExists(db)
if (existsTable(db, tableDefinition.name)) {
val table = tableDefinition.name.table
if (existsTable(db, table)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
throw new AnalysisException(s"Table $table already exists in $db database")
}
} else {
catalog(db).tables.put(tableDefinition.name, new TableDesc(tableDefinition))
catalog(db).tables.put(table, new TableDesc(tableDefinition))
}
}

Expand All @@ -182,14 +184,14 @@ class InMemoryCatalog extends ExternalCatalog {
override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
requireTableExists(db, oldName)
val oldDesc = catalog(db).tables(oldName)
oldDesc.table = oldDesc.table.copy(name = newName)
oldDesc.table = oldDesc.table.copy(name = TableIdentifier(newName, Some(db)))
catalog(db).tables.put(newName, oldDesc)
catalog(db).tables.remove(oldName)
}

override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
requireTableExists(db, tableDefinition.name)
catalog(db).tables(tableDefinition.name).table = tableDefinition
requireTableExists(db, tableDefinition.name.table)
catalog(db).tables(tableDefinition.name.table).table = tableDefinition
}

override def getTable(db: String, table: String): CatalogTable = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import javax.annotation.Nullable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}


/**
Expand Down Expand Up @@ -212,8 +213,7 @@ case class CatalogTablePartition(
* future once we have a better understanding of how we want to handle skewed columns.
*/
case class CatalogTable(
specifiedDatabase: Option[String],
name: String,
name: TableIdentifier,
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: Seq[CatalogColumn],
Expand All @@ -227,12 +227,12 @@ case class CatalogTable(
viewText: Option[String] = None) {

/** Return the database this table was specified to belong to, assuming it exists. */
def database: String = specifiedDatabase.getOrElse {
def database: String = name.database.getOrElse {
throw new AnalysisException(s"table $name did not specify database")
}

/** Return the fully qualified name of this table, assuming the database was specified. */
def qualifiedName: String = s"$database.$name"
def qualifiedName: String = name.unquotedString

/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfterEach

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier


/**
Expand Down Expand Up @@ -89,8 +90,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {

private def newTable(name: String, db: String): CatalogTable = {
CatalogTable(
specifiedDatabase = Some(db),
name = name,
name = TableIdentifier(name, Some(db)),
tableType = CatalogTableType.EXTERNAL_TABLE,
storage = storageFormat,
schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),
Expand Down Expand Up @@ -277,7 +277,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
}

test("get table") {
assert(newBasicCatalog().getTable("db2", "tbl1").name == "tbl1")
assert(newBasicCatalog().getTable("db2", "tbl1").name.table == "tbl1")
}

test("get table when database/table does not exist") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.NoSuchItemException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.catalyst.TableIdentifier


/**
Expand Down Expand Up @@ -73,10 +74,10 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
}

private def requireDbMatches(db: String, table: CatalogTable): Unit = {
if (table.specifiedDatabase != Some(db)) {
if (table.name.database != Some(db)) {
throw new AnalysisException(
s"Provided database $db does not much the one specified in the " +
s"table definition (${table.specifiedDatabase.getOrElse("n/a")})")
s"table definition (${table.name.database.getOrElse("n/a")})")
}
}

Expand Down Expand Up @@ -160,7 +161,7 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
}

override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
val newTable = client.getTable(db, oldName).copy(name = newName)
val newTable = client.getTable(db, oldName).copy(name = TableIdentifier(newName, Some(db)))
client.alterTable(oldName, newTable)
}

Expand All @@ -173,7 +174,7 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
*/
override def alterTable(db: String, tableDefinition: CatalogTable): Unit = withClient {
requireDbMatches(db, tableDefinition)
requireTableExists(db, tableDefinition.name)
requireTableExists(db, tableDefinition.name.table)
client.alterTable(tableDefinition)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte

private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = {
QualifiedTableName(
t.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase,
t.name.toLowerCase)
t.name.database.getOrElse(client.currentDatabase).toLowerCase,
t.name.table.toLowerCase)
}

/** A cache of Spark SQL data source tables that have been accessed. */
Expand Down Expand Up @@ -293,8 +293,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte

def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
CatalogTable(
specifiedDatabase = Option(dbName),
name = tblName,
name = TableIdentifier(tblName, Option(dbName)),
tableType = tableType,
schema = Nil,
storage = CatalogStorageFormat(
Expand All @@ -314,8 +313,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
assert(relation.partitionSchema.isEmpty)

CatalogTable(
specifiedDatabase = Option(dbName),
name = tblName,
name = TableIdentifier(tblName, Option(dbName)),
tableType = tableType,
storage = CatalogStorageFormat(
locationUri = Some(relation.location.paths.map(_.toUri.toString).head),
Expand Down Expand Up @@ -432,7 +430,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
alias match {
// because hive use things like `_c0` to build the expanded text
// currently we cannot support view from "create view v1(c1) as ..."
case None => SubqueryAlias(table.name, hive.parseSql(viewText))
case None => SubqueryAlias(table.name.table, hive.parseSql(viewText))
case Some(aliasText) => SubqueryAlias(aliasText, hive.parseSql(viewText))
}
} else {
Expand Down Expand Up @@ -618,9 +616,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)

execution.CreateViewAsSelect(
table.copy(
specifiedDatabase = Some(dbName),
name = tblName),
table.copy(name = TableIdentifier(tblName, Some(dbName))),
child,
allowExisting,
replace)
Expand All @@ -642,15 +638,15 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
if (hive.convertCTAS && table.storage.serde.isEmpty) {
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
// does not specify any storage format (file format and storage handler).
if (table.specifiedDatabase.isDefined) {
if (table.name.database.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
}

val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
TableIdentifier(desc.name),
TableIdentifier(desc.name.table),
conf.defaultDataSourceName,
temporary = false,
Array.empty[String],
Expand All @@ -671,9 +667,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)

execution.CreateTableAsSelect(
desc.copy(
specifiedDatabase = Some(dbName),
name = tblName),
desc.copy(name = TableIdentifier(tblName, Some(dbName))),
child,
allowExisting)
}
Expand Down Expand Up @@ -824,7 +818,7 @@ private[hive] case class MetastoreRelation(
// We start by constructing an API table as Hive performs several important transformations
// internally when converting an API table to a QL table.
val tTable = new org.apache.hadoop.hive.metastore.api.Table()
tTable.setTableName(table.name)
tTable.setTableName(table.name.table)
tTable.setDbName(table.database)

val tableParameters = new java.util.HashMap[String, String]()
Expand Down
14 changes: 5 additions & 9 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private[hive] case class CreateTableAsSelect(

override def output: Seq[Attribute] = Seq.empty[Attribute]
override lazy val resolved: Boolean =
tableDesc.specifiedDatabase.isDefined &&
tableDesc.name.database.isDefined &&
tableDesc.schema.nonEmpty &&
tableDesc.storage.serde.isDefined &&
tableDesc.storage.inputFormat.isDefined &&
Expand Down Expand Up @@ -185,13 +185,10 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
properties: Map[String, String],
allowExist: Boolean,
replace: Boolean): CreateViewAsSelect = {
val TableIdentifier(viewName, dbName) = extractTableIdent(viewNameParts)

val tableIdentifier = extractTableIdent(viewNameParts)
val originalText = query.source

val tableDesc = CatalogTable(
specifiedDatabase = dbName,
name = viewName,
name = tableIdentifier,
tableType = CatalogTableType.VIRTUAL_VIEW,
schema = schema,
storage = CatalogStorageFormat(
Expand Down Expand Up @@ -356,12 +353,11 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
"TOK_TABLELOCATION",
"TOK_TABLEPROPERTIES"),
children)
val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts)
val tableIdentifier = extractTableIdent(tableNameParts)

// TODO add bucket support
var tableDesc: CatalogTable = CatalogTable(
specifiedDatabase = dbName,
name = tblName,
name = tableIdentifier,
tableType =
if (externalTable.isDefined) {
CatalogTableType.EXTERNAL_TABLE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private[hive] trait HiveClient {
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit

/** Alter a table whose name matches the one specified in `table`, assuming it exists. */
final def alterTable(table: CatalogTable): Unit = alterTable(table.name, table)
final def alterTable(table: CatalogTable): Unit = alterTable(table.name.table, table)

/** Updates the given table with new metadata, optionally renaming the table. */
def alterTable(tableName: String, table: CatalogTable): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Expression
Expand Down Expand Up @@ -298,8 +299,7 @@ private[hive] class HiveClientImpl(
logDebug(s"Looking up $dbName.$tableName")
Option(client.getTable(dbName, tableName, false)).map { h =>
CatalogTable(
specifiedDatabase = Option(h.getDbName),
name = h.getTableName,
name = TableIdentifier(h.getTableName, Option(h.getDbName)),
tableType = h.getTableType match {
case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL_TABLE
case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED_TABLE
Expand Down Expand Up @@ -639,7 +639,7 @@ private[hive] class HiveClientImpl(
}

private def toHiveTable(table: CatalogTable): HiveTable = {
val hiveTable = new HiveTable(table.database, table.name)
val hiveTable = new HiveTable(table.database, table.name.table)
hiveTable.setTableType(table.tableType match {
case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE
case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ case class CreateTableAsSelect(
allowExisting: Boolean)
extends RunnableCommand {

val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database))
private val tableIdentifier = tableDesc.name

override def children: Seq[LogicalPlan] = Seq(query)

Expand Down Expand Up @@ -93,6 +93,6 @@ case class CreateTableAsSelect(
}

override def argString: String = {
s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name}, InsertIntoHiveTable]"
s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name.table}, InsertIntoHiveTable]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private[hive] case class CreateViewAsSelect(
assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
assert(tableDesc.viewText.isDefined)

val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database))
private val tableIdentifier = tableDesc.name

override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
Expand Down Expand Up @@ -116,7 +116,7 @@ private[hive] case class CreateViewAsSelect(
}

val viewText = tableDesc.viewText.get
val viewName = quote(tableDesc.name)
val viewName = quote(tableDesc.name.table)
s"SELECT $viewOutput FROM ($viewText) $viewName"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {

val (desc, exists) = extractTableDesc(s1)
assert(exists)
assert(desc.specifiedDatabase == Some("mydb"))
assert(desc.name == "page_view")
assert(desc.name.database == Some("mydb"))
assert(desc.name.table == "page_view")
assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
assert(desc.storage.locationUri == Some("/user/external/page_view"))
assert(desc.schema ==
Expand Down Expand Up @@ -100,8 +100,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {

val (desc, exists) = extractTableDesc(s2)
assert(exists)
assert(desc.specifiedDatabase == Some("mydb"))
assert(desc.name == "page_view")
assert(desc.name.database == Some("mydb"))
assert(desc.name.table == "page_view")
assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
assert(desc.storage.locationUri == Some("/user/external/page_view"))
assert(desc.schema ==
Expand All @@ -127,8 +127,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
val s3 = """CREATE TABLE page_view AS SELECT * FROM src"""
val (desc, exists) = extractTableDesc(s3)
assert(exists == false)
assert(desc.specifiedDatabase == None)
assert(desc.name == "page_view")
assert(desc.name.database == None)
assert(desc.name.table == "page_view")
assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
assert(desc.storage.locationUri == None)
assert(desc.schema == Seq.empty[CatalogColumn])
Expand Down Expand Up @@ -162,8 +162,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
| ORDER BY key, value""".stripMargin
val (desc, exists) = extractTableDesc(s5)
assert(exists == false)
assert(desc.specifiedDatabase == None)
assert(desc.name == "ctas2")
assert(desc.name.database == None)
assert(desc.name.table == "ctas2")
assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
assert(desc.storage.locationUri == None)
assert(desc.schema == Seq.empty[CatalogColumn])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,8 +719,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
withTable(tableName) {
val schema = StructType(StructField("int", IntegerType, true) :: Nil)
val hiveTable = CatalogTable(
specifiedDatabase = Some("default"),
name = tableName,
name = TableIdentifier(tableName, Some("default")),
tableType = CatalogTableType.MANAGED_TABLE,
schema = Seq.empty,
storage = CatalogStorageFormat(
Expand Down
Loading

0 comments on commit dd1fbae

Please sign in to comment.