Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #1800][1.4] Remove oom hook #1810

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.kyuubi.service

import java.net.{InetAddress, ServerSocket}
import java.util.concurrent.TimeUnit
import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit}

import scala.collection.JavaConverters._
import scala.language.implicitConversions
Expand All @@ -34,7 +34,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.{FetchOrientation, OperationHandle}
import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.{ExecutorPoolCaptureOom, KyuubiHadoopUtils, NamedThreadFactory}
import org.apache.kyuubi.util.{KyuubiHadoopUtils, NamedThreadFactory}

abstract class ThriftBinaryFrontendService(name: String)
extends AbstractFrontendService(name) with TCLIService.Iface with Runnable with Logging {
Expand All @@ -51,14 +51,7 @@ abstract class ThriftBinaryFrontendService(name: String)
private var authFactory: KyuubiAuthenticationFactory = _
private var hadoopConf: Configuration = _

// When a OOM occurs, here we de-register the engine by stop its discoveryService.
// Then the current engine will not be connected by new client anymore but keep the existing ones
// alive. In this case we can reduce the engine's overhead and make it possible recover from that.
// We shall not tear down the whole engine by serverable.stop to make the engine unreachable for
// the existing clients which are still getting statuses and reporting to the end-users.
protected def oomHook: Runnable = {
() => discoveryService.foreach(_.stop())
}
// Removed OOM hook since Kyuubi #1800 to respect the hive server2 #2383

override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
Expand All @@ -71,12 +64,13 @@ abstract class ThriftBinaryFrontendService(name: String)
val minThreads = conf.get(FRONTEND_THRIFT_MIN_WORKER_THREADS)
val maxThreads = conf.get(FRONTEND_THRIFT_MAX_WORKER_THREADS)
val keepAliveTime = conf.get(FRONTEND_THRIFT_WORKER_KEEPALIVE_TIME)
val executor = ExecutorPoolCaptureOom(
name + "Handler-Pool",
val executor = new ThreadPoolExecutor(
minThreads,
maxThreads,
keepAliveTime,
oomHook)
TimeUnit.MILLISECONDS,
new SynchronousQueue[Runnable](),
new NamedThreadFactory(name + "Handler-Pool", false))
authFactory = new KyuubiAuthenticationFactory(conf)
val transFactory = authFactory.getTTransportFactory
val tProcFactory = authFactory.getTProcessorFactory(this)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,4 @@ class NoopThriftBinaryFrontendService(override val serverable: Serverable)
override val discoveryService: Option[Service] = None

override def connectionUrl: String = serverAddr.getCanonicalHostName + ":" + portNum

override protected def oomHook: Runnable = () => serverable.stop()
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.kyuubi.server

import java.net.{InetAddress, InetSocketAddress}
import java.util.concurrent.{ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit}

import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.PooledByteBufAllocator
Expand All @@ -32,7 +32,7 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.server.mysql._
import org.apache.kyuubi.server.mysql.authentication.MySQLAuthHandler
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service}
import org.apache.kyuubi.util.ExecutorPoolCaptureOom
import org.apache.kyuubi.util.NamedThreadFactory
import org.apache.kyuubi.util.NettyUtils._

/**
Expand All @@ -50,18 +50,17 @@ class KyuubiMySQLFrontendService(override val serverable: Serverable)

@volatile protected var isStarted = false

protected def oomHook: Runnable = () => serverable.stop()

override def initialize(conf: KyuubiConf): Unit = synchronized {
val minThreads = conf.get(FRONTEND_MYSQL_MIN_WORKER_THREADS)
val maxThreads = conf.get(FRONTEND_MYSQL_MAX_WORKER_THREADS)
val keepAliveMs = conf.get(FRONTEND_MYSQL_WORKER_KEEPALIVE_TIME)
execPool = ExecutorPoolCaptureOom(
"mysql-exec-pool",
execPool = new ThreadPoolExecutor(
minThreads,
maxThreads,
keepAliveMs,
oomHook)
TimeUnit.MILLISECONDS,
new SynchronousQueue[Runnable](),
new NamedThreadFactory("mysql-exec-pool", false))

serverAddr = conf.get(FRONTEND_MYSQL_BIND_HOST)
.map(InetAddress.getByName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,4 @@ class KyuubiThriftBinaryFrontendService(
checkInitialized()
s"${serverAddr.getCanonicalHostName}:$portNum"
}

override protected def oomHook: Runnable = {
() => serverable.stop()
}
}