From d6e107977e791ca70d7b118739e7160f4dec1867 Mon Sep 17 00:00:00 2001 From: David McGuire Date: Fri, 17 Apr 2015 09:43:57 -0700 Subject: [PATCH] Stack overflow error in RateLimiter on rates over 1000/s --- .../org/apache/spark/streaming/ReceiverSuite.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 91261a9db7360..239f83d1c8efc 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -158,7 +158,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { test("block generator throttling") { val blockGeneratorListener = new FakeBlockGeneratorListener val blockIntervalMs = 100 - val maxRate = 100 + val maxRate = 1001 val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms"). set("spark.streaming.receiver.maxRate", maxRate.toString) val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) @@ -176,7 +176,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { blockGenerator.addData(count) generatedData += count count += 1 - Thread.sleep(1) + Thread.sleep(0) } blockGenerator.stop() @@ -186,16 +186,17 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { assert(recordedData.toSet === generatedData.toSet, "Received data not same") // recordedData size should be close to the expected rate - val minExpectedMessages = expectedMessages - 3 - val maxExpectedMessages = expectedMessages + 1 + // use an error margin proportional to the value, so that rate changes don't cause a brittle test + val minExpectedMessages = expectedMessages - 0.3 * expectedMessages + val maxExpectedMessages = expectedMessages + 0.1 * expectedMessages val numMessages = recordedData.size assert( numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" ) - val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 3 - val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 1 + val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.3 * expectedMessagesPerBlock + val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.1 * expectedMessagesPerBlock val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") assert( // the first and last block may be incomplete, so we slice them out