From 33bf91eefc42fd09fa977c655a776c05733e1e94 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Thu, 13 Sep 2018 16:22:48 -0400 Subject: [PATCH 1/3] fixing flakiness by adding watchers to executor pods --- .../k8s/integrationtest/KubernetesSuite.scala | 32 +++++++++++++------ .../KubernetesTestComponents.scala | 1 - .../integrationtest/SecretsTestsSuite.scala | 3 +- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 82e6efa2707d9..f734cce514c63 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -23,18 +23,23 @@ import java.util.regex.Pattern import com.google.common.io.PatternFilenameFilter import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Tag} import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.Matchers import org.scalatest.time.{Minutes, Seconds, Span} import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.k8s.integrationtest.TestConfig._ import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} +import org.apache.spark.internal.Logging private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite - with PythonTestsSuite with ClientModeTestsSuite { + with PythonTestsSuite with ClientModeTestsSuite + with Logging with Eventually with Matchers { import KubernetesSuite._ @@ -218,17 +223,25 @@ private[spark] class KubernetesSuite extends SparkFunSuite .getItems .get(0) driverPodChecker(driverPod) - - val executorPods = kubernetesTestComponents.kubernetesClient + val execPods = scala.collection.mutable.Stack[Pod]() + val execWatcher = kubernetesTestComponents.kubernetesClient .pods() .withLabel("spark-app-locator", appLocator) .withLabel("spark-role", "executor") - .list() - .getItems - executorPods.asScala.foreach { pod => - executorPodChecker(pod) - } - + .watch(new Watcher[Pod] { + logInfo("Beginning watch of executors") + override def onClose(cause: KubernetesClientException): Unit = + logInfo("Ending watch of executors") + override def eventReceived(action: Watcher.Action, resource: Pod): Unit = { + action match { + case Action.ADDED | Action.MODIFIED => + execPods.push(resource) + } + } + }) + Eventually.eventually(TIMEOUT, INTERVAL) { execPods.nonEmpty should be (true) } + execWatcher.close() + executorPodChecker(execPods.pop()) Eventually.eventually(TIMEOUT, INTERVAL) { expectedLogOnCompletion.foreach { e => assert(kubernetesTestComponents.kubernetesClient @@ -239,7 +252,6 @@ private[spark] class KubernetesSuite extends SparkFunSuite } } } - protected def doBasicDriverPodCheck(driverPod: Pod): Unit = { assert(driverPod.getMetadata.getName === driverPodName) assert(driverPod.getSpec.getContainers.get(0).getImage === image) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index b602fdf39731f..5615d6173eebd 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -62,7 +62,6 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl new SparkAppConf() .set("spark.master", s"k8s://${kubernetesClient.getMasterUrl}") .set("spark.kubernetes.namespace", namespace) - .set("spark.executor.memory", "500m") .set("spark.executor.cores", "1") .set("spark.executors.instances", "1") .set("spark.app.name", "spark-test-app") diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala index 9b039bb98dd9a..b18a6aebda497 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.integrationtest import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{Pod, Secret, SecretBuilder} +import io.fabric8.kubernetes.api.model.{Pod, SecretBuilder} import org.apache.commons.codec.binary.Base64 import org.apache.commons.io.output.ByteArrayOutputStream import org.scalatest.concurrent.Eventually @@ -53,7 +53,6 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite => .delete() } - // TODO: [SPARK-25291] This test is flaky with regards to memory of executors test("Run SparkPi with env and mount secrets.", k8sTestTag) { createTestSecret() sparkAppConf From 1c66537b9c514dfd73a86c967478f4a6067ac332 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Thu, 13 Sep 2018 16:39:13 -0400 Subject: [PATCH 2/3] style check --- .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index f734cce514c63..cf1855999dd54 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -26,8 +26,8 @@ import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Tag} -import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.Matchers +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} import scala.collection.JavaConverters._ From 9b00bc4b87020f36504039ba2f8c50d6035f99c1 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 17 Sep 2018 10:38:35 -0400 Subject: [PATCH 3/3] foreach over list of executors --- .../deploy/k8s/integrationtest/KubernetesSuite.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index cf1855999dd54..e6840ce818c1f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -223,7 +223,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite .getItems .get(0) driverPodChecker(driverPod) - val execPods = scala.collection.mutable.Stack[Pod]() + val execPods = scala.collection.mutable.Map[String, Pod]() val execWatcher = kubernetesTestComponents.kubernetesClient .pods() .withLabel("spark-app-locator", appLocator) @@ -233,15 +233,18 @@ private[spark] class KubernetesSuite extends SparkFunSuite override def onClose(cause: KubernetesClientException): Unit = logInfo("Ending watch of executors") override def eventReceived(action: Watcher.Action, resource: Pod): Unit = { + val name = resource.getMetadata.getName action match { case Action.ADDED | Action.MODIFIED => - execPods.push(resource) + execPods(name) = resource + case Action.DELETED | Action.ERROR => + execPods.remove(name) } } }) - Eventually.eventually(TIMEOUT, INTERVAL) { execPods.nonEmpty should be (true) } + Eventually.eventually(TIMEOUT, INTERVAL) { execPods.values.nonEmpty should be (true) } execWatcher.close() - executorPodChecker(execPods.pop()) + execPods.values.foreach(executorPodChecker(_)) Eventually.eventually(TIMEOUT, INTERVAL) { expectedLogOnCompletion.foreach { e => assert(kubernetesTestComponents.kubernetesClient