Skip to content

Commit

Permalink
backport
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you committed Jan 19, 2022
1 parent fc9a609 commit 4c8f7e9
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.kyuubi.service

import java.net.{InetAddress, ServerSocket}
import java.util.concurrent.TimeUnit
import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit}

import scala.collection.JavaConverters._
import scala.language.implicitConversions
Expand All @@ -34,7 +34,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.{FetchOrientation, OperationHandle}
import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.{ExecutorPoolCaptureOom, KyuubiHadoopUtils, NamedThreadFactory}
import org.apache.kyuubi.util.{KyuubiHadoopUtils, NamedThreadFactory}

abstract class ThriftBinaryFrontendService(name: String)
extends AbstractFrontendService(name) with TCLIService.Iface with Runnable with Logging {
Expand All @@ -51,14 +51,7 @@ abstract class ThriftBinaryFrontendService(name: String)
private var authFactory: KyuubiAuthenticationFactory = _
private var hadoopConf: Configuration = _

// When a OOM occurs, here we de-register the engine by stop its discoveryService.
// Then the current engine will not be connected by new client anymore but keep the existing ones
// alive. In this case we can reduce the engine's overhead and make it possible recover from that.
// We shall not tear down the whole engine by serverable.stop to make the engine unreachable for
// the existing clients which are still getting statuses and reporting to the end-users.
protected def oomHook: Runnable = {
() => discoveryService.foreach(_.stop())
}
// Removed OOM hook since Kyuubi #1800 to respect the hive server2 #2383

override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
Expand All @@ -71,12 +64,13 @@ abstract class ThriftBinaryFrontendService(name: String)
val minThreads = conf.get(FRONTEND_THRIFT_MIN_WORKER_THREADS)
val maxThreads = conf.get(FRONTEND_THRIFT_MAX_WORKER_THREADS)
val keepAliveTime = conf.get(FRONTEND_THRIFT_WORKER_KEEPALIVE_TIME)
val executor = ExecutorPoolCaptureOom(
name + "Handler-Pool",
val executor = new ThreadPoolExecutor(
minThreads,
maxThreads,
keepAliveTime,
oomHook)
TimeUnit.MILLISECONDS,
new SynchronousQueue[Runnable](),
new NamedThreadFactory(name + "Handler-Pool", false))
authFactory = new KyuubiAuthenticationFactory(conf)
val transFactory = authFactory.getTTransportFactory
val tProcFactory = authFactory.getTProcessorFactory(this)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,4 @@ class NoopThriftBinaryFrontendService(override val serverable: Serverable)
override val discoveryService: Option[Service] = None

override def connectionUrl: String = serverAddr.getCanonicalHostName + ":" + portNum

override protected def oomHook: Runnable = () => serverable.stop()
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.kyuubi.server

import java.net.{InetAddress, InetSocketAddress}
import java.util.concurrent.{ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit}

import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.PooledByteBufAllocator
Expand All @@ -32,7 +32,7 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.server.mysql._
import org.apache.kyuubi.server.mysql.authentication.MySQLAuthHandler
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service}
import org.apache.kyuubi.util.ExecutorPoolCaptureOom
import org.apache.kyuubi.util.NamedThreadFactory
import org.apache.kyuubi.util.NettyUtils._

/**
Expand All @@ -50,18 +50,17 @@ class KyuubiMySQLFrontendService(override val serverable: Serverable)

@volatile protected var isStarted = false

protected def oomHook: Runnable = () => serverable.stop()

override def initialize(conf: KyuubiConf): Unit = synchronized {
val minThreads = conf.get(FRONTEND_MYSQL_MIN_WORKER_THREADS)
val maxThreads = conf.get(FRONTEND_MYSQL_MAX_WORKER_THREADS)
val keepAliveMs = conf.get(FRONTEND_MYSQL_WORKER_KEEPALIVE_TIME)
execPool = ExecutorPoolCaptureOom(
"mysql-exec-pool",
execPool = new ThreadPoolExecutor(
minThreads,
maxThreads,
keepAliveMs,
oomHook)
TimeUnit.MILLISECONDS,
new SynchronousQueue[Runnable](),
new NamedThreadFactory("mysql-exec-pool", false))

serverAddr = conf.get(FRONTEND_MYSQL_BIND_HOST)
.map(InetAddress.getByName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,4 @@ class KyuubiThriftBinaryFrontendService(
checkInitialized()
s"${serverAddr.getCanonicalHostName}:$portNum"
}

override protected def oomHook: Runnable = {
() => serverable.stop()
}
}

0 comments on commit 4c8f7e9

Please sign in to comment.