Skip to content

Commit

Permalink
Stop JVM StreamingContext when the Python process is dead
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Sep 22, 2016
1 parent 8c3ee2b commit cd326b4
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 2 deletions.
Expand Up @@ -24,11 +24,14 @@ import java.util.{ArrayList => JArrayList, List => JList}
import scala.collection.JavaConverters._
import scala.language.existentials

import py4j.Py4JException

import org.apache.spark.SparkException
import org.apache.spark.api.java._
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Duration, Interval, Time}
import org.apache.spark.streaming.{Duration, Interval, StreamingContext, Time}
import org.apache.spark.streaming.api.java._
import org.apache.spark.streaming.dstream._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -157,7 +160,7 @@ private[python] object PythonTransformFunctionSerializer {
/**
* Helper functions, which are called from Python via Py4J.
*/
private[python] object PythonDStream {
private[streaming] object PythonDStream {

/**
* can not access PythonTransformFunctionSerializer.register() via Py4j
Expand All @@ -184,6 +187,32 @@ private[python] object PythonDStream {
rdds.asScala.foreach(queue.add)
queue
}

/**
* Stop [[StreamingContext]] if the Python process crashes (E.g., OOM) in case the user cannot
* stop it in the Python side.
*/
def stopStreamingContextIfPythonProcessIsDead(e: Throwable): Unit = {
// These two special messages are from:
// scalastyle:off
// https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L218
// https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L340
// scalastyle:on
if (e.isInstanceOf[Py4JException] &&
("Cannot obtain a new communication channel" == e.getMessage ||
"Error while obtaining a new communication channel" == e.getMessage)) {
// Start a new thread to stop StreamingContext to avoid deadlock.
new Thread("Stop-StreamingContext") with Logging {
setDaemon(true)

override def run(): Unit = {
logError(
"Cannot connect to Python process. It's probably dead. Stopping StreamingContext.", e)
StreamingContext.getActive().foreach(_.stop(stopSparkContext = false))
}
}.start()
}
}
}

/**
Expand Down
Expand Up @@ -22,6 +22,7 @@ import scala.util.{Failure, Success, Try}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
import org.apache.spark.streaming.api.python.PythonDStream
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils}

Expand Down Expand Up @@ -252,6 +253,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
Expand Down
Expand Up @@ -27,6 +27,7 @@ import org.apache.commons.lang3.SerializationUtils
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.api.python.PythonDStream
import org.apache.spark.streaming.ui.UIUtils
import org.apache.spark.util.{EventLoop, ThreadUtils}

Expand Down Expand Up @@ -210,6 +211,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
private def handleError(msg: String, e: Throwable) {
logError(msg, e)
ssc.waiter.notifyError(e)
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
}

private class JobHandler(job: Job) extends Runnable with Logging {
Expand Down

0 comments on commit cd326b4

Please sign in to comment.