Skip to content

Commit

Permalink
[SPARK-10769] [STREAMING] [TESTS] Fix o.a.s.streaming.CheckpointSuite…
Browse files Browse the repository at this point in the history
….maintains rate controller

Fixed the following failure in https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1787/testReport/junit/org.apache.spark.streaming/CheckpointSuite/recovery_maintains_rate_controller/
```
sbt.ForkMain$ForkError: The code passed to eventually never returned normally. Attempted 660 times over 10.000044392000001 seconds. Last failure message: 9223372036854775807 did not equal 200.
	at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply$mcV$sp(CheckpointSuite.scala:413)
	at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
	at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
```

In this test, it calls `advanceTimeWithRealDelay(ssc, 2)` to run two batch jobs. However, one race condition is these two jobs can finish before the receiver is registered. Then `UpdateRateLimit` won't be sent to the receiver and `getDefaultBlockGeneratorRateLimit` cannot be updated.

Here are the logs related to this issue:
```
15/09/22 19:28:26.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock before advancing = 2500

15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Finished job streaming job 3000 ms.0 from job set of time 3000 ms
15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Total delay: 1442975303.869 s for time 3000 ms (execution: 0.711 s)

15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Finished job streaming job 3500 ms.0 from job set of time 3500 ms
15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Total delay: 1442975303.373 s for time 3500 ms (execution: 0.004 s)

15/09/22 19:28:26.879 sparkDriver-akka.actor.default-dispatcher-3 INFO ReceiverTracker: Registered receiver for stream 0 from localhost:57749

15/09/22 19:28:27.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock after advancing = 3500
```
`advanceTimeWithRealDelay(ssc, 2)` triggered job 3000ms and 3500ms but the receiver was registered after job 3000ms and 3500ms finished.

So we should make sure the receiver online before running `advanceTimeWithRealDelay(ssc, 2)`.

Author: zsxwing <zsxwing@gmail.com>

Closes apache#8877 from zsxwing/SPARK-10769.

(cherry picked from commit 50e4634)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
  • Loading branch information
zsxwing authored and tdas committed Sep 23, 2015
1 parent 6a616d0 commit 4174b94
Showing 1 changed file with 5 additions and 1 deletion.
Expand Up @@ -408,10 +408,14 @@ class CheckpointSuite extends TestSuiteBase {

ssc = new StreamingContext(checkpointDir)
ssc.start()
val outputNew = advanceTimeWithRealDelay(ssc, 2)

eventually(timeout(10.seconds)) {
assert(RateTestReceiver.getActive().nonEmpty)
}

advanceTimeWithRealDelay(ssc, 2)

eventually(timeout(10.seconds)) {
assert(RateTestReceiver.getActive().get.getDefaultBlockGeneratorRateLimit() === 200)
}
ssc.stop()
Expand Down

0 comments on commit 4174b94

Please sign in to comment.