Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,39 @@
# cause session leakage, so we need to check session leakage.
# How long to check livy session leakage
# livy.server.yarn.app-leakage.check-timeout = 600s
# how often to check livy session leakage
# How often to check livy session leakage
# livy.server.yarn.app-leakage.check-interval = 60s

# How often Livy polls YARN to refresh YARN app state.
# livy.server.yarn.poll-interval = 5s
#

# If Livy can't find the Kubernetes app within this time, consider it lost
# livy.server.kubernetes.app-lookup-timeout = 600s
# When the cluster is busy, we may fail to launch Kubernetes app in app-lookup-timeout, then it
# would cause session leakage, so we need to check session leakage
# How long to check livy session leakage
# livy.server.kubernetes.app-leakage.check-timeout = 600s
# How often to check livy session leakage
# livy.server.kubernetes.app-leakage.check-interval = 60s

# How often Livy polls KubeApiServer to refresh KubernetesApp state (Pods state, logs, description
# details, routes, etc...)
# livy.server.kubernetes.poll-interval = 15s

# Comma-separated list of the Kubernetes namespaces to allow for applications creation.
# All namespaces are allowed if empty
# livy.server.kubernetes.allowedNamespaces =

# Days to keep Livy server request logs.
# livy.server.request-log-retain.days = 5

# If the Livy Web UI should be included in the Livy Server. Enabled by default.
# livy.ui.enabled = true
# Wether to display on Livy UI links to Spark UI when running on Kubernetes

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "Whether'

# livy.ui.kubernetes.sparkui.enabled = false
# String format to use to build links to Spark UI to be displayed on Livy UI when running on
# Kubernetes. Use '%s' placeholder for the link url part to be replaced by Spark application ID
# livy.ui.kubernetes.sparkui.link-format = http://localhost/%s

# Whether to enable Livy server access control, if it is true then all the income requests will
# be checked if the requested user has permission.
Expand Down Expand Up @@ -165,3 +187,18 @@
# livy.server.auth.<custom>.class = <class of custom auth filter>
# livy.server.auth.<custom>.param.<foo1> = <bar1>
# livy.server.auth.<custom>.param.<foo2> = <bar2>

# Manual authentication to KubeApiserver (by default configured with Kubernetes ServiceAccount
# if deployed to Kubernetes cluster as a Pod)
# Kubernetes oauth token file path
# livy.server.kubernetes.oauthTokenFile =
# Kubernetes oauth token string value
# livy.server.kubernetes.oauthTokenValue =
# Kubernetes CA cert file path
# livy.server.kubernetes.caCertFile =
# Kubernetes client key file path
# livy.server.kubernetes.clientKeyFile =
# Kubernetes client cert file path
# livy.server.kubernetes.clientCertFile =
# Kubernetes client default namespace
# livy.server.kubernetes.defaultNamespace =
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
<hadoop.scope>compile</hadoop.scope>
<spark.scala-2.11.version>2.2.3</spark.scala-2.11.version>
<spark.version>${spark.scala-2.11.version}</spark.version>
<kubernetes.client.version>4.1.1</kubernetes.client.version>
<hive.version>3.0.0</hive.version>
<commons-codec.version>1.9</commons-codec.version>
<httpclient.version>4.5.3</httpclient.version>
Expand Down Expand Up @@ -298,6 +299,18 @@
<version>${metrics.version}</version>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>${kubernetes.client.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down
7 changes: 7 additions & 0 deletions rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import javax.security.sasl.Sasl;

Expand Down Expand Up @@ -151,6 +152,12 @@ public String findLocalAddress() throws IOException {
return address.getCanonicalHostName();
}

public boolean isRunningOnKubernetes() {
return Optional.ofNullable(get("livy.spark.master"))
.filter(s -> s.startsWith("k8s"))
.isPresent();
}

private static final Map<String, DeprecatedConf> configsWithAlternatives
= Collections.unmodifiableMap(new HashMap<String, DeprecatedConf>() {{
put(RSCConf.Entry.CLIENT_IN_PROCESS.key, DepConf.CLIENT_IN_PROCESS);
Expand Down
7 changes: 7 additions & 0 deletions rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ private void initializeServer() throws Exception {
// on the cluster, it would be tricky to solve that problem in a generic way.
livyConf.set(RPC_SERVER_ADDRESS, null);

// If we are running on Kubernetes, set RPC_SERVER_ADDRESS from "spark.driver.host" option,
// which is set in class org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep:
// line 61: val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace()}.svc"
if (livyConf.isRunningOnKubernetes()) {
livyConf.set(RPC_SERVER_ADDRESS, conf.get("spark.driver.host"));
}

if (livyConf.getBoolean(TEST_STUCK_START_DRIVER)) {
// Test flag is turned on so we will just infinite loop here. It should cause
// timeout and we should still see yarn application being cleaned up.
Expand Down
5 changes: 5 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@
<artifactId>metrics-healthchecks</artifactId>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
</dependency>

<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
Expand Down
44 changes: 44 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ object LivyConf {
val SERVER_BASE_PATH = Entry("livy.ui.basePath", "")

val UI_ENABLED = Entry("livy.ui.enabled", true)
// Wether to display on Livy UI links to Spark UI when running on Kubernetes

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "Whether'

val UI_KUBERNETES_SPARK_UI_ENABLED = Entry("livy.ui.kubernetes.sparkui.enabled", false)
// String format to use to build links to Spark UI to be displayed on Livy UI when running on
// Kubernetes. Use '%s' placeholder for the link url part to be replaced by Spark application ID
val UI_KUBERNETES_SPARK_UI_LINK_FORMAT =
Entry("livy.ui.kubernetes.sparkui.link-format", "http://localhost/%s")

val REQUEST_HEADER_SIZE = Entry("livy.server.request-header.size", 131072)
val RESPONSE_HEADER_SIZE = Entry("livy.server.response-header.size", 131072)
Expand Down Expand Up @@ -220,6 +226,35 @@ object LivyConf {
// how often to check livy session leakage
val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s")

// Kubernetes oauth token file path.
val KUBERNETES_OAUTH_TOKEN_FILE = Entry("livy.server.kubernetes.oauthTokenFile", "")
// Kubernetes oauth token string value.
val KUBERNETES_OAUTH_TOKEN_VALUE = Entry("livy.server.kubernetes.oauthTokenValue", "")
// Kubernetes CA cert file path.
val KUBERNETES_CA_CERT_FILE = Entry("livy.server.kubernetes.caCertFile", "")
// Kubernetes client key file path.
val KUBERNETES_CLIENT_KEY_FILE = Entry("livy.server.kubernetes.clientKeyFile", "")
// Kubernetes client cert file path.
val KUBERNETES_CLIENT_CERT_FILE = Entry("livy.server.kubernetes.clientCertFile", "")
// Kubernetes client default namespace.
val KUBERNETES_DEFAULT_NAMESPACE = Entry("livy.server.kubernetes.defaultNamespace", "")

// Comma-separated list of the Kubernetes namespaces to allow for applications creation.
// All namespaces are allowed if empty.
val KUBERNETES_ALLOWED_NAMESPACES = Entry("livy.server.kubernetes.allowedNamespaces", null)

// If Livy can't find the Kubernetes app within this time, consider it lost.
val KUBERNETES_APP_LOOKUP_TIMEOUT = Entry("livy.server.kubernetes.app-lookup-timeout", "600s")
// How often Livy polls Kubernetes to refresh Kubernetes app state.
val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s")

// How long to check livy session leakage.
val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT =
Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s")
// How often to check livy session leakage.
val KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL =
Entry("livy.server.kubernetes.app-leakage.check-interval", "60s")

// Whether session timeout should be checked, by default it will be checked, which means inactive
// session will be stopped after "livy.server.session.timeout"
val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
Expand Down Expand Up @@ -331,6 +366,15 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
/** Return true if spark master starts with yarn. */
def isRunningOnYarn(): Boolean = sparkMaster().startsWith("yarn")

/** Return true if spark master starts with k8s. */
def isRunningOnKubernetes(): Boolean = sparkMaster().startsWith("k8s")

/** Return Kubernetes namespace or all if not set. */
def getKubernetesNamespaces(): Set[String] =
Option(get(KUBERNETES_ALLOWED_NAMESPACES)).filterNot(_.isEmpty)
.map(_.split(",").toSet)
.getOrElse(Set.empty)

/** Return the spark deploy mode Livy sessions should use. */
def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty)

Expand Down
13 changes: 8 additions & 5 deletions server/src/main/scala/org/apache/livy/server/LivyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import org.apache.livy.server.recovery.{SessionStore, StateStore}
import org.apache.livy.server.ui.UIServlet
import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager}
import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF
import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp}
import org.apache.livy.utils.LivySparkUtils._
import org.apache.livy.utils.SparkYarnApp

class LivyServer extends Logging {

Expand Down Expand Up @@ -140,10 +140,13 @@ class LivyServer extends Logging {

testRecovery(livyConf)

// Initialize YarnClient ASAP to save time.
// Initialize YarnClient or KubernetesClient ASAP to save time.
if (livyConf.isRunningOnYarn()) {
SparkYarnApp.init(livyConf)
Future { SparkYarnApp.yarnClient }
} else if (livyConf.isRunningOnKubernetes()) {
SparkKubernetesApp.init(livyConf)
Future { SparkKubernetesApp.kubernetesClient }
}

StateStore.init(livyConf)
Expand Down Expand Up @@ -407,10 +410,10 @@ class LivyServer extends Logging {
}

private[livy] def testRecovery(livyConf: LivyConf): Unit = {
if (!livyConf.isRunningOnYarn()) {
// If recovery is turned on but we are not running on YARN, quit.
if (!livyConf.isRunningOnYarn() && !livyConf.isRunningOnKubernetes()) {
// If recovery is turned on but we are not running on YARN or Kubernetes, quit.
require(livyConf.get(LivyConf.RECOVERY_MODE) == SESSION_RECOVERY_MODE_OFF,
"Session recovery requires YARN.")
"Session recovery requires YARN or Kubernetes.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,13 @@ class InteractiveSession(
app = mockApp.orElse {
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
driverProcess.map { _ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)) }
driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
.orElse {
if (livyConf.isRunningOnKubernetes()) {
// Create SparkKubernetesApp anyway to recover app monitoring on Livy server restart
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
} else None
}
}

if (client.isEmpty) {
Expand Down Expand Up @@ -474,6 +480,8 @@ class InteractiveSession(
transition(SessionState.ShuttingDown)
sessionStore.remove(RECOVERY_SESSION_TYPE, id)
client.foreach { _.stop(true) }
// We need to call #kill here explicitly to delete Interactive pods from the cluster
if (livyConf.isRunningOnKubernetes()) app.foreach(_.kill())
} catch {
case _: Exception =>
app.foreach {
Expand Down
30 changes: 28 additions & 2 deletions server/src/main/scala/org/apache/livy/utils/SparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,36 @@ object SparkApp {
sparkConf ++ Map(
SPARK_YARN_TAG_KEY -> mergedYarnTags,
"spark.yarn.submit.waitAppCompletion" -> "false")
} else if (livyConf.isRunningOnKubernetes()) {

// We don't allow to submit applications to the namespaces different from the configured
val kubernetesNamespaces = livyConf.getKubernetesNamespaces()
val targetNamespace = sparkConf.getOrElse("spark.kubernetes.namespace",
SparkKubernetesApp.kubernetesClient.getDefaultNamespace)
if (kubernetesNamespaces.nonEmpty && !kubernetesNamespaces.contains(targetNamespace)) {
throw new IllegalArgumentException(
s"Requested namespace $targetNamespace doesn't match the configured: " +
s"${kubernetesNamespaces.mkString(", ")}")
}

import KubernetesConstants._
sparkConf ++ Map(
"spark.kubernetes.namespace" -> targetNamespace,
// Mark Spark pods with the unique appTag label to be used for their discovery
s"spark.kubernetes.driver.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag,
s"spark.kubernetes.executor.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag,
// Mark Spark pods as created by Livy for the additional tracing
s"spark.kubernetes.driver.label.$CREATED_BY_ANNOTATION" -> "livy",
s"spark.kubernetes.executor.label.$CREATED_BY_ANNOTATION" -> "livy",
"spark.kubernetes.submission.waitAppCompletion" -> "false")
} else {
sparkConf
}
}

/**
* Return a SparkApp object to control the underlying Spark application via YARN or spark-submit.
* Return a SparkApp object to control the underlying Spark application via YARN, Kubernetes
* or spark-submit.
*
* @param uniqueAppTag A tag that can uniquely identify the application.
*/
Expand All @@ -89,8 +112,11 @@ object SparkApp {
listener: Option[SparkAppListener]): SparkApp = {
if (livyConf.isRunningOnYarn()) {
new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf)
} else if (livyConf.isRunningOnKubernetes()) {
new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf)
} else {
require(process.isDefined, "process must not be None when Livy master is not YARN.")
require(process.isDefined, "process must not be None when Livy master is not YARN or " +
"Kubernetes.")
new SparkProcApp(process.get, listener)
}
}
Expand Down
Loading