diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 60bcf86783e95..691f9b06ad4e9 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -114,7 +114,7 @@ def textFileStream(self, directory): Create an input stream that monitors a Hadoop-compatible file system for new files and reads them as text files. Files must be wrriten to the monitored directory by "moving" them from another location within the same - file system. FIle names starting with . are ignored. + file system. File names starting with . are ignored. """ return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer()) @@ -132,8 +132,9 @@ def stop(self, stopSparkContext=True, stopGraceFully=False): def _testInputStream(self, test_inputs, numSlices=None): """ - This is inpired by QueStream implementation. Give list of RDD and generate DStream - which contain the RDD. + This function is only for test. + This implementation is inpired by QueStream implementation. + Give list of RDD to generate DStream which contains the RDD. """ test_rdds = list() test_rdd_deserializers = list() @@ -142,12 +143,10 @@ def _testInputStream(self, test_inputs, numSlices=None): test_rdds.append(test_rdd._jrdd) test_rdd_deserializers.append(test_rdd._jrdd_deserializer) +# if len(set(test_rdd_deserializers)) > 1: +# raise IOError("Deserializer should be one type to run test case. " +# "See the SparkContext.parallelize to understand how to decide deserializer") jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client) jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream() - dstream = DStream(jinput_stream, self, test_rdd_deserializers[0]) - return dstream - - def _testInputStream3(self): - jinput_stream = self._jvm.PythonTestInputStream3(self._jssc).asJavaDStream() - return DStream(jinput_stream, self, UTF8Deserializer()) + return DStream(jinput_stream, self, test_rdd_deserializers[0]) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index ea418822759c4..679360dbca08d 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -24,6 +24,8 @@ from pyspark.rdd import _JavaStackTrace from pyspark.storagelevel import StorageLevel from pyspark.resultiterable import ResultIterable +from pyspark.streaming.utils import rddToFileName + from py4j.java_collections import ListConverter, MapConverter @@ -343,21 +345,46 @@ def mergeCombiners(a, b): return self.combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions).mapValues(lambda x: ResultIterable(x)) + def countByValue(self): + def countPartition(iterator): + counts = defaultdict(int) + for obj in iterator: + counts[obj] += 1 + yield counts + + def mergeMaps(m1, m2): + for (k, v) in m2.iteritems(): + m1[k] += v + return m1 + + return self.mapPartitions(countPartition).reduce(mergeMaps).flatMap(lambda x: x.items()) + + def saveAsTextFiles(self, prefix, suffix=None): + + def saveAsTextFile(rdd, time): + path = rddToFileName(prefix, suffix, time) + rdd.saveAsTextFile(path) + + return self.foreachRDD(saveAsTextFile) + + def saveAsPickledFiles(self, prefix, suffix=None): + + def saveAsTextFile(rdd, time): + path = rddToFileName(prefix, suffix, time) + rdd.saveAsPickleFile(path) + + return self.foreachRDD(saveAsTextFile) -# TODO: implement groupByKey -# TODO: implement saveAsTextFile # Following operation has dependency to transform # TODO: impelment union # TODO: implement repertitions # TODO: implement cogroup # TODO: implement join -# TODO: implement countByValue # TODO: implement leftOuterJoin # TODO: implemtnt rightOuterJoin - class PipelinedDStream(DStream): def __init__(self, prev, func, preservesPartitioning=False): if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable(): diff --git a/python/pyspark/streaming/utils.py b/python/pyspark/streaming/utils.py index aa5e19adbd927..9178577743e0b 100644 --- a/python/pyspark/streaming/utils.py +++ b/python/pyspark/streaming/utils.py @@ -53,3 +53,9 @@ def msDurationToString(ms): return "%.1f m" % (float(ms) / minute) else: return "%.2f h" % (float(ms) / hour) + +def rddToFileName(prefix, suffix, time): + if suffix is not None: + return prefix + "-" + str(time) + "." + suffix + else: + return prefix + "-" + str(time) diff --git a/python/pyspark/streaming_tests.py b/python/pyspark/streaming_tests.py index 02996ccce9a3e..2bb01ed3a0642 100644 --- a/python/pyspark/streaming_tests.py +++ b/python/pyspark/streaming_tests.py @@ -301,6 +301,38 @@ def f(iterator): output = self._run_stream(test_input, test_func, expected_output, numSlices) self.assertEqual(expected_output, output) + def test_countByValue_batch(self): + """Basic operation test for DStream.countByValue with batch deserializer""" + test_input = [range(1, 5) + range(1,5), range(5, 7) + range(5, 9), ["a"] * 2 + ["b"] + [""] ] + + def test_func(dstream): + return dstream.countByValue() + expected_output = [[(1, 2), (2, 2), (3, 2), (4, 2)], + [(5, 2), (6, 2), (7, 1), (8, 1)], + [("a", 2), ("b", 1), ("", 1)]] + output = self._run_stream(test_input, test_func, expected_output) + for result in (output, expected_output): + self._sort_result_based_on_key(result) + self.assertEqual(expected_output, output) + + def test_countByValue_unbatch(self): + """Basic operation test for DStream.countByValue with unbatch deserializer""" + test_input = [range(1, 4), [1, 1, ""], ["a", "a", "b"]] + + def test_func(dstream): + return dstream.countByValue() + expected_output = [[(1, 1), (2, 1), (3, 1)], + [(1, 2), ("", 1)], + [("a", 2), ("b", 1)]] + output = self._run_stream(test_input, test_func, expected_output) + for result in (output, expected_output): + self._sort_result_based_on_key(result) + self.assertEqual(expected_output, output) + + def _sort_result_based_on_key(self, outputs): + for output in outputs: + output.sort(key=lambda x: x[0]) + def _run_stream(self, test_input, test_func, expected_output, numSlices=None): """Start stream and return the output""" # Generate input stream with user-defined input diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 9f1e1f4d3cca7..38b9004ab7439 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -138,7 +138,7 @@ class PythonTransformedDStream( * This is a input stream just for the unitest. This is equivalent to a checkpointable, * replayable, reliable message queue like Kafka. It requires a sequence as input, and * returns the i_th element at the i_th batch under manual clock. - * This implementation is close to QueStream + * This implementation is inspired by QueStream */ class PythonTestInputStream(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]])