diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 809158aedbc96..123fa67f837e3 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -187,5 +187,8 @@ def _testInputStream2(self, test_inputs, numSlices=None): jinput_stream = self._jvm.PythonTestInputStream2(self._jssc, jtest_rdds).asJavaDStream() dstream = DStream(jinput_stream, self, test_rdd_deserializers[0]) - dstream._test_switch_dserializer(test_rdd_deserializers) return dstream + + def _testInputStream3(self): + jinput_stream = self._jvm.PythonTestInputStream3(self._jssc).asJavaDStream() + return DStream(jinput_stream, self, UTF8Deserializer()) diff --git a/python/pyspark/streaming_tests.py b/python/pyspark/streaming_tests.py index e23b86e8f040e..19cce3f185833 100644 --- a/python/pyspark/streaming_tests.py +++ b/python/pyspark/streaming_tests.py @@ -37,13 +37,6 @@ SPARK_HOME = os.environ["SPARK_HOME"] -class StreamOutput: - """ - a class to store the output from stream - """ - result = list() - - class PySparkStreamingTestCase(unittest.TestCase): def setUp(self): class_name = self.__class__.__name__ @@ -115,7 +108,8 @@ def test_func(dstream): def test_count(self): """Basic operation test for DStream.count""" - test_input = [[], [1], range(1, 3), range(1, 4), range(1, 5)] + #test_input = [[], [1], range(1, 3), range(1, 4), range(1, 5)] + test_input = [range(1, 5), range(1,10), range(1,20)] def test_func(dstream): print "count" @@ -137,33 +131,39 @@ def test_func(dstream): def test_reduceByKey(self): """Basic operation test for DStream.reduceByKey""" - test_input = [["a", "a", "b"], ["", ""], []] + #test_input = [["a", "a", "b"], ["", ""], []] + test_input = [["a", "a", "b", "b"], ["", "", "", ""], []] def test_func(dstream): print "reduceByKey" dstream.map(lambda x: (x, 1)).pyprint() return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add) - expected_output = [[("a", 2), ("b", 1)], [("", 2)], []] + #expected_output = [[("a", 2), ("b", 1)], [("", 2)], []] + expected_output = [[("a", 2), ("b", 2)], [("", 4)], []] output = self._run_stream(test_input, test_func, expected_output) self.assertEqual(expected_output, output) def test_mapValues(self): """Basic operation test for DStream.mapValues""" - test_input = [["a", "a", "b"], ["", ""], []] + #test_input = [["a", "a", "b"], ["", ""], []] + test_input = [["a", "a", "b", "b"], ["", "", "", ""], []] def test_func(dstream): return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add).mapValues(lambda x: x + 10) - expected_output = [[("a", 12), ("b", 11)], [("", 12)], []] + #expected_output = [[("a", 12), ("b", 11)], [("", 12)], []] + expected_output = [[("a", 12), ("b", 12)], [("", 14)], []] output = self._run_stream(test_input, test_func, expected_output) self.assertEqual(expected_output, output) def test_flatMapValues(self): """Basic operation test for DStream.flatMapValues""" - test_input = [["a", "a", "b"], ["", ""], []] + #test_input = [["a", "a", "b"], ["", ""], []] + test_input = [["a", "a", "b", "b"], ["", "", "",""], []] def test_func(dstream): return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add).flatMapValues(lambda x: (x, x + 10)) - expected_output = [[("a", 2), ("a", 12), ("b", 1), ("b", 11)], [("", 2), ("", 12)], []] + #expected_output = [[("a", 2), ("a", 12), ("b", 1), ("b", 11)], [("", 2), ("", 12)], []] + expected_output = [[("a", 2), ("a", 12), ("b", 2), ("b", 12)], [("", 4), ("", 14)], []] output = self._run_stream(test_input, test_func, expected_output) self.assertEqual(expected_output, output) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index b9841744c15ee..2a2efcb57ac6c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -208,4 +208,21 @@ class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[ val asJavaDStream = JavaDStream.fromDStream(this) } ->>>>>>> broke something + + +class PythonTestInputStream3(ssc_ : JavaStreamingContext) + extends InputDStream[Any](JavaStreamingContext.toStreamingContext(ssc_)) { + + def start() {} + + def stop() {} + + def compute(validTime: Time): Option[RDD[Any]] = { + val index = ((validTime - zeroTime) / slideDuration - 1).toInt + val selectedInput = ArrayBuffer(1, 2, 3).toSeq + val rdd :RDD[Any] = ssc.sc.makeRDD(selectedInput, 2) + Some(rdd) + } + + val asJavaDStream = JavaDStream.fromDStream(this) +}>>>>>>> broke something