Skip to content

Commit

Permalink
Address TD's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Jun 30, 2015
1 parent 0449723 commit f1bf3c0
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ private class FlumeUtilsPythonHelper {
enableDecompression: Boolean
): JavaPairDStream[Array[Byte], Array[Byte]] = {
val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, enableDecompression)
FlumeUtilsPythonHelper.toDStreamForPython(dstream)
FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
}

def createPollingStream(
Expand All @@ -274,7 +274,7 @@ private class FlumeUtilsPythonHelper {
}
val dstream = FlumeUtils.createPollingStream(
jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
FlumeUtilsPythonHelper.toDStreamForPython(dstream)
FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
}

}
Expand All @@ -297,7 +297,7 @@ private object FlumeUtilsPythonHelper {
}
}

private def toDStreamForPython(dstream: JavaReceiverInputDStream[SparkFlumeEvent]):
private def toByteArrayPairDStream(dstream: JavaReceiverInputDStream[SparkFlumeEvent]):
JavaPairDStream[Array[Byte], Array[Byte]] = {
dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], Array[Byte]] {
override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], Array[Byte]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ import org.apache.spark.streaming.flume.sink.{SparkSinkConfig, SparkSink}
private[flume] class PollingFlumeTestUtils {

private val batchCount = 5
private val eventsPerBatch = 100
val eventsPerBatch = 100
private val totalEventsPerChannel = batchCount * eventsPerBatch
private val channelCapacity = 5000

def getEventsPerBatch: Int = eventsPerBatch

def getTotalEvents: Int = totalEventsPerChannel * channels.size

private val channels = new ArrayBuffer[MemoryChannel]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,9 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(this.getClass.getSimpleName)
.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")

var utils = new PollingFlumeTestUtils

def beforeFunction() {
logInfo("Using manual clock")
conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
}

before(beforeFunction())
val utils = new PollingFlumeTestUtils

test("flume polling test") {
testMultipleTimes(testFlumePolling)
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ def _writeAndVerify(self, ports):
dstream = FlumeUtils.createPollingStream(
ssc,
addresses,
maxBatchSize=self._utils.getEventsPerBatch(),
maxBatchSize=self._utils.eventsPerBatch(),
parallelism=5)
outputBuffer = []

Expand Down

0 comments on commit f1bf3c0

Please sign in to comment.