From 6e02aec44b9e5bc2ada55cb612f26e6ba000c23e Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Wed, 18 May 2016 16:00:02 +0800 Subject: [PATCH] [SPARK-15334][SQL] HiveClient facade not compatible with Hive 0.12 ## What changes were proposed in this pull request? HiveClient facade is not compatible with Hive 0.12. This PR Fixes the following compatibility issues: 1. `org.apache.spark.sql.hive.client.HiveClientImpl` use `AddPartitionDesc(db, table, ignoreIfExists)` to create partitions, however, Hive 0.12 doesn't have this constructor for `AddPartitionDesc`. 2. `HiveClientImpl` uses `PartitionDropOptions` when dropping partition, however, class `PartitionDropOptions` doesn't exist in Hive 0.12. 3. Hive 0.12 doesn't support adding permanent functions. It is not valid to call `org.apache.hadoop.hive.ql.metadata.Hive.createFunction`, `org.apache.hadoop.hive.ql.metadata.Hive.alterFunction`, and `org.apache.hadoop.hive.ql.metadata.Hive.alterFunction` 4. `org.apache.spark.sql.hive.client.VersionsSuite` doesn't have enough test coverage for different hive versions 0.12, 0.13, 0.14, 1.0.0, 1.1.0, 1.2.0. ## How was this patch tested? Unit test. Author: Sean Zhong Closes #13127 from clockfly/versionSuite. --- .../sql/hive/client/HiveClientImpl.scala | 74 +--- .../spark/sql/hive/client/HiveShim.scala | 182 +++++++- .../spark/sql/hive/client/VersionsSuite.scala | 412 +++++++++++++++--- 3 files changed, 545 insertions(+), 123 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index a4e9f03b43342..af2850d4f568c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -26,14 +26,10 @@ import scala.language.reflectiveCalls import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.metastore.{PartitionDropOptions, TableType => HiveTableType} +import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema} -import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType} -import org.apache.hadoop.hive.metastore.api.{NoSuchObjectException, PrincipalType} -import org.apache.hadoop.hive.metastore.api.{ResourceType, ResourceUri} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable} -import org.apache.hadoop.hive.ql.plan.AddPartitionDesc import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.security.UserGroupInformation @@ -41,13 +37,13 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.util.{CausedBy, CircularBuffer, Utils} +import org.apache.spark.util.{CircularBuffer, Utils} /** * A class that wraps the HiveClient and converts its responses to externally visible classes. @@ -400,11 +396,7 @@ private[hive] class HiveClientImpl( table: String, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = withHiveState { - val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists) - parts.foreach { s => - addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull) - } - client.createPartitions(addPartitionDesc) + shim.createPartitions(client, db, table, parts, ignoreIfExists) } override def dropPartitions( @@ -430,10 +422,9 @@ private[hive] class HiveClientImpl( }.distinct var droppedParts = ArrayBuffer.empty[java.util.List[String]] matchingParts.foreach { partition => - val dropOptions = new PartitionDropOptions - dropOptions.ifExists = ignoreIfNotExists try { - client.dropPartition(db, table, partition, dropOptions) + val deleteData = true + client.dropPartition(db, table, partition, deleteData) } catch { case e: Exception => val remainingParts = matchingParts.toBuffer -- droppedParts @@ -629,37 +620,28 @@ private[hive] class HiveClientImpl( } override def createFunction(db: String, func: CatalogFunction): Unit = withHiveState { - client.createFunction(toHiveFunction(func, db)) + shim.createFunction(client, db, func) } override def dropFunction(db: String, name: String): Unit = withHiveState { - client.dropFunction(db, name) + shim.dropFunction(client, db, name) } override def renameFunction(db: String, oldName: String, newName: String): Unit = withHiveState { - val catalogFunc = getFunction(db, oldName) - .copy(identifier = FunctionIdentifier(newName, Some(db))) - val hiveFunc = toHiveFunction(catalogFunc, db) - client.alterFunction(db, oldName, hiveFunc) + shim.renameFunction(client, db, oldName, newName) } override def alterFunction(db: String, func: CatalogFunction): Unit = withHiveState { - client.alterFunction(db, func.identifier.funcName, toHiveFunction(func, db)) + shim.alterFunction(client, db, func) } override def getFunctionOption( - db: String, - name: String): Option[CatalogFunction] = withHiveState { - try { - Option(client.getFunction(db, name)).map(fromHiveFunction) - } catch { - case CausedBy(ex: NoSuchObjectException) if ex.getMessage.contains(name) => - None - } + db: String, name: String): Option[CatalogFunction] = withHiveState { + shim.getFunctionOption(client, db, name) } override def listFunctions(db: String, pattern: String): Seq[String] = withHiveState { - client.getFunctions(db, pattern).asScala + shim.listFunctions(client, db, pattern) } def addJar(path: String): Unit = { @@ -708,36 +690,6 @@ private[hive] class HiveClientImpl( Utils.classForName(name) .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] - private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = { - val resourceUris = f.resources.map { resource => - new ResourceUri( - ResourceType.valueOf(resource.resourceType.resourceType.toUpperCase()), resource.uri) - } - new HiveFunction( - f.identifier.funcName, - db, - f.className, - null, - PrincipalType.USER, - (System.currentTimeMillis / 1000).toInt, - FunctionType.JAVA, - resourceUris.asJava) - } - - private def fromHiveFunction(hf: HiveFunction): CatalogFunction = { - val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName)) - val resources = hf.getResourceUris.asScala.map { uri => - val resourceType = uri.getResourceType() match { - case ResourceType.ARCHIVE => "archive" - case ResourceType.FILE => "file" - case ResourceType.JAR => "jar" - case r => throw new AnalysisException(s"Unknown resource type: $r") - } - FunctionResource(FunctionResourceType.fromString(resourceType), uri.getUri()) - } - new CatalogFunction(name, hf.getClassName, resources) - } - private def toHiveColumn(c: CatalogColumn): FieldSchema = { new FieldSchema(c.name, c.dataType, c.comment.orNull) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 4ecf866f96395..78713c3f0bace 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -27,15 +27,23 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType, NoSuchObjectException, PrincipalType, ResourceType, ResourceUri} import org.apache.hadoop.hive.ql.Driver -import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table} +import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table} +import org.apache.hadoop.hive.ql.plan.AddPartitionDesc import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory} import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException +import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, FunctionResource, FunctionResourceType} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{IntegralType, StringType} +import org.apache.spark.util.CausedBy + /** * A shim that defines the interface between [[HiveClientImpl]] and the underlying Hive library used @@ -73,6 +81,13 @@ private[client] sealed abstract class Shim { def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long + def createPartitions( + hive: Hive, + db: String, + table: String, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit + def loadPartition( hive: Hive, loadPath: Path, @@ -100,6 +115,18 @@ private[client] sealed abstract class Shim { holdDDLTime: Boolean, listBucketingEnabled: Boolean): Unit + def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit + + def dropFunction(hive: Hive, db: String, name: String): Unit + + def renameFunction(hive: Hive, db: String, oldName: String, newName: String): Unit + + def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit + + def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction] + + def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] + def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = { @@ -112,7 +139,6 @@ private[client] sealed abstract class Shim { protected def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = { klass.getMethod(name, args: _*) } - } private[client] class Shim_v0_12 extends Shim with Logging { @@ -144,6 +170,22 @@ private[client] class Shim_v0_12 extends Shim with Logging { classOf[Driver], "getResults", classOf[JArrayList[String]]) + private lazy val createPartitionMethod = + findMethod( + classOf[Hive], + "createPartition", + classOf[Table], + classOf[JMap[String, String]], + classOf[Path], + classOf[JMap[String, String]], + classOf[String], + classOf[String], + JInteger.TYPE, + classOf[JList[Object]], + classOf[String], + classOf[JMap[String, String]], + classOf[JList[Object]], + classOf[JList[Object]]) private lazy val loadPartitionMethod = findMethod( classOf[Hive], @@ -199,6 +241,42 @@ private[client] class Shim_v0_12 extends Shim with Logging { override def setDataLocation(table: Table, loc: String): Unit = setDataLocationMethod.invoke(table, new URI(loc)) + // Follows exactly the same logic of DDLTask.createPartitions in Hive 0.12 + override def createPartitions( + hive: Hive, + database: String, + tableName: String, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit = { + val table = hive.getTable(database, tableName) + parts.foreach { s => + val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull + val spec = s.spec.asJava + if (hive.getPartition(table, spec, false) != null && ignoreIfExists) { + // Ignore this partition since it already exists and ignoreIfExists == true + } else { + if (location == null && table.isView()) { + throw new HiveException("LOCATION clause illegal for view partition"); + } + + createPartitionMethod.invoke( + hive, + table, + spec, + location, + null, // partParams + null, // inputFormat + null, // outputFormat + -1: JInteger, // numBuckets + null, // cols + null, // serializationLib + null, // serdeParams + null, // bucketCols + null) // sortCols + } + } + } + override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] = getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq @@ -265,6 +343,30 @@ private[client] class Shim_v0_12 extends Shim with Logging { dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean) } + override def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit = { + throw new AnalysisException("Hive 0.12 doesn't support creating permanent functions. " + + "Please use Hive 0.13 or higher.") + } + + def dropFunction(hive: Hive, db: String, name: String): Unit = { + throw new NoSuchPermanentFunctionException(db, name) + } + + def renameFunction(hive: Hive, db: String, oldName: String, newName: String): Unit = { + throw new NoSuchPermanentFunctionException(db, oldName) + } + + def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit = { + throw new NoSuchPermanentFunctionException(db, func.identifier.funcName) + } + + def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction] = { + None + } + + def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] = { + Seq.empty[String] + } } private[client] class Shim_v0_13 extends Shim_v0_12 { @@ -308,9 +410,85 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { override def setDataLocation(table: Table, loc: String): Unit = setDataLocationMethod.invoke(table, new Path(loc)) + override def createPartitions( + hive: Hive, + db: String, + table: String, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit = { + val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists) + parts.foreach { s => + addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull) + } + hive.createPartitions(addPartitionDesc) + } + override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] = getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq + private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = { + val resourceUris = f.resources.map { resource => + new ResourceUri( + ResourceType.valueOf(resource.resourceType.resourceType.toUpperCase()), resource.uri) + } + new HiveFunction( + f.identifier.funcName, + db, + f.className, + null, + PrincipalType.USER, + (System.currentTimeMillis / 1000).toInt, + FunctionType.JAVA, + resourceUris.asJava) + } + + override def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit = { + hive.createFunction(toHiveFunction(func, db)) + } + + override def dropFunction(hive: Hive, db: String, name: String): Unit = { + hive.dropFunction(db, name) + } + + override def renameFunction(hive: Hive, db: String, oldName: String, newName: String): Unit = { + val catalogFunc = getFunctionOption(hive, db, oldName) + .getOrElse(throw new NoSuchPermanentFunctionException(db, oldName)) + .copy(identifier = FunctionIdentifier(newName, Some(db))) + val hiveFunc = toHiveFunction(catalogFunc, db) + hive.alterFunction(db, oldName, hiveFunc) + } + + override def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit = { + hive.alterFunction(db, func.identifier.funcName, toHiveFunction(func, db)) + } + + private def fromHiveFunction(hf: HiveFunction): CatalogFunction = { + val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName)) + val resources = hf.getResourceUris.asScala.map { uri => + val resourceType = uri.getResourceType() match { + case ResourceType.ARCHIVE => "archive" + case ResourceType.FILE => "file" + case ResourceType.JAR => "jar" + case r => throw new AnalysisException(s"Unknown resource type: $r") + } + FunctionResource(FunctionResourceType.fromString(resourceType), uri.getUri()) + } + new CatalogFunction(name, hf.getClassName, resources) + } + + override def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction] = { + try { + Option(hive.getFunction(db, name)).map(fromHiveFunction) + } catch { + case CausedBy(ex: NoSuchObjectException) if ex.getMessage.contains(name) => + None + } + } + + override def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] = { + hive.getFunctions(db, pattern).asScala + } + /** * Converts catalyst expression to the format that Hive's getPartitionsByFilter() expects, i.e. * a string that represents partition predicates like "str_key=\"value\" and int_key=1 ...". diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index a6a5ab3988fc9..57e8db7e88fb5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -17,21 +17,27 @@ package org.apache.spark.sql.hive.client -import java.io.File +import java.io.{ByteArrayOutputStream, File, PrintStream} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe +import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.util.VersionInfo import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.types.IntegerType import org.apache.spark.tags.ExtendedHiveTest -import org.apache.spark.util.Utils +import org.apache.spark.util.{MutableURLClassLoader, Utils} /** * A simple set of tests that call the methods of a [[HiveClient]], loading different version @@ -97,12 +103,6 @@ class VersionsSuite extends SparkFunSuite with Logging { private val emptyDir = Utils.createTempDir().getCanonicalPath - private def partSpec = { - val hashMap = new java.util.LinkedHashMap[String, String] - hashMap.put("key", "1") - hashMap - } - // Its actually pretty easy to mess things up and have all of your tests "pass" by accidentally // connecting to an auto-populated, in-process metastore. Let's make sure we are getting the // versions right by forcing a known compatibility failure. @@ -122,7 +122,7 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'") } - private val versions = Seq("12", "13", "14", "1.0.0", "1.1.0", "1.2.0") + private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2") private var client: HiveClient = null @@ -130,110 +130,402 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: create client") { client = null System.gc() // Hack to avoid SEGV on some JVM versions. + val hadoopConf = new Configuration(); + hadoopConf.set("test", "success") client = IsolatedClientLoader.forVersion( hiveMetastoreVersion = version, hadoopVersion = VersionInfo.getVersion, sparkConf = sparkConf, - hadoopConf = new Configuration(), + hadoopConf, config = buildConf(), ivyPath = ivyPath).createClient() } + def table(database: String, tableName: String): CatalogTable = { + CatalogTable( + identifier = TableIdentifier(tableName, Some(database)), + tableType = CatalogTableType.MANAGED, + schema = Seq(CatalogColumn("key", "int")), + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = Some(classOf[TextInputFormat].getName), + outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName), + serde = Some(classOf[LazySimpleSerDe].getName()), + compressed = false, + serdeProperties = Map.empty + )) + } + + /////////////////////////////////////////////////////////////////////////// + // Database related API + /////////////////////////////////////////////////////////////////////////// + + val tempDatabasePath = Utils.createTempDir().getCanonicalPath + test(s"$version: createDatabase") { - val db = CatalogDatabase("default", "desc", "loc", Map()) - client.createDatabase(db, ignoreIfExists = true) + val defaultDB = CatalogDatabase("default", "desc", "loc", Map()) + client.createDatabase(defaultDB, ignoreIfExists = true) + val tempDB = CatalogDatabase( + "temporary", description = "test create", tempDatabasePath, Map()) + client.createDatabase(tempDB, ignoreIfExists = true) + } + + test(s"$version: setCurrentDatabase") { + client.setCurrentDatabase("default") + } + + test(s"$version: getDatabase") { + // No exception should be thrown + client.getDatabase("default") + } + + test(s"$version: getDatabaseOption") { + assert(client.getDatabaseOption("default").isDefined) + assert(client.getDatabaseOption("nonexist") == None) } + test(s"$version: listDatabases") { + assert(client.listDatabases("defau.*") == Seq("default")) + } + + test(s"$version: alterDatabase") { + val database = client.getDatabase("temporary").copy(properties = Map("flag" -> "true")) + client.alterDatabase(database) + assert(client.getDatabase("temporary").properties.contains("flag")) + } + + test(s"$version: dropDatabase") { + assert(client.getDatabaseOption("temporary").isDefined) + client.dropDatabase("temporary", ignoreIfNotExists = false, cascade = true) + assert(client.getDatabaseOption("temporary").isEmpty) + } + + /////////////////////////////////////////////////////////////////////////// + // Table related API + /////////////////////////////////////////////////////////////////////////// + test(s"$version: createTable") { - val table = - CatalogTable( - identifier = TableIdentifier("src", Some("default")), - tableType = CatalogTableType.MANAGED, - schema = Seq(CatalogColumn("key", "int")), - storage = CatalogStorageFormat( - locationUri = None, - inputFormat = Some(classOf[org.apache.hadoop.mapred.TextInputFormat].getName), - outputFormat = Some( - classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName), - serde = Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName()), - compressed = false, - serdeProperties = Map.empty - )) - - client.createTable(table, ignoreIfExists = false) + client.createTable(table("default", tableName = "src"), ignoreIfExists = false) + client.createTable(table("default", "temporary"), ignoreIfExists = false) + } + + test(s"$version: loadTable") { + client.loadTable( + emptyDir, + tableName = "src", + replace = false, + holdDDLTime = false) } test(s"$version: getTable") { + // No exception should be thrown client.getTable("default", "src") } - test(s"$version: listTables") { - assert(client.listTables("default") === Seq("src")) + test(s"$version: getTableOption") { + assert(client.getTableOption("default", "src").isDefined) } - test(s"$version: getDatabase") { - client.getDatabase("default") + test(s"$version: alterTable(table: CatalogTable)") { + val newTable = client.getTable("default", "src").copy(properties = Map("changed" -> "")) + client.alterTable(newTable) + assert(client.getTable("default", "src").properties.contains("changed")) } - test(s"$version: alterTable") { - client.alterTable(client.getTable("default", "src")) + test(s"$version: alterTable(tableName: String, table: CatalogTable)") { + val newTable = client.getTable("default", "src").copy(properties = Map("changedAgain" -> "")) + client.alterTable("src", newTable) + assert(client.getTable("default", "src").properties.contains("changedAgain")) } - test(s"$version: set command") { - client.runSqlHive("SET spark.sql.test.key=1") + test(s"$version: listTables(database)") { + assert(client.listTables("default") === Seq("src", "temporary")) + } + + test(s"$version: listTables(database, pattern)") { + assert(client.listTables("default", pattern = "src") === Seq("src")) + assert(client.listTables("default", pattern = "nonexist").isEmpty) + } + + test(s"$version: dropTable") { + client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false) + assert(client.listTables("default") === Seq("src")) } - test(s"$version: create partitioned table DDL") { - client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key INT)") - client.runSqlHive("ALTER TABLE src_part ADD PARTITION (key = '1')") + /////////////////////////////////////////////////////////////////////////// + // Partition related API + /////////////////////////////////////////////////////////////////////////// + + val storageFormat = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + compressed = false, + serdeProperties = Map.empty) + + test(s"$version: sql create partitioned table") { + client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)") } - test(s"$version: getPartitions") { - client.getPartitions(client.getTable("default", "src_part")) + test(s"$version: createPartitions") { + val partition1 = CatalogTablePartition(Map("key1" -> "1", "key2" -> "1"), storageFormat) + val partition2 = CatalogTablePartition(Map("key1" -> "1", "key2" -> "2"), storageFormat) + client.createPartitions( + "default", "src_part", Seq(partition1, partition2), ignoreIfExists = true) + } + + test(s"$version: getPartitions(catalogTable)") { + assert(2 == client.getPartitions(client.getTable("default", "src_part")).size) } test(s"$version: getPartitionsByFilter") { - client.getPartitionsByFilter(client.getTable("default", "src_part"), Seq(EqualTo( - AttributeReference("key", IntegerType, false)(NamedExpression.newExprId), - Literal(1)))) + // Only one partition [1, 1] for key2 == 1 + val result = client.getPartitionsByFilter(client.getTable("default", "src_part"), + Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1)))) + + // Hive 0.12 doesn't support getPartitionsByFilter, it ignores the filter condition. + if (version != "0.12") { + assert(result.size == 1) + } + } + + test(s"$version: getPartition") { + // No exception should be thrown + client.getPartition("default", "src_part", Map("key1" -> "1", "key2" -> "2")) + } + + test(s"$version: getPartitionOption(db: String, table: String, spec: TablePartitionSpec)") { + val partition = client.getPartitionOption( + "default", "src_part", Map("key1" -> "1", "key2" -> "2")) + assert(partition.isDefined) + } + + test(s"$version: getPartitionOption(table: CatalogTable, spec: TablePartitionSpec)") { + val partition = client.getPartitionOption( + client.getTable("default", "src_part"), Map("key1" -> "1", "key2" -> "2")) + assert(partition.isDefined) + } + + test(s"$version: getPartitions(db: String, table: String)") { + assert(2 == client.getPartitions("default", "src_part", None).size) } test(s"$version: loadPartition") { + val partSpec = new java.util.LinkedHashMap[String, String] + partSpec.put("key1", "1") + partSpec.put("key2", "2") + client.loadPartition( emptyDir, "default.src_part", partSpec, - false, - false, - false, - false) - } - - test(s"$version: loadTable") { - client.loadTable( - emptyDir, - "src", - false, - false) + replace = false, + holdDDLTime = false, + inheritTableSpecs = false, + isSkewedStoreAsSubdir = false) } test(s"$version: loadDynamicPartitions") { + val partSpec = new java.util.LinkedHashMap[String, String] + partSpec.put("key1", "1") + partSpec.put("key2", "") // Dynamic partition + client.loadDynamicPartitions( emptyDir, "default.src_part", partSpec, - false, - 1, + replace = false, + numDP = 1, false, false) } - test(s"$version: create index and reset") { + test(s"$version: renamePartitions") { + val oldSpec = Map("key1" -> "1", "key2" -> "1") + val newSpec = Map("key1" -> "1", "key2" -> "3") + client.renamePartitions("default", "src_part", Seq(oldSpec), Seq(newSpec)) + + // Checks the existence of the new partition (key1 = 1, key2 = 3) + assert(client.getPartitionOption("default", "src_part", newSpec).isDefined) + } + + test(s"$version: alterPartitions") { + val spec = Map("key1" -> "1", "key2" -> "2") + val newLocation = Utils.createTempDir().getPath() + val storage = storageFormat.copy(locationUri = Some(newLocation)) + val partition = CatalogTablePartition(spec, storage) + client.alterPartitions("default", "src_part", Seq(partition)) + assert(client.getPartition("default", "src_part", spec) + .storage.locationUri == Some(newLocation)) + } + + test(s"$version: dropPartitions") { + val spec = Map("key1" -> "1", "key2" -> "3") + client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true) + assert(client.getPartitionOption("default", "src_part", spec).isEmpty) + } + + /////////////////////////////////////////////////////////////////////////// + // Function related API + /////////////////////////////////////////////////////////////////////////// + + def function(name: String, className: String): CatalogFunction = { + CatalogFunction( + FunctionIdentifier(name, Some("default")), className, Seq.empty[FunctionResource]) + } + + test(s"$version: createFunction") { + val functionClass = "org.apache.spark.MyFunc1" + if (version == "0.12") { + // Hive 0.12 doesn't support creating permanent functions + intercept[AnalysisException] { + client.createFunction("default", function("func1", functionClass)) + } + } else { + client.createFunction("default", function("func1", functionClass)) + } + } + + test(s"$version: functionExists") { + if (version == "0.12") { + // Hive 0.12 doesn't allow customized permanent functions + assert(client.functionExists("default", "func1") == false) + } else { + assert(client.functionExists("default", "func1") == true) + } + } + + test(s"$version: renameFunction") { + if (version == "0.12") { + // Hive 0.12 doesn't allow customized permanent functions + intercept[NoSuchPermanentFunctionException] { + client.renameFunction("default", "func1", "func2") + } + } else { + client.renameFunction("default", "func1", "func2") + assert(client.functionExists("default", "func2") == true) + } + } + + test(s"$version: alterFunction") { + val functionClass = "org.apache.spark.MyFunc2" + if (version == "0.12") { + // Hive 0.12 doesn't allow customized permanent functions + intercept[NoSuchPermanentFunctionException] { + client.alterFunction("default", function("func2", functionClass)) + } + } else { + client.alterFunction("default", function("func2", functionClass)) + } + } + + test(s"$version: getFunction") { + if (version == "0.12") { + // Hive 0.12 doesn't allow customized permanent functions + intercept[NoSuchPermanentFunctionException] { + client.getFunction("default", "func2") + } + } else { + // No exception should be thrown + val func = client.getFunction("default", "func2") + assert(func.className == "org.apache.spark.MyFunc2") + } + } + + test(s"$version: getFunctionOption") { + if (version == "0.12") { + // Hive 0.12 doesn't allow customized permanent functions + assert(client.getFunctionOption("default", "func2").isEmpty) + } else { + assert(client.getFunctionOption("default", "func2").isDefined) + } + } + + test(s"$version: listFunctions") { + if (version == "0.12") { + // Hive 0.12 doesn't allow customized permanent functions + assert(client.listFunctions("default", "fun.*").isEmpty) + } else { + assert(client.listFunctions("default", "fun.*").size == 1) + } + } + + test(s"$version: dropFunction") { + if (version == "0.12") { + // Hive 0.12 doesn't support creating permanent functions + intercept[NoSuchPermanentFunctionException] { + client.dropFunction("default", "func2") + } + } else { + // No exception should be thrown + client.dropFunction("default", "func2") + assert(client.listFunctions("default", "fun.*").size == 0) + } + } + + /////////////////////////////////////////////////////////////////////////// + // SQL related API + /////////////////////////////////////////////////////////////////////////// + + test(s"$version: sql set command") { + client.runSqlHive("SET spark.sql.test.key=1") + } + + test(s"$version: sql create index and reset") { client.runSqlHive("CREATE TABLE indexed_table (key INT)") client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " + "as 'COMPACT' WITH DEFERRED REBUILD") + } + + /////////////////////////////////////////////////////////////////////////// + // Miscellaneous API + /////////////////////////////////////////////////////////////////////////// + + test(s"$version: version") { + assert(client.version.fullVersion.startsWith(version)) + } + + test(s"$version: getConf") { + assert("success" === client.getConf("test", null)) + } + + test(s"$version: setOut") { + client.setOut(new PrintStream(new ByteArrayOutputStream())) + } + + test(s"$version: setInfo") { + client.setInfo(new PrintStream(new ByteArrayOutputStream())) + } + + test(s"$version: setError") { + client.setError(new PrintStream(new ByteArrayOutputStream())) + } + + test(s"$version: newSession") { + val newClient = client.newSession() + assert(newClient != null) + } + + test(s"$version: withHiveState and addJar") { + val newClassPath = "." + client.addJar(newClassPath) + client.withHiveState { + // No exception should be thrown. + // withHiveState changes the classloader to MutableURLClassLoader + val classLoader = Thread.currentThread().getContextClassLoader + .asInstanceOf[MutableURLClassLoader] + + val urls = classLoader.getURLs() + urls.contains(new File(newClassPath).toURI.toURL) + } + } + + test(s"$version: reset") { + // Clears all database, tables, functions... client.reset() + assert(client.listTables("default").isEmpty) } } }