diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 4bdc021e0e26d..095bfb0c73a9a 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -257,7 +257,7 @@ private class FlumeUtilsPythonHelper { enableDecompression: Boolean ): JavaPairDStream[Array[Byte], Array[Byte]] = { val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, enableDecompression) - FlumeUtilsPythonHelper.toDStreamForPython(dstream) + FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream) } def createPollingStream( @@ -274,7 +274,7 @@ private class FlumeUtilsPythonHelper { } val dstream = FlumeUtils.createPollingStream( jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism) - FlumeUtilsPythonHelper.toDStreamForPython(dstream) + FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream) } } @@ -297,7 +297,7 @@ private object FlumeUtilsPythonHelper { } } - private def toDStreamForPython(dstream: JavaReceiverInputDStream[SparkFlumeEvent]): + private def toByteArrayPairDStream(dstream: JavaReceiverInputDStream[SparkFlumeEvent]): JavaPairDStream[Array[Byte], Array[Byte]] = { dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], Array[Byte]] { override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], Array[Byte]) = { diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala index ead2c8b71d626..91d63d49dbec3 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala @@ -37,12 +37,10 @@ import org.apache.spark.streaming.flume.sink.{SparkSinkConfig, SparkSink} private[flume] class PollingFlumeTestUtils { private val batchCount = 5 - private val eventsPerBatch = 100 + val eventsPerBatch = 100 private val totalEventsPerChannel = batchCount * eventsPerBatch private val channelCapacity = 5000 - def getEventsPerBatch: Int = eventsPerBatch - def getTotalEvents: Int = totalEventsPerChannel * channels.size private val channels = new ArrayBuffer[MemoryChannel] diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 7e9eb3b767068..7ad5af61ca0fb 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -42,15 +42,9 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log val conf = new SparkConf() .setMaster("local[2]") .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.clock", "org.apache.spark.util.ManualClock") - var utils = new PollingFlumeTestUtils - - def beforeFunction() { - logInfo("Using manual clock") - conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock") - } - - before(beforeFunction()) + val utils = new PollingFlumeTestUtils test("flume polling test") { testMultipleTimes(testFlumePolling) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 87c68efe9ebdd..188c8ff12067e 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -771,7 +771,7 @@ def _writeAndVerify(self, ports): dstream = FlumeUtils.createPollingStream( ssc, addresses, - maxBatchSize=self._utils.getEventsPerBatch(), + maxBatchSize=self._utils.eventsPerBatch(), parallelism=5) outputBuffer = []