Skip to content

Commit

Permalink
[SPARK-27970][SQL] Support Hive 3.0 metastore
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

It seems that some users are using Hive 3.0.0. This pr makes it support Hive 3.0 metastore.

## How was this patch tested?

unit tests

Closes #24688 from wangyum/SPARK-26145.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
wangyum authored and gatorsmile committed Jun 7, 2019
1 parent 9c4eb99 commit 2926890
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 18 deletions.
2 changes: 1 addition & 1 deletion docs/sql-data-sources-hive-tables.md
Expand Up @@ -130,7 +130,7 @@ The following options can be used to configure the version of Hive that is used
<td><code>1.2.1</code></td>
<td>
Version of the Hive metastore. Available
options are <code>0.12.0</code> through <code>2.3.5</code> and <code>3.1.0</code> through <code>3.1.1</code>.
options are <code>0.12.0</code> through <code>2.3.5</code> and <code>3.0.0</code> through <code>3.1.1</code>.
</td>
</tr>
<tr>
Expand Down
2 changes: 1 addition & 1 deletion docs/sql-migration-guide-hive-compatibility.md
Expand Up @@ -25,7 +25,7 @@ license: |
Spark SQL is designed to be compatible with the Hive Metastore, SerDes and UDFs.
Currently, Hive SerDes and UDFs are based on Hive 1.2.1,
and Spark SQL can be connected to different versions of Hive Metastore
(from 0.12.0 to 2.3.5 and 3.1.0 to 3.1.1. Also see [Interacting with Different Versions of Hive Metastore](sql-data-sources-hive-tables.html#interacting-with-different-versions-of-hive-metastore)).
(from 0.12.0 to 2.3.5 and 3.0.0 to 3.1.1. Also see [Interacting with Different Versions of Hive Metastore](sql-data-sources-hive-tables.html#interacting-with-different-versions-of-hive-metastore)).

#### Deploying in Existing Hive Warehouses

Expand Down
Expand Up @@ -64,7 +64,7 @@ private[spark] object HiveUtils extends Logging {
val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
"<code>0.12.0</code> through <code>2.3.5</code> and " +
"<code>3.1.0</code> through <code>3.1.1</code>.")
"<code>3.0.0</code> through <code>3.1.1</code>.")
.stringConf
.createWithDefault(builtinHiveVersion)

Expand Down
Expand Up @@ -107,6 +107,7 @@ private[hive] class HiveClientImpl(
case hive.v2_1 => new Shim_v2_1()
case hive.v2_2 => new Shim_v2_2()
case hive.v2_3 => new Shim_v2_3()
case hive.v3_0 => new Shim_v3_0()
case hive.v3_1 => new Shim_v3_1()
}

Expand Down Expand Up @@ -744,7 +745,7 @@ private[hive] class HiveClientImpl(
// Since HIVE-18238(Hive 3.0.0), the Driver.close function's return type changed
// and the CommandProcessorFactory.clean function removed.
driver.getClass.getMethod("close").invoke(driver)
if (version != hive.v3_1) {
if (version != hive.v3_0 && version != hive.v3_1) {
CommandProcessorFactory.clean(conf)
}
}
Expand Down
Expand Up @@ -1181,7 +1181,7 @@ private[client] class Shim_v2_2 extends Shim_v2_1

private[client] class Shim_v2_3 extends Shim_v2_1

private[client] class Shim_v3_1 extends Shim_v2_3 {
private[client] class Shim_v3_0 extends Shim_v2_3 {
// Spark supports only non-ACID operations
protected lazy val isAcidIUDoperation = JBoolean.FALSE

Expand Down Expand Up @@ -1305,3 +1305,5 @@ private[client] class Shim_v3_1 extends Shim_v2_3 {
replace: JBoolean)
}
}

private[client] class Shim_v3_1 extends Shim_v3_0
Expand Up @@ -102,6 +102,7 @@ private[hive] object IsolatedClientLoader extends Logging {
case "2.1" | "2.1.0" | "2.1.1" => hive.v2_1
case "2.2" | "2.2.0" => hive.v2_2
case "2.3" | "2.3.0" | "2.3.1" | "2.3.2" | "2.3.3" | "2.3.4" | "2.3.5" => hive.v2_3
case "3.0" | "3.0.0" => hive.v3_0
case "3.1" | "3.1.0" | "3.1.1" => hive.v3_1
case version =>
throw new UnsupportedOperationException(s"Unsupported Hive Metastore version ($version). " +
Expand Down
Expand Up @@ -95,6 +95,16 @@ package object client {
"org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))

// Since Hive 3.0, HookUtils uses org.apache.logging.log4j.util.Strings
// Since HIVE-14496, Hive.java uses calcite-core
case object v3_0 extends HiveVersion("3.0.0",
extraDeps = Seq("org.apache.logging.log4j:log4j-api:2.10.0",
"org.apache.derby:derby:10.14.1.0"),
exclusions = Seq("org.apache.calcite:calcite-druid",
"org.apache.calcite.avatica:avatica",
"org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))

// Since Hive 3.0, HookUtils uses org.apache.logging.log4j.util.Strings
// Since HIVE-14496, Hive.java uses calcite-core
case object v3_1 extends HiveVersion("3.1.1",
Expand All @@ -105,7 +115,8 @@ package object client {
"org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))

val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_1)
val allSupportedHiveVersions =
Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
}
// scalastyle:on

Expand Down
Expand Up @@ -114,7 +114,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
// be removed by Hive when Hive is trying to empty the table directory.
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_1)
Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)

// Ensure all the supported versions are considered here.
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
Expand Down
Expand Up @@ -23,5 +23,6 @@ import org.apache.spark.SparkFunSuite

private[client] trait HiveClientVersions {
protected val versions =
IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.1")
IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.0",
"3.1")
}
Expand Up @@ -35,12 +35,12 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" ||
version == "3.1") {
version == "3.0" || version == "3.1") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
// Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`.
if (version == "3.1") {
if (version == "3.0" || version == "3.1") {
hadoopConf.set("hive.in.test", "true")
}
HiveClientBuilder.buildClient(
Expand Down
Expand Up @@ -103,7 +103,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
}

private val versions =
Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.1")
Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.0", "3.1")

private var client: HiveClient = null

Expand All @@ -119,11 +119,11 @@ class VersionsSuite extends SparkFunSuite with Logging {
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" ||
version == "3.1") {
version == "3.0" || version == "3.1") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
if (version == "3.1") {
if (version == "3.0" || version == "3.1") {
// Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`.
hadoopConf.set("hive.in.test", "true")
// Since HIVE-17626(Hive 3.0.0), need to set hive.query.reexecution.enabled=false.
Expand Down Expand Up @@ -577,7 +577,7 @@ class VersionsSuite extends SparkFunSuite with Logging {

test(s"$version: sql create index and reset") {
// HIVE-18448 Since Hive 3.0, INDEX is not supported.
if (version != "3.1") {
if (version != "3.0" && version != "3.1") {
client.runSqlHive("CREATE TABLE indexed_table (key INT)")
client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
"as 'COMPACT' WITH DEFERRED REBUILD")
Expand All @@ -586,10 +586,9 @@ class VersionsSuite extends SparkFunSuite with Logging {

test(s"$version: sql read hive materialized view") {
// HIVE-14249 Since Hive 2.3.0, materialized view is supported.
if (version == "2.3" || version == "3.1") {
// Since HIVE-14498(Hive 3.0), Automatic rewriting for materialized view cannot be enabled
// if the materialized view uses non-transactional tables.
val disableRewrite = if (version == "2.3") "" else "DISABLE REWRITE"
if (version == "2.3" || version == "3.0" || version == "3.1") {
// Since HIVE-18394(Hive 3.1), "Create Materialized View" should default to rewritable ones
val disableRewrite = if (version == "2.3" || version == "3.0") "" else "DISABLE REWRITE"
client.runSqlHive("CREATE TABLE materialized_view_tbl (c1 INT)")
client.runSqlHive(
s"CREATE MATERIALIZED VIEW mv1 $disableRewrite AS SELECT * FROM materialized_view_tbl")
Expand Down

0 comments on commit 2926890

Please sign in to comment.