Skip to content

Commit

Permalink
[KYUUBI #1610] Rest frontend service jetty server has started at null
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

fix #1610
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

> 2022-01-21 15:10:53.999 INFO server.KyuubiRestFrontendService: KyuubiRestFronten
dService has started at 10.242.189.214:10099

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1817 from yaooqinn/1610.

Closes #1610

78ec26d [Kent Yao] [KYUUBI #1610] Rest frontend service jetty server has started at null
c2b42e9 [Kent Yao] [KYUUBI #1610] Rest frontend service jetty server has started at null

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
yaooqinn committed Jan 25, 2022
1 parent 5bc7ea1 commit c2a7e28
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 75 deletions.
10 changes: 10 additions & 0 deletions kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
Expand Up @@ -24,6 +24,7 @@ import java.nio.file.{Files, Path, Paths}
import java.util.{Properties, TimeZone, UUID}

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.commons.lang3.SystemUtils
import org.apache.commons.lang3.time.DateFormatUtils
Expand Down Expand Up @@ -257,4 +258,13 @@ object Utils extends Logging {
wrt.close()
stm.toString
}

def tryLogNonFatalError(block: => Unit): Unit = {
try {
block
} catch {
case NonFatal(t) =>
error(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.server

import org.eclipse.jetty.server.{HttpConfiguration, HttpConnectionFactory, Server, ServerConnector}
import org.eclipse.jetty.server.handler.{ContextHandlerCollection, ErrorHandler}
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.util.component.LifeCycle
import org.eclipse.jetty.util.thread.{QueuedThreadPool, ScheduledExecutorScheduler}

import org.apache.kyuubi.Utils._

class JettyServer private (
server: Server,
connector: ServerConnector,
rootHandler: ContextHandlerCollection) {

def start(): Unit = {
server.start()
connector.start()
server.addConnector(connector)
}

def stop(): Unit = {
tryLogNonFatalError(connector.stop())
tryLogNonFatalError(server.stop())
server.getThreadPool match {
case lifeCycle: LifeCycle => tryLogNonFatalError(lifeCycle.stop())
case _ =>
}
}

def getServerUri: String = connector.getHost + ":" + connector.getLocalPort

def addHandler(handler: ServletContextHandler): Unit = {
rootHandler.addHandler(handler)
if (!handler.isStarted) handler.start()
}
}

object JettyServer {

def apply(name: String, host: String, port: Int): JettyServer = {
// TODO: Configurable pool size
val pool = new QueuedThreadPool()
pool.setName(name)
pool.setDaemon(true)
val server = new Server(pool)

val errorHandler = new ErrorHandler()
errorHandler.setShowStacks(true)
errorHandler.setServer(server)
server.addBean(errorHandler)

val collection = new ContextHandlerCollection
server.setHandler(collection)

val serverExecutor = new ScheduledExecutorScheduler(s"$name-JettyScheduler", true)
val httpConf = new HttpConfiguration()
val connector = new ServerConnector(
server,
null,
serverExecutor,
null,
-1,
-1,
new HttpConnectionFactory(httpConf))
connector.setHost(host)
connector.setPort(port)
connector.setReuseAddress(!isWindows)
connector.setAcceptQueueSize(math.min(connector.getAcceptors, 8))

new JettyServer(server, connector, collection)
}
}
Expand Up @@ -17,11 +17,7 @@

package org.apache.kyuubi.server

import java.net.InetAddress

import org.eclipse.jetty.server.{HttpConfiguration, HttpConnectionFactory, Server, ServerConnector}
import org.eclipse.jetty.server.handler.ErrorHandler
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.kyuubi.{KyuubiException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
Expand All @@ -36,96 +32,46 @@ import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service}
class KyuubiRestFrontendService(override val serverable: Serverable)
extends AbstractFrontendService("KyuubiRestFrontendService") with Logging {

private var serverAddr: InetAddress = _
private var portNum: Int = _
private var jettyServer: Server = _
private var connector: ServerConnector = _
private var server: JettyServer = _

@volatile protected var isStarted = false
private val isStarted = new AtomicBoolean(false)

override def initialize(conf: KyuubiConf): Unit = synchronized {
val serverHost = conf.get(FRONTEND_REST_BIND_HOST)
serverAddr = serverHost.map(InetAddress.getByName).getOrElse(Utils.findLocalInetAddress)
portNum = conf.get(FRONTEND_REST_BIND_PORT)

jettyServer = new Server()

// set error handler
val errorHandler = new ErrorHandler()
errorHandler.setShowStacks(true)
errorHandler.setServer(jettyServer)
jettyServer.addBean(errorHandler)

jettyServer.setHandler(ApiRootResource.getServletHandler(this))

connector = new ServerConnector(
jettyServer,
null,
new ScheduledExecutorScheduler(s"$getName-JettyScheduler", true),
null,
-1,
-1,
Array(new HttpConnectionFactory(new HttpConfiguration())): _*)
connector.setPort(portNum)
connector.setHost(serverAddr.getCanonicalHostName)
connector.setReuseAddress(!Utils.isWindows)
connector.setAcceptQueueSize(math.min(connector.getAcceptors, 8))

val host = conf.get(FRONTEND_REST_BIND_HOST)
.getOrElse(Utils.findLocalInetAddress.getHostName)
server = JettyServer(getName, host, conf.get(FRONTEND_REST_BIND_PORT))
super.initialize(conf)
}

override def connectionUrl: String = {
checkInitialized()
s"${serverAddr.getCanonicalHostName}:${connector.getLocalPort}"
server.getServerUri
}

private def startInternal(): Unit = {
server.addHandler(ApiRootResource.getServletHandler(this))
}

override def start(): Unit = {
if (!isStarted) {
override def start(): Unit = synchronized {
if (!isStarted.get) {
try {
connector.start()
jettyServer.start()
isStarted = true
info(s"Rest frontend service jetty server has started at ${jettyServer.getURI}.")
server.start()
isStarted.set(true)
info(s"$getName has started at ${server.getServerUri}")
startInternal()
} catch {
case rethrow: Exception =>
stopHttpServer()
throw new KyuubiException("Cannot start rest frontend service jetty server", rethrow)
case e: Exception => throw new KyuubiException(s"Cannot start $getName", e)
}
}

super.start()
}

override def stop(): Unit = {
if (isStarted) {
stopHttpServer()
isStarted = false
override def stop(): Unit = synchronized {
if (isStarted.getAndSet(false)) {
server.stop()
}
super.stop()
}

private def stopHttpServer(): Unit = {
if (jettyServer != null) {
try {
connector.stop()
info("Rest frontend service server connector has stopped.")
} catch {
case err: Exception =>
error("Cannot safely stop rest frontend service server connector", err)
} finally {
connector = null
}

try {
jettyServer.stop()
info("Rest frontend service jetty server has stopped.")
} catch {
case err: Exception => error("Cannot safely stop rest frontend service jetty server", err)
} finally {
jettyServer = null
}
}
}

override val discoveryService: Option[Service] = None
}

0 comments on commit c2a7e28

Please sign in to comment.