Skip to content

Commit

Permalink
Addressed PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Sep 21, 2015
1 parent d8600cf commit 7550490
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private[spark] object ThreadUtils {
}

/**
* Run a piece of code in a new thread, and the get result. Exception in the new thread is
* Run a piece of code in a new thread and return the result. Exception in the new thread is
* thrown in the caller thread with an adjusted stack trace that removes references to this
* method for clarity. The exception stack traces will be like the following
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.Random

import org.apache.spark.SparkFunSuite

Expand Down Expand Up @@ -74,10 +75,11 @@ class ThreadUtilsSuite extends SparkFunSuite {
assert(
runInNewThread("thread-name", isDaemon = false) { Thread.currentThread().isDaemon } === false
)
val uniqueExceptionMessage = "test" + Random.nextInt()
val exception = intercept[IllegalArgumentException] {
runInNewThread("thread-name") { throw new IllegalArgumentException("test") }
runInNewThread("thread-name") { throw new IllegalArgumentException(uniqueExceptionMessage) }
}
assert(exception.asInstanceOf[IllegalArgumentException].getMessage.contains("test"))
assert(exception.asInstanceOf[IllegalArgumentException].getMessage === uniqueExceptionMessage)
assert(exception.getStackTrace.mkString("\n").contains(
"... run in separate thread using org.apache.spark.util.ThreadUtils ...") === true,
"stack trace does not contain expected place holder"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
assert(ssc.scheduler.isStarted === false)
}

test("start should set job group correctly") {
test("start should set job group and description of streaming jobs correctly") {
ssc = new StreamingContext(conf, batchDuration)
ssc.sc.setJobGroup("non-streaming", "non-streaming", true)
val sc = ssc.sc
Expand All @@ -198,7 +198,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
}
ssc.start()

eventually(timeout(5 seconds), interval(10 milliseconds)) {
eventually(timeout(10 seconds), interval(10 milliseconds)) {
assert(allFound === true)
}

Expand Down

0 comments on commit 7550490

Please sign in to comment.