Skip to content

Commit

Permalink
[SPARK-35321][SQL] Don't register Hive permanent functions when creat…
Browse files Browse the repository at this point in the history
…ing Hive client

### What changes were proposed in this pull request?

Instantiate a new Hive client through `Hive.getWithFastCheck(conf, false)` instead of `Hive.get(conf)`.

### Why are the changes needed?

[HIVE-10319](https://issues.apache.org/jira/browse/HIVE-10319) introduced a new API `get_all_functions` which is only supported in Hive 1.3.0/2.0.0 and up. As result, when Spark 3.x talks to a HMS service of version 1.2 or lower, the following error will occur:
```
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.thrift.TApplicationException: Invalid method name: 'get_all_functions'
        at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3897)
        at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:248)
        at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:231)
        ... 96 more
Caused by: org.apache.thrift.TApplicationException: Invalid method name: 'get_all_functions'
        at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_all_functions(ThriftHiveMetastore.java:3845)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_all_functions(ThriftHiveMetastore.java:3833)
```

The `get_all_functions` is called only when `doRegisterAllFns` is set to true:
```java
  private Hive(HiveConf c, boolean doRegisterAllFns) throws HiveException {
    conf = c;
    if (doRegisterAllFns) {
      registerAllFunctionsOnce();
    }
  }
```

what this does is to register all Hive permanent functions defined in HMS in Hive's `FunctionRegistry` class, via iterating through results from `get_all_functions`. To Spark, this seems unnecessary as it loads Hive permanent (not built-in) UDF via directly calling the HMS API, i.e., `get_function`. The `FunctionRegistry` is only used in loading Hive's built-in function that is not supported by Spark. At this time, it only applies to `histogram_numeric`.

### Does this PR introduce _any_ user-facing change?

Yes with this fix Spark now should be able to talk to HMS server with Hive 1.2.x and lower (with HIVE-24608 too)

### How was this patch tested?

Manually started a HMS server of Hive version 1.2.2, with patched Hive 2.3.8 using HIVE-24608. Without the PR it failed with the above exception. With the PR the error disappeared and I can successfully perform common operations such as create table, create database, list tables, etc.

Closes #32446 from sunchao/SPARK-35321.

Authored-by: Chao Sun <sunchao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
sunchao authored and dongjoon-hyun committed May 7, 2021
1 parent 33fbf56 commit b4ec9e2
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ private[hive] class HiveClientImpl(
if (clientLoader.cachedHive != null) {
clientLoader.cachedHive.asInstanceOf[Hive]
} else {
val c = Hive.get(conf)
val c = shim.getHive(conf)
clientLoader.cachedHive = c
c
}
Expand Down Expand Up @@ -303,7 +303,7 @@ private[hive] class HiveClientImpl(
// with the side-effect of Hive.get(conf) to avoid using out-of-date HiveConf.
// See discussion in https://github.com/apache/spark/pull/16826/files#r104606859
// for more details.
Hive.get(conf)
shim.getHive(conf)
// setCurrentSessionState will use the classLoader associated
// with the HiveConf in `state` to override the context class loader of the current
// thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ private[client] sealed abstract class Shim {

def getMSC(hive: Hive): IMetaStoreClient

def getHive(hiveConf: HiveConf): Hive

protected def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
klass.getMethod(name, args: _*)
}
Expand All @@ -199,6 +201,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
getMSCMethod.invoke(hive).asInstanceOf[IMetaStoreClient]
}

override def getHive(hiveConf: HiveConf): Hive = Hive.get(hiveConf)

private lazy val startMethod =
findStaticMethod(
classOf[SessionState],
Expand Down Expand Up @@ -1316,6 +1320,13 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
override def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit = {
alterPartitionsMethod.invoke(hive, tableName, newParts, environmentContextInAlterTable)
}

// HIVE-10319 introduced a new HMS thrift API `get_all_functions` which is used by
// `Hive.get` since version 2.1.0, when it loads all Hive permanent functions during
// initialization. This breaks compatibility with HMS server of lower versions.
// To mitigate here we use `Hive.getWithFastCheck` instead which skips loading the permanent
// functions and therefore avoids calling `get_all_functions`.
override def getHive(hiveConf: HiveConf): Hive = Hive.getWithFastCheck(hiveConf, false)
}

private[client] class Shim_v2_2 extends Shim_v2_1
Expand Down

0 comments on commit b4ec9e2

Please sign in to comment.