-
Notifications
You must be signed in to change notification settings - Fork 4.1k
/
common.clj
195 lines (165 loc) · 7.02 KB
/
common.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
(ns backtype.storm.daemon.common
(:use [clojure.contrib.seq-utils :only [find-first]])
(:use [backtype.storm log config util])
(:import [backtype.storm.generated StormTopology
InvalidTopologyException GlobalStreamId])
(:import [backtype.storm.utils Utils])
(:import [backtype.storm Constants])
(:require [backtype.storm.daemon.acker :as acker])
(:require [backtype.storm.thrift :as thrift])
)
(defn system-id? [id]
(Utils/isSystemId id))
(def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
(def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)
(def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID)
(def ACKER-FAIL-STREAM-ID acker/ACKER-FAIL-STREAM-ID)
(def SYSTEM-STREAM-ID "__system")
;; the task id is the virtual port
;; node->host is here so that tasks know who to talk to just from assignment
;; this avoid situation where node goes down and task doesn't know what to do information-wise
(defrecord Assignment [master-code-dir node->host task->node+port task->start-time-secs])
(defrecord StormBase [storm-name launch-time-secs status])
(defrecord SupervisorInfo [time-secs hostname worker-ports uptime-secs])
(defrecord TaskInfo [component-id])
(defprotocol DaemonCommon
(waiting? [this]))
(def LS-WORKER-HEARTBEAT "worker-heartbeat")
;; LocalState constants
(def LS-ID "supervisor-id")
(def LS-LOCAL-ASSIGNMENTS "local-assignments")
(def LS-APPROVED-WORKERS "approved-workers")
(defrecord WorkerHeartbeat [time-secs storm-id task-ids port])
(defrecord TaskStats [^long processed
^long acked
^long emitted
^long transferred
^long failed])
(defrecord TaskHeartbeat [time-secs uptime-secs stats])
(defn new-task-stats []
(TaskStats. 0 0 0 0 0))
;technically this is only active task ids
(defn storm-task-ids [storm-cluster-state storm-id]
(keys (:task->node+port (.assignment-info storm-cluster-state storm-id nil))))
(defn storm-task-info
"Returns map from task -> component id"
[storm-cluster-state storm-id]
(let [task-ids (.task-ids storm-cluster-state storm-id)]
(into {}
(dofor [id task-ids]
[id (:component-id (.task-info storm-cluster-state storm-id id))]
))))
(defn get-storm-id [storm-cluster-state storm-name]
(let [active-storms (.active-storms storm-cluster-state)]
(find-first
#(= storm-name (:storm-name (.storm-base storm-cluster-state % nil)))
active-storms)
))
(defn topology-bases [storm-cluster-state]
(let [active-topologies (.active-storms storm-cluster-state)]
(into {}
(dofor [id active-topologies]
[id (.storm-base storm-cluster-state id nil)]
))
))
(defn validate-distributed-mode! [conf]
(if (local-mode? conf)
(throw
(IllegalArgumentException. "Cannot start server in local mode!"))))
(defmacro defserverfn [name & body]
`(let [exec-fn# (fn ~@body)]
(defn ~name [& args#]
(try-cause
(apply exec-fn# args#)
(catch InterruptedException e#
(throw e#))
(catch Throwable t#
(log-error t# "Error on initialization of server " ~(str name))
(halt-process! 13 "Error on initialization")
)))))
(defn- validate-ids! [^StormTopology topology]
(let [sets (map #(.getFieldValue topology %) thrift/STORM-TOPOLOGY-FIELDS)
offending (apply any-intersection sets)]
(if-not (empty? offending)
(throw (InvalidTopologyException.
(str "Duplicate component ids: " offending))))
(doseq [f thrift/STORM-TOPOLOGY-FIELDS
:let [obj-map (.getFieldValue topology f)]]
(doseq [id (keys obj-map)]
(if (system-id? id)
(throw (InvalidTopologyException.
(str id " is not a valid component id")))))
(doseq [obj (vals obj-map)
id (-> obj .get_common .get_streams keys)]
(if (system-id? id)
(throw (InvalidTopologyException.
(str id " is not a valid stream id"))))))
))
(defn validate-basic! [^StormTopology topology]
(validate-ids! topology)
(doseq [f thrift/SPOUT-FIELDS
obj (->> f (.getFieldValue topology) vals)]
(if-not (empty? (-> obj .get_common .get_inputs))
(throw (InvalidTopologyException. "May not declare inputs for a spout"))
)))
(defn validate-structure! [^StormTopology topology]
;; TODO: validate that all subscriptions are to valid component/streams
)
(defn all-components [^StormTopology topology]
(apply merge {}
(for [f thrift/STORM-TOPOLOGY-FIELDS]
(.getFieldValue topology f)
)))
(defn acker-inputs [^StormTopology topology]
(let [bolt-ids (.. topology get_bolts keySet)
spout-ids (.. topology get_spouts keySet)
spout-inputs (apply merge
(for [id spout-ids]
{[id ACKER-INIT-STREAM-ID] ["id"]}
))
bolt-inputs (apply merge
(for [id bolt-ids]
{[id ACKER-ACK-STREAM-ID] ["id"]
[id ACKER-FAIL-STREAM-ID] ["id"]}
))]
(merge spout-inputs bolt-inputs)))
(defn add-acker! [num-tasks ^StormTopology ret]
(let [acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret)
(new backtype.storm.daemon.acker)
{ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"])
ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"])
}
:p num-tasks)]
(dofor [[_ bolt] (.get_bolts ret)
:let [common (.get_common bolt)]]
(do
(.put_to_streams common ACKER-ACK-STREAM-ID (thrift/output-fields ["id" "ack-val"]))
(.put_to_streams common ACKER-FAIL-STREAM-ID (thrift/output-fields ["id"]))
))
(dofor [[_ spout] (.get_spouts ret)
:let [common (.get_common spout)]]
(do
(.put_to_streams common ACKER-INIT-STREAM-ID (thrift/output-fields ["id" "init-val" "spout-task"]))
(.put_to_inputs common
(GlobalStreamId. ACKER-COMPONENT-ID ACKER-ACK-STREAM-ID)
(thrift/mk-direct-grouping))
(.put_to_inputs common
(GlobalStreamId. ACKER-COMPONENT-ID ACKER-FAIL-STREAM-ID)
(thrift/mk-direct-grouping))
))
(.put_to_bolts ret "__acker" acker-bolt)
))
(defn add-system-streams! [^StormTopology topology]
(doseq [[_ component] (all-components topology)
:let [common (.get_common component)]]
(.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"]))
;; TODO: consider adding a stats stream for stats aggregation
))
(defn system-topology! [storm-conf ^StormTopology topology]
(validate-basic! topology)
(let [ret (.deepCopy topology)]
(add-acker! (storm-conf TOPOLOGY-ACKERS) ret)
(add-system-streams! ret)
(validate-structure! ret)
ret
))