Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10769][Streaming][Tests]Fix o.a.s.streaming.CheckpointSuite.maintains rate controller #8877

Closed
wants to merge 1 commit into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Sep 23, 2015

Fixed the following failure in https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1787/testReport/junit/org.apache.spark.streaming/CheckpointSuite/recovery_maintains_rate_controller/

sbt.ForkMain$ForkError: The code passed to eventually never returned normally. Attempted 660 times over 10.000044392000001 seconds. Last failure message: 9223372036854775807 did not equal 200.
    at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
    at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
    at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
    at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336)
    at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
    at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply$mcV$sp(CheckpointSuite.scala:413)
    at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
    at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
    at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)

In this test, it calls advanceTimeWithRealDelay(ssc, 2) to run two batch jobs. However, one race condition is these two jobs can finish before the receiver is registered. Then UpdateRateLimit won't be sent to the receiver and getDefaultBlockGeneratorRateLimit cannot be updated.

Here are the logs related to this issue:

15/09/22 19:28:26.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock before advancing = 2500

15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Finished job streaming job 3000 ms.0 from job set of time 3000 ms
15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Total delay: 1442975303.869 s for time 3000 ms (execution: 0.711 s)

15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Finished job streaming job 3500 ms.0 from job set of time 3500 ms
15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Total delay: 1442975303.373 s for time 3500 ms (execution: 0.004 s)

15/09/22 19:28:26.879 sparkDriver-akka.actor.default-dispatcher-3 INFO ReceiverTracker: Registered receiver for stream 0 from localhost:57749

15/09/22 19:28:27.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock after advancing = 3500

advanceTimeWithRealDelay(ssc, 2) triggered job 3000ms and 3500ms but the receiver was registered after job 3000ms and 3500ms finished.

So we should make sure the receiver online before running advanceTimeWithRealDelay(ssc, 2).

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #42890 has finished for PR 8877 at commit c4ee299.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Sep 23, 2015

test this again

@tdas
Copy link
Contributor

tdas commented Sep 23, 2015

test this please

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #1794 has finished for PR 8877 at commit c4ee299.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #42894 has finished for PR 8877 at commit c4ee299.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Sep 23, 2015

Merging this master and 1.5 branch.

asfgit pushed a commit that referenced this pull request Sep 23, 2015
….maintains rate controller

Fixed the following failure in https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1787/testReport/junit/org.apache.spark.streaming/CheckpointSuite/recovery_maintains_rate_controller/
```
sbt.ForkMain$ForkError: The code passed to eventually never returned normally. Attempted 660 times over 10.000044392000001 seconds. Last failure message: 9223372036854775807 did not equal 200.
	at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply$mcV$sp(CheckpointSuite.scala:413)
	at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
	at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
```

In this test, it calls `advanceTimeWithRealDelay(ssc, 2)` to run two batch jobs. However, one race condition is these two jobs can finish before the receiver is registered. Then `UpdateRateLimit` won't be sent to the receiver and `getDefaultBlockGeneratorRateLimit` cannot be updated.

Here are the logs related to this issue:
```
15/09/22 19:28:26.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock before advancing = 2500

15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Finished job streaming job 3000 ms.0 from job set of time 3000 ms
15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Total delay: 1442975303.869 s for time 3000 ms (execution: 0.711 s)

15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Finished job streaming job 3500 ms.0 from job set of time 3500 ms
15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Total delay: 1442975303.373 s for time 3500 ms (execution: 0.004 s)

15/09/22 19:28:26.879 sparkDriver-akka.actor.default-dispatcher-3 INFO ReceiverTracker: Registered receiver for stream 0 from localhost:57749

15/09/22 19:28:27.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock after advancing = 3500
```
`advanceTimeWithRealDelay(ssc, 2)` triggered job 3000ms and 3500ms but the receiver was registered after job 3000ms and 3500ms finished.

So we should make sure the receiver online before running `advanceTimeWithRealDelay(ssc, 2)`.

Author: zsxwing <zsxwing@gmail.com>

Closes #8877 from zsxwing/SPARK-10769.

(cherry picked from commit 50e4634)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in 50e4634 Sep 23, 2015
@zsxwing zsxwing deleted the SPARK-10769 branch September 23, 2015 12:43
ashangit pushed a commit to ashangit/spark that referenced this pull request Oct 19, 2016
….maintains rate controller

Fixed the following failure in https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1787/testReport/junit/org.apache.spark.streaming/CheckpointSuite/recovery_maintains_rate_controller/
```
sbt.ForkMain$ForkError: The code passed to eventually never returned normally. Attempted 660 times over 10.000044392000001 seconds. Last failure message: 9223372036854775807 did not equal 200.
	at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply$mcV$sp(CheckpointSuite.scala:413)
	at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
	at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
```

In this test, it calls `advanceTimeWithRealDelay(ssc, 2)` to run two batch jobs. However, one race condition is these two jobs can finish before the receiver is registered. Then `UpdateRateLimit` won't be sent to the receiver and `getDefaultBlockGeneratorRateLimit` cannot be updated.

Here are the logs related to this issue:
```
15/09/22 19:28:26.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock before advancing = 2500

15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Finished job streaming job 3000 ms.0 from job set of time 3000 ms
15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Total delay: 1442975303.869 s for time 3000 ms (execution: 0.711 s)

15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Finished job streaming job 3500 ms.0 from job set of time 3500 ms
15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Total delay: 1442975303.373 s for time 3500 ms (execution: 0.004 s)

15/09/22 19:28:26.879 sparkDriver-akka.actor.default-dispatcher-3 INFO ReceiverTracker: Registered receiver for stream 0 from localhost:57749

15/09/22 19:28:27.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock after advancing = 3500
```
`advanceTimeWithRealDelay(ssc, 2)` triggered job 3000ms and 3500ms but the receiver was registered after job 3000ms and 3500ms finished.

So we should make sure the receiver online before running `advanceTimeWithRealDelay(ssc, 2)`.

Author: zsxwing <zsxwing@gmail.com>

Closes apache#8877 from zsxwing/SPARK-10769.

(cherry picked from commit 50e4634)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
(cherry picked from commit 4174b94)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants