Skip to content

Commit

Permalink
[SPARK-6749] [SQL] Make metastore client robust to underlying socket …
Browse files Browse the repository at this point in the history
…connection loss

This works around a bug in the underlying RetryingMetaStoreClient (HIVE-10384) by refreshing the metastore client on thrift exceptions. We attempt to emulate the proper hive behavior by retrying only as configured by hiveconf.

Author: Eric Liang <ekl@databricks.com>

Closes #6912 from ericl/spark-6749 and squashes the following commits:

2d54b55 [Eric Liang] use conf from state
0e3a74e [Eric Liang] use shim properly
980b3e5 [Eric Liang] Fix conf parsing hive 0.14 conf.
92459b6 [Eric Liang] Work around RetryingMetaStoreClient bug
  • Loading branch information
ericl authored and yhuai committed Jun 24, 2015
1 parent a458efc commit 50c3a86
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client
import java.io.{BufferedReader, InputStreamReader, File, PrintStream}
import java.net.URI
import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet}
import javax.annotation.concurrent.GuardedBy

import scala.collection.JavaConversions._
import scala.language.reflectiveCalls
Expand Down Expand Up @@ -136,12 +137,62 @@ private[hive] class ClientWrapper(

// TODO: should be a def?s
// When we create this val client, the HiveConf of it (conf) is the one associated with state.
private val client = Hive.get(conf)
@GuardedBy("this")
private var client = Hive.get(conf)

// We use hive's conf for compatibility.
private val retryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES)
private val retryDelayMillis = shim.getMetastoreClientConnectRetryDelayMillis(conf)

/**
* Runs `f` with multiple retries in case the hive metastore is temporarily unreachable.
*/
private def retryLocked[A](f: => A): A = synchronized {
// Hive sometimes retries internally, so set a deadline to avoid compounding delays.
val deadline = System.nanoTime + (retryLimit * retryDelayMillis * 1e6).toLong
var numTries = 0
var caughtException: Exception = null
do {
numTries += 1
try {
return f
} catch {
case e: Exception if causedByThrift(e) =>
caughtException = e
logWarning(
"HiveClientWrapper got thrift exception, destroying client and retrying " +
s"(${retryLimit - numTries} tries remaining)", e)
Thread.sleep(retryDelayMillis)
try {
client = Hive.get(state.getConf, true)
} catch {
case e: Exception if causedByThrift(e) =>
logWarning("Failed to refresh hive client, will retry.", e)
}
}
} while (numTries <= retryLimit && System.nanoTime < deadline)
if (System.nanoTime > deadline) {
logWarning("Deadline exceeded")
}
throw caughtException
}

private def causedByThrift(e: Throwable): Boolean = {
var target = e
while (target != null) {
val msg = target.getMessage()
if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) {
return true
}
target = target.getCause()
}
false
}

/**
* Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
*/
private def withHiveState[A](f: => A): A = synchronized {
private def withHiveState[A](f: => A): A = retryLocked {
val original = Thread.currentThread().getContextClassLoader
// Set the thread local metastore client to the client associated with this ClientWrapper.
Hive.set(client)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.lang.{Boolean => JBoolean, Integer => JInteger}
import java.lang.reflect.{Method, Modifier}
import java.net.URI
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -64,6 +65,8 @@ private[client] sealed abstract class Shim {

def getDriverResults(driver: Driver): Seq[String]

def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long

def loadPartition(
hive: Hive,
loadPath: Path,
Expand Down Expand Up @@ -192,6 +195,10 @@ private[client] class Shim_v0_12 extends Shim {
res.toSeq
}

override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 1000
}

override def loadPartition(
hive: Hive,
loadPath: Path,
Expand Down Expand Up @@ -321,6 +328,12 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val getTimeVarMethod =
findMethod(
classOf[HiveConf],
"getTimeVar",
classOf[HiveConf.ConfVars],
classOf[TimeUnit])

override def loadPartition(
hive: Hive,
Expand Down Expand Up @@ -359,4 +372,10 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE)
}

override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
getTimeVarMethod.invoke(
conf,
HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY,
TimeUnit.MILLISECONDS).asInstanceOf[Long]
}
}

0 comments on commit 50c3a86

Please sign in to comment.