Skip to content

Commit

Permalink
WIP: solved partitioned and None is not recognized
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Aug 14, 2014
1 parent 90a6484 commit 0704b86
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 12 deletions.
20 changes: 19 additions & 1 deletion python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,36 @@ def _testInputStream(self, test_inputs, numSlices=None):

# Make sure we distribute data evenly if it's smaller than self.batchSize
if "__len__" not in dir(test_input):
c = list(test_input) # Make it a list so we can compute its length
test_input = list(test_input) # Make it a list so we can compute its length
batchSize = min(len(test_input) // numSlices, self._sc._batchSize)
if batchSize > 1:
serializer = BatchedSerializer(self._sc._unbatched_serializer,
batchSize)
else:
serializer = self._sc._unbatched_serializer
serializer.dump_stream(test_input, tempFile)
tempFile.close()
tempFiles.append(tempFile.name)

jtempFiles = ListConverter().convert(tempFiles, SparkContext._gateway._gateway_client)
jinput_stream = self._jvm.PythonTestInputStream(self._jssc,
jtempFiles,
numSlices).asJavaDStream()
return DStream(jinput_stream, self, PickleSerializer())


def _testInputStream2(self, test_inputs, numSlices=None):
"""
This is inpired by QueStream implementation. Give list of RDD and generate DStream
which contain the RDD.
"""
test_rdds = list()
for test_input in test_inputs:
test_rdd = self._sc.parallelize(test_input, numSlices)
print test_rdd.glom().collect()
test_rdds.append(test_rdd._jrdd)

jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
jinput_stream = self._jvm.PythonTestInputStream2(self._jssc, jtest_rdds).asJavaDStream()

return DStream(jinput_stream, self, BatchedSerializer(PickleSerializer()))
16 changes: 16 additions & 0 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ def takeAndPrint(rdd, time):
taken = rdd.take(11)
print "-------------------------------------------"
print "Time: %s" % (str(time))
print rdd.glom().collect()
print "-------------------------------------------"
print "-------------------------------------------"
for record in taken[:10]:
print record
Expand Down Expand Up @@ -288,6 +290,20 @@ def get_output(rdd, time):
self.foreachRDD(get_output)


# TODO: implement groupByKey
# TODO: impelment union
# TODO: implement cache
# TODO: implement persist
# TODO: implement repertitions
# TODO: implement saveAsTextFile
# TODO: implement cogroup
# TODO: implement join
# TODO: implement countByValue
# TODO: implement leftOuterJoin
# TODO: implemtnt rightOuterJoin



class PipelinedDStream(DStream):
def __init__(self, prev, func, preservesPartitioning=False):
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():
Expand Down
23 changes: 13 additions & 10 deletions python/pyspark/streaming_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ class TestBasicOperationsSuite(PySparkStreamingTestCase):
"""
def setUp(self):
PySparkStreamingTestCase.setUp(self)
StreamOutput.result = list()
self.timeout = 10 # seconds
self.numInputPartitions = 2
self.result = list()

def tearDown(self):
PySparkStreamingTestCase.tearDown(self)
Expand Down Expand Up @@ -137,6 +138,8 @@ def test_reduceByKey(self):
test_input = [["a", "a", "b"], ["", ""], []]

def test_func(dstream):
print "reduceByKey"
dstream.map(lambda x: (x, 1)).pyprint()
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add)
expected_output = [[("a", 2), ("b", 1)], [("", 2)], []]
output = self._run_stream(test_input, test_func, expected_output)
Expand Down Expand Up @@ -168,9 +171,8 @@ def test_glom(self):
numSlices = 2

def test_func(dstream):
dstream.pyprint()
return dstream.glom()
expected_output = [[[1,2], [3,4]],[[5,6], [7,8]],[[9,10], [11,12]]]
expected_output = [[[1,2], [3,4]], [[5,6], [7,8]], [[9,10], [11,12]]]
output = self._run_stream(test_input, test_func, expected_output, numSlices)
self.assertEqual(expected_output, output)

Expand All @@ -180,20 +182,21 @@ def test_mapPartitions(self):
numSlices = 2

def test_func(dstream):
dstream.pyprint()
return dstream.mapPartitions(lambda x: reduce(operator.add, x))
expected_output = [[3, 7],[11, 15],[19, 23]]
def f(iterator): yield sum(iterator)
return dstream.mapPartitions(f)
expected_output = [[3, 7], [11, 15], [19, 23]]
output = self._run_stream(test_input, test_func, expected_output, numSlices)
self.assertEqual(expected_output, output)

def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
"""Start stream and return the output"""
# Generate input stream with user-defined input
test_input_stream = self.ssc._testInputStream(test_input, numSlices)
numSlices = numSlices or self.numInputPartitions
test_input_stream = self.ssc._testInputStream2(test_input, numSlices)
# Apply test function to stream
test_stream = test_func(test_input_stream)
# Add job to get output from stream
test_stream._test_output(StreamOutput.result)
test_stream._test_output(self.result)
self.ssc.start()

start_time = time.time()
Expand All @@ -205,9 +208,9 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
break
self.ssc.awaitTermination(50)
# check if the output is the same length of expexted output
if len(expected_output) == len(StreamOutput.result):
if len(expected_output) == len(self.result):
break
return StreamOutput.result
return self.result

if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,44 @@ class PythonTestInputStream(ssc_ : JavaStreamingContext, inputFiles: JArrayList[
tempFile.getAbsolutePath
}
}

println("PythonTestInputStreaming numPartitons" + numPartitions )
val rdd = PythonRDD.readRDDFromFile(JavaSparkContext.fromSparkContext(ssc_.sparkContext), selectedInputFile, numPartitions).rdd
logInfo("Created RDD " + rdd.id + " with " + selectedInputFile)
Some(rdd)
}

val asJavaDStream = JavaDStream.fromDStream(this)
}

/**
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch under manual clock.
* This implementation is close to QueStream
*/

class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]])
extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)) {

def start() {}

def stop() {}

def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
val emptyRDD = ssc.sparkContext.emptyRDD[Array[Byte]]
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
val selectedRDD = {
if (inputRDDs.isEmpty) {
emptyRDD
} else if (index < inputRDDs.size()) {
inputRDDs.get(index).rdd
} else {
emptyRDD
}
}

Some(selectedRDD)
}

val asJavaDStream = JavaDStream.fromDStream(this)
}

0 comments on commit 0704b86

Please sign in to comment.