From b54a039da08aec93a6db9d1470d0b2eaaec08814 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Wed, 29 Aug 2018 20:19:40 -0400 Subject: [PATCH 1/7] initial WIP push for SPARK-25021 --- .../org/apache/spark/deploy/k8s/Config.scala | 7 +++++ .../k8s/features/BasicDriverFeatureStep.scala | 1 + .../features/BasicExecutorFeatureStep.scala | 13 +++++++-- .../bindings/JavaDriverFeatureStep.scala | 4 ++- .../bindings/PythonDriverFeatureStep.scala | 4 ++- .../bindings/RDriverFeatureStep.scala | 4 ++- .../BasicDriverFeatureStepSuite.scala | 1 - .../BasicExecutorFeatureStepSuite.scala | 24 +++++++++++++++ .../bindings/JavaDriverFeatureStepSuite.scala | 1 - .../dockerfiles/spark/bindings/R/Dockerfile | 2 +- .../spark/bindings/python/Dockerfile | 2 +- .../k8s/integrationtest/KubernetesSuite.scala | 29 +++++++++++++++++++ .../integrationtest/PythonTestsSuite.scala | 20 +++++++++++++ 13 files changed, 103 insertions(+), 9 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 1b582fe53624a..c7338a721595f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -225,6 +225,13 @@ private[spark] object Config extends Logging { "Ensure that major Python version is either Python2 or Python3") .createWithDefault("2") + val APP_RESOURCE_TYPE = + ConfigBuilder("spark.kubernetes.resource.type") + .doc("This sets the resource type internally") + .internal() + .stringConf + .createOptional + val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX = "spark.kubernetes.authenticate.submission" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 575bc54ffe2bb..21c0a9f92072c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -51,6 +51,7 @@ private[spark] class BasicDriverFeatureStep( .get(DRIVER_MEMORY_OVERHEAD) .getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) + // TODO: Have memory limit checks on driverMemory private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB override def configurePod(pod: SparkPod): SparkPod = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index c37f713c56de1..1a08fe4c76a4e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -24,7 +24,7 @@ import org.apache.spark.SparkException import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD} +import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, PYSPARK_EXECUTOR_MEMORY} import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils @@ -58,6 +58,15 @@ private[spark] class BasicExecutorFeatureStep( (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB + // TODO: Have memory limit checks on executorMemory + private val executorMemoryTotal = kubernetesConf.sparkConf + .getOption(APP_RESOURCE_TYPE.key).map{ res => + val additionalPySparkMemory = if (res == "python") { + kubernetesConf.sparkConf + .get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) + } else 0 + executorMemoryWithOverhead + additionalPySparkMemory + }.getOrElse(executorMemoryWithOverhead) private val executorCores = kubernetesConf.sparkConf.getInt("spark.executor.cores", 1) private val executorCoresRequest = @@ -76,7 +85,7 @@ private[spark] class BasicExecutorFeatureStep( // executorId val hostname = name.substring(Math.max(0, name.length - 63)) val executorMemoryQuantity = new QuantityBuilder(false) - .withAmount(s"${executorMemoryWithOverhead}Mi") + .withAmount(s"${executorMemoryTotal}Mi") .build() val executorCpuQuantity = new QuantityBuilder(false) .withAmount(executorCoresRequest) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala index f52ec9fdc677e..6f063b253cd73 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.features.bindings import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE import org.apache.spark.deploy.k8s.Constants.SPARK_CONF_PATH import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep import org.apache.spark.launcher.SparkLauncher @@ -38,7 +39,8 @@ private[spark] class JavaDriverFeatureStep( .build() SparkPod(pod.pod, withDriverArgs) } - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty + override def getAdditionalPodSystemProperties(): Map[String, String] = + Map(APP_RESOURCE_TYPE.key -> "java") override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala index c20bcac1f8987..cb051d1db49ed 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep @@ -67,7 +68,8 @@ private[spark] class PythonDriverFeatureStep( SparkPod(pod.pod, withPythonPrimaryContainer) } - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty + override def getAdditionalPodSystemProperties(): Map[String, String] = + Map(APP_RESOURCE_TYPE.key -> "python") override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala index b33b86e02ea6f..de752e300db20 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep @@ -53,7 +54,8 @@ private[spark] class RDriverFeatureStep( SparkPod(pod.pod, withRPrimaryContainer) } - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty + override def getAdditionalPodSystemProperties(): Map[String, String] = + Map(APP_RESOURCE_TYPE.key -> "r") override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index d98e113554648..0968cce971c31 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -57,7 +57,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { MAIN_CLASS, APP_ARGS) - test("Check the pod respects all configurations from the user.") { val sparkConf = new SparkConf() .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 95d373f791649..63b237b9dfe46 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -75,6 +75,7 @@ class BasicExecutorFeatureStepSuite .set("spark.driver.host", DRIVER_HOSTNAME) .set("spark.driver.port", DRIVER_PORT.toString) .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(",")) + .set("spark.kubernetes.resource.type", "java") } test("basic executor pod has reasonable defaults") { @@ -161,6 +162,29 @@ class BasicExecutorFeatureStepSuite checkOwnerReferences(executor.pod, DRIVER_POD_UID) } + test("test executor pyspark memory") { + val conf = baseConf.clone() + conf.set("spark.kubernetes.resource.type", "python") + conf.set(org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY, 42L) + + val step = new BasicExecutorFeatureStep( + KubernetesConf( + conf, + KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)), + RESOURCE_NAME_PREFIX, + APP_ID, + LABELS, + ANNOTATIONS, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String])) + val executor = step.configurePod(SparkPod.initialPod()) + // This is checking that basic executor + executorMemory = 1408 + 42 = 1450 + assert(executor.container.getResources.getRequests.get("memory").getAmount === "1450Mi") + } + // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala index 18874afe6e53a..bf552aeb8b901 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala @@ -56,6 +56,5 @@ class JavaDriverFeatureStepSuite extends SparkFunSuite { "--properties-file", SPARK_CONF_PATH, "--class", "test-class", "spark-internal", "5 7")) - } } diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile index e627883ba782e..9f67422efeb3c 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile @@ -19,10 +19,10 @@ ARG base_img FROM $base_img WORKDIR / RUN mkdir ${SPARK_HOME}/R -COPY R ${SPARK_HOME}/R RUN apk add --no-cache R R-dev +COPY R ${SPARK_HOME}/R ENV R_HOME /usr/lib/R WORKDIR /opt/spark/work-dir diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile index 72bb9620b45de..69b6efa6149a0 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile @@ -19,7 +19,6 @@ ARG base_img FROM $base_img WORKDIR / RUN mkdir ${SPARK_HOME}/python -COPY python/lib ${SPARK_HOME}/python/lib # TODO: Investigate running both pip and pip3 via virtualenvs RUN apk add --no-cache python && \ apk add --no-cache python3 && \ @@ -33,6 +32,7 @@ RUN apk add --no-cache python && \ # Removed the .cache to save space rm -r /root/.cache +COPY python/lib ${SPARK_HOME}/python/lib ENV PYTHONPATH ${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-*.zip WORKDIR /opt/spark/work-dir diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 896a83a5badbb..ff44518900974 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -50,6 +50,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite protected var containerLocalSparkDistroExamplesJar: String = _ protected var appLocator: String = _ + // Default memory limit is 1024M + 384M (minimum overhead constant) + private val baseMemory = s"${1024 + 384}Mi" + protected val memOverheadConstant = 0.8 + private val standardNonJVMMemory = s"${(1024 + 0.4*1024).toInt}Mi" + protected val additionalMemory = 200 + private val extraTotalMemory = s"${(1024 + memOverheadConstant*1024 + additionalMemory).toInt}Mi" + override def beforeAll(): Unit = { // The scalatest-maven-plugin gives system properties that are referenced but not set null // values. We need to remove the null-value properties before initializing the test backend. @@ -233,6 +240,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite assert(driverPod.getMetadata.getName === driverPodName) assert(driverPod.getSpec.getContainers.get(0).getImage === image) assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver") + assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount + === baseMemory) } @@ -240,28 +249,48 @@ private[spark] class KubernetesSuite extends SparkFunSuite assert(driverPod.getMetadata.getName === driverPodName) assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage) assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver") + assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount + === standardNonJVMMemory) } protected def doBasicDriverRPodCheck(driverPod: Pod): Unit = { assert(driverPod.getMetadata.getName === driverPodName) assert(driverPod.getSpec.getContainers.get(0).getImage === rImage) assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver") + assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount + === standardNonJVMMemory) } protected def doBasicExecutorPodCheck(executorPod: Pod): Unit = { assert(executorPod.getSpec.getContainers.get(0).getImage === image) assert(executorPod.getSpec.getContainers.get(0).getName === "executor") + assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount + === baseMemory) } protected def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = { assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage) assert(executorPod.getSpec.getContainers.get(0).getName === "executor") + assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount + === standardNonJVMMemory) } protected def doBasicExecutorRPodCheck(executorPod: Pod): Unit = { assert(executorPod.getSpec.getContainers.get(0).getImage === rImage) assert(executorPod.getSpec.getContainers.get(0).getName === "executor") + assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount + === standardNonJVMMemory) + } + + protected def doDriverMemoryCheck(driverPod: Pod): Unit = { + assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount + === extraTotalMemory) + } + + protected def doExecutorMemoryCheck(executorPod: Pod): Unit = { + assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount + === extraTotalMemory) } protected def checkCustomSettings(pod: Pod): Unit = { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala index 1ebb30094dcde..ec8ee3275f674 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala @@ -72,6 +72,26 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => isJVM = false, pyFiles = Some(PYSPARK_CONTAINER_TESTS)) } + + test("Run PySpark with memory customization", k8sTestTag) { + sparkAppConf + .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") + .set("spark.kubernetes.pyspark.pythonVersion", "3") + .set("spark.kubernetes.memoryOverheadFactor", s"$memOverheadConstant") + .set("spark.executor.pyspark.memory", s"${additionalMemory}Mi") + runSparkApplicationAndVerifyCompletion( + appResource = PYSPARK_FILES, + mainClass = "", + expectedLogOnCompletion = Seq( + "Python runtime version check is: True", + "Python environment version check is: True"), + appArgs = Array("python3"), + driverPodChecker = doDriverMemoryCheck, + executorPodChecker = doExecutorMemoryCheck, + appLocator = appLocator, + isJVM = false, + pyFiles = Some(PYSPARK_CONTAINER_TESTS)) + } } private[spark] object PythonTestsSuite { From 75742a37687a7eb3ebaa34069ac7a62521a4e2f8 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Thu, 30 Aug 2018 01:26:27 -0400 Subject: [PATCH 2/7] add python.worker.reuse --- .../spark/deploy/k8s/integrationtest/PythonTestsSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala index ec8ee3275f674..fc30066421ff6 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala @@ -79,6 +79,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => .set("spark.kubernetes.pyspark.pythonVersion", "3") .set("spark.kubernetes.memoryOverheadFactor", s"$memOverheadConstant") .set("spark.executor.pyspark.memory", s"${additionalMemory}Mi") + .set("spark.python.worker.reuse", "false") runSparkApplicationAndVerifyCompletion( appResource = PYSPARK_FILES, mainClass = "", From 46c30cc27cd3a7279a116ec6a70a937b8502cd73 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 31 Aug 2018 00:32:22 -0400 Subject: [PATCH 3/7] final checks with e2e tests --- docs/configuration.md | 2 +- .../src/main/python/worker_memory_check.py | 47 +++++++++++++++++++ .../k8s/integrationtest/KubernetesSuite.scala | 10 ++-- .../integrationtest/PythonTestsSuite.scala | 10 ++-- .../integrationtest/SecretsTestsSuite.scala | 1 + 5 files changed, 61 insertions(+), 9 deletions(-) create mode 100644 examples/src/main/python/worker_memory_check.py diff --git a/docs/configuration.md b/docs/configuration.md index 9714b48d5e69b..8ba3950ac0621 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -187,7 +187,7 @@ of the most common options to set are: unless otherwise specified. If set, PySpark memory for an executor will be limited to this amount. If not set, Spark will not limit Python's memory use and it is up to the application to avoid exceeding the overhead memory space - shared with other non-JVM processes. When PySpark is run in YARN, this memory + shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory is added to executor resource requests. diff --git a/examples/src/main/python/worker_memory_check.py b/examples/src/main/python/worker_memory_check.py new file mode 100644 index 0000000000000..f21e211361cc2 --- /dev/null +++ b/examples/src/main/python/worker_memory_check.py @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +import resource +import sys + +from pyspark.sql import SparkSession + + +if __name__ == "__main__": + """ + Usage: worker_memory_check [Memory_in_Mi] + """ + spark = SparkSession \ + .builder \ + .appName("PyFilesTest") \ + .getOrCreate() + sc = spark.sparkContext + if len(sys.argv) < 2: + print("Usage: worker_memory_check [Memory_in_Mi]", file=sys.stderr) + sys.exit(-1) + + def f(x): + rLimit = resource.getrlimit(resource.RLIMIT_AS) + print("RLimit is " + str(rLimit)) + return rLimit + resourceValue = sc.parallelize([1]).map(f).collect()[0][0] + print("Resource Value is " + str(resourceValue)) + truthCheck = (resourceValue == int(sys.argv[1])) + print("PySpark Worker Memory Check is: " + str(truthCheck)) + spark.stop() diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index ff44518900974..82e6efa2707d9 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -55,7 +55,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite protected val memOverheadConstant = 0.8 private val standardNonJVMMemory = s"${(1024 + 0.4*1024).toInt}Mi" protected val additionalMemory = 200 - private val extraTotalMemory = s"${(1024 + memOverheadConstant*1024 + additionalMemory).toInt}Mi" + // 209715200 is 200Mi + protected val additionalMemoryInBytes = 209715200 + private val extraDriverTotalMemory = s"${(1024 + memOverheadConstant*1024).toInt}Mi" + private val extraExecTotalMemory = + s"${(1024 + memOverheadConstant*1024 + additionalMemory).toInt}Mi" override def beforeAll(): Unit = { // The scalatest-maven-plugin gives system properties that are referenced but not set null @@ -285,12 +289,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite protected def doDriverMemoryCheck(driverPod: Pod): Unit = { assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount - === extraTotalMemory) + === extraDriverTotalMemory) } protected def doExecutorMemoryCheck(executorPod: Pod): Unit = { assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount - === extraTotalMemory) + === extraExecTotalMemory) } protected def checkCustomSettings(pod: Pod): Unit = { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala index fc30066421ff6..1ab23559d9b9d 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala @@ -78,15 +78,14 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") .set("spark.kubernetes.pyspark.pythonVersion", "3") .set("spark.kubernetes.memoryOverheadFactor", s"$memOverheadConstant") - .set("spark.executor.pyspark.memory", s"${additionalMemory}Mi") + .set("spark.executor.pyspark.memory", s"${additionalMemory}m") .set("spark.python.worker.reuse", "false") runSparkApplicationAndVerifyCompletion( - appResource = PYSPARK_FILES, + appResource = PYSPARK_MEMORY_CHECK, mainClass = "", expectedLogOnCompletion = Seq( - "Python runtime version check is: True", - "Python environment version check is: True"), - appArgs = Array("python3"), + "PySpark Worker Memory Check is: True"), + appArgs = Array(s"$additionalMemoryInBytes"), driverPodChecker = doDriverMemoryCheck, executorPodChecker = doExecutorMemoryCheck, appLocator = appLocator, @@ -100,5 +99,6 @@ private[spark] object PythonTestsSuite { val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py" val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py" val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + "py_container_checks.py" + val PYSPARK_MEMORY_CHECK: String = CONTAINER_LOCAL_PYSPARK + "worker_memory_check.py" } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala index 7b05c1355ca24..9b039bb98dd9a 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala @@ -53,6 +53,7 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite => .delete() } + // TODO: [SPARK-25291] This test is flaky with regards to memory of executors test("Run SparkPi with env and mount secrets.", k8sTestTag) { createTestSecret() sparkAppConf From 467703bfb8bde570b5b0cad11904cb92641dcd84 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 31 Aug 2018 17:30:41 -0400 Subject: [PATCH 4/7] resolve comments --- .../deploy/k8s/features/BasicExecutorFeatureStep.scala | 10 ++++++---- .../deploy/k8s/integrationtest/PythonTestsSuite.scala | 10 ++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 1a08fe4c76a4e..5b2d3a13feac3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -61,10 +61,12 @@ private[spark] class BasicExecutorFeatureStep( // TODO: Have memory limit checks on executorMemory private val executorMemoryTotal = kubernetesConf.sparkConf .getOption(APP_RESOURCE_TYPE.key).map{ res => - val additionalPySparkMemory = if (res == "python") { - kubernetesConf.sparkConf - .get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) - } else 0 + val additionalPySparkMemory = res match { + case "python" => + kubernetesConf.sparkConf + .get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) + case _ => 0 + } executorMemoryWithOverhead + additionalPySparkMemory }.getOrElse(executorMemoryWithOverhead) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala index 1ab23559d9b9d..9dad8171ad742 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala @@ -23,9 +23,11 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => import PythonTestsSuite._ import KubernetesSuite.k8sTestTag + private val pySparkDockerImage = + s"${getTestImageRepo}/spark-py:${getTestImageTag}" test("Run PySpark on simple pi.py example", k8sTestTag) { sparkAppConf - .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") + .set("spark.kubernetes.container.image", pySparkDockerImage) runSparkApplicationAndVerifyCompletion( appResource = PYSPARK_PI, mainClass = "", @@ -39,7 +41,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => test("Run PySpark with Python2 to test a pyfiles example", k8sTestTag) { sparkAppConf - .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") + .set("spark.kubernetes.container.image", pySparkDockerImage) .set("spark.kubernetes.pyspark.pythonVersion", "2") runSparkApplicationAndVerifyCompletion( appResource = PYSPARK_FILES, @@ -57,7 +59,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => test("Run PySpark with Python3 to test a pyfiles example", k8sTestTag) { sparkAppConf - .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") + .set("spark.kubernetes.container.image", pySparkDockerImage) .set("spark.kubernetes.pyspark.pythonVersion", "3") runSparkApplicationAndVerifyCompletion( appResource = PYSPARK_FILES, @@ -75,7 +77,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => test("Run PySpark with memory customization", k8sTestTag) { sparkAppConf - .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") + .set("spark.kubernetes.container.image", pySparkDockerImage) .set("spark.kubernetes.pyspark.pythonVersion", "3") .set("spark.kubernetes.memoryOverheadFactor", s"$memOverheadConstant") .set("spark.executor.pyspark.memory", s"${additionalMemory}m") From 3ad13242f0f5c5d34cda8ca5d0e0ad0b1bdbcead Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 31 Aug 2018 19:36:58 -0400 Subject: [PATCH 5/7] remove TODOs --- .../spark/deploy/k8s/features/BasicDriverFeatureStep.scala | 1 - .../spark/deploy/k8s/features/BasicExecutorFeatureStep.scala | 1 - 2 files changed, 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 21c0a9f92072c..575bc54ffe2bb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -51,7 +51,6 @@ private[spark] class BasicDriverFeatureStep( .get(DRIVER_MEMORY_OVERHEAD) .getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) - // TODO: Have memory limit checks on driverMemory private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB override def configurePod(pod: SparkPod): SparkPod = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 5b2d3a13feac3..d89995ba5e4f4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -58,7 +58,6 @@ private[spark] class BasicExecutorFeatureStep( (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB - // TODO: Have memory limit checks on executorMemory private val executorMemoryTotal = kubernetesConf.sparkConf .getOption(APP_RESOURCE_TYPE.key).map{ res => val additionalPySparkMemory = res match { From 7dc26ce7a06715a90950e57ee5519979fef4fc27 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Tue, 4 Sep 2018 18:42:30 -0400 Subject: [PATCH 6/7] reconfigure location of pyspark examples --- dev/make-distribution.sh | 1 + .../docker/src/main/dockerfiles/spark/Dockerfile | 1 + .../deploy/k8s/integrationtest/PythonTestsSuite.scala | 8 ++++---- .../integration-tests/tests}/py_container_checks.py | 0 .../kubernetes/integration-tests/tests}/pyfiles.py | 0 .../integration-tests/tests}/worker_memory_check.py | 2 +- 6 files changed, 7 insertions(+), 5 deletions(-) rename {examples/src/main/python => resource-managers/kubernetes/integration-tests/tests}/py_container_checks.py (100%) rename {examples/src/main/python => resource-managers/kubernetes/integration-tests/tests}/pyfiles.py (100%) rename {examples/src/main/python => resource-managers/kubernetes/integration-tests/tests}/worker_memory_check.py (97%) diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh index ad99ce55806af..778d376c12b56 100755 --- a/dev/make-distribution.sh +++ b/dev/make-distribution.sh @@ -192,6 +192,7 @@ fi if [ -d "$SPARK_HOME"/resource-managers/kubernetes/core/target/ ]; then mkdir -p "$DISTDIR/kubernetes/" cp -a "$SPARK_HOME"/resource-managers/kubernetes/docker/src/main/dockerfiles "$DISTDIR/kubernetes/" + cp -a "$SPARK_HOME"/resource-managers/kubernetes/integration-tests/tests "$DISTDIR/kubernetes/" fi # Copy examples and dependencies diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile index 42a670174eae1..a1fc06b3ed3b2 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile @@ -42,6 +42,7 @@ COPY bin /opt/spark/bin COPY sbin /opt/spark/sbin COPY ${img_path}/spark/entrypoint.sh /opt/ COPY examples /opt/spark/examples +COPY kubernetes/tests /opt/spark/tests COPY data /opt/spark/data ENV SPARK_HOME /opt/spark diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala index 9dad8171ad742..1f4238f8284dd 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala @@ -99,8 +99,8 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => private[spark] object PythonTestsSuite { val CONTAINER_LOCAL_PYSPARK: String = "local:///opt/spark/examples/src/main/python/" val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py" - val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py" - val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + "py_container_checks.py" - val PYSPARK_MEMORY_CHECK: String = CONTAINER_LOCAL_PYSPARK + "worker_memory_check.py" + val TEST_LOCAL_PYSPARK: String = "local:///opt/spark/tests/" + val PYSPARK_FILES: String = TEST_LOCAL_PYSPARK + "pyfiles.py" + val PYSPARK_CONTAINER_TESTS: String = TEST_LOCAL_PYSPARK + "py_container_checks.py" + val PYSPARK_MEMORY_CHECK: String = TEST_LOCAL_PYSPARK + "worker_memory_check.py" } - diff --git a/examples/src/main/python/py_container_checks.py b/resource-managers/kubernetes/integration-tests/tests/py_container_checks.py similarity index 100% rename from examples/src/main/python/py_container_checks.py rename to resource-managers/kubernetes/integration-tests/tests/py_container_checks.py diff --git a/examples/src/main/python/pyfiles.py b/resource-managers/kubernetes/integration-tests/tests/pyfiles.py similarity index 100% rename from examples/src/main/python/pyfiles.py rename to resource-managers/kubernetes/integration-tests/tests/pyfiles.py diff --git a/examples/src/main/python/worker_memory_check.py b/resource-managers/kubernetes/integration-tests/tests/worker_memory_check.py similarity index 97% rename from examples/src/main/python/worker_memory_check.py rename to resource-managers/kubernetes/integration-tests/tests/worker_memory_check.py index f21e211361cc2..d312a29f388e4 100644 --- a/examples/src/main/python/worker_memory_check.py +++ b/resource-managers/kubernetes/integration-tests/tests/worker_memory_check.py @@ -29,7 +29,7 @@ """ spark = SparkSession \ .builder \ - .appName("PyFilesTest") \ + .appName("PyMemoryTest") \ .getOrCreate() sc = spark.sparkContext if len(sys.argv) < 2: From ea25b8b14013d17fbafe5c338cca0eb28a490482 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 7 Sep 2018 20:18:13 -0400 Subject: [PATCH 7/7] remove worker reuse --- .../spark/deploy/k8s/integrationtest/PythonTestsSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala index 1f4238f8284dd..06b73107ec236 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala @@ -81,7 +81,6 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => .set("spark.kubernetes.pyspark.pythonVersion", "3") .set("spark.kubernetes.memoryOverheadFactor", s"$memOverheadConstant") .set("spark.executor.pyspark.memory", s"${additionalMemory}m") - .set("spark.python.worker.reuse", "false") runSparkApplicationAndVerifyCompletion( appResource = PYSPARK_MEMORY_CHECK, mainClass = "",