Skip to content

Commit

Permalink
Stack overflow error in RateLimiter on rates over 1000/s
Browse files Browse the repository at this point in the history
  • Loading branch information
David McGuire committed Apr 17, 2015
1 parent 55f553a commit d6e1079
Showing 1 changed file with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
test("block generator throttling") {
val blockGeneratorListener = new FakeBlockGeneratorListener
val blockIntervalMs = 100
val maxRate = 100
val maxRate = 1001
val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms").
set("spark.streaming.receiver.maxRate", maxRate.toString)
val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
Expand All @@ -176,7 +176,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
blockGenerator.addData(count)
generatedData += count
count += 1
Thread.sleep(1)
Thread.sleep(0)
}
blockGenerator.stop()

Expand All @@ -186,16 +186,17 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
assert(recordedData.toSet === generatedData.toSet, "Received data not same")

// recordedData size should be close to the expected rate
val minExpectedMessages = expectedMessages - 3
val maxExpectedMessages = expectedMessages + 1
// use an error margin proportional to the value, so that rate changes don't cause a brittle test
val minExpectedMessages = expectedMessages - 0.3 * expectedMessages
val maxExpectedMessages = expectedMessages + 0.1 * expectedMessages
val numMessages = recordedData.size
assert(
numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages,
s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages"
)

val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 3
val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 1
val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.3 * expectedMessagesPerBlock
val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.1 * expectedMessagesPerBlock
val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",")
assert(
// the first and last block may be incomplete, so we slice them out
Expand Down

0 comments on commit d6e1079

Please sign in to comment.