Skip to content

Commit

Permalink
[KYUUBI #1800][1.4] Remove oom hook
Browse files Browse the repository at this point in the history
backport #1803 into branch-1.4

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
closes #1800

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1810 from ulysses-you/backport.

Closes #1800

4c8f7e9 [ulysses-you] backport

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
  • Loading branch information
ulysses-you committed Jan 19, 2022
1 parent fc9a609 commit 952efb5
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.kyuubi.service

import java.net.{InetAddress, ServerSocket}
import java.util.concurrent.TimeUnit
import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit}

import scala.collection.JavaConverters._
import scala.language.implicitConversions
Expand All @@ -34,7 +34,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.{FetchOrientation, OperationHandle}
import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.{ExecutorPoolCaptureOom, KyuubiHadoopUtils, NamedThreadFactory}
import org.apache.kyuubi.util.{KyuubiHadoopUtils, NamedThreadFactory}

abstract class ThriftBinaryFrontendService(name: String)
extends AbstractFrontendService(name) with TCLIService.Iface with Runnable with Logging {
Expand All @@ -51,14 +51,7 @@ abstract class ThriftBinaryFrontendService(name: String)
private var authFactory: KyuubiAuthenticationFactory = _
private var hadoopConf: Configuration = _

// When a OOM occurs, here we de-register the engine by stop its discoveryService.
// Then the current engine will not be connected by new client anymore but keep the existing ones
// alive. In this case we can reduce the engine's overhead and make it possible recover from that.
// We shall not tear down the whole engine by serverable.stop to make the engine unreachable for
// the existing clients which are still getting statuses and reporting to the end-users.
protected def oomHook: Runnable = {
() => discoveryService.foreach(_.stop())
}
// Removed OOM hook since Kyuubi #1800 to respect the hive server2 #2383

override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
Expand All @@ -71,12 +64,13 @@ abstract class ThriftBinaryFrontendService(name: String)
val minThreads = conf.get(FRONTEND_THRIFT_MIN_WORKER_THREADS)
val maxThreads = conf.get(FRONTEND_THRIFT_MAX_WORKER_THREADS)
val keepAliveTime = conf.get(FRONTEND_THRIFT_WORKER_KEEPALIVE_TIME)
val executor = ExecutorPoolCaptureOom(
name + "Handler-Pool",
val executor = new ThreadPoolExecutor(
minThreads,
maxThreads,
keepAliveTime,
oomHook)
TimeUnit.MILLISECONDS,
new SynchronousQueue[Runnable](),
new NamedThreadFactory(name + "Handler-Pool", false))
authFactory = new KyuubiAuthenticationFactory(conf)
val transFactory = authFactory.getTTransportFactory
val tProcFactory = authFactory.getTProcessorFactory(this)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,4 @@ class NoopThriftBinaryFrontendService(override val serverable: Serverable)
override val discoveryService: Option[Service] = None

override def connectionUrl: String = serverAddr.getCanonicalHostName + ":" + portNum

override protected def oomHook: Runnable = () => serverable.stop()
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.kyuubi.server

import java.net.{InetAddress, InetSocketAddress}
import java.util.concurrent.{ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit}

import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.PooledByteBufAllocator
Expand All @@ -32,7 +32,7 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.server.mysql._
import org.apache.kyuubi.server.mysql.authentication.MySQLAuthHandler
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service}
import org.apache.kyuubi.util.ExecutorPoolCaptureOom
import org.apache.kyuubi.util.NamedThreadFactory
import org.apache.kyuubi.util.NettyUtils._

/**
Expand All @@ -50,18 +50,17 @@ class KyuubiMySQLFrontendService(override val serverable: Serverable)

@volatile protected var isStarted = false

protected def oomHook: Runnable = () => serverable.stop()

override def initialize(conf: KyuubiConf): Unit = synchronized {
val minThreads = conf.get(FRONTEND_MYSQL_MIN_WORKER_THREADS)
val maxThreads = conf.get(FRONTEND_MYSQL_MAX_WORKER_THREADS)
val keepAliveMs = conf.get(FRONTEND_MYSQL_WORKER_KEEPALIVE_TIME)
execPool = ExecutorPoolCaptureOom(
"mysql-exec-pool",
execPool = new ThreadPoolExecutor(
minThreads,
maxThreads,
keepAliveMs,
oomHook)
TimeUnit.MILLISECONDS,
new SynchronousQueue[Runnable](),
new NamedThreadFactory("mysql-exec-pool", false))

serverAddr = conf.get(FRONTEND_MYSQL_BIND_HOST)
.map(InetAddress.getByName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,4 @@ class KyuubiThriftBinaryFrontendService(
checkInitialized()
s"${serverAddr.getCanonicalHostName}:$portNum"
}

override protected def oomHook: Runnable = {
() => serverable.stop()
}
}

0 comments on commit 952efb5

Please sign in to comment.