Skip to content

Commit

Permalink
[SPARK-15334][SQL] HiveClient facade not compatible with Hive 0.12
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

HiveClient facade is not compatible with Hive 0.12.

This PR Fixes the following compatibility issues:
1. `org.apache.spark.sql.hive.client.HiveClientImpl` use `AddPartitionDesc(db, table, ignoreIfExists)` to create partitions, however, Hive 0.12 doesn't have this constructor for `AddPartitionDesc`.
2. `HiveClientImpl` uses `PartitionDropOptions` when dropping partition, however, class `PartitionDropOptions` doesn't exist in Hive 0.12.
3. Hive 0.12 doesn't support adding permanent functions. It is not valid to call `org.apache.hadoop.hive.ql.metadata.Hive.createFunction`, `org.apache.hadoop.hive.ql.metadata.Hive.alterFunction`, and `org.apache.hadoop.hive.ql.metadata.Hive.alterFunction`
4. `org.apache.spark.sql.hive.client.VersionsSuite` doesn't have enough test coverage for different hive versions 0.12, 0.13, 0.14, 1.0.0, 1.1.0, 1.2.0.

## How was this patch tested?

Unit test.

Author: Sean Zhong <seanzhong@databricks.com>

Closes #13127 from clockfly/versionSuite.
  • Loading branch information
clockfly authored and liancheng committed May 18, 2016
1 parent 411c04a commit 6e02aec
Show file tree
Hide file tree
Showing 3 changed files with 545 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,24 @@ import scala.language.reflectiveCalls
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{PartitionDropOptions, TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema}
import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType}
import org.apache.hadoop.hive.metastore.api.{NoSuchObjectException, PrincipalType}
import org.apache.hadoop.hive.metastore.api.{ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.util.{CausedBy, CircularBuffer, Utils}
import org.apache.spark.util.{CircularBuffer, Utils}

/**
* A class that wraps the HiveClient and converts its responses to externally visible classes.
Expand Down Expand Up @@ -400,11 +396,7 @@ private[hive] class HiveClientImpl(
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = withHiveState {
val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
parts.foreach { s =>
addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
}
client.createPartitions(addPartitionDesc)
shim.createPartitions(client, db, table, parts, ignoreIfExists)
}

override def dropPartitions(
Expand All @@ -430,10 +422,9 @@ private[hive] class HiveClientImpl(
}.distinct
var droppedParts = ArrayBuffer.empty[java.util.List[String]]
matchingParts.foreach { partition =>
val dropOptions = new PartitionDropOptions
dropOptions.ifExists = ignoreIfNotExists
try {
client.dropPartition(db, table, partition, dropOptions)
val deleteData = true
client.dropPartition(db, table, partition, deleteData)
} catch {
case e: Exception =>
val remainingParts = matchingParts.toBuffer -- droppedParts
Expand Down Expand Up @@ -629,37 +620,28 @@ private[hive] class HiveClientImpl(
}

override def createFunction(db: String, func: CatalogFunction): Unit = withHiveState {
client.createFunction(toHiveFunction(func, db))
shim.createFunction(client, db, func)
}

override def dropFunction(db: String, name: String): Unit = withHiveState {
client.dropFunction(db, name)
shim.dropFunction(client, db, name)
}

override def renameFunction(db: String, oldName: String, newName: String): Unit = withHiveState {
val catalogFunc = getFunction(db, oldName)
.copy(identifier = FunctionIdentifier(newName, Some(db)))
val hiveFunc = toHiveFunction(catalogFunc, db)
client.alterFunction(db, oldName, hiveFunc)
shim.renameFunction(client, db, oldName, newName)
}

override def alterFunction(db: String, func: CatalogFunction): Unit = withHiveState {
client.alterFunction(db, func.identifier.funcName, toHiveFunction(func, db))
shim.alterFunction(client, db, func)
}

override def getFunctionOption(
db: String,
name: String): Option[CatalogFunction] = withHiveState {
try {
Option(client.getFunction(db, name)).map(fromHiveFunction)
} catch {
case CausedBy(ex: NoSuchObjectException) if ex.getMessage.contains(name) =>
None
}
db: String, name: String): Option[CatalogFunction] = withHiveState {
shim.getFunctionOption(client, db, name)
}

override def listFunctions(db: String, pattern: String): Seq[String] = withHiveState {
client.getFunctions(db, pattern).asScala
shim.listFunctions(client, db, pattern)
}

def addJar(path: String): Unit = {
Expand Down Expand Up @@ -708,36 +690,6 @@ private[hive] class HiveClientImpl(
Utils.classForName(name)
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]

private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
val resourceUris = f.resources.map { resource =>
new ResourceUri(
ResourceType.valueOf(resource.resourceType.resourceType.toUpperCase()), resource.uri)
}
new HiveFunction(
f.identifier.funcName,
db,
f.className,
null,
PrincipalType.USER,
(System.currentTimeMillis / 1000).toInt,
FunctionType.JAVA,
resourceUris.asJava)
}

private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
val resources = hf.getResourceUris.asScala.map { uri =>
val resourceType = uri.getResourceType() match {
case ResourceType.ARCHIVE => "archive"
case ResourceType.FILE => "file"
case ResourceType.JAR => "jar"
case r => throw new AnalysisException(s"Unknown resource type: $r")
}
FunctionResource(FunctionResourceType.fromString(resourceType), uri.getUri())
}
new CatalogFunction(name, hf.getClassName, resources)
}

private def toHiveColumn(c: CatalogColumn): FieldSchema = {
new FieldSchema(c.name, c.dataType, c.comment.orNull)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,23 @@ import scala.collection.JavaConverters._

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType, NoSuchObjectException, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table}
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{IntegralType, StringType}
import org.apache.spark.util.CausedBy


/**
* A shim that defines the interface between [[HiveClientImpl]] and the underlying Hive library used
Expand Down Expand Up @@ -73,6 +81,13 @@ private[client] sealed abstract class Shim {

def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long

def createPartitions(
hive: Hive,
db: String,
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit

def loadPartition(
hive: Hive,
loadPath: Path,
Expand Down Expand Up @@ -100,6 +115,18 @@ private[client] sealed abstract class Shim {
holdDDLTime: Boolean,
listBucketingEnabled: Boolean): Unit

def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit

def dropFunction(hive: Hive, db: String, name: String): Unit

def renameFunction(hive: Hive, db: String, oldName: String, newName: String): Unit

def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit

def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction]

def listFunctions(hive: Hive, db: String, pattern: String): Seq[String]

def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit

protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
Expand All @@ -112,7 +139,6 @@ private[client] sealed abstract class Shim {
protected def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
klass.getMethod(name, args: _*)
}

}

private[client] class Shim_v0_12 extends Shim with Logging {
Expand Down Expand Up @@ -144,6 +170,22 @@ private[client] class Shim_v0_12 extends Shim with Logging {
classOf[Driver],
"getResults",
classOf[JArrayList[String]])
private lazy val createPartitionMethod =
findMethod(
classOf[Hive],
"createPartition",
classOf[Table],
classOf[JMap[String, String]],
classOf[Path],
classOf[JMap[String, String]],
classOf[String],
classOf[String],
JInteger.TYPE,
classOf[JList[Object]],
classOf[String],
classOf[JMap[String, String]],
classOf[JList[Object]],
classOf[JList[Object]])
private lazy val loadPartitionMethod =
findMethod(
classOf[Hive],
Expand Down Expand Up @@ -199,6 +241,42 @@ private[client] class Shim_v0_12 extends Shim with Logging {
override def setDataLocation(table: Table, loc: String): Unit =
setDataLocationMethod.invoke(table, new URI(loc))

// Follows exactly the same logic of DDLTask.createPartitions in Hive 0.12
override def createPartitions(
hive: Hive,
database: String,
tableName: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
val table = hive.getTable(database, tableName)
parts.foreach { s =>
val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull
val spec = s.spec.asJava
if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
// Ignore this partition since it already exists and ignoreIfExists == true
} else {
if (location == null && table.isView()) {
throw new HiveException("LOCATION clause illegal for view partition");
}

createPartitionMethod.invoke(
hive,
table,
spec,
location,
null, // partParams
null, // inputFormat
null, // outputFormat
-1: JInteger, // numBuckets
null, // cols
null, // serializationLib
null, // serdeParams
null, // bucketCols
null) // sortCols
}
}
}

override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq

Expand Down Expand Up @@ -265,6 +343,30 @@ private[client] class Shim_v0_12 extends Shim with Logging {
dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean)
}

override def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
throw new AnalysisException("Hive 0.12 doesn't support creating permanent functions. " +
"Please use Hive 0.13 or higher.")
}

def dropFunction(hive: Hive, db: String, name: String): Unit = {
throw new NoSuchPermanentFunctionException(db, name)
}

def renameFunction(hive: Hive, db: String, oldName: String, newName: String): Unit = {
throw new NoSuchPermanentFunctionException(db, oldName)
}

def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
throw new NoSuchPermanentFunctionException(db, func.identifier.funcName)
}

def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction] = {
None
}

def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] = {
Seq.empty[String]
}
}

private[client] class Shim_v0_13 extends Shim_v0_12 {
Expand Down Expand Up @@ -308,9 +410,85 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
override def setDataLocation(table: Table, loc: String): Unit =
setDataLocationMethod.invoke(table, new Path(loc))

override def createPartitions(
hive: Hive,
db: String,
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
parts.foreach { s =>
addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
}
hive.createPartitions(addPartitionDesc)
}

override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq

private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
val resourceUris = f.resources.map { resource =>
new ResourceUri(
ResourceType.valueOf(resource.resourceType.resourceType.toUpperCase()), resource.uri)
}
new HiveFunction(
f.identifier.funcName,
db,
f.className,
null,
PrincipalType.USER,
(System.currentTimeMillis / 1000).toInt,
FunctionType.JAVA,
resourceUris.asJava)
}

override def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
hive.createFunction(toHiveFunction(func, db))
}

override def dropFunction(hive: Hive, db: String, name: String): Unit = {
hive.dropFunction(db, name)
}

override def renameFunction(hive: Hive, db: String, oldName: String, newName: String): Unit = {
val catalogFunc = getFunctionOption(hive, db, oldName)
.getOrElse(throw new NoSuchPermanentFunctionException(db, oldName))
.copy(identifier = FunctionIdentifier(newName, Some(db)))
val hiveFunc = toHiveFunction(catalogFunc, db)
hive.alterFunction(db, oldName, hiveFunc)
}

override def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
hive.alterFunction(db, func.identifier.funcName, toHiveFunction(func, db))
}

private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
val resources = hf.getResourceUris.asScala.map { uri =>
val resourceType = uri.getResourceType() match {
case ResourceType.ARCHIVE => "archive"
case ResourceType.FILE => "file"
case ResourceType.JAR => "jar"
case r => throw new AnalysisException(s"Unknown resource type: $r")
}
FunctionResource(FunctionResourceType.fromString(resourceType), uri.getUri())
}
new CatalogFunction(name, hf.getClassName, resources)
}

override def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction] = {
try {
Option(hive.getFunction(db, name)).map(fromHiveFunction)
} catch {
case CausedBy(ex: NoSuchObjectException) if ex.getMessage.contains(name) =>
None
}
}

override def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] = {
hive.getFunctions(db, pattern).asScala
}

/**
* Converts catalyst expression to the format that Hive's getPartitionsByFilter() expects, i.e.
* a string that represents partition predicates like "str_key=\"value\" and int_key=1 ...".
Expand Down

0 comments on commit 6e02aec

Please sign in to comment.