From 7f6572ea530065b2777a96442c57f3029b66dfa1 Mon Sep 17 00:00:00 2001 From: Yong Tang Date: Mon, 11 Apr 2016 12:24:38 -0700 Subject: [PATCH] [SPARK-14531][STREAMING] Flume streaming should respect maxRate At the moment Flume streaming set the default polling max batch size as DEFAULT_POLLING_BATCH_SIZE (1000) even if an option "spark.streaming.receiver.maxRate" is passed. This patch tries to address this issue by respecting the option "spark.streaming.receiver.maxRate" in FlumeUtils.scala. A test case is added to cover the this change. --- .../spark/streaming/flume/FlumeUtils.scala | 6 ++++-- .../streaming/flume/FlumeStreamSuite.scala | 17 +++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 3e3ed712f0dbf..300dd789ddd93 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -147,7 +147,8 @@ object FlumeUtils { storageLevel: StorageLevel ): ReceiverInputDStream[SparkFlumeEvent] = { createPollingStream(ssc, addresses, storageLevel, - DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM) + ssc.conf.getInt("spark.streaming.receiver.maxRate", DEFAULT_POLLING_BATCH_SIZE), + DEFAULT_POLLING_PARALLELISM) } /** @@ -217,7 +218,8 @@ object FlumeUtils { storageLevel: StorageLevel ): JavaReceiverInputDStream[SparkFlumeEvent] = { createPollingStream(jssc, addresses, storageLevel, - DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM) + jssc.ssc.conf.getInt("spark.streaming.receiver.maxRate", DEFAULT_POLLING_BATCH_SIZE), + DEFAULT_POLLING_PARALLELISM) } /** diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 7bac1cc4b0ae7..d7977ea3d1c63 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -48,6 +48,23 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w testFlumeStream(testCompression = true) } + test("flume input with maxBatchSize - SPARK-14531") { + val utils = new FlumeTestUtils + try { + conf.set("spark.streaming.receiver.maxRate", "500") + ssc = new StreamingContext(conf, Milliseconds(200)) + val flumePollingStream1 = FlumeUtils.createPollingStream( + ssc, "localhost", utils.getTestPort()) + assert(flumePollingStream1.getReceiver().asInstanceOf[FlumePollingReceiver] + .getMaxBatchSize == 500) + } finally { + if (ssc != null) { + ssc.stop() + } + utils.close() + } + } + /** Run test on flume stream */ private def testFlumeStream(testCompression: Boolean): Unit = { val input = (1 to 100).map { _.toString }