Skip to content

Commit

Permalink
Extract constants and config into separate file. Launch => Submit. (a…
Browse files Browse the repository at this point in the history
…pache#65)

* Extract constants and config into separate file. Launch => Submit.

* Address comments

* A small shorthand

* Refactor more ThreadUtils

* Fix scalastyle, use cached thread pool

* Tiny Scala style change
  • Loading branch information
mccheah authored and foxish committed Jul 24, 2017
1 parent 4ff44d3 commit c57ccdc
Show file tree
Hide file tree
Showing 9 changed files with 470 additions and 250 deletions.
16 changes: 12 additions & 4 deletions docs/running-on-kubernetes.md
Expand Up @@ -140,12 +140,12 @@ Spark supports using SSL to encrypt the traffic in this bootstrapping process. I
whenever possible.

See the [security page](security.html) and [configuration](configuration.html) sections for more information on
configuring SSL; use the prefix `spark.ssl.kubernetes.driverlaunch` in configuring the SSL-related fields in the context
configuring SSL; use the prefix `spark.ssl.kubernetes.submit` in configuring the SSL-related fields in the context
of submitting to Kubernetes. For example, to set the trustStore used when the local machine communicates with the driver
pod in starting the application, set `spark.ssl.kubernetes.driverlaunch.trustStore`.
pod in starting the application, set `spark.ssl.kubernetes.submit.trustStore`.

One note about the keyStore is that it can be specified as either a file on the client machine or a file in the
container image's disk. Thus `spark.ssl.kubernetes.driverlaunch.keyStore` can be a URI with a scheme of either `file:`
container image's disk. Thus `spark.ssl.kubernetes.submit.keyStore` can be a URI with a scheme of either `file:`
or `container:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto
the driver container as a [secret volume](https://kubernetes.io/docs/user-guide/secrets/). When the URI has the scheme
`container:`, the file is assumed to already be on the container's disk at the appropriate path.
Expand Down Expand Up @@ -235,7 +235,15 @@ from the other deployment modes. See the [configuration page](configuration.html
<td>(none)</td>
<td>
Custom labels that will be added to the driver pod. This should be a comma-separated list of label key-value pairs,
where each label is in the format <code>key=value</code>.
where each label is in the format <code>key=value</code>. Note that Spark also adds its own labels to the driver pod
for bookkeeping purposes.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driverSubmitTimeout</code></td>
<td>60s</td>
<td>
Time to wait for the driver pod to start running before aborting its execution.
</td>
</tr>
</table>
Expand Down

Large diffs are not rendered by default.

@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import java.util.concurrent.TimeUnit

import org.apache.spark.{SPARK_VERSION => sparkVersion}
import org.apache.spark.internal.config.ConfigBuilder

package object config {

private[spark] val KUBERNETES_NAMESPACE =
ConfigBuilder("spark.kubernetes.namespace")
.doc("""
| The namespace that will be used for running the driver and
| executor pods. When using spark-submit in cluster mode,
| this can also be passed to spark-submit via the
| --kubernetes-namespace command line argument.
""".stripMargin)
.stringConf
.createWithDefault("default")

private[spark] val DRIVER_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.driver.docker.image")
.doc("""
| Docker image to use for the driver. Specify this using the
| standard Docker tag format.
""".stripMargin)
.stringConf
.createWithDefault(s"spark-driver:$sparkVersion")

private[spark] val EXECUTOR_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.executor.docker.image")
.doc("""
| Docker image to use for the executors. Specify this using
| the standard Docker tag format.
""".stripMargin)
.stringConf
.createWithDefault(s"spark-executor:$sparkVersion")

private[spark] val KUBERNETES_CA_CERT_FILE =
ConfigBuilder("spark.kubernetes.submit.caCertFile")
.doc("""
| CA cert file for connecting to Kubernetes over SSL. This
| file should be located on the submitting machine's disk.
""".stripMargin)
.stringConf
.createOptional

private[spark] val KUBERNETES_CLIENT_KEY_FILE =
ConfigBuilder("spark.kubernetes.submit.clientKeyFile")
.doc("""
| Client key file for authenticating against the Kubernetes
| API server. This file should be located on the submitting
| machine's disk.
""".stripMargin)
.stringConf
.createOptional

private[spark] val KUBERNETES_CLIENT_CERT_FILE =
ConfigBuilder("spark.kubernetes.submit.clientCertFile")
.doc("""
| Client cert file for authenticating against the
| Kubernetes API server. This file should be located on
| the submitting machine's disk.
""".stripMargin)
.stringConf
.createOptional

private[spark] val KUBERNETES_SERVICE_ACCOUNT_NAME =
ConfigBuilder("spark.kubernetes.submit.serviceAccountName")
.doc("""
| Service account that is used when running the driver pod.
| The driver pod uses this service account when requesting
| executor pods from the API server.
""".stripMargin)
.stringConf
.createWithDefault("default")

private[spark] val KUBERNETES_DRIVER_UPLOAD_JARS =
ConfigBuilder("spark.kubernetes.driver.uploads.jars")
.doc("""
| Comma-separated list of jars to sent to the driver and
| all executors when submitting the application in cluster
| mode.
""".stripMargin)
.stringConf
.createOptional

// Note that while we set a default for this when we start up the
// scheduler, the specific default value is dynamically determined
// based on the executor memory.
private[spark] val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
ConfigBuilder("spark.kubernetes.executor.memoryOverhead")
.doc("""
| The amount of off-heap memory (in megabytes) to be
| allocated per executor. This is memory that accounts for
| things like VM overheads, interned strings, other native
| overheads, etc. This tends to grow with the executor size
| (typically 6-10%).
""".stripMargin)
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_LABELS =
ConfigBuilder("spark.kubernetes.driver.labels")
.doc("""
| Custom labels that will be added to the driver pod.
| This should be a comma-separated list of label key-value
| pairs, where each label is in the format key=value. Note
| that Spark also adds its own labels to the driver pod
| for bookkeeping purposes.
""".stripMargin)
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SUBMIT_TIMEOUT =
ConfigBuilder("spark.kubernetes.driverSubmitTimeout")
.doc("""
| Time to wait for the driver process to start running
| before aborting its execution.
""".stripMargin)
.timeConf(TimeUnit.SECONDS)
.createWithDefault(60L)

private[spark] val KUBERNETES_DRIVER_SUBMIT_KEYSTORE =
ConfigBuilder("spark.ssl.kubernetes.submit.keyStore")
.doc("""
| KeyStore file for the driver submission server listening
| on SSL. Can be pre-mounted on the driver container
| or uploaded from the submitting client.
""".stripMargin)
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE =
ConfigBuilder("spark.ssl.kubernetes.submit.trustStore")
.doc("""
| TrustStore containing certificates for communicating
| to the driver submission server over SSL.
""".stripMargin)
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SERVICE_NAME =
ConfigBuilder("spark.kubernetes.driver.service.name")
.doc("""
| Kubernetes service that exposes the driver pod
| for external access.
""".stripMargin)
.internal()
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
.doc("""
| Name of the driver pod.
""".stripMargin)
.internal()
.stringConf
.createOptional
}
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

package object constants {
// Labels
private[spark] val SPARK_DRIVER_LABEL = "spark-driver"
private[spark] val SPARK_APP_ID_LABEL = "spark-app-id"
private[spark] val SPARK_APP_NAME_LABEL = "spark-app-name"
private[spark] val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"

// Secrets
private[spark] val DRIVER_CONTAINER_SECRETS_BASE_DIR = "/var/run/secrets/spark-submission"
private[spark] val SUBMISSION_APP_SECRET_NAME = "spark-submission-server-secret"
private[spark] val SUBMISSION_APP_SECRET_PREFIX = "spark-submission-server-secret"
private[spark] val SUBMISSION_APP_SECRET_VOLUME_NAME = "spark-submission-secret-volume"
private[spark] val SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME =
"spark-submission-server-key-password"
private[spark] val SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME =
"spark-submission-server-keystore-password"
private[spark] val SUBMISSION_SSL_KEYSTORE_SECRET_NAME = "spark-submission-server-keystore"
private[spark] val SUBMISSION_SSL_SECRETS_PREFIX = "spark-submission-server-ssl"
private[spark] val SUBMISSION_SSL_SECRETS_VOLUME_NAME = "spark-submission-server-ssl-secrets"

// Default and fixed ports
private[spark] val SUBMISSION_SERVER_PORT = 7077
private[spark] val DEFAULT_DRIVER_PORT = 7078
private[spark] val DEFAULT_BLOCKMANAGER_PORT = 7079
private[spark] val DEFAULT_UI_PORT = 4040
private[spark] val UI_PORT_NAME = "spark-ui-port"
private[spark] val SUBMISSION_SERVER_PORT_NAME = "submit-server"
private[spark] val BLOCK_MANAGER_PORT_NAME = "blockmanager"
private[spark] val DRIVER_PORT_NAME = "driver"
private[spark] val EXECUTOR_PORT_NAME = "executor"

// Environment Variables
private[spark] val ENV_SUBMISSION_SECRET_LOCATION = "SPARK_SUBMISSION_SECRET_LOCATION"
private[spark] val ENV_SUBMISSION_SERVER_PORT = "SPARK_SUBMISSION_SERVER_PORT"
private[spark] val ENV_SUBMISSION_KEYSTORE_FILE = "SPARK_SUBMISSION_KEYSTORE_FILE"
private[spark] val ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE =
"SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE"
private[spark] val ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE =
"SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"
private[spark] val ENV_SUBMISSION_KEYSTORE_TYPE = "SPARK_SUBMISSION_KEYSTORE_TYPE"
private[spark] val ENV_SUBMISSION_USE_SSL = "SPARK_SUBMISSION_USE_SSL"
private[spark] val ENV_EXECUTOR_PORT = "SPARK_EXECUTOR_PORT"
private[spark] val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
private[spark] val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"

// Miscellaneous
private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit"
}
Expand Up @@ -20,23 +20,22 @@ import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo}

import org.apache.spark.SPARK_VERSION

// TODO: jars should probably be compressed. Shipping tarballs would be optimal.
case class KubernetesCreateSubmissionRequest(
val appResource: AppResource,
val mainClass: String,
val appArgs: Array[String],
val sparkProperties: Map[String, String],
val secret: String,
val uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
appResource: AppResource,
mainClass: String,
appArgs: Array[String],
sparkProperties: Map[String, String],
secret: String,
uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
message = "create"
clientSparkVersion = SPARK_VERSION
}

case class TarGzippedData(
val dataBase64: String,
val blockSize: Int = 10240,
val recordSize: Int = 512,
val encoding: String
dataBase64: String,
blockSize: Int = 10240,
recordSize: Int = 512,
encoding: String
)

@JsonTypeInfo(
Expand Down
Expand Up @@ -28,12 +28,11 @@ trait KubernetesSparkRestApi {
@Consumes(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/create")
def create(request: KubernetesCreateSubmissionRequest): CreateSubmissionResponse
def submitApplication(request: KubernetesCreateSubmissionRequest): CreateSubmissionResponse

@GET
@Consumes(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/ping")
def ping(): PingResponse

}

0 comments on commit c57ccdc

Please sign in to comment.