Skip to content

Commit

Permalink
[KYUUBI #1800] Remove oom hook
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
closes #1800

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1803 from ulysses-you/KYUUBI#1800.

Closes #1800

416bf9f [ulysses-you] comment
9843769 [ulysses-you] style git diff
ad9d21d [ulysses-you] remove oom hook

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
ulysses-you authored and yaooqinn committed Jan 19, 2022
1 parent dd4c2fa commit bb1cfde
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 200 deletions.
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.service

import java.util.concurrent.TimeUnit
import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit}

import org.apache.hive.service.rpc.thrift._
import org.apache.thrift.protocol.TBinaryProtocol
Expand All @@ -26,7 +26,7 @@ import org.apache.thrift.transport.TServerSocket

import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.util.ExecutorPoolCaptureOom
import org.apache.kyuubi.util.NamedThreadFactory

/**
* Apache Thrift based hive service rpc
Expand All @@ -46,27 +46,21 @@ abstract class TBinaryFrontendService(name: String)

private var server: Option[TServer] = None

// When a OOM occurs, here we de-register the engine by stop its discoveryService.
// Then the current engine will not be connected by new client anymore but keep the existing ones
// alive. In this case we can reduce the engine's overhead and make it possible recover from that.
// We shall not tear down the whole engine by serverable.stop to make the engine unreachable for
// the existing clients which are still getting statuses and reporting to the end-users.
protected def oomHook: Runnable = {
() => discoveryService.foreach(_.stop())
}
// Removed OOM hook since Kyuubi #1800 to respect the hive server2 #2383

override def initialize(conf: KyuubiConf): Unit = synchronized {
this.conf = conf
try {
val minThreads = conf.get(FRONTEND_THRIFT_MIN_WORKER_THREADS)
val maxThreads = conf.get(FRONTEND_THRIFT_MAX_WORKER_THREADS)
val keepAliveTime = conf.get(FRONTEND_THRIFT_WORKER_KEEPALIVE_TIME)
val executor = ExecutorPoolCaptureOom(
name + "Handler-Pool",
val executor = new ThreadPoolExecutor(
minThreads,
maxThreads,
keepAliveTime,
oomHook)
TimeUnit.MILLISECONDS,
new SynchronousQueue[Runnable](),
new NamedThreadFactory(name + "Handler-Pool", false))
val transFactory = authFactory.getTTransportFactory
val tProcFactory = authFactory.getTProcessorFactory(this)
val tServerSocket = new TServerSocket(serverSocket)
Expand Down

This file was deleted.

Expand Up @@ -21,6 +21,4 @@ class NoopTBinaryFrontendService(override val serverable: Serverable)
extends TBinaryFrontendService("NoopThriftBinaryFrontend") {

override val discoveryService: Option[Service] = None

override protected def oomHook: Runnable = () => serverable.stop()
}

This file was deleted.

Expand Up @@ -18,7 +18,7 @@
package org.apache.kyuubi.server

import java.net.{InetAddress, InetSocketAddress}
import java.util.concurrent.{ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit}

import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.PooledByteBufAllocator
Expand All @@ -32,7 +32,7 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.server.mysql._
import org.apache.kyuubi.server.mysql.authentication.MySQLAuthHandler
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service}
import org.apache.kyuubi.util.ExecutorPoolCaptureOom
import org.apache.kyuubi.util.NamedThreadFactory
import org.apache.kyuubi.util.NettyUtils._

/**
Expand All @@ -50,18 +50,17 @@ class KyuubiMySQLFrontendService(override val serverable: Serverable)

@volatile protected var isStarted = false

protected def oomHook: Runnable = () => serverable.stop()

override def initialize(conf: KyuubiConf): Unit = synchronized {
val minThreads = conf.get(FRONTEND_MYSQL_MIN_WORKER_THREADS)
val maxThreads = conf.get(FRONTEND_MYSQL_MAX_WORKER_THREADS)
val keepAliveMs = conf.get(FRONTEND_MYSQL_WORKER_KEEPALIVE_TIME)
execPool = ExecutorPoolCaptureOom(
"mysql-exec-pool",
execPool = new ThreadPoolExecutor(
minThreads,
maxThreads,
keepAliveMs,
oomHook)
TimeUnit.MILLISECONDS,
new SynchronousQueue[Runnable](),
new NamedThreadFactory("mysql-exec-pool", false))

serverAddr = conf.get(FRONTEND_MYSQL_BIND_HOST)
.map(InetAddress.getByName)
Expand Down
Expand Up @@ -69,9 +69,5 @@ final class KyuubiTBinaryFrontendService(
resp
}

override protected def oomHook: Runnable = {
() => serverable.stop()
}

override protected def isServer(): Boolean = true
}

0 comments on commit bb1cfde

Please sign in to comment.