Skip to content

Commit

Permalink
[SPARK-13080][SQL] Implement new Catalog API using Hive
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This is a step towards merging `SQLContext` and `HiveContext`. A new internal Catalog API was introduced in #10982 and extended in #11069. This patch introduces an implementation of this API using `HiveClient`, an existing interface to Hive. It also extends `HiveClient` with additional calls to Hive that are needed to complete the catalog implementation.

*Where should I start reviewing?* The new catalog introduced is `HiveCatalog`. This class is relatively simple because it just calls `HiveClientImpl`, where most of the new logic is. I would not start with `HiveClient`, `HiveQl`, or `HiveMetastoreCatalog`, which are modified mainly because of a refactor.

*Why is this patch so big?* I had to refactor HiveClient to remove an intermediate representation of databases, tables, partitions etc. After this refactor `CatalogTable` convert directly to and from `HiveTable` (etc.). Otherwise we would have to first convert `CatalogTable` to the intermediate representation and then convert that to HiveTable, which is messy.

The new class hierarchy is as follows:
```
org.apache.spark.sql.catalyst.catalog.Catalog
  - org.apache.spark.sql.catalyst.catalog.InMemoryCatalog
  - org.apache.spark.sql.hive.HiveCatalog
```

Note that, as of this patch, none of these classes are currently used anywhere yet. This will come in the future before the Spark 2.0 release.

## How was the this patch tested?
All existing unit tests, and HiveCatalogSuite that extends CatalogTestCases.

Author: Andrew Or <andrew@databricks.com>
Author: Reynold Xin <rxin@databricks.com>

Closes #11293 from rxin/hive-catalog.
  • Loading branch information
Andrew Or authored and rxin committed Feb 21, 2016
1 parent 7eb83fe commit 6c3832b
Show file tree
Hide file tree
Showing 21 changed files with 1,483 additions and 700 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package org.apache.spark.sql

import org.apache.spark.annotation.DeveloperApi


// TODO: don't swallow original stack trace if it exists

/**
* :: DeveloperApi ::
* Thrown when a query fails to analyze, usually because the query itself is invalid.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,11 @@ package org.apache.spark.sql.catalyst.analysis
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}

/**
* Thrown by a catalog when a table cannot be found. The analyzer will rethrow the exception
* as an AnalysisException with the correct position information.
*/
class NoSuchTableException extends Exception

class NoSuchDatabaseException extends Exception

/**
* An interface for looking up relations by name. Used by an [[Analyzer]].
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.catalog.Catalog.TablePartitionSpec


/**
* Thrown by a catalog when an item cannot be found. The analyzer will rethrow the exception
* as an [[org.apache.spark.sql.AnalysisException]] with the correct position information.
*/
abstract class NoSuchItemException extends Exception {
override def getMessage: String
}

class NoSuchDatabaseException(db: String) extends NoSuchItemException {
override def getMessage: String = s"Database $db not found"
}

class NoSuchTableException(db: String, table: String) extends NoSuchItemException {
override def getMessage: String = s"Table $table not found in database $db"
}

class NoSuchPartitionException(
db: String,
table: String,
spec: TablePartitionSpec)
extends NoSuchItemException {

override def getMessage: String = {
s"Partition not found in table $table database $db:\n" + spec.mkString("\n")
}
}

class NoSuchFunctionException(db: String, func: String) extends NoSuchItemException {
override def getMessage: String = s"Function $func not found in database $db"
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ import org.apache.spark.sql.AnalysisException
class InMemoryCatalog extends Catalog {
import Catalog._

private class TableDesc(var table: Table) {
val partitions = new mutable.HashMap[PartitionSpec, TablePartition]
private class TableDesc(var table: CatalogTable) {
val partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition]
}

private class DatabaseDesc(var db: Database) {
private class DatabaseDesc(var db: CatalogDatabase) {
val tables = new mutable.HashMap[String, TableDesc]
val functions = new mutable.HashMap[String, Function]
val functions = new mutable.HashMap[String, CatalogFunction]
}

// Database name -> description
private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]

private def filterPattern(names: Seq[String], pattern: String): Seq[String] = {
Expand All @@ -47,39 +48,33 @@ class InMemoryCatalog extends Catalog {
}

private def existsFunction(db: String, funcName: String): Boolean = {
assertDbExists(db)
requireDbExists(db)
catalog(db).functions.contains(funcName)
}

private def existsTable(db: String, table: String): Boolean = {
assertDbExists(db)
requireDbExists(db)
catalog(db).tables.contains(table)
}

private def existsPartition(db: String, table: String, spec: PartitionSpec): Boolean = {
assertTableExists(db, table)
private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = {
requireTableExists(db, table)
catalog(db).tables(table).partitions.contains(spec)
}

private def assertDbExists(db: String): Unit = {
if (!catalog.contains(db)) {
throw new AnalysisException(s"Database $db does not exist")
}
}

private def assertFunctionExists(db: String, funcName: String): Unit = {
private def requireFunctionExists(db: String, funcName: String): Unit = {
if (!existsFunction(db, funcName)) {
throw new AnalysisException(s"Function $funcName does not exist in $db database")
}
}

private def assertTableExists(db: String, table: String): Unit = {
private def requireTableExists(db: String, table: String): Unit = {
if (!existsTable(db, table)) {
throw new AnalysisException(s"Table $table does not exist in $db database")
}
}

private def assertPartitionExists(db: String, table: String, spec: PartitionSpec): Unit = {
private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
if (!existsPartition(db, table, spec)) {
throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec")
}
Expand All @@ -90,7 +85,7 @@ class InMemoryCatalog extends Catalog {
// --------------------------------------------------------------------------

override def createDatabase(
dbDefinition: Database,
dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean): Unit = synchronized {
if (catalog.contains(dbDefinition.name)) {
if (!ignoreIfExists) {
Expand Down Expand Up @@ -124,17 +119,20 @@ class InMemoryCatalog extends Catalog {
}
}

override def alterDatabase(db: String, dbDefinition: Database): Unit = synchronized {
assertDbExists(db)
assert(db == dbDefinition.name)
catalog(db).db = dbDefinition
override def alterDatabase(dbDefinition: CatalogDatabase): Unit = synchronized {
requireDbExists(dbDefinition.name)
catalog(dbDefinition.name).db = dbDefinition
}

override def getDatabase(db: String): Database = synchronized {
assertDbExists(db)
override def getDatabase(db: String): CatalogDatabase = synchronized {
requireDbExists(db)
catalog(db).db
}

override def databaseExists(db: String): Boolean = synchronized {
catalog.contains(db)
}

override def listDatabases(): Seq[String] = synchronized {
catalog.keySet.toSeq
}
Expand All @@ -143,15 +141,17 @@ class InMemoryCatalog extends Catalog {
filterPattern(listDatabases(), pattern)
}

override def setCurrentDatabase(db: String): Unit = { /* no-op */ }

// --------------------------------------------------------------------------
// Tables
// --------------------------------------------------------------------------

override def createTable(
db: String,
tableDefinition: Table,
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized {
assertDbExists(db)
requireDbExists(db)
if (existsTable(db, tableDefinition.name)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
Expand All @@ -165,7 +165,7 @@ class InMemoryCatalog extends Catalog {
db: String,
table: String,
ignoreIfNotExists: Boolean): Unit = synchronized {
assertDbExists(db)
requireDbExists(db)
if (existsTable(db, table)) {
catalog(db).tables.remove(table)
} else {
Expand All @@ -176,31 +176,30 @@ class InMemoryCatalog extends Catalog {
}

override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
assertTableExists(db, oldName)
requireTableExists(db, oldName)
val oldDesc = catalog(db).tables(oldName)
oldDesc.table = oldDesc.table.copy(name = newName)
catalog(db).tables.put(newName, oldDesc)
catalog(db).tables.remove(oldName)
}

override def alterTable(db: String, table: String, tableDefinition: Table): Unit = synchronized {
assertTableExists(db, table)
assert(table == tableDefinition.name)
catalog(db).tables(table).table = tableDefinition
override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
requireTableExists(db, tableDefinition.name)
catalog(db).tables(tableDefinition.name).table = tableDefinition
}

override def getTable(db: String, table: String): Table = synchronized {
assertTableExists(db, table)
override def getTable(db: String, table: String): CatalogTable = synchronized {
requireTableExists(db, table)
catalog(db).tables(table).table
}

override def listTables(db: String): Seq[String] = synchronized {
assertDbExists(db)
requireDbExists(db)
catalog(db).tables.keySet.toSeq
}

override def listTables(db: String, pattern: String): Seq[String] = synchronized {
assertDbExists(db)
requireDbExists(db)
filterPattern(listTables(db), pattern)
}

Expand All @@ -211,9 +210,9 @@ class InMemoryCatalog extends Catalog {
override def createPartitions(
db: String,
table: String,
parts: Seq[TablePartition],
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = synchronized {
assertTableExists(db, table)
requireTableExists(db, table)
val existingParts = catalog(db).tables(table).partitions
if (!ignoreIfExists) {
val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec }
Expand All @@ -229,9 +228,9 @@ class InMemoryCatalog extends Catalog {
override def dropPartitions(
db: String,
table: String,
partSpecs: Seq[PartitionSpec],
partSpecs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit = synchronized {
assertTableExists(db, table)
requireTableExists(db, table)
val existingParts = catalog(db).tables(table).partitions
if (!ignoreIfNotExists) {
val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s }
Expand All @@ -244,75 +243,82 @@ class InMemoryCatalog extends Catalog {
partSpecs.foreach(existingParts.remove)
}

override def alterPartition(
override def renamePartitions(
db: String,
table: String,
spec: Map[String, String],
newPart: TablePartition): Unit = synchronized {
assertPartitionExists(db, table, spec)
val existingParts = catalog(db).tables(table).partitions
if (spec != newPart.spec) {
// Also a change in specs; remove the old one and add the new one back
existingParts.remove(spec)
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = synchronized {
require(specs.size == newSpecs.size, "number of old and new partition specs differ")
specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec)
val existingParts = catalog(db).tables(table).partitions
existingParts.remove(oldSpec)
existingParts.put(newSpec, newPart)
}
}

override def alterPartitions(
db: String,
table: String,
parts: Seq[CatalogTablePartition]): Unit = synchronized {
parts.foreach { p =>
requirePartitionExists(db, table, p.spec)
catalog(db).tables(table).partitions.put(p.spec, p)
}
existingParts.put(newPart.spec, newPart)
}

override def getPartition(
db: String,
table: String,
spec: Map[String, String]): TablePartition = synchronized {
assertPartitionExists(db, table, spec)
spec: TablePartitionSpec): CatalogTablePartition = synchronized {
requirePartitionExists(db, table, spec)
catalog(db).tables(table).partitions(spec)
}

override def listPartitions(db: String, table: String): Seq[TablePartition] = synchronized {
assertTableExists(db, table)
override def listPartitions(
db: String,
table: String): Seq[CatalogTablePartition] = synchronized {
requireTableExists(db, table)
catalog(db).tables(table).partitions.values.toSeq
}

// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------

override def createFunction(
db: String,
func: Function,
ignoreIfExists: Boolean): Unit = synchronized {
assertDbExists(db)
override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
if (existsFunction(db, func.name)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Function $func already exists in $db database")
}
throw new AnalysisException(s"Function $func already exists in $db database")
} else {
catalog(db).functions.put(func.name, func)
}
}

override def dropFunction(db: String, funcName: String): Unit = synchronized {
assertFunctionExists(db, funcName)
requireFunctionExists(db, funcName)
catalog(db).functions.remove(funcName)
}

override def alterFunction(
db: String,
funcName: String,
funcDefinition: Function): Unit = synchronized {
assertFunctionExists(db, funcName)
if (funcName != funcDefinition.name) {
// Also a rename; remove the old one and add the new one back
catalog(db).functions.remove(funcName)
}
override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
requireFunctionExists(db, oldName)
val newFunc = getFunction(db, oldName).copy(name = newName)
catalog(db).functions.remove(oldName)
catalog(db).functions.put(newName, newFunc)
}

override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized {
requireFunctionExists(db, funcDefinition.name)
catalog(db).functions.put(funcDefinition.name, funcDefinition)
}

override def getFunction(db: String, funcName: String): Function = synchronized {
assertFunctionExists(db, funcName)
override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {
requireFunctionExists(db, funcName)
catalog(db).functions(funcName)
}

override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
assertDbExists(db)
requireDbExists(db)
filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
}

Expand Down
Loading

0 comments on commit 6c3832b

Please sign in to comment.