/
word_count.clj
96 lines (86 loc) · 3.12 KB
/
word_count.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.starter.clj.word-count
(:import [org.apache.storm StormSubmitter LocalCluster]
[org.apache.storm.utils Utils])
(:use [org.apache.storm clojure config])
(:gen-class))
(defspout sentence-spout ["sentence"]
[conf context collector]
(let [sentences ["a little brown dog"
"the man petted the dog"
"four score and seven years ago"
"an apple a day keeps the doctor away"]]
(spout
(nextTuple []
(Thread/sleep 100)
(emit-spout! collector [(rand-nth sentences)])
)
(ack [id]
;; You only need to define this method for reliable spouts
;; (such as one that reads off of a queue like Kestrel)
;; This is an unreliable spout, so it does nothing here
))))
(defspout sentence-spout-parameterized ["word"] {:params [sentences] :prepare false}
[collector]
(Thread/sleep 500)
(emit-spout! collector [(rand-nth sentences)]))
(defbolt split-sentence ["word"] [tuple collector]
(let [words (.split (.getString tuple 0) " ")]
(doseq [w words]
(emit-bolt! collector [w] :anchor tuple))
(ack! collector tuple)
))
(defbolt word-count ["word" "count"] {:prepare true}
[conf context collector]
(let [counts (atom {})]
(bolt
(execute [tuple]
(let [word (.getString tuple 0)]
(swap! counts (partial merge-with +) {word 1})
(emit-bolt! collector [word (@counts word)] :anchor tuple)
(ack! collector tuple)
)))))
(defn mk-topology []
(topology
{"1" (spout-spec sentence-spout)
"2" (spout-spec (sentence-spout-parameterized
["the cat jumped over the door"
"greetings from a faraway land"])
:p 2)}
{"3" (bolt-spec {"1" :shuffle "2" :shuffle}
split-sentence
:p 5)
"4" (bolt-spec {"3" ["word"]}
word-count
:p 6)}))
(defn run-local! []
(let [cluster (LocalCluster.)]
(.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology))
(Thread/sleep 10000)
(.shutdown cluster)
))
(defn submit-topology! [name]
(StormSubmitter/submitTopology
name
{TOPOLOGY-DEBUG true
TOPOLOGY-WORKERS 3}
(mk-topology)))
(defn -main
([]
(run-local!))
([name]
(submit-topology! name)))