/
recoverable_network_wordcount.py
113 lines (94 loc) · 4.65 KB
/
recoverable_network_wordcount.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Counts words in text encoded with UTF8 received from the network every second.
Usage: recoverable_network_wordcount.py <hostname> <port> <checkpoint-directory> <output-file>
<hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
<output-file> file to which the word counts will be appended
To run this on your local machine, you need to first run a Netcat server
`$ nc -lk 9999`
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/recoverable_network_wordcount.py \
localhost 9999 ~/checkpoint/ ~/out`
If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
the checkpoint data.
"""
import datetime
import os
import sys
from typing import List, Tuple
from pyspark import SparkContext
from pyspark.accumulators import Accumulator
from pyspark.broadcast import Broadcast
from pyspark.rdd import RDD
from pyspark.streaming import StreamingContext
# Get or register a Broadcast variable
def getWordExcludeList(sparkContext: SparkContext) -> Broadcast[List[str]]:
if ('wordExcludeList' not in globals()):
globals()['wordExcludeList'] = sparkContext.broadcast(["a", "b", "c"])
return globals()['wordExcludeList']
# Get or register an Accumulator
def getDroppedWordsCounter(sparkContext: SparkContext) -> Accumulator[int]:
if ('droppedWordsCounter' not in globals()):
globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
return globals()['droppedWordsCounter']
def createContext(host: str, port: int, outputPath: str) -> StreamingContext:
# If you do not see this printed, that means the StreamingContext has been loaded
# from the new checkpoint
print("Creating new context")
if os.path.exists(outputPath):
os.remove(outputPath)
sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
ssc = StreamingContext(sc, 1)
# Create a socket stream on target ip:port and count the
# words in input stream of \n delimited text (e.g. generated by 'nc')
lines = ssc.socketTextStream(host, port)
words = lines.flatMap(lambda line: line.split(" "))
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
def echo(time: datetime.datetime, rdd: RDD[Tuple[str, int]]) -> None:
# Get or register the excludeList Broadcast
excludeList = getWordExcludeList(rdd.context)
# Get or register the droppedWordsCounter Accumulator
droppedWordsCounter = getDroppedWordsCounter(rdd.context)
# Use excludeList to drop words and use droppedWordsCounter to count them
def filterFunc(wordCount: Tuple[str, int]) -> bool:
if wordCount[0] in excludeList.value:
droppedWordsCounter.add(wordCount[1])
return False
else:
return True
counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
print(counts)
print("Dropped %d word(s) totally" % droppedWordsCounter.value)
print("Appending to " + os.path.abspath(outputPath))
with open(outputPath, 'a') as f:
f.write(counts + "\n")
wordCounts.foreachRDD(echo)
return ssc
if __name__ == "__main__":
if len(sys.argv) != 5:
print("Usage: recoverable_network_wordcount.py <hostname> <port> "
"<checkpoint-directory> <output-file>", file=sys.stderr)
sys.exit(-1)
host, port, checkpoint, output = sys.argv[1:]
ssc = StreamingContext.getOrCreate(checkpoint,
lambda: createContext(host, int(port), output))
ssc.start()
ssc.awaitTermination()