Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Try and debug the tests some more.
  • Loading branch information
holdenk committed Dec 28, 2018
1 parent c3c0e3a commit 0bf027a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
Expand Up @@ -257,6 +257,7 @@ class KubernetesSuite extends SparkFunSuite
isJVM,
pyFiles)

println("Running spark job.")
val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
.withLabel("spark-app-locator", appLocator)
Expand All @@ -275,9 +276,11 @@ class KubernetesSuite extends SparkFunSuite
override def onClose(cause: KubernetesClientException): Unit =
logInfo("Ending watch of executors")
override def eventReceived(action: Watcher.Action, resource: Pod): Unit = {
println("Event received.")
val name = resource.getMetadata.getName
action match {
case Action.ADDED | Action.MODIFIED =>
println("Add or modification event received.")
execPods(name) = resource
// If testing decomissioning delete the node 5 seconds after it starts running
if (decomissioningTest) {
Expand All @@ -296,6 +299,7 @@ class KubernetesSuite extends SparkFunSuite
println(s"Pod: $name deleted")
}
case Action.DELETED | Action.ERROR =>
println("Deleted or error event received.")
execPods.remove(name)
}
}
Expand Down
Expand Up @@ -27,12 +27,15 @@
"""
Usage: decomissioning_water
"""
print("Starting decom test")
spark = SparkSession \
.builder \
.appName("PyMemoryTest") \
.getOrCreate()
sc = spark._sc
rdd = sc.parallelize(range(10))
rdd.collect()
time.sleep(30)
print("Waiting to give nodes time to finish.")
time.sleep(50)
spark.stop()
sys.exit(0)

0 comments on commit 0bf027a

Please sign in to comment.