From 2707dee967fd6c4cebbe96cc7ae40feb5bfced24 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Sun, 17 Jun 2018 22:48:24 -0400 Subject: [PATCH 1/3] Fixed Remote File tests and added PySpark tests --- .../k8s/integrationtest/KubernetesSuite.scala | 171 +++++++++++++++--- .../KubernetesTestComponents.scala | 31 ++-- 2 files changed, 167 insertions(+), 35 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 65c513cf241a4..a62f57088ecfb 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -24,7 +24,7 @@ import java.util.regex.Pattern import scala.collection.JavaConverters._ import com.google.common.io.PatternFilenameFilter -import io.fabric8.kubernetes.api.model.{Container, Pod} +import io.fabric8.kubernetes.api.model.Pod import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} @@ -43,6 +43,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite private var kubernetesTestComponents: KubernetesTestComponents = _ private var sparkAppConf: SparkAppConf = _ private var image: String = _ + private var pyImage: String = _ private var containerLocalSparkDistroExamplesJar: String = _ private var appLocator: String = _ private var driverPodName: String = _ @@ -65,6 +66,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite val imageTag = getTestImageTag val imageRepo = getTestImageRepo image = s"$imageRepo/spark:$imageTag" + pyImage = s"$imageRepo/spark-py:$imageTag" val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars")) .toFile @@ -150,22 +152,77 @@ private[spark] class KubernetesSuite extends SparkFunSuite }) } - // TODO(ssuchter): Enable the below after debugging - // test("Run PageRank using remote data file") { - // sparkAppConf - // .set("spark.kubernetes.mountDependencies.filesDownloadDir", - // CONTAINER_LOCAL_FILE_DOWNLOAD_PATH) - // .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE) - // runSparkPageRankAndVerifyCompletion( - // appArgs = Array(CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE)) - // } + test("Run extraJVMOptions check on driver") { + sparkAppConf + .set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar") + runSparkJVMCheckAndVerifyCompletion( + expectedJVMValue = Seq("(spark.test.foo,spark.test.bar)")) + } + + test("Run SparkRemoteFileTest using a remote data file") { + sparkAppConf + .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE) + runSparkRemoteCheckAndVerifyCompletion( + appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME)) + } + + test("Run PySpark on simple pi.py example") { + sparkAppConf + .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") + runSparkApplicationAndVerifyCompletion( + appResource = PYSPARK_PI, + mainClass = "", + expectedLogOnCompletion = Seq("Pi is roughly 3"), + appArgs = Array("5"), + driverPodChecker = doBasicDriverPyPodCheck, + executorPodChecker = doBasicExecutorPyPodCheck, + appLocator = appLocator, + isJVM = false) + } + + test("Run PySpark with Python2 to test a pyfiles example") { + sparkAppConf + .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") + .set("spark.kubernetes.pyspark.pythonversion", "2") + runSparkApplicationAndVerifyCompletion( + appResource = PYSPARK_FILES, + mainClass = "", + expectedLogOnCompletion = Seq( + "Python runtime version check is: True", + "Python environment version check is: True"), + appArgs = Array("python"), + driverPodChecker = doBasicDriverPyPodCheck, + executorPodChecker = doBasicExecutorPyPodCheck, + appLocator = appLocator, + isJVM = false, + pyFiles = Some(PYSPARK_CONTAINER_TESTS)) + } + + test("Run PySpark with Python3 to test a pyfiles example") { + sparkAppConf + .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") + .set("spark.kubernetes.pyspark.pythonversion", "3") + runSparkApplicationAndVerifyCompletion( + appResource = PYSPARK_FILES, + mainClass = "", + expectedLogOnCompletion = Seq( + "Python runtime version check is: True", + "Python environment version check is: True"), + appArgs = Array("python3"), + driverPodChecker = doBasicDriverPyPodCheck, + executorPodChecker = doBasicExecutorPyPodCheck, + appLocator = appLocator, + isJVM = false, + pyFiles = Some(PYSPARK_CONTAINER_TESTS)) + } private def runSparkPiAndVerifyCompletion( appResource: String = containerLocalSparkDistroExamplesJar, driverPodChecker: Pod => Unit = doBasicDriverPodCheck, executorPodChecker: Pod => Unit = doBasicExecutorPodCheck, appArgs: Array[String] = Array.empty[String], - appLocator: String = appLocator): Unit = { + appLocator: String = appLocator, + isJVM: Boolean = true ): Unit = { runSparkApplicationAndVerifyCompletion( appResource, SPARK_PI_MAIN_CLASS, @@ -173,10 +230,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite appArgs, driverPodChecker, executorPodChecker, - appLocator) + appLocator, + isJVM) } - private def runSparkPageRankAndVerifyCompletion( + private def runSparkRemoteCheckAndVerifyCompletion( appResource: String = containerLocalSparkDistroExamplesJar, driverPodChecker: Pod => Unit = doBasicDriverPodCheck, executorPodChecker: Pod => Unit = doBasicExecutorPodCheck, @@ -184,12 +242,50 @@ private[spark] class KubernetesSuite extends SparkFunSuite appLocator: String = appLocator): Unit = { runSparkApplicationAndVerifyCompletion( appResource, - SPARK_PAGE_RANK_MAIN_CLASS, - Seq("1 has rank", "2 has rank", "3 has rank", "4 has rank"), + SPARK_REMOTE_MAIN_CLASS, + Seq(s"Mounting of ${appArgs.head} was true"), appArgs, driverPodChecker, executorPodChecker, - appLocator) + appLocator, + true) + } + + private def runSparkJVMCheckAndVerifyCompletion( + appResource: String = containerLocalSparkDistroExamplesJar, + mainClass: String = SPARK_DRIVER_MAIN_CLASS, + driverPodChecker: Pod => Unit = doBasicDriverPodCheck, + appArgs: Array[String] = Array("5"), + expectedJVMValue: Seq[String]): Unit = { + val appArguments = SparkAppArguments( + mainAppResource = appResource, + mainClass = mainClass, + appArgs = appArgs) + SparkAppLauncher.launch( + appArguments, + sparkAppConf, + TIMEOUT.value.toSeconds.toInt, + sparkHomeDir, + true) + + val driverPod = kubernetesTestComponents.kubernetesClient + .pods() + .withLabel("spark-app-locator", appLocator) + .withLabel("spark-role", "driver") + .list() + .getItems + .get(0) + doBasicDriverPodCheck(driverPod) + + Eventually.eventually(TIMEOUT, INTERVAL) { + expectedJVMValue.foreach { e => + assert(kubernetesTestComponents.kubernetesClient + .pods() + .withName(driverPod.getMetadata.getName) + .getLog + .contains(e), "The application did not complete.") + } + } } private def runSparkApplicationAndVerifyCompletion( @@ -199,12 +295,20 @@ private[spark] class KubernetesSuite extends SparkFunSuite appArgs: Array[String], driverPodChecker: Pod => Unit, executorPodChecker: Pod => Unit, - appLocator: String): Unit = { + appLocator: String, + isJVM: Boolean, + pyFiles: Option[String] = None): Unit = { val appArguments = SparkAppArguments( mainAppResource = appResource, mainClass = mainClass, appArgs = appArgs) - SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir) + SparkAppLauncher.launch( + appArguments, + sparkAppConf, + TIMEOUT.value.toSeconds.toInt, + sparkHomeDir, + isJVM, + pyFiles) val driverPod = kubernetesTestComponents.kubernetesClient .pods() @@ -242,11 +346,22 @@ private[spark] class KubernetesSuite extends SparkFunSuite assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver") } + private def doBasicDriverPyPodCheck(driverPod: Pod): Unit = { + assert(driverPod.getMetadata.getName === driverPodName) + assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage) + assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver") + } + private def doBasicExecutorPodCheck(executorPod: Pod): Unit = { assert(executorPod.getSpec.getContainers.get(0).getImage === image) assert(executorPod.getSpec.getContainers.get(0).getName === "executor") } + private def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = { + assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage) + assert(executorPod.getSpec.getContainers.get(0).getName === "executor") + } + private def checkCustomSettings(pod: Pod): Unit = { assert(pod.getMetadata.getLabels.get("label1") === "label1-value") assert(pod.getMetadata.getLabels.get("label2") === "label2-value") @@ -281,14 +396,22 @@ private[spark] object KubernetesSuite { val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi" + val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest" + val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest" val SPARK_PAGE_RANK_MAIN_CLASS: String = "org.apache.spark.examples.SparkPageRank" + val CONTAINER_LOCAL_PYSPARK: String = "local:///opt/spark/examples/src/main/python/" + val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py" + val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py" + val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + "py_container_checks.py" - // val CONTAINER_LOCAL_FILE_DOWNLOAD_PATH = "/var/spark-data/spark-files" + val TEST_SECRET_NAME_PREFIX = "test-secret-" + val TEST_SECRET_KEY = "test-key" + val TEST_SECRET_VALUE = "test-data" + val TEST_SECRET_MOUNT_PATH = "/etc/secrets" - // val REMOTE_PAGE_RANK_DATA_FILE = - // "https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt" - // val CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE = - // s"$CONTAINER_LOCAL_FILE_DOWNLOAD_PATH/pagerank_data.txt" + val REMOTE_PAGE_RANK_DATA_FILE = + "https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt" + val REMOTE_PAGE_RANK_FILE_NAME = "pagerank_data.txt" - // case object ShuffleNotReadyException extends Exception + case object ShuffleNotReadyException extends Exception } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index 48727142dd052..a9b49a8e5a610 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -97,24 +97,33 @@ private[spark] case class SparkAppArguments( appArgs: Array[String]) private[spark] object SparkAppLauncher extends Logging { - def launch( appArguments: SparkAppArguments, appConf: SparkAppConf, timeoutSecs: Int, - sparkHomeDir: Path): Unit = { + sparkHomeDir: Path, + isJVM: Boolean, + pyFiles: Option[String] = None): Unit = { val sparkSubmitExecutable = sparkHomeDir.resolve(Paths.get("bin", "spark-submit")) logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf") - val appArgsArray = - if (appArguments.appArgs.length > 0) Array(appArguments.appArgs.mkString(" ")) - else Array[String]() - val commandLine = (Array(sparkSubmitExecutable.toFile.getAbsolutePath, + val preCommandLine = if (isJVM) { + mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath, "--deploy-mode", "cluster", "--class", appArguments.mainClass, - "--master", appConf.get("spark.master") - ) ++ appConf.toStringArray :+ - appArguments.mainAppResource) ++ - appArgsArray - ProcessUtils.executeProcess(commandLine, timeoutSecs) + "--master", appConf.get("spark.master")) + } else { + mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath, + "--deploy-mode", "cluster", + "--master", appConf.get("spark.master")) + } + val commandLine = + pyFiles.map(s => preCommandLine ++ Array("--py-files", s)).getOrElse(preCommandLine) ++ + appConf.toStringArray :+ appArguments.mainAppResource + + if (appArguments.appArgs.nonEmpty) { + commandLine += appArguments.appArgs.mkString(" ") + } + logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}") + ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs) } } From dfd78d675046e4c5563a1ddf8e76542a57b71109 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 29 Jun 2018 12:29:53 +0300 Subject: [PATCH 2/3] modification to build args --- bin/docker-image-tool.sh | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh index a3f1bcffaea57..4a9cc7d0dd49c 100755 --- a/bin/docker-image-tool.sh +++ b/bin/docker-image-tool.sh @@ -63,10 +63,6 @@ function build { if [ ! -d "$IMG_PATH" ]; then error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark." fi - local BINDING_BUILD_ARGS=( - --build-arg - base_img=$(image_ref spark) - ) local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"} local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"} @@ -74,7 +70,7 @@ function build { -t $(image_ref spark) \ -f "$BASEDOCKERFILE" . - docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \ + docker build $NOCACHEARG "${BUILD_ARGS[@]}" \ -t $(image_ref spark-py) \ -f "$PYDOCKERFILE" . } From c7c35716015cb5b346b716bd4df8e7bf89de1628 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 29 Jun 2018 16:21:42 +0300 Subject: [PATCH 3/3] bring bag ENV_VARS into docker file --- bin/docker-image-tool.sh | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh index 4a9cc7d0dd49c..a3f1bcffaea57 100755 --- a/bin/docker-image-tool.sh +++ b/bin/docker-image-tool.sh @@ -63,6 +63,10 @@ function build { if [ ! -d "$IMG_PATH" ]; then error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark." fi + local BINDING_BUILD_ARGS=( + --build-arg + base_img=$(image_ref spark) + ) local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"} local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"} @@ -70,7 +74,7 @@ function build { -t $(image_ref spark) \ -f "$BASEDOCKERFILE" . - docker build $NOCACHEARG "${BUILD_ARGS[@]}" \ + docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \ -t $(image_ref spark-py) \ -f "$PYDOCKERFILE" . }