Skip to content

Commit

Permalink
WIP: added PythonTestInputStream
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Sep 20, 2014
1 parent 019ef38 commit 5c04a5f
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 10 deletions.
14 changes: 4 additions & 10 deletions examples/src/main/python/streaming/test_oprations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,14 @@
from pyspark.streaming.duration import *

if __name__ == "__main__":
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: wordcount <hostname> <port>"
exit(-1)
conf = SparkConf()
conf.setAppName("PythonStreamingNetworkWordCount")
ssc = StreamingContext(conf=conf, duration=Seconds(1))

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
words = lines.flatMap(lambda line: line.split(" "))
# ssc.checkpoint("checkpoint")
mapped_words = words.map(lambda word: (word, 1))
count = mapped_words.reduceByKey(add)
test_input = ssc._testInputStream([1,1,1,1])
mapped = test_input.map(lambda x: (x, 1))
mapped.pyprint()

count.pyprint()
ssc.start()
ssc.awaitTermination()
# ssc.awaitTermination()
# ssc.stop()
1 change: 1 addition & 0 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import sys
from signal import signal, SIGTERM, SIGINT
from tempfile import NamedTemporaryFile


from pyspark.conf import SparkConf
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def _mergeCombiners(iterator):
combiners[k] = v
else:
combiners[k] = mergeCombiners(combiners[k], v)
return combiners.iteritems()

return shuffled.mapPartitions(_mergeCombiners)

Expand Down

0 comments on commit 5c04a5f

Please sign in to comment.