From fe648e377ae81988e7b2e868f55a53a37fde27f8 Mon Sep 17 00:00:00 2001 From: giwa Date: Mon, 4 Aug 2014 09:47:48 -0700 Subject: [PATCH] WIP --- .../main/python/streaming/test_oprations.py | 24 +++++++++++++++++++ python/pyspark/streaming/dstream.py | 1 - 2 files changed, 24 insertions(+), 1 deletion(-) create mode 100644 examples/src/main/python/streaming/test_oprations.py diff --git a/examples/src/main/python/streaming/test_oprations.py b/examples/src/main/python/streaming/test_oprations.py new file mode 100644 index 0000000000000..cb338ced5f228 --- /dev/null +++ b/examples/src/main/python/streaming/test_oprations.py @@ -0,0 +1,24 @@ +import sys +from operator import add + +from pyspark.conf import SparkConf +from pyspark.streaming.context import StreamingContext +from pyspark.streaming.duration import * + +if __name__ == "__main__": + if len(sys.argv) != 3: + print >> sys.stderr, "Usage: wordcount " + exit(-1) + conf = SparkConf() + conf.setAppName("PythonStreamingNetworkWordCount") + ssc = StreamingContext(conf=conf, duration=Seconds(1)) + + lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) + words = lines.flatMap(lambda line: line.split(" ")) + mapped_words = words.map(lambda word: (word, 1)) + count = mapped_words.reduceByKey(add) + + count.pyprint() + ssc.start() +# ssc.awaitTermination() + ssc.stop() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index e96fac007fa50..6332c4527d334 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -120,7 +120,6 @@ def _mergeCombiners(iterator): combiners[k] = v else: combiners[k] = mergeCombiners(combiners[k], v) - return combiners.iteritems() return shuffled._mapPartitions(_mergeCombiners)