diff --git a/examples/src/main/python/streaming/nerwork_wordcount.py b/examples/src/main/python/streaming/nerwork_wordcount.py new file mode 100644 index 0000000000000..2e5048ccad213 --- /dev/null +++ b/examples/src/main/python/streaming/nerwork_wordcount.py @@ -0,0 +1,22 @@ +import sys +from operator import add + +from pyspark.streaming.context import StreamingContext +from pyspark.streaming.duration import * + +if __name__ == "__main__": + if len(sys.argv) != 3: + print >> sys.stderr, "Usage: wordcount " + exit(-1) + ssc = StreamingContext(appName="PythonStreamingNetworkWordCount", duration=Seconds(1)) + + lines = ssc.socketTextStream(sys.argv[1], sys.argv[2]) + fm_lines = lines.flatMap(lambda x: x.split(" ")) + filtered_lines = fm_lines.filter(lambda line: "Spark" in line) + mapped_lines = fm_lines.map(lambda x: (x, 1)) + + fm_lines.pyprint() + filtered_lines.pyprint() + mapped_lines.pyprint() + ssc.start() + ssc.awaitTermination()