diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala index b4102638453..49ffeef3bcc 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala @@ -18,7 +18,7 @@ package org.apache.kyuubi.service import java.net.{InetAddress, ServerSocket} -import java.util.concurrent.TimeUnit +import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit} import scala.collection.JavaConverters._ import scala.language.implicitConversions @@ -34,7 +34,7 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.operation.{FetchOrientation, OperationHandle} import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory import org.apache.kyuubi.session.SessionHandle -import org.apache.kyuubi.util.{ExecutorPoolCaptureOom, KyuubiHadoopUtils, NamedThreadFactory} +import org.apache.kyuubi.util.{KyuubiHadoopUtils, NamedThreadFactory} abstract class ThriftBinaryFrontendService(name: String) extends AbstractFrontendService(name) with TCLIService.Iface with Runnable with Logging { @@ -51,14 +51,7 @@ abstract class ThriftBinaryFrontendService(name: String) private var authFactory: KyuubiAuthenticationFactory = _ private var hadoopConf: Configuration = _ - // When a OOM occurs, here we de-register the engine by stop its discoveryService. - // Then the current engine will not be connected by new client anymore but keep the existing ones - // alive. In this case we can reduce the engine's overhead and make it possible recover from that. - // We shall not tear down the whole engine by serverable.stop to make the engine unreachable for - // the existing clients which are still getting statuses and reporting to the end-users. - protected def oomHook: Runnable = { - () => discoveryService.foreach(_.stop()) - } + // Removed OOM hook since Kyuubi #1800 to respect the hive server2 #2383 override def initialize(conf: KyuubiConf): Unit = { this.conf = conf @@ -71,12 +64,13 @@ abstract class ThriftBinaryFrontendService(name: String) val minThreads = conf.get(FRONTEND_THRIFT_MIN_WORKER_THREADS) val maxThreads = conf.get(FRONTEND_THRIFT_MAX_WORKER_THREADS) val keepAliveTime = conf.get(FRONTEND_THRIFT_WORKER_KEEPALIVE_TIME) - val executor = ExecutorPoolCaptureOom( - name + "Handler-Pool", + val executor = new ThreadPoolExecutor( minThreads, maxThreads, keepAliveTime, - oomHook) + TimeUnit.MILLISECONDS, + new SynchronousQueue[Runnable](), + new NamedThreadFactory(name + "Handler-Pool", false)) authFactory = new KyuubiAuthenticationFactory(conf) val transFactory = authFactory.getTTransportFactory val tProcFactory = authFactory.getTProcessorFactory(this) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ExecutorPoolCaptureOom.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ExecutorPoolCaptureOom.scala deleted file mode 100644 index e05a89c8c8c..00000000000 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ExecutorPoolCaptureOom.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.util - -import java.util.concurrent.{Future, SynchronousQueue, ThreadPoolExecutor, TimeUnit} - -case class ExecutorPoolCaptureOom( - poolName: String, - corePoolSize: Int, - maximumPoolSize: Int, - keepAliveSeconds: Long, - hook: Runnable) - extends ThreadPoolExecutor( - corePoolSize, - maximumPoolSize, - keepAliveSeconds, - TimeUnit.MILLISECONDS, - new SynchronousQueue[Runnable](), - new NamedThreadFactory(poolName, false)) { - - override def afterExecute(r: Runnable, t: Throwable): Unit = { - super.afterExecute(r, t) - t match { - case _: OutOfMemoryError => hook.run() - case null => r match { - case f: Future[_] => - try { - if (f.isDone) f.get() - } catch { - case _: InterruptedException => Thread.currentThread().interrupt() - case _: OutOfMemoryError => hook.run() - } - case _ => - } - case _ => - } - } -} diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopThriftBinaryFrontendService.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopThriftBinaryFrontendService.scala index 8c0b1db8ed9..85f81a07e6a 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopThriftBinaryFrontendService.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopThriftBinaryFrontendService.scala @@ -23,6 +23,4 @@ class NoopThriftBinaryFrontendService(override val serverable: Serverable) override val discoveryService: Option[Service] = None override def connectionUrl: String = serverAddr.getCanonicalHostName + ":" + portNum - - override protected def oomHook: Runnable = () => serverable.stop() } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/util/ExecutorPoolCaptureOomSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/util/ExecutorPoolCaptureOomSuite.scala deleted file mode 100644 index a1185889df0..00000000000 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/util/ExecutorPoolCaptureOomSuite.scala +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.util - -import java.util.concurrent.{RunnableFuture, TimeUnit} - -import org.apache.kyuubi.KyuubiFunSuite - -class ExecutorPoolCaptureOomSuite extends KyuubiFunSuite { - - @volatile var flag = false - private val oomHook = new Runnable { - override def run(): Unit = { - flag = true - } - } - - override def beforeEach(): Unit = { - flag = false - } - - private val pool = ExecutorPoolCaptureOom(getClass.getName, 10, 10, 10, oomHook) - - test("t is RuntimeException, r is not future") { - val exception = new RuntimeException() - pool.execute(() => { - throw exception - }) - checkFalse() - - pool.afterExecute(null, exception) - checkFalse() - } - - test("t is OutOfMemoryError, r is not future") { - val error = new OutOfMemoryError() - pool.execute(() => { - throw error - }) - - checkTrue() - flag = false - pool.afterExecute(null, error) - checkTrue() - } - - test("t is null, r is not future") { - pool.execute(() => ()) - checkFalse() - - pool.afterExecute(null, null) - checkFalse() - } - - test("t is null, r is future with no exception") { - val future = new TestRunnableFuture(1) - pool.execute(future) - wait(future) - checkFalse() - } - - test("t is null, r is future throw InterruptedException") { - val future = new TestRunnableFuture(throw new InterruptedException()) - pool.execute(future) - checkFalse() - } - - test("t is null, r is future throw OutOfMemoryError") { - val future = new TestRunnableFuture(throw new OutOfMemoryError()) - pool.execute(future) - wait(future) - checkTrue() - } - - test("t is null, r is future throw RuntimeException") { - val future = new TestRunnableFuture(throw new RuntimeException) - pool.execute(future) - wait(future) - checkFalse() - } - - def wait(future: RunnableFuture[_]): Unit = { - while (!future.isDone) { - Thread.sleep(10) - } - } - - def checkFalse(): Unit = { - Thread.sleep(50) - assert(!flag) - } - - def checkTrue(): Unit = { - Thread.sleep(50) - assert(flag) - } -} - -class TestRunnableFuture[T](f: => T, isDone: Boolean = true) extends RunnableFuture[T] { - override def run(): Unit = {} - override def cancel(mayInterruptIfRunning: Boolean): Boolean = !isDone - override def isCancelled: Boolean = isDone - override def isDone: Boolean = isDone - override def get(): T = f - override def get(timeout: Long, unit: TimeUnit): T = f -} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala index 9fa8bb09a5f..c779e352990 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala @@ -18,7 +18,7 @@ package org.apache.kyuubi.server import java.net.{InetAddress, InetSocketAddress} -import java.util.concurrent.{ThreadPoolExecutor, TimeUnit} +import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit} import io.netty.bootstrap.ServerBootstrap import io.netty.buffer.PooledByteBufAllocator @@ -32,7 +32,7 @@ import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.server.mysql._ import org.apache.kyuubi.server.mysql.authentication.MySQLAuthHandler import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service} -import org.apache.kyuubi.util.ExecutorPoolCaptureOom +import org.apache.kyuubi.util.NamedThreadFactory import org.apache.kyuubi.util.NettyUtils._ /** @@ -50,18 +50,17 @@ class KyuubiMySQLFrontendService(override val serverable: Serverable) @volatile protected var isStarted = false - protected def oomHook: Runnable = () => serverable.stop() - override def initialize(conf: KyuubiConf): Unit = synchronized { val minThreads = conf.get(FRONTEND_MYSQL_MIN_WORKER_THREADS) val maxThreads = conf.get(FRONTEND_MYSQL_MAX_WORKER_THREADS) val keepAliveMs = conf.get(FRONTEND_MYSQL_WORKER_KEEPALIVE_TIME) - execPool = ExecutorPoolCaptureOom( - "mysql-exec-pool", + execPool = new ThreadPoolExecutor( minThreads, maxThreads, keepAliveMs, - oomHook) + TimeUnit.MILLISECONDS, + new SynchronousQueue[Runnable](), + new NamedThreadFactory("mysql-exec-pool", false)) serverAddr = conf.get(FRONTEND_MYSQL_BIND_HOST) .map(InetAddress.getByName) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala index 9a2530b85da..7cefe079881 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala @@ -73,8 +73,4 @@ class KyuubiThriftBinaryFrontendService( checkInitialized() s"${serverAddr.getCanonicalHostName}:$portNum" } - - override protected def oomHook: Runnable = { - () => serverable.stop() - } }