From 99e4bb39bf6edc1d2a1d363c298bba85d87c02a8 Mon Sep 17 00:00:00 2001 From: giwa Date: Thu, 14 Aug 2014 23:42:34 -0700 Subject: [PATCH] basic function test cases are passed --- python/pyspark/streaming_tests.py | 209 +++++++++++++----- python/pyspark/worker.py | 11 - .../streaming/api/python/PythonDStream.scala | 62 +----- 3 files changed, 160 insertions(+), 122 deletions(-) diff --git a/python/pyspark/streaming_tests.py b/python/pyspark/streaming_tests.py index 19cce3f185833..6d85a7faae859 100644 --- a/python/pyspark/streaming_tests.py +++ b/python/pyspark/streaming_tests.py @@ -24,7 +24,6 @@ """ from itertools import chain -import os import time import unittest import operator @@ -34,9 +33,6 @@ from pyspark.streaming.duration import * -SPARK_HOME = os.environ["SPARK_HOME"] - - class PySparkStreamingTestCase(unittest.TestCase): def setUp(self): class_name = self.__class__.__name__ @@ -49,7 +45,7 @@ def tearDown(self): self.ssc._sc.stop() # Why does it long time to terminaete StremaingContext and SparkContext? # Should we change the sleep time if this depends on machine spec? - time.sleep(5) + time.sleep(8) @classmethod def tearDownClass(cls): @@ -59,8 +55,17 @@ def tearDownClass(cls): class TestBasicOperationsSuite(PySparkStreamingTestCase): """ - Input and output of this TestBasicOperationsSuite is the equivalent to - Scala TestBasicOperationsSuite. + 2 tests for each function for batach deserializer and unbatch deserilizer because + we cannot change the deserializer after streaming process starts. + Default numInputPartitions is 2. + If the number of input element is over 3, that DStream use batach deserializer. + If not, that DStream use unbatch deserializer. + + Most of the operation uses UTF8 deserializer to get value from Scala. + I am wondering if these test are enough or not. + All tests input should have list of lists. This represents stream. + Every batch interval, the first object of list are chosen to make DStream. + Please see the BasicTestSuits in Scala or QueStream which is close to this implementation. """ def setUp(self): PySparkStreamingTestCase.setUp(self) @@ -75,8 +80,8 @@ def tearDown(self): def tearDownClass(cls): PySparkStreamingTestCase.tearDownClass() - def test_map(self): - """Basic operation test for DStream.map""" + def test_map_batch(self): + """Basic operation test for DStream.map with batch deserializer""" test_input = [range(1, 5), range(5, 9), range(9, 13)] def test_func(dstream): @@ -85,8 +90,18 @@ def test_func(dstream): output = self._run_stream(test_input, test_func, expected_output) self.assertEqual(expected_output, output) - def test_flatMap(self): - """Basic operation test for DStream.faltMap""" + def test_map_unbatach(self): + """Basic operation test for DStream.map with unbatch deserializer""" + test_input = [range(1, 4), range(4, 7), range(7, 10)] + + def test_func(dstream): + return dstream.map(lambda x: str(x)) + expected_output = map(lambda x: map(lambda y: str(y), x), test_input) + output = self._run_stream(test_input, test_func, expected_output) + self.assertEqual(expected_output, output) + + def test_flatMap_batch(self): + """Basic operation test for DStream.faltMap with batch deserializer""" test_input = [range(1, 5), range(5, 9), range(9, 13)] def test_func(dstream): @@ -96,8 +111,19 @@ def test_func(dstream): output = self._run_stream(test_input, test_func, expected_output) self.assertEqual(expected_output, output) - def test_filter(self): - """Basic operation test for DStream.filter""" + def test_flatMap_unbatch(self): + """Basic operation test for DStream.faltMap with unbatch deserializer""" + test_input = [range(1, 4), range(4, 7), range(7, 10)] + + def test_func(dstream): + return dstream.flatMap(lambda x: (x, x * 2)) + expected_output = map(lambda x: list(chain.from_iterable((map(lambda y: [y, y * 2], x)))), + test_input) + output = self._run_stream(test_input, test_func, expected_output) + self.assertEqual(expected_output, output) + + def test_filter_batch(self): + """Basic operation test for DStream.filter with batch deserializer""" test_input = [range(1, 5), range(5, 9), range(9, 13)] def test_func(dstream): @@ -106,21 +132,38 @@ def test_func(dstream): output = self._run_stream(test_input, test_func, expected_output) self.assertEqual(expected_output, output) - def test_count(self): - """Basic operation test for DStream.count""" - #test_input = [[], [1], range(1, 3), range(1, 4), range(1, 5)] - test_input = [range(1, 5), range(1,10), range(1,20)] + def test_filter_unbatch(self): + """Basic operation test for DStream.filter with unbatch deserializer""" + test_input = [range(1, 4), range(4, 7), range(7, 10)] + + def test_func(dstream): + return dstream.filter(lambda x: x % 2 == 0) + expected_output = map(lambda x: filter(lambda y: y % 2 == 0, x), test_input) + output = self._run_stream(test_input, test_func, expected_output) + self.assertEqual(expected_output, output) + + def test_count_batch(self): + """Basic operation test for DStream.count with batch deserializer""" + test_input = [range(1, 5), range(1, 10), range(1, 20)] def test_func(dstream): - print "count" - dstream.count().pyprint() return dstream.count() expected_output = map(lambda x: [len(x)], test_input) output = self._run_stream(test_input, test_func, expected_output) self.assertEqual(expected_output, output) - - def test_reduce(self): - """Basic operation test for DStream.reduce""" + + def test_count_unbatch(self): + """Basic operation test for DStream.count with unbatch deserializer""" + test_input = [[], [1], range(1, 3), range(1, 4)] + + def test_func(dstream): + return dstream.count() + expected_output = map(lambda x: [len(x)], test_input) + output = self._run_stream(test_input, test_func, expected_output) + self.assertEqual(expected_output, output) + + def test_reduce_batch(self): + """Basic operation test for DStream.reduce with batch deserializer""" test_input = [range(1, 5), range(5, 9), range(9, 13)] def test_func(dstream): @@ -129,67 +172,132 @@ def test_func(dstream): output = self._run_stream(test_input, test_func, expected_output) self.assertEqual(expected_output, output) - def test_reduceByKey(self): - """Basic operation test for DStream.reduceByKey""" - #test_input = [["a", "a", "b"], ["", ""], []] - test_input = [["a", "a", "b", "b"], ["", "", "", ""], []] + def test_reduce_unbatch(self): + """Basic operation test for DStream.reduce with unbatch deserializer""" + test_input = [[1], range(1, 3), range(1, 4)] + + def test_func(dstream): + return dstream.reduce(operator.add) + expected_output = map(lambda x: [reduce(operator.add, x)], test_input) + output = self._run_stream(test_input, test_func, expected_output) + self.assertEqual(expected_output, output) + + def test_reduceByKey_batch(self): + """Basic operation test for DStream.reduceByKey with batch deserializer""" + test_input = [["a", "a", "b", "b"], ["", "", "", ""]] + + def test_func(dstream): + return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add) + expected_output = [[("a", 2), ("b", 2)], [("", 4)]] + output = self._run_stream(test_input, test_func, expected_output) + self.assertEqual(expected_output, output) + + def test_reduceByKey_unbatch(self): + """Basic operation test for DStream.reduceByKey with unbatch deserilizer""" + test_input = [["a", "a", "b"], ["", ""], []] def test_func(dstream): - print "reduceByKey" - dstream.map(lambda x: (x, 1)).pyprint() return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add) - #expected_output = [[("a", 2), ("b", 1)], [("", 2)], []] - expected_output = [[("a", 2), ("b", 2)], [("", 4)], []] + expected_output = [[("a", 2), ("b", 1)], [("", 2)], []] output = self._run_stream(test_input, test_func, expected_output) self.assertEqual(expected_output, output) - def test_mapValues(self): - """Basic operation test for DStream.mapValues""" - #test_input = [["a", "a", "b"], ["", ""], []] - test_input = [["a", "a", "b", "b"], ["", "", "", ""], []] + def test_mapValues_batch(self): + """Basic operation test for DStream.mapValues with batch deserializer""" + test_input = [["a", "a", "b", "b"], ["", "", "", ""]] def test_func(dstream): - return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add).mapValues(lambda x: x + 10) - #expected_output = [[("a", 12), ("b", 11)], [("", 12)], []] - expected_output = [[("a", 12), ("b", 12)], [("", 14)], []] + return dstream.map(lambda x: (x, 1))\ + .reduceByKey(operator.add)\ + .mapValues(lambda x: x + 10) + expected_output = [[("a", 12), ("b", 12)], [("", 14)]] output = self._run_stream(test_input, test_func, expected_output) self.assertEqual(expected_output, output) - def test_flatMapValues(self): - """Basic operation test for DStream.flatMapValues""" - #test_input = [["a", "a", "b"], ["", ""], []] - test_input = [["a", "a", "b", "b"], ["", "", "",""], []] + def test_mapValues_unbatch(self): + """Basic operation test for DStream.mapValues with unbatch deserializer""" + test_input = [["a", "a", "b"], ["", ""], []] def test_func(dstream): - return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add).flatMapValues(lambda x: (x, x + 10)) - #expected_output = [[("a", 2), ("a", 12), ("b", 1), ("b", 11)], [("", 2), ("", 12)], []] - expected_output = [[("a", 2), ("a", 12), ("b", 2), ("b", 12)], [("", 4), ("", 14)], []] + return dstream.map(lambda x: (x, 1))\ + .reduceByKey(operator.add)\ + .mapValues(lambda x: x + 10) + expected_output = [[("a", 12), ("b", 11)], [("", 12)], []] output = self._run_stream(test_input, test_func, expected_output) self.assertEqual(expected_output, output) - def test_glom(self): - """Basic operation test for DStream.glom""" + def test_flatMapValues_batch(self): + """Basic operation test for DStream.flatMapValues with batch deserializer""" + test_input = [["a", "a", "b", "b"], ["", "", "", ""]] + + def test_func(dstream): + return dstream.map(lambda x: (x, 1))\ + .reduceByKey(operator.add)\ + .flatMapValues(lambda x: (x, x + 10)) + expected_output = [[("a", 2), ("a", 12), ("b", 2), ("b", 12)], [("", 4), ("", 14)]] + output = self._run_stream(test_input, test_func, expected_output) + self.assertEqual(expected_output, output) + + def test_flatMapValues_unbatch(self): + """Basic operation test for DStream.flatMapValues with unbatch deserializer""" + test_input = [["a", "a", "b"], ["", ""], []] + + def test_func(dstream): + return dstream.map(lambda x: (x, 1))\ + .reduceByKey(operator.add)\ + .flatMapValues(lambda x: (x, x + 10)) + expected_output = [[("a", 2), ("a", 12), ("b", 1), ("b", 11)], [("", 2), ("", 12)], []] + output = self._run_stream(test_input, test_func, expected_output) + self.assertEqual(expected_output, output) + + def test_glom_batch(self): + """Basic operation test for DStream.glom with batch deserializer""" test_input = [range(1, 5), range(5, 9), range(9, 13)] numSlices = 2 def test_func(dstream): return dstream.glom() - expected_output = [[[1,2], [3,4]], [[5,6], [7,8]], [[9,10], [11,12]]] + expected_output = [[[1, 2], [3, 4]], [[5, 6], [7, 8]], [[9, 10], [11, 12]]] + output = self._run_stream(test_input, test_func, expected_output, numSlices) + self.assertEqual(expected_output, output) + + def test_glom_unbatach(self): + """Basic operation test for DStream.glom with unbatch deserialiser""" + test_input = [range(1, 4), range(4, 7), range(7, 10)] + numSlices = 2 + + def test_func(dstream): + return dstream.glom() + expected_output = [[[1], [2, 3]], [[4], [5, 6]], [[7], [8, 9]]] output = self._run_stream(test_input, test_func, expected_output, numSlices) self.assertEqual(expected_output, output) - def test_mapPartitions(self): - """Basic operation test for DStream.mapPartitions""" + def test_mapPartitions_batch(self): + """Basic operation test for DStream.mapPartitions with batch deserializer""" test_input = [range(1, 5), range(5, 9), range(9, 13)] numSlices = 2 def test_func(dstream): - def f(iterator): yield sum(iterator) + def f(iterator): + yield sum(iterator) return dstream.mapPartitions(f) expected_output = [[3, 7], [11, 15], [19, 23]] output = self._run_stream(test_input, test_func, expected_output, numSlices) self.assertEqual(expected_output, output) + def test_mapPartitions_unbatch(self): + """Basic operation test for DStream.mapPartitions with unbatch deserializer""" + test_input = [range(1, 4), range(4, 7), range(7, 10)] + numSlices = 2 + + def test_func(dstream): + def f(iterator): + yield sum(iterator) + return dstream.mapPartitions(f) + expected_output = [[1, 5], [4, 11], [7, 17]] + output = self._run_stream(test_input, test_func, expected_output, numSlices) + self.assertEqual(expected_output, output) + def _run_stream(self, test_input, test_func, expected_output, numSlices=None): """Start stream and return the output""" # Generate input stream with user-defined input @@ -212,6 +320,7 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None): # check if the output is the same length of expexted output if len(expected_output) == len(self.result): break + return self.result if __name__ == "__main__": diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index e212fe6598e09..ceb50b4f99acd 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -23,7 +23,6 @@ import time import socket import traceback -import itertools # CloudPickler needs to be imported so that depicklers are registered using the # copy_reg module. from pyspark.accumulators import _accumulatorRegistry @@ -77,16 +76,6 @@ def main(infile, outfile): (func, deserializer, serializer) = command init_time = time.time() iterator = deserializer.load_stream(infile) - print "deserializer in worker: %s" % str(deserializer) - iterator, walk = itertools.tee(iterator) - if isinstance(walk, int): - print "this is int" - print walk - else: - try: - print list(walk) - except: - print list(walk) serializer.dump_stream(func(split_index, iterator), outfile) except Exception: try: diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 2a2efcb57ac6c..ede1070472a43 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -134,48 +134,6 @@ class PythonTransformedDStream( } */ -<<<<<<< HEAD -======= -/** - * This is a input stream just for the unitest. This is equivalent to a checkpointable, - * replayable, reliable message queue like Kafka. It requires a sequence as input, and - * returns the i_th element at the i_th batch under manual clock. - */ -class PythonTestInputStream(ssc_ : JavaStreamingContext, inputFiles: JArrayList[String], numPartitions: Int) - extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)){ - - def start() {} - - def stop() {} - - def compute(validTime: Time): Option[RDD[Array[Byte]]] = { - logInfo("Computing RDD for time " + validTime) - inputFiles.foreach(logInfo(_)) - // make a temporary file - // make empty RDD - val prefix = "spark" - val suffix = ".tmp" - val tempFile = File.createTempFile(prefix, suffix) - val index = ((validTime - zeroTime) / slideDuration - 1).toInt - logInfo("Index: " + index) - - val selectedInputFile: String = { - if (inputFiles.isEmpty){ - tempFile.getAbsolutePath - }else if (index < inputFiles.size()) { - inputFiles.get(index) - } else { - tempFile.getAbsolutePath - } - } - val rdd = PythonRDD.readRDDFromFile(JavaSparkContext.fromSparkContext(ssc_.sparkContext), selectedInputFile, numPartitions).rdd - logInfo("Created RDD " + rdd.id + " with " + selectedInputFile) - Some(rdd) - } - - val asJavaDStream = JavaDStream.fromDStream(this) -} - /** * This is a input stream just for the unitest. This is equivalent to a checkpointable, * replayable, reliable message queue like Kafka. It requires a sequence as input, and @@ -183,7 +141,7 @@ class PythonTestInputStream(ssc_ : JavaStreamingContext, inputFiles: JArrayList[ * This implementation is close to QueStream */ -class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]]) +class PythonTestInputStream(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]]) extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)) { def start() {} @@ -208,21 +166,3 @@ class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[ val asJavaDStream = JavaDStream.fromDStream(this) } - - -class PythonTestInputStream3(ssc_ : JavaStreamingContext) - extends InputDStream[Any](JavaStreamingContext.toStreamingContext(ssc_)) { - - def start() {} - - def stop() {} - - def compute(validTime: Time): Option[RDD[Any]] = { - val index = ((validTime - zeroTime) / slideDuration - 1).toInt - val selectedInput = ArrayBuffer(1, 2, 3).toSeq - val rdd :RDD[Any] = ssc.sc.makeRDD(selectedInput, 2) - Some(rdd) - } - - val asJavaDStream = JavaDStream.fromDStream(this) -}>>>>>>> broke something