diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 8f6251677ad05..b511cb6e2a115 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -257,6 +257,7 @@ class KubernetesSuite extends SparkFunSuite isJVM, pyFiles) + println("Running spark job.") val driverPod = kubernetesTestComponents.kubernetesClient .pods() .withLabel("spark-app-locator", appLocator) @@ -275,9 +276,11 @@ class KubernetesSuite extends SparkFunSuite override def onClose(cause: KubernetesClientException): Unit = logInfo("Ending watch of executors") override def eventReceived(action: Watcher.Action, resource: Pod): Unit = { + println("Event received.") val name = resource.getMetadata.getName action match { case Action.ADDED | Action.MODIFIED => + println("Add or modification event received.") execPods(name) = resource // If testing decomissioning delete the node 5 seconds after it starts running if (decomissioningTest) { @@ -296,6 +299,7 @@ class KubernetesSuite extends SparkFunSuite println(s"Pod: $name deleted") } case Action.DELETED | Action.ERROR => + println("Deleted or error event received.") execPods.remove(name) } } diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index d025ff6e4455d..a99e76d2ec2b4 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -27,6 +27,7 @@ """ Usage: decomissioning_water """ + print("Starting decom test") spark = SparkSession \ .builder \ .appName("PyMemoryTest") \ @@ -34,5 +35,7 @@ sc = spark._sc rdd = sc.parallelize(range(10)) rdd.collect() - time.sleep(30) + print("Waiting to give nodes time to finish.") + time.sleep(50) + spark.stop() sys.exit(0)