Skip to content

Commit

Permalink
[KYUUBI #2445] Implement ApplicationManager and Yarn/ JPS-local Appli…
Browse files Browse the repository at this point in the history
…cation Operation

### _Why are the changes needed?_

Add KyuubiApplicationManager in SessionManager for application management, currently support kill and get application information.

The underlying cluster manager operation added in this PR are
- local jps with SIG TERM KILL
- YARN-client

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2447 from yaooqinn/2445.

Closes #2445

7781039 [Kent Yao] address comment
aed3f25 [Kent Yao] address comment
0fad941 [Kent Yao] address comment
67ec250 [Kent Yao] address comment
800bedb [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/ JPS-local Application Operation
1a4084a [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/ JPS-local Application Operation
be58583 [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/ JPS-local Application Operation
e75e20e [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/ JPS-local Application Operation
baac7f0 [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/ JPS-local Application Operation
e3f5c29 [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/ JPS-local Application Operation
c81e563 [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/ JPS-local Application Operation

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
yaooqinn committed Apr 25, 2022
1 parent 5ae7c9c commit 5e6d645
Show file tree
Hide file tree
Showing 27 changed files with 704 additions and 440 deletions.
Expand Up @@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, SecurityUtil, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.kyuubi.config.KyuubiConf

Expand All @@ -49,6 +50,10 @@ object KyuubiHadoopUtils {
hadoopConf
}

def newYarnConfiguration(conf: KyuubiConf): YarnConfiguration = {
new YarnConfiguration(newHadoopConf(conf))
}

def getServerPrincipal(principal: String): String = {
SecurityUtil.getServerPrincipal(principal, "0.0.0.0")
}
Expand Down
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.kyuubi.engine.YarnApplicationOperation
org.apache.kyuubi.engine.JpsApplicationOperation
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine

import org.apache.kyuubi.config.KyuubiConf

trait ApplicationOperation {

/**
* Step for initializing the instance.
*/
def initialize(conf: KyuubiConf): Unit

/**
* Step to clean up the instance
*/
def stop(): Unit

/**
* Called before other method to do a quick skip
*
* @param clusterManager the underlying cluster manager or just local instance
*/
def isSupported(clusterManager: Option[String]): Boolean

/**
* Kill the app/engine by the unique application tag
*
* @param tag the unique application tag for engine instance.
* For example,
* if the Hadoop Yarn is used, for spark applications,
* the tag will be preset via spark.yarn.tags
* @return a message contains response describing how the kill process.
*
* @note For implementations, please suppress exceptions and always return KillResponse
*/
def killApplicationByTag(tag: String): KillResponse

/**
* Get the engine/application status by the unique application tag
*
* @param tag the unique application tag for engine instance.
* @return a map contains the application status
*/
def getApplicationInfoByTag(tag: String): Map[String, String]
}

object ApplicationOperation {

/**
* identifier determined by cluster manager for the engine
*/
val APP_ID_KEY = "id"
val APP_NAME_KEY = "name"
val APP_STATE_KEY = "state"
val APP_URL_KEY = "url"
val APP_ERROR_KEY = "error"

val NOT_FOUND = "APPLICATION_NOT_FOUND"
}
Expand Up @@ -17,7 +17,6 @@

package org.apache.kyuubi.engine

import java.util.UUID
import java.util.concurrent.TimeUnit

import scala.util.Random
Expand All @@ -36,10 +35,8 @@ import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
import org.apache.kyuubi.engine.hive.HiveProcessBuilder
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.engine.trino.TrinoProcessBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
import org.apache.kyuubi.ha.client.DiscoveryClient
import org.apache.kyuubi.ha.client.DiscoveryPaths
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_ENGINE_REF_ID, HA_ZK_NAMESPACE}
import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryPaths}
import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL}
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.log.OperationLog
Expand All @@ -54,7 +51,8 @@ import org.apache.kyuubi.operation.log.OperationLog
private[kyuubi] class EngineRef(
conf: KyuubiConf,
user: String,
engineRefId: String = UUID.randomUUID().toString)
engineRefId: String,
engineManager: KyuubiApplicationManager)
extends Logging {
// The corresponding ServerSpace where the engine belongs to
private val serverSpace: String = conf.get(HA_ZK_NAMESPACE)
Expand Down Expand Up @@ -167,24 +165,21 @@ private[kyuubi] class EngineRef(
val builder = engineType match {
case SPARK_SQL =>
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
// tag is a seq type with comma-separated
conf.set(
SparkProcessBuilder.TAG_KEY,
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") +
"KYUUBI," + engineRefId)
new SparkProcessBuilder(appUser, conf, extraEngineLog)
case FLINK_SQL =>
conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName)
// tag is a seq type with comma-separated
conf.set(
FlinkProcessBuilder.TAG_KEY,
conf.getOption(FlinkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
new FlinkProcessBuilder(appUser, conf, extraEngineLog)
case TRINO =>
new TrinoProcessBuilder(appUser, conf, extraEngineLog)
case HIVE_SQL =>
new HiveProcessBuilder(appUser, conf, extraEngineLog)
}
// TODO: Better to do this inside ProcBuilder
KyuubiApplicationManager.tagApplication(
engineRefId,
builder.shortName,
builder.clusterManager(),
builder.conf)

MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
try {
Expand All @@ -204,10 +199,7 @@ private[kyuubi] class EngineRef(
}
}
if (started + timeout <= System.currentTimeMillis()) {
val killMessage = engineType match {
case SPARK_SQL => builder.killApplication(Left(engineRefId))
case _ => builder.killApplication()
}
val killMessage = engineManager.killApplication(builder.clusterManager(), engineRefId)
process.destroyForcibly()
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
throw KyuubiSQLException(
Expand Down
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine

import java.nio.file.Paths

import scala.sys.process._

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationOperation.NOT_FOUND

class JpsApplicationOperation extends ApplicationOperation {

private var runner: String = _

override def initialize(conf: KyuubiConf): Unit = {
val jps = sys.env.get("JAVA_HOME").orElse(sys.props.get("java.home"))
.map(Paths.get(_, "bin", "jps").toString)
.getOrElse("jps")
runner =
try {
jps.!!
} catch {
case _: Throwable => null
}
}

override def isSupported(clusterManager: Option[String]): Boolean = {
runner != null && (clusterManager.isEmpty || clusterManager.get == "local")
}

private def getEngine(tag: String): Option[String] = {
if (runner == null) {
None
} else {
val pb = "jps -ml" #| s"grep $tag"
try {
pb.lineStream_!.headOption
} catch {
case _: Throwable => None
}
}
}

override def killApplicationByTag(tag: String): KillResponse = {
val commandOption = getEngine(tag)
if (commandOption.nonEmpty) {
val idAndCmd = commandOption.get
val (id, _) = idAndCmd.splitAt(idAndCmd.indexOf(" "))
try {
s"kill -15 $id".lineStream
(true, s"Succeeded to terminate: $idAndCmd")
} catch {
case e: Exception =>
(false, s"Failed to terminate: $idAndCmd, due to ${e.getMessage}")
}
} else {
(false, NOT_FOUND)
}
}

override def getApplicationInfoByTag(tag: String): Map[String, String] = {
val commandOption = getEngine(tag)
if (commandOption.nonEmpty) {
val idAndCmd = commandOption.get
val (id, cmd) = idAndCmd.splitAt(idAndCmd.indexOf(" "))
Map(
"id" -> id,
"name" -> cmd,
"state" -> "RUNNING")
} else {
Map("state" -> "FINISHED")
}
}

override def stop(): Unit = {}
}

0 comments on commit 5e6d645

Please sign in to comment.