Skip to content

Commit

Permalink
[KYUUBI-189]Add an OOM Hook to shutdown server when OutOfMemoryError …
Browse files Browse the repository at this point in the history
…occurs (#190)

fix #189 

* [KYUUBI-189]Add an OOM Hook to shutdown server when OutOfMemoryError occurs

* add ut

* fit ut

* fit ut

* typo
  • Loading branch information
yaooqinn committed May 25, 2019
1 parent 5ad2235 commit 71eea2b
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ import yaooqinn.kyuubi.operation.OperationHandle
import yaooqinn.kyuubi.schema.SchemaMapper
import yaooqinn.kyuubi.service.{AbstractService, ServiceException, ServiceUtils}
import yaooqinn.kyuubi.session.SessionHandle
import yaooqinn.kyuubi.utils.NamedThreadFactory
import yaooqinn.kyuubi.utils.{NamedThreadFactory, ThreadPoolWithOOMHook}

/**
* [[FrontendService]] keeps compatible with all kinds of Hive JDBC/Thrift Client Connections
*
* It use Hive configurations to configure itself.
*/
private[kyuubi] class FrontendService private(name: String, beService: BackendService)
private[kyuubi]
class FrontendService private(name: String, beService: BackendService, OOMHook: Runnable)
extends AbstractService(name) with TCLIService.Iface with Runnable with Logging {

private var hadoopConf: Configuration = _
Expand All @@ -66,13 +67,18 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe

private var isStarted = false


def this(beService: BackendService) = {
this(classOf[FrontendService].getSimpleName, beService)
def this(beService: BackendService, OOMHook: Runnable) = {
this(classOf[FrontendService].getSimpleName, beService, OOMHook)
currentServerContext = new ThreadLocal[ServerContext]()
serverEventHandler = new FeTServerEventHandler
}

def this(beService: BackendService) = {
this(beService, new Runnable {
override def run(): Unit = {}
})
}

class FeServiceServerContext extends ServerContext {
private var sessionHandle: SessionHandle = _

Expand Down Expand Up @@ -574,13 +580,14 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
// Server thread pool
val minThreads = conf.get(FRONTEND_MIN_WORKER_THREADS).toInt
val maxThreads = conf.get(FRONTEND_MAX_WORKER_THREADS).toInt
val executorService = new ThreadPoolExecutor(
val executorService = new ThreadPoolWithOOMHook(
minThreads,
maxThreads,
conf.getTimeAsSeconds(FRONTEND_WORKER_KEEPALIVE_TIME),
TimeUnit.SECONDS,
new SynchronousQueue[Runnable],
new NamedThreadFactory(threadPoolName))
new NamedThreadFactory(threadPoolName),
OOMHook)

// Thrift configs
authFactory = new KyuubiAuthFactory(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,13 @@ private[kyuubi] class KyuubiServer private(name: String)

override def init(conf: SparkConf): Unit = synchronized {
this.conf = conf

val OOMHook = new Runnable {
override def run(): Unit = KyuubiServer.this.stop()
}

_beService = new BackendService()
_feService = new FrontendService(_beService)
_feService = new FrontendService(_beService, OOMHook)

addService(_beService)
addService(_feService)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package yaooqinn.kyuubi.utils

import java.util.concurrent.{BlockingQueue, Future, ThreadFactory, ThreadPoolExecutor}

import scala.concurrent.duration.TimeUnit

class ThreadPoolWithOOMHook(
corePoolSize: Int,
maximumPoolSize: Int,
keepAliveTime: Long,
unit: TimeUnit,
workQueue: BlockingQueue[Runnable],
threadFactory: ThreadFactory,
oomHook: Runnable)
extends ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory) {

override def afterExecute(r: Runnable, t: Throwable): Unit = {
super.afterExecute(r, t)
var throwable = t
if (throwable == null && r.isInstanceOf[Future[_]]) {
try {
val future = r.asInstanceOf[Future[_]]
if (future.isDone) {
future.get()
}
} catch {
case _: InterruptedException => Thread.currentThread().interrupt()
case t2: Throwable =>
throwable = t2
}
}
if (throwable.isInstanceOf[OutOfMemoryError]) oomHook.run()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import yaooqinn.kyuubi.SecuredFunSuite

class ZooKeeperACLProviderSuite extends SparkFunSuite with Matchers with SecuredFunSuite {

test("") {
test("zk acl provider") {
val provider = new ZooKeeperACLProvider()
provider.getDefaultAcl should be(ZooDefs.Ids.OPEN_ACL_UNSAFE)
provider.getAclForPath("") should be(ZooDefs.Ids.OPEN_ACL_UNSAFE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package yaooqinn.kyuubi.server

import java.util.concurrent.{Executors, RejectedExecutionException}
import java.util.concurrent.RejectedExecutionException

import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}

import yaooqinn.kyuubi.KyuubiSQLException
import yaooqinn.kyuubi.cli.GetInfoType
import yaooqinn.kyuubi.operation.{CANCELED, CLOSED, FINISHED, OperationHandle}
import yaooqinn.kyuubi.operation.{CANCELED, CLOSED, FINISHED}
import yaooqinn.kyuubi.session.SessionHandle

class BackendServiceSuite extends SparkFunSuite {
Expand Down Expand Up @@ -154,7 +154,7 @@ class BackendServiceSuite extends SparkFunSuite {
backendService.cancelOperation(operationHandle)
val operation = operationMgr.getOperation(operationHandle)
val opState = operation.getStatus.getState
assert(opState === CLOSED || opState === CANCELED || opState === FINISHED)
assert(opState === CANCELED || opState === FINISHED)
}

test("close operation") {
Expand All @@ -163,7 +163,7 @@ class BackendServiceSuite extends SparkFunSuite {
val operation = operationMgr.getOperation(operationHandle)
backendService.closeOperation(operationHandle)
val opState = operation.getStatus.getState
assert(opState === CLOSED || opState === CANCELED || opState === FINISHED)
assert(opState === CLOSED || opState === FINISHED)
}

test("reject execution exception") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,11 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
val req3 = new TGetOperationStatusReq(resp2.getOperationHandle)
val resp3 = feService.GetOperationStatus(req3)
resp3.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
Thread.sleep(5000)
while(feService.GetOperationStatus(req3)
.getOperationState.getValue < TOperationState.FINISHED_STATE.getValue) {
Thread.sleep(10)
}
Thread.sleep(2000)
val req4 = new TFetchResultsReq(resp2.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
val resp4 = feService.FetchResults(req4)
resp4.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
Expand All @@ -225,6 +229,17 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
val req9 = new TCancelOperationReq(resp2.getOperationHandle)
val resp9 = feService.CancelOperation(req9)
resp9.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)

val reqInfo1 = new TGetInfoReq(handle, TGetInfoType.CLI_DBMS_NAME)
val respInfo1 = feService.GetInfo(reqInfo1)
respInfo1.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)

val reqInfo2 = new TGetInfoReq(handle, TGetInfoType.CLI_ACCESSIBLE_PROCEDURES)
val respInfo2 = feService.GetInfo(reqInfo2)
respInfo2.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
respInfo2.getStatus.getErrorMessage should
include(TGetInfoType.CLI_ACCESSIBLE_PROCEDURES.toString)

val req8 = new TCloseSessionReq(handle)
val resp8 = feService.CloseSession(req8)
resp8.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package yaooqinn.kyuubi.utils

import java.util.concurrent.{RunnableFuture, SynchronousQueue, TimeUnit}

import org.apache.spark.SparkFunSuite
import org.scalatest.BeforeAndAfterEach

class ThreadPoolWithOOMHookSuite extends SparkFunSuite with BeforeAndAfterEach {

@volatile var flag = false
private val oomHook = new Runnable {
override def run(): Unit = {
flag = true
}
}

private val poolWithOOMHook = new ThreadPoolWithOOMHook(10, 10, 10, TimeUnit.SECONDS,
new SynchronousQueue[Runnable], new NamedThreadFactory("oom"), oomHook)


override def beforeEach(): Unit = {
flag = false
}

test("out of memory") {
val oom = new Runnable {
override def run(): Unit = throw new OutOfMemoryError()
}
poolWithOOMHook.execute(oom)
Thread.sleep(50)
assert(flag, "out of memory occurred")
}

test("interrupted exception") {
val future = new TestRunnableFuture[Int](throw new InterruptedException())
poolWithOOMHook.execute(future)
wait(future)
poolWithOOMHook.execute(future)
assert(!flag, "no oom occurred")
}

test("out of memory in after execute") {
val future = new TestRunnableFuture[Int](throw new OutOfMemoryError())
poolWithOOMHook.execute(future)
wait(future)
assert(flag, "oom occurred in after execute")
}

test("non fatal error in after execute either") {
val future = new TestRunnableFuture[Int](throw new Exception)
poolWithOOMHook.execute(future)
wait(future)
assert(!flag, "no oom occurred in after execute")
}

test("no exception") {
val future = new TestRunnableFuture[Int](1)
poolWithOOMHook.execute(future)
wait(future)
assert(!flag)

val r = new Runnable {
override def run(): Unit = {}
}
poolWithOOMHook.execute(r)
assert(!flag)
}

test("non fatal exception") {
val r = new Runnable {
override def run(): Unit = throw new Exception()
}
poolWithOOMHook.execute(r)
assert(!flag)
}

test("non fatal exception 2") {
val future = new TestRunnableFuture2[Int](1)
poolWithOOMHook.execute(future)
assert(!flag)
}

def wait(future: RunnableFuture[Int]): Unit = {
while (!future.isDone) {
Thread.sleep(10)
}
}
}

class TestRunnableFuture[Int](f: => Int) extends RunnableFuture[Int]{
private var done = false
override def run(): Unit = done = true
override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
override def isCancelled: Boolean = false
override def isDone: Boolean = done
override def get(): Int = f
override def get(timeout: Long, unit: TimeUnit): Int = f
}

class TestRunnableFuture2[Int](f: => Int) extends RunnableFuture[Int]{
private var done = false
override def run(): Unit = done = true
override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
override def isCancelled: Boolean = false
override def isDone: Boolean = false
override def get(): Int = f
override def get(timeout: Long, unit: TimeUnit): Int = f
}

0 comments on commit 71eea2b

Please sign in to comment.