Skip to content

Commit

Permalink
[KYUUBI-198] Failover mode ha service disconnects from zk after lost …
Browse files Browse the repository at this point in the history
…leader ship (#199)

* fxi #198 Failover mode ha service disconnects from zk after losts leader ship

* reset leader latch

* add ut

* add some ut back
  • Loading branch information
yaooqinn committed Jun 20, 2019
1 parent 26663c5 commit b6591d3
Show file tree
Hide file tree
Showing 16 changed files with 262 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,31 @@ import yaooqinn.kyuubi.server.KyuubiServer
private[kyuubi] class FailoverService(name: String, server: KyuubiServer)
extends HighAvailableService(name, server) with LeaderLatchListener with Logging {

private var leaderLatch: LeaderLatch = _
@volatile private var leaderLatch: LeaderLatch = _
@volatile private var zkServiceStarted = false

private def closeLeaderLatch(): Unit = Option(leaderLatch).foreach { latch =>
try {
latch.close()
info("Close Zookeeper leader latch")
} catch {
case e: Exception => error("Error close leader latch", e)
}
}

private def startLeaderLatch(): Unit = {
info("Start Zookeeper leader latch")
leaderLatch = new LeaderLatch(zkClient, serviceRootNamespace + "-latch")
leaderLatch.addListener(this)
leaderLatch.start()
}

def this(server: KyuubiServer) = {
this(classOf[FailoverService].getSimpleName, server)
}

override def start(): Unit = {
leaderLatch = new LeaderLatch(zkClient, serviceRootNamespace + "-latch")
leaderLatch.addListener(this)
leaderLatch.start()
startLeaderLatch()
super.start()
}

Expand Down Expand Up @@ -84,7 +90,9 @@ private[kyuubi] class FailoverService(name: String, server: KyuubiServer)
}
}

override def reset(): Unit = {
override private[ha] def reset(): Unit = {
info("Reset Zookeeper leader latch")
closeLeaderLatch()
startLeaderLatch()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,17 @@ private[kyuubi] abstract class HighAvailableService(name: String, server: Kyuubi

import HighAvailableService._

protected var zkClient: CuratorFramework = _
protected var serviceRootNamespace: String = _
protected final var zkClient: CuratorFramework = _
protected final var serviceRootNamespace: String = _

private var serviceNode: PersistentEphemeralNode = _
private var servicePath: String = _

/**
* reset current service
* Visible for testing
*/
protected def reset(): Unit
private[ha] def reset(): Unit

/**
* Expose Kyuubi service instance uri in [[HA_ZOOKEEPER_NAMESPACE]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ private[kyuubi] class LoadBalanceService private(name: String, server: KyuubiSer
super.stop()
}

override def reset(): Unit = {
override private[ha] def reset(): Unit = {
server.deregisterWithZK()
// If there are no more active client sessions, stop the server
if (server.beService.getSessionManager.getOpenSessionCount == 0) {
warn("This Kyuubi instance has been removed from the list of " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.StructType

import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.auth.KyuubiAuthFactory
import yaooqinn.kyuubi.author.AuthzHelper
import yaooqinn.kyuubi.cli.{FetchOrientation, FetchType, GetInfoType, GetInfoValue}
Expand All @@ -34,18 +34,18 @@ import yaooqinn.kyuubi.session.{SessionHandle, SessionManager}
* [[BackendService]] holds an instance of [[SessionManager]] which manages
* `KyuubiSession` for execution
*/
private[server] class BackendService private(name: String)
private[server] class BackendService private(name: String, server: KyuubiServer)
extends CompositeService(name) with Logging {

private[this] var sessionManager: SessionManager = _
def getSessionManager: SessionManager = sessionManager

def this() = this(classOf[BackendService].getSimpleName)
def this(server: KyuubiServer) = this(classOf[BackendService].getSimpleName, server)

override def init(conf: SparkConf): Unit = synchronized {
this.conf = conf
AuthzHelper.init(conf)
sessionManager = new SessionManager()
sessionManager = new SessionManager(server)
addService(sessionManager)
super.init(conf)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class FrontendService private(name: String, beService: BackendService, OOMHook:

@throws[KyuubiSQLException]
private def getProxyUser(sessionConf: Map[String, String],
ipAddress: String, realUser: String): String = {
ipAddress: String, realUser: String): String = {
Option(sessionConf).flatMap(_.get(KyuubiAuthFactory.HS2_PROXY_USER)) match {
case None => realUser
case Some(_) if !conf.get(FRONTEND_ALLOW_USER_SUBSTITUTION).toBoolean =>
Expand All @@ -221,19 +221,7 @@ class FrontendService private(name: String, beService: BackendService, OOMHook:
}

private def getMinVersion(versions: TProtocolVersion*): TProtocolVersion = {
val values = TProtocolVersion.values
var current = values(values.length - 1).getValue
for (version <- versions) {
if (current > version.getValue) {
current = version.getValue
}
}
for (version <- values) {
if (version.getValue == current) {
return version
}
}
throw new IllegalArgumentException("never")
versions.minBy(_.getValue)
}

@throws[KyuubiSQLException]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ private[kyuubi] class KyuubiServer private(name: String)

private val started = new AtomicBoolean(false)

private var _isDeregisterWithZK = false
def deregisterWithZK(): Unit = _isDeregisterWithZK = true
def isDeregisterWithZk: Boolean = _isDeregisterWithZK

def this() = this(classOf[KyuubiServer].getSimpleName)

override def init(conf: SparkConf): Unit = synchronized {
Expand All @@ -52,7 +56,7 @@ private[kyuubi] class KyuubiServer private(name: String)
override def run(): Unit = KyuubiServer.this.stop()
}

_beService = new BackendService()
_beService = new BackendService(this)
_feService = new FrontendService(_beService, OOMHook)

addService(_beService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import yaooqinn.kyuubi.utils.NamedThreadFactory
* A SessionManager for managing [[KyuubiSession]]s
*/
private[kyuubi] class SessionManager private(
name: String) extends CompositeService(name) with Logging {
name: String, server: KyuubiServer) extends CompositeService(name) with Logging {
private val operationManager = new OperationManager()
private val cacheManager = new SparkSessionCacheManager()
private val handleToSession = new ConcurrentHashMap[SessionHandle, KyuubiSession]
Expand All @@ -54,7 +54,7 @@ private[kyuubi] class SessionManager private(
private var checkOperation: Boolean = false
private var shutdown: Boolean = false

def this() = this(classOf[SessionManager].getSimpleName)
def this(server: KyuubiServer) = this(classOf[SessionManager].getSimpleName, server)

private def createExecPool(): Unit = {
val poolSize = conf.get(ASYNC_EXEC_THREADS).toInt
Expand Down Expand Up @@ -280,6 +280,7 @@ private[kyuubi] class SessionManager private(
sessionHandle.getSessionId.toString,
kyuubiSession.getUserName)
}
info(s"Session [$sessionHandle] opened, current opening sessions: $getOpenSessionCount")

sessionHandle
}
Expand All @@ -304,7 +305,23 @@ private[kyuubi] class SessionManager private(
_.onSessionClosed(sessionHandle.getSessionId.toString)
}
cacheManager.decrease(sessionUser)
session.close()
info(s"Session [$sessionHandle] closed, current opening sessions: $getOpenSessionCount")
try {
session.close()
} finally {
if (server.isDeregisterWithZk) {
info("This instance of KyuubiServer is offline from HA service discovery layer previously" +
", the last client is disconnected, shut down now")
if (getOpenSessionCount == 0) {
new Thread("StopServerAfterSessionCleared") {
override def run(): Unit = {
info("All Sessions closed, will invoke Kyuubi server stop")
server.stop()
}
}.start()
}
}
}
}

def getOperationMgr: OperationManager = operationManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package yaooqinn.kyuubi.ha

import java.io.IOException

import scala.collection.JavaConverters._

import org.apache.curator.framework.recipes.leader.LeaderLatchListener
import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkFunSuite}
import org.scalatest.{BeforeAndAfterEach, Matchers}
Expand Down Expand Up @@ -101,4 +103,24 @@ class FailoverServiceSuite extends SparkFunSuite
haService.start()
haService.stop()
}

test("gain leader ship") {
server.init(conf)
haService.init(conf)
haService.start()
Thread.sleep(10000)
val list = zooKeeperClient.getChildren.forPath("/").asScala.toList
assert(list.size === 3)
assert(list.contains("kyuubiserver"))
assert(list.contains("kyuubiserver-latch"))
val l2 = zooKeeperClient.getChildren.forPath("/kyuubiserver").asScala.toList
assert(l2.size === 1)
val l3 = zooKeeperClient.getChildren.forPath("/kyuubiserver-latch").asScala.toList
assert(l3.size === 1)
haService.reset()
Thread.sleep(5000)
val l4 = zooKeeperClient.getChildren.forPath("/kyuubiserver-latch").asScala.toList
assert(l4.size === 1)
assert(l3.head !== l4.head)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class HighAvailableServiceSuite extends SparkFunSuite
override def beforeEach(): Unit = {
server = new KyuubiServer()
haService = new HighAvailableService("test", server) {
override protected def reset(): Unit = {}
override def reset(): Unit = {}
}
super.beforeEach()
}
Expand Down Expand Up @@ -138,7 +138,7 @@ class HighAvailableServiceSuite extends SparkFunSuite
test("deregister watcher") {

val ha = new HighAvailableService("ha", server) { self =>
override protected def reset(): Unit = {}
override def reset(): Unit = {}
}

import Watcher.Event.EventType._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package yaooqinn.kyuubi.ha

import com.google.common.io.Files
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.test.TestingServer
import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite}
import org.apache.spark.KyuubiConf._
Expand All @@ -30,18 +32,25 @@ trait ZookeeperFunSuite extends SparkFunSuite{
KyuubiSparkUtil.setupCommonConfig(conf)
conf.set(KyuubiConf.FRONTEND_BIND_PORT.key, "0")

var zooKeeperClient: CuratorFramework = _

override def beforeAll(): Unit = {
zkServer = new TestingServer(2181, Files.createTempDir(), true)
connectString = zkServer.getConnectString
conf.set(HA_ZOOKEEPER_QUORUM.key, connectString)
conf.set(HA_ZOOKEEPER_CONNECTION_BASESLEEPTIME.key, "100ms")
conf.set(HA_ZOOKEEPER_SESSION_TIMEOUT.key, "15s")
conf.set(HA_ZOOKEEPER_CONNECTION_MAX_RETRIES.key, "0")
zooKeeperClient = CuratorFrameworkFactory.builder().connectString(connectString)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build()
zooKeeperClient.start()
super.beforeAll()
}

override def afterAll(): Unit = {
zkServer.stop()
Option(zooKeeperClient).foreach(_.close())
Option(zkServer).foreach(_.stop())
System.clearProperty(HA_ZOOKEEPER_QUORUM.key)
System.clearProperty(HA_ENABLED.key)
super.afterAll()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.File
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli.thrift.{TFetchOrientation, TProtocolVersion}
import org.apache.spark._
import org.apache.spark.KyuubiConf.LOGGING_OPERATION_LOG_DIR
import org.apache.spark.KyuubiConf.{FRONTEND_BIND_PORT, LOGGING_OPERATION_LOG_DIR}
import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.execution.command.CreateFunctionCommand
Expand All @@ -34,37 +34,41 @@ import yaooqinn.kyuubi.KyuubiSQLException
import yaooqinn.kyuubi.cli.FetchOrientation
import yaooqinn.kyuubi.cli.FetchOrientation.{FETCH_FIRST, FETCH_NEXT}
import yaooqinn.kyuubi.operation.statement.ExecuteStatementInClientMode
import yaooqinn.kyuubi.server.KyuubiServer
import yaooqinn.kyuubi.session.{KyuubiSession, SessionHandle, SessionManager}
import yaooqinn.kyuubi.utils.ReflectUtils

class ExecuteStatementInClientModeSuite extends SparkFunSuite with MockitoSugar {

private var server: KyuubiServer = _
private val conf = new SparkConf()
KyuubiSparkUtil.setupCommonConfig(conf)
conf.remove(KyuubiSparkUtil.CATALOG_IMPL)
conf.setMaster("local")
conf.setMaster("local").set(FRONTEND_BIND_PORT.key, "0")

private val proto = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8
private val user = UserGroupInformation.getCurrentUser
private val userName = user.getShortUserName
private val passwd = ""

protected val statement = "show tables"
protected val sessionMgr = new SessionManager()
protected var sessionMgr: SessionManager = _
protected var session: KyuubiSession = _
protected var sessHandle: SessionHandle = _

override protected def beforeAll(): Unit = {
sessionMgr.init(conf)
sessionMgr.start()
server = new KyuubiServer()
server.init(conf)
server.start()
sessionMgr = server.beService.getSessionManager
sessHandle = sessionMgr.openSession(proto, userName, passwd, "", Map.empty, false)
session = sessionMgr.getSession(sessHandle)
super.beforeAll()
}

override protected def afterAll(): Unit = {
sessionMgr.closeSession(sessHandle)
sessionMgr.stop()
Option(server).foreach(_.stop())
super.afterAll()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.RejectedExecutionException

import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}
import org.apache.spark.KyuubiConf.FRONTEND_BIND_PORT

import yaooqinn.kyuubi.KyuubiSQLException
import yaooqinn.kyuubi.cli.GetInfoType
Expand All @@ -29,26 +30,28 @@ import yaooqinn.kyuubi.session.SessionHandle

class BackendServiceSuite extends SparkFunSuite {

private var server: KyuubiServer = _
private var backendService: BackendService = _
private val user = KyuubiSparkUtil.getCurrentUserName
private val conf = new SparkConf(loadDefaults = true).setAppName("be test")
KyuubiSparkUtil.setupCommonConfig(conf)
conf.remove(KyuubiSparkUtil.CATALOG_IMPL)
conf.setMaster("local")
conf.setMaster("local").set(FRONTEND_BIND_PORT.key, "0")
private var sessionHandle: SessionHandle = _
private val showTables = "show tables"
private val ip = "localhost"
private val proto = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8

override protected def beforeAll(): Unit = {
backendService = new BackendService()
backendService.init(conf)
backendService.start()
server = new KyuubiServer()
server.init(conf)
server.start()
backendService = server.beService
sessionHandle = backendService.openSession(proto, user, "", ip, Map.empty)
}

protected override def afterAll(): Unit = {
backendService.stop()
Option(server).foreach(_.stop())
}

test("open session") {
Expand Down

0 comments on commit b6591d3

Please sign in to comment.