Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10649][STREAMING] Prevent inheriting job group and irrelevant job description in streaming jobs #8781

Closed
wants to merge 10 commits into from
59 changes: 59 additions & 0 deletions core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.spark.util
import java.util.concurrent._

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.control.NonFatal

import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}

Expand Down Expand Up @@ -86,4 +87,62 @@ private[spark] object ThreadUtils {
val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
Executors.newSingleThreadScheduledExecutor(threadFactory)
}

/**
* Run a piece of code in a new thread, and the get result. Exception in the new thread is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Run a piece of code in a new thread and return the result.

* thrown in the caller thread with an adjusted stack trace that removes references to this
* method for clarity. The exception stack traces will be like the following
*
* SomeException: exception-message
* at CallerClass.body-method (sourcefile.scala)
* at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
* at CallerClass.caller-method (sourcefile.scala)
* ...
*/
def runInNewThread[T](
threadName: String,
isDaemon: Boolean = true)(body: => T): T = {
@volatile var exception: Option[Throwable] = None
@volatile var result: T = null.asInstanceOf[T]

val thread = new Thread(threadName) {
override def run(): Unit = {
try {
result = body
} catch {
case NonFatal(e) =>
exception = Some(e)
}
}
}
thread.setDaemon(isDaemon)
thread.start()
thread.join()

exception match {
case Some(realException) =>
// Remove the part of the stack that shows method calls into this helper method
// This means drop everything from the top until the stack element
// ThreadUtils.runInNewThread(), and then drop that as well (hence the `drop(1)`).
val baseStackTrace = Thread.currentThread().getStackTrace().dropWhile(
! _.getClassName.contains(this.getClass.getSimpleName)).drop(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you add a comment about the magic number 1, such as remove "java.lang.Thread.getStackTrace"?


// Remove the part of the new thread stack that shows methods call from this helper method
val extraStackTrace = realException.getStackTrace.takeWhile(
! _.getClassName.contains(this.getClass.getSimpleName))

// Combine the two stack traces, with a place holder just specifying that there
// was a helper method used, without any further details of the helper
val placeHolderStackElem = new StackTraceElement(
s"... run in separate thread using ${ThreadUtils.getClass.getName.stripSuffix("$")} ..",
" ", "", -1)
val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ baseStackTrace

// Update the stack trace and rethrow the exception in the caller thread
realException.setStackTrace(finalStackTrace)
throw realException
case None =>
result
}
}
}
22 changes: 21 additions & 1 deletion core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.util

import java.util.concurrent.{CountDownLatch, TimeUnit}

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

import org.apache.spark.SparkFunSuite

Expand Down Expand Up @@ -66,4 +66,24 @@ class ThreadUtilsSuite extends SparkFunSuite {
val futureThreadName = Await.result(f, 10.seconds)
assert(futureThreadName === callerThreadName)
}

test("runInNewThread") {
import ThreadUtils._
assert(runInNewThread("thread-name") { Thread.currentThread().getName } === "thread-name")
assert(runInNewThread("thread-name") { Thread.currentThread().isDaemon } === true)
assert(
runInNewThread("thread-name", isDaemon = false) { Thread.currentThread().isDaemon } === false
)
val exception = intercept[IllegalArgumentException] {
runInNewThread("thread-name") { throw new IllegalArgumentException("test") }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some kind of more unique identifier here? I wouldn't be surprised if elsewhere we have test in the stack trace. Maybe just do "test-" + System.currentTimeMillis

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

}
assert(exception.asInstanceOf[IllegalArgumentException].getMessage.contains("test"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can update these 5 lines to:

    val exception = intercept[IllegalArgumentException] {
      runInNewThread("thread-name") { throw new IllegalArgumentException("test") }
    }
    assert(exception.getMessage.contains("test"))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true!

assert(exception.getStackTrace.mkString("\n").contains(
"... run in separate thread using org.apache.spark.util.ThreadUtils ...") === true,
"stack trace does not contain expected place holder"
)
assert(exception.getStackTrace.mkString("\n").contains("ThreadUtils.scala") === false,
"stack trace contains unexpected references to ThreadUtils"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.{CallSite, ShutdownHookManager, Utils}
import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils}

/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
Expand Down Expand Up @@ -588,12 +588,16 @@ class StreamingContext private[streaming] (
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
sparkContext.setCallSite(startSite.get)
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
scheduler.start()
ThreadUtils.runInNewThread("streaming-start") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this block definitely requires a paragraph comment. Someone new to the code will have no idea why we need to do these things in a new thread.

sparkContext.setCallSite(startSite.get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why we need to move this in here? It's an atomic reference so it doesn't really matter which thread reads it right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this sets a thread local variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an inheritable thread local variable, so it still doesn't matter

(anyway we can just keep this change, not a big deal, mainly just wondering)

sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use SparkContext.clearJobGroup here to avoid the duplicate codes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to set SPARK_JOB_INTERRUPT_ON_CANCEL to false, considering clearJobGroup will clean it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically I dont want to rely on the default value of this parameter, and explicitly specify that do not interrupt.

scheduler.start()
}
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
Expand All @@ -610,14 +614,15 @@ class StreamingContext private[streaming] (
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
this.logInfo("StreamingContext started")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad. Forgot to remove that.

case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
throw new IllegalStateException("StreamingContext has already been stopped")
}
}


/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,38 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
assert(ssc.scheduler.isStarted === false)
}

test("start should set job group correctly") {
ssc = new StreamingContext(conf, batchDuration)
ssc.sc.setJobGroup("non-streaming", "non-streaming", true)
val sc = ssc.sc

@volatile var jobGroupFound: String = ""
@volatile var jobDescFound: String = ""
@volatile var jobInterruptFound: String = ""
@volatile var allFound: Boolean = false

addInputStream(ssc).foreachRDD { rdd =>
jobGroupFound = sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID)
jobDescFound = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
jobInterruptFound = sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
allFound = true
}
ssc.start()

eventually(timeout(5 seconds), interval(10 milliseconds)) {
assert(allFound === true)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to use semaphores here or some kind of locking to make the test more robust? I'm worried that it could become another flaky test that we have to fix eventually anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can volatile lead to flakiness?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if for some reason we don't execute foreachRDD within the time frame

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well you have to set some time limit no matter what method you use to wait, isnt it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily. If it hangs forever Jenkins will time it out anyway. In general I'm not a huge fan of eventually's because these are usually the ones we have to end up fixing later. If you guess a timeout wrong then you'll have to try a slightly higher one after breaking many builds.

In this case it's probably OK to keep this as is, but I'm just saying I would have written the test differently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, in both cases, one has to go and fix the test :) Anyways not going into the argument of eventually vs block permanently. I think its better to use eventually with conservative timeouts, than block completely. I am fairly certain that this test is fine. Other tests in this suite also uses same eventually and they have not been flaky (the few times StreamingContextSuite has been flaky, they were for one test which had a real bug, not for eventually). I can make the timeout even more conservative, no harm in that.


// Verify streaming jobs have expected thread-local properties
assert(jobGroupFound === null)
assert(jobDescFound === null)
assert(jobInterruptFound === "false")

// Verify current thread's thread-local properties have not changed
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "non-streaming")
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) === "non-streaming")
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) === "true")
}

test("start multiple times") {
ssc = new StreamingContext(master, appName, batchDuration)
Expand Down