/
worker.clj
799 lines (715 loc) · 40 KB
/
worker.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.daemon.worker
(:use [org.apache.storm.daemon common])
(:use [org.apache.storm config log util timer local-state])
(:require [clj-time.core :as time])
(:require [clj-time.coerce :as coerce])
(:require [org.apache.storm.daemon [executor :as executor]])
(:require [org.apache.storm [disruptor :as disruptor] [cluster :as cluster]])
(:require [clojure.set :as set])
(:require [org.apache.storm.messaging.loader :as msg-loader])
(:import [java.util.concurrent Executors]
[org.apache.storm.hooks IWorkerHook BaseWorkerHook])
(:import [java.util ArrayList HashMap])
(:import [org.apache.storm.utils Utils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue ConfigUtils])
(:import [org.apache.storm.grouping LoadMapping])
(:import [org.apache.storm.messaging TransportFactory])
(:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status])
(:import [org.apache.storm.daemon Shutdownable])
(:import [org.apache.storm.serialization KryoTupleSerializer])
(:import [org.apache.storm.generated StormTopology])
(:import [org.apache.storm.tuple AddressedTuple Fields])
(:import [org.apache.storm.task WorkerTopologyContext])
(:import [org.apache.storm Constants])
(:import [org.apache.storm.security.auth AuthUtils])
(:import [org.apache.storm.cluster ClusterStateContext DaemonType])
(:import [javax.security.auth Subject])
(:import [java.security PrivilegedExceptionAction])
(:import [org.apache.logging.log4j LogManager])
(:import [org.apache.logging.log4j Level])
(:import [org.apache.logging.log4j.core.config LoggerConfig])
(:import [org.apache.storm.generated LogConfig LogLevelAction])
(:import [org.apache.storm.metrics2 StormMetricRegistry])
(:gen-class))
(defmulti mk-suicide-fn cluster-mode)
(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port assignment-versions]
(log-message "Reading Assignments.")
(let [assignment (:executor->node+port (.assignment-info storm-cluster-state storm-id nil))]
(doall
(concat
[Constants/SYSTEM_EXECUTOR_ID]
(mapcat (fn [[executor loc]]
(if (= loc [assignment-id port])
[executor]
))
assignment)))))
(defnk do-executor-heartbeats [worker :executors nil]
;; stats is how we know what executors are assigned to this worker
(let [stats (if-not executors
(into {} (map (fn [e] {e nil}) (:executors worker)))
(->> executors
(map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)}))
(apply merge)))
zk-hb {:storm-id (:storm-id worker)
:executor-stats stats
:uptime ((:uptime worker))
:time-secs (current-time-secs)
}]
;; do the zookeeper heartbeat
(try
(.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb)
(catch Exception exc
(log-error exc "Worker failed to write heartbeats to ZK or Pacemaker...will retry")))))
(defn do-heartbeat [worker]
(let [conf (:conf worker)
state (worker-state conf (:worker-id worker))]
;; do the local-file-system heartbeat.
(ls-worker-heartbeat! state (current-time-secs) (:storm-id worker) (:executors worker) (:port worker))
(.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up.
; it shouldn't take supervisor 120 seconds between listing dir and reading it
))
(defn worker-outbound-tasks
"Returns seq of task-ids that receive messages from this worker"
[worker]
(let [context (worker-context worker)
components (mapcat
(fn [task-id]
(->> (.getComponentId context (int task-id))
(.getTargets context)
vals
(map keys)
(apply concat)))
(:task-ids worker))]
(-> worker
:task->component
reverse-map
(select-keys components)
vals
flatten
set )))
(defn get-dest
[^AddressedTuple addressed-tuple]
"get the destination for an AddressedTuple"
(.getDest addressed-tuple))
(defn mk-transfer-local-fn [worker]
(let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker)
task->short-executor (:task->short-executor worker)
task-getter (comp #(get task->short-executor %) get-dest)]
(fn [tuple-batch]
(let [grouped (fast-group-by task-getter tuple-batch)]
(fast-map-iter [[short-executor pairs] grouped]
(let [q (short-executor-receive-queue-map short-executor)]
(if q
(disruptor/publish q pairs)
(log-warn "Received invalid messages for unknown tasks. Dropping... ")
)))))))
(defn- assert-can-serialize [^KryoTupleSerializer serializer tuple-batch]
"Check that all of the tuples can be serialized by serializing them."
(fast-list-iter [^AddressedTuple addressed-tuple tuple-batch]
(let [tuple (.getTuple addressed-tuple)]
(.serialize serializer tuple))))
(defn should-trigger-backpressure [executors worker]
(or (.getThrottleOn (:transfer-queue worker))
(reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))))
(defn- mk-backpressure-handler [executors topo-conf]
"make a handler that checks and updates worker's backpressure timestamp"
(let [update-freq-ms (* (topo-conf BACKPRESSURE-ZNODE-UPDATE-FREQ-SECS) 1000)]
(disruptor/worker-backpressure-handler
(if executors
(fn [worker]
(let [storm-id (:storm-id worker)
assignment-id (:assignment-id worker)
port (:port worker)
storm-cluster-state (:storm-cluster-state worker)
prev-backpressure-timestamp @(:backpressure worker)
curr-timestamp (System/currentTimeMillis)
;; the backpressure flag is true if at least one of the disruptor queues has throttle-on
curr-backpressure-timestamp (if (should-trigger-backpressure executors worker)
;; Update the backpressure timestamp every update-freq-ms seconds
(if (> (- curr-timestamp (or prev-backpressure-timestamp 0)) update-freq-ms)
curr-timestamp
prev-backpressure-timestamp)
0)]
;; update the worker's backpressure timestamp to zookeeper only when it has changed
(when (not= prev-backpressure-timestamp curr-backpressure-timestamp)
(try
(log-debug "worker backpressure timestamp changing from " prev-backpressure-timestamp " to " curr-backpressure-timestamp)
(.worker-backpressure! storm-cluster-state storm-id assignment-id port curr-backpressure-timestamp)
;; doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception
(reset! (:backpressure worker) curr-backpressure-timestamp)
(catch Exception exc
(log-error exc "workerBackpressure update failed when connecting to ZK ... will retry"))))))
(fn [workers])))))
(defn- mk-disruptor-backpressure-handler [worker]
"make a handler for the worker's send disruptor queue to
check highWaterMark and lowWaterMark for backpressure"
(disruptor/disruptor-backpressure-handler
(fn []
(log-debug "worker " (:worker-id worker) " transfer-queue is congested, set backpressure flag true")
(WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))
(fn []
(log-debug "worker " (:worker-id worker) " transfer-queue is not congested, set backpressure flag false")
(WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))))
(defn mk-transfer-fn [worker]
(let [local-tasks (-> worker :task-ids set)
local-transfer (:transfer-local-fn worker)
^DisruptorQueue transfer-queue (:transfer-queue worker)
task->node+port (:cached-task->node+port worker)
try-serialize-local ((:storm-conf worker) TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE)
transfer-fn
(fn [^KryoTupleSerializer serializer tuple-batch]
(let [^ArrayList local (ArrayList.)
^HashMap remoteMap (HashMap.)]
(fast-list-iter [^AddressedTuple addressed-tuple tuple-batch]
(let [task (.getDest addressed-tuple)
tuple (.getTuple addressed-tuple)]
(if (local-tasks task)
(.add local addressed-tuple)
;;Using java objects directly to avoid performance issues in java code
(do
(when (not (.get remoteMap task))
(.put remoteMap task (ArrayList.)))
(let [^ArrayList remote (.get remoteMap task)]
(if (not-nil? task)
(.add remote (TaskMessage. task ^bytes (.serialize serializer tuple)))
(log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
)))))
(when (not (.isEmpty local)) (local-transfer local))
(when (not (.isEmpty remoteMap)) (disruptor/publish transfer-queue remoteMap))))]
(if try-serialize-local
(do
(log-warn "WILL TRY TO SERIALIZE ALL TUPLES (Turn off " TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE " for production)")
(fn [^KryoTupleSerializer serializer tuple-batch]
(assert-can-serialize serializer tuple-batch)
(transfer-fn serializer tuple-batch)))
transfer-fn)))
(defn- mk-receive-queue-map [storm-conf executors storm-id worker-id port]
(->> executors
;; TODO: this depends on the type of executor
(map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
(storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
storm-id (int -1) "__system" port
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))]))
(into {})
))
(defn- stream->fields [^StormTopology topology component]
(->> (ThriftTopologyUtils/getComponentCommon topology component)
.get_streams
(map (fn [[s info]] [s (Fields. (.get_output_fields info))]))
(into {})
(HashMap.)))
(defn component->stream->fields [^StormTopology topology]
(->> (ThriftTopologyUtils/getComponentIds topology)
(map (fn [c] [c (stream->fields topology c)]))
(into {})
(HashMap.)))
(defn- mk-default-resources [worker]
(let [conf (:conf worker)
thread-pool-size (int (conf TOPOLOGY-WORKER-SHARED-THREAD-POOL-SIZE))]
{WorkerTopologyContext/SHARED_EXECUTOR (Executors/newFixedThreadPool thread-pool-size)}
))
(defn- mk-user-resources [worker]
;;TODO: need to invoke a hook provided by the topology, giving it a chance to create user resources.
;; this would be part of the initialization hook
;; need to separate workertopologycontext into WorkerContext and WorkerUserContext.
;; actually just do it via interfaces. just need to make sure to hide setResource from tasks
{})
(defn mk-halting-timer [timer-name]
(mk-timer :kill-fn (fn [t]
(log-error t "Error when processing event")
(exit-process! 20 "Error when processing an event")
)
:timer-name timer-name))
(defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state]
(let [assignment-versions (atom {})
executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
storm-id (int -1) "__system" port
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
executor-receive-queue-map (mk-receive-queue-map storm-conf executors storm-id worker-id port)
receive-queue-map (->> executor-receive-queue-map
(mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))
(into {}))
topology (read-supervisor-topology conf storm-id)
mq-context (if mq-context
mq-context
(TransportFactory/makeContext storm-conf))]
(recursive-map
:conf conf
:mq-context mq-context
:receiver (.bind ^IContext mq-context storm-id port)
:storm-id storm-id
:assignment-id assignment-id
:port port
:worker-id worker-id
:cluster-state cluster-state
:storm-cluster-state storm-cluster-state
;; when worker bootup, worker will start to setup initial connections to
;; other workers. When all connection is ready, we will enable this flag
;; and spout and bolt will be activated.
:worker-active-flag (atom false)
:storm-active-atom (atom false)
:storm-component->debug-atom (atom {})
:executors executors
:task-ids (->> receive-queue-map keys (map int) sort)
:storm-conf storm-conf
:deserialized-worker-hooks (java.util.ArrayList.)
:topology topology
:system-topology (system-topology! storm-conf topology)
:heartbeat-timer (mk-halting-timer "heartbeat-timer")
:refresh-load-timer (mk-halting-timer "refresh-load-timer")
:refresh-connections-timer (mk-halting-timer "refresh-connections-timer")
:refresh-credentials-timer (mk-halting-timer "refresh-credentials-timer")
:reset-log-levels-timer (mk-halting-timer "reset-log-levels-timer")
:refresh-active-timer (mk-halting-timer "refresh-active-timer")
:executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer")
:refresh-backpressure-timer (mk-halting-timer "refresh-backpressure-timer")
:user-timer (mk-halting-timer "user-timer")
:task->component (HashMap. (storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on
:component->stream->fields (component->stream->fields (:system-topology <>))
:component->sorted-tasks (->> (:task->component <>) reverse-map (map-val sort))
:endpoint-socket-lock (mk-rw-lock)
:cached-node+port->socket (atom {})
:cached-task->node+port (atom {})
:transfer-queue transfer-queue
:executor-receive-queue-map executor-receive-queue-map
:short-executor-receive-queue-map (map-key first executor-receive-queue-map)
:task->short-executor (->> executors
(mapcat (fn [e] (for [t (executor-id->tasks e)] [t (first e)])))
(into {})
(HashMap.))
:suicide-fn (mk-suicide-fn conf)
:uptime (uptime-computer)
:default-shared-resources (mk-default-resources <>)
:user-shared-resources (mk-user-resources <>)
:transfer-local-fn (mk-transfer-local-fn <>)
:transfer-fn (mk-transfer-fn <>)
:load-mapping (LoadMapping.)
:assignment-versions assignment-versions
:backpressure (atom 0) ;; whether this worker is going slow. non-positive means turning off backpressure
:backpressure-trigger (Object.) ;; a trigger for synchronization with executors
:throttle-on (atom false) ;; whether throttle is activated for spouts
)))
(defn- endpoint->string [[node port]]
(str port "/" node))
(defn string->endpoint [^String s]
(let [[port-str node] (.split s "/" 2)]
[node (Integer/valueOf port-str)]
))
(def LOAD-REFRESH-INTERVAL-MS 5000)
(defn mk-refresh-load [worker]
(let [local-tasks (set (:task-ids worker))
remote-tasks (set/difference (worker-outbound-tasks worker) local-tasks)
short-executor-receive-queue-map (:short-executor-receive-queue-map worker)
next-update (atom 0)]
(fn this
([]
(let [^LoadMapping load-mapping (:load-mapping worker)
local-pop (map-val (fn [queue]
(let [q-metrics (.getMetrics queue)]
(/ (double (.population q-metrics)) (.capacity q-metrics))))
short-executor-receive-queue-map)
local-pop (map-key int local-pop)
remote-load (reduce merge (for [[np conn] @(:cached-node+port->socket worker)] (into {} (.getLoad conn remote-tasks))))
now (System/currentTimeMillis)]
(.setLocal load-mapping local-pop)
(.setRemote load-mapping remote-load)
(when (> now @next-update)
(.sendLoadMetrics (:receiver worker) local-pop)
(reset! next-update (+ LOAD-REFRESH-INTERVAL-MS now))))))))
(defn mk-refresh-connections [worker]
(let [outbound-tasks (worker-outbound-tasks worker)
conf (:conf worker)
storm-cluster-state (:storm-cluster-state worker)
storm-id (:storm-id worker)]
(fn this
([]
(this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this))))
([callback]
(let [version (.assignment-version storm-cluster-state storm-id callback)
assignment (if (= version (:version (get @(:assignment-versions worker) storm-id)))
(:data (get @(:assignment-versions worker) storm-id))
(let [new-assignment (.assignment-info-with-version storm-cluster-state storm-id callback)]
(swap! (:assignment-versions worker) assoc storm-id new-assignment)
(:data new-assignment)))
my-assignment (-> assignment
:executor->node+port
to-task->node+port
(select-keys outbound-tasks)
(#(map-val endpoint->string %)))
;; we dont need a connection for the local tasks anymore
needed-assignment (->> my-assignment
(filter-key (complement (-> worker :task-ids set))))
needed-connections (-> needed-assignment vals set)
needed-tasks (-> needed-assignment keys)
current-connections (set (keys @(:cached-node+port->socket worker)))
new-connections (set/difference needed-connections current-connections)
remove-connections (set/difference current-connections needed-connections)]
(swap! (:cached-node+port->socket worker)
#(HashMap. (merge (into {} %1) %2))
(into {}
(dofor [endpoint-str new-connections
:let [[node port] (string->endpoint endpoint-str)]]
[endpoint-str
(.connect
^IContext (:mq-context worker)
storm-id
((:node->host assignment) node)
port)
]
)))
(write-locked (:endpoint-socket-lock worker)
(reset! (:cached-task->node+port worker)
(HashMap. my-assignment)))
(doseq [endpoint remove-connections]
(.close (get @(:cached-node+port->socket worker) endpoint)))
(apply swap!
(:cached-node+port->socket worker)
#(HashMap. (apply dissoc (into {} %1) %&))
remove-connections)
)))))
(defn refresh-storm-active
([worker]
(refresh-storm-active worker (fn [& ignored] (schedule (:refresh-active-timer worker) 0 (partial refresh-storm-active worker)))))
([worker callback]
(let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) callback)]
(reset!
(:storm-active-atom worker)
(and (= :active (-> base :status :type)) @(:worker-active-flag worker)))
(reset! (:storm-component->debug-atom worker) (-> base :component->debug))
(log-debug "Event debug options " @(:storm-component->debug-atom worker)))))
;; TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues
(defn mk-transfer-tuples-handler [worker]
(let [^DisruptorQueue transfer-queue (:transfer-queue worker)
drainer (TransferDrainer.)
node+port->socket (:cached-node+port->socket worker)
task->node+port (:cached-task->node+port worker)
endpoint-socket-lock (:endpoint-socket-lock worker)
]
(disruptor/clojure-handler
(fn [packets _ batch-end?]
(.add drainer packets)
(when batch-end?
(read-locked endpoint-socket-lock
(let [node+port->socket @node+port->socket
task->node+port @task->node+port]
(.send drainer task->node+port node+port->socket)))
(.clear drainer))))))
;; Check whether this messaging connection is ready to send data
(defn is-connection-ready [^IConnection connection]
(if (instance? ConnectionWithStatus connection)
(let [^ConnectionWithStatus connection connection
status (.status connection)]
(= status ConnectionWithStatus$Status/Ready))
true))
;; all connections are ready
(defn all-connections-ready [worker]
(let [connections (vals @(:cached-node+port->socket worker))]
(every? is-connection-ready connections)))
;; we will wait all connections to be ready and then activate the spout/bolt
;; when the worker bootup
(defn activate-worker-when-all-connections-ready
[worker]
(let [timer (:refresh-active-timer worker)
delay-secs 0
recur-secs 1]
(schedule timer
delay-secs
(fn this []
(if (all-connections-ready worker)
(do
(log-message "All connections are ready for worker " (:assignment-id worker) ":" (:port worker)
" with id "(:worker-id worker))
(reset! (:worker-active-flag worker) true))
(schedule timer recur-secs this :check-active false)
)))))
(defn register-callbacks [worker]
(log-message "Registering IConnectionCallbacks for " (:assignment-id worker) ":" (:port worker))
(msg-loader/register-callback (:transfer-local-fn worker)
(:receiver worker)
(:storm-conf worker)
(worker-context worker)))
(defn- close-resources [worker]
(let [dr (:default-shared-resources worker)]
(log-message "Shutting down default resources")
(.shutdownNow (get dr WorkerTopologyContext/SHARED_EXECUTOR))
(log-message "Shut down default resources")))
(defn- get-logger-levels []
(into {}
(let [logger-config (.getConfiguration (LogManager/getContext false))]
(for [[logger-name logger] (.getLoggers logger-config)]
{logger-name (.getLevel logger)}))))
(defn set-logger-level [logger-context logger-name new-level]
(let [config (.getConfiguration logger-context)
logger-config (.getLoggerConfig config logger-name)]
(if (not (= (.getName logger-config) logger-name))
;; create a new config. Make it additive (true) s.t. inherit
;; parents appenders
(let [new-logger-config (LoggerConfig. logger-name new-level true)]
(log-message "Adding config for: " new-logger-config " with level: " new-level)
(.addLogger config logger-name new-logger-config))
(do
(log-message "Setting " logger-config " log level to: " new-level)
(.setLevel logger-config new-level)))))
;; function called on timer to reset log levels last set to DEBUG
;; also called from process-log-config-change
(defn reset-log-levels [latest-log-config-atom]
(let [latest-log-config @latest-log-config-atom
logger-context (LogManager/getContext false)]
(doseq [[logger-name logger-setting] (sort latest-log-config)]
(let [timeout (:timeout logger-setting)
target-log-level (:target-log-level logger-setting)
reset-log-level (:reset-log-level logger-setting)]
(when (> (coerce/to-long (time/now)) timeout)
(log-message logger-name ": Resetting level to " reset-log-level)
(set-logger-level logger-context logger-name reset-log-level)
(swap! latest-log-config-atom
(fn [prev]
(dissoc prev logger-name))))))
(.updateLoggers logger-context)))
;; when a new log level is received from zookeeper, this function is called
(defn process-log-config-change [latest-log-config original-log-levels log-config]
(when log-config
(log-debug "Processing received log config: " log-config)
;; merge log configs together
(let [loggers (.get_named_logger_level log-config)
logger-context (LogManager/getContext false)]
(def new-log-configs
(into {}
;; merge named log levels
(for [[msg-logger-name logger-level] loggers]
(let [logger-name (if (= msg-logger-name "ROOT")
LogManager/ROOT_LOGGER_NAME
msg-logger-name)]
;; the new-timeouts map now contains logger => timeout
(when (.is_set_reset_log_level_timeout_epoch logger-level)
{logger-name {:action (.get_action logger-level)
:target-log-level (Level/toLevel (.get_target_log_level logger-level))
:reset-log-level (or (.get @original-log-levels logger-name) (Level/INFO))
:timeout (.get_reset_log_level_timeout_epoch logger-level)}})))))
;; look for deleted log timeouts
(doseq [[logger-name logger-val] (sort @latest-log-config)]
(when (not (contains? new-log-configs logger-name))
;; if we had a timeout, but the timeout is no longer active
(set-logger-level
logger-context logger-name (:reset-log-level logger-val))))
;; apply new log settings we just received
;; the merged configs are only for the reset logic
(doseq [[msg-logger-name logger-level] (sort (into {} (.get_named_logger_level log-config)))]
(let [logger-name (if (= msg-logger-name "ROOT")
LogManager/ROOT_LOGGER_NAME
msg-logger-name)
level (Level/toLevel (.get_target_log_level logger-level))
action (.get_action logger-level)]
(if (= action LogLevelAction/UPDATE)
(set-logger-level logger-context logger-name level))))
(.updateLoggers logger-context)
(reset! latest-log-config new-log-configs)
(log-debug "New merged log config is " @latest-log-config))))
(defn deserialize-worker-hooks [worker]
(let [topology (:topology worker)
topo-conf (:storm-conf worker)
worker-topology-context (worker-context worker)
hooks (.get_worker_hooks topology)
deserialized-worker-hooks (:deserialized-worker-hooks worker)]
(dofor [hook hooks]
(let [hook-bytes (Utils/toByteArray hook)
deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)]
(.add deserialized-worker-hooks deser-hook)))))
(defn run-worker-start-hooks [worker]
(let [topology (:topology worker)
topo-conf (:storm-conf worker)
worker-topology-context (worker-context worker)
deserialized-worker-hooks (:deserialized-worker-hooks worker)]
(dofor [hook deserialized-worker-hooks]
(.start hook topo-conf worker-topology-context))))
(defn run-worker-shutdown-hooks [worker]
(let [deserialized-worker-hooks (:deserialized-worker-hooks worker)]
(dofor [hook deserialized-worker-hooks]
(.shutdown hook))))
;; TODO: should worker even take the storm-id as input? this should be
;; deducable from cluster state (by searching through assignments)
;; what about if there's inconsistency in assignments? -> but nimbus
;; should guarantee this consistency
(defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id]
(log-message "Launching worker for " storm-id " on " assignment-id ":" port " with id " worker-id
" and conf " (ConfigUtils/maskPasswords conf))
;; create an empty list to store deserialized hooks
(def deserialized-hooks (java.util.ArrayList.))
(if-not (local-mode? conf)
(redirect-stdio-to-slf4j!))
;; because in local mode, its not a separate
;; process. supervisor will register it in this case
(when (= :distributed (cluster-mode conf))
(let [pid (process-pid)]
(touch (worker-pid-path conf worker-id pid))
(spit (worker-artifacts-pid-path conf storm-id port) pid)))
(declare establish-log-setting-callback)
(StormMetricRegistry/start conf DaemonType/WORKER)
;; start out with empty list of timeouts
(def latest-log-config (atom {}))
(def original-log-levels (atom {}))
(let [storm-conf (read-supervisor-storm-conf conf storm-id)
storm-conf (override-login-config-with-system-property storm-conf)
acls (Utils/getWorkerACL storm-conf)
cluster-state (cluster/mk-distributed-cluster-state conf :auth-conf storm-conf :acls acls :context (ClusterStateContext. DaemonType/WORKER))
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state :acls acls)
initial-credentials (.credentials storm-cluster-state storm-id nil)
auto-creds (AuthUtils/GetAutoCredentials storm-conf)
subject (AuthUtils/populateSubject nil auto-creds initial-credentials)]
(Subject/doAs subject (reify PrivilegedExceptionAction
(run [this]
(let [worker (worker-data conf shared-mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state)
heartbeat-fn #(do-heartbeat worker)
;; do this here so that the worker process dies if this fails
;; it's important that worker heartbeat to supervisor ASAP when launching so that the supervisor knows it's running (and can move on)
_ (heartbeat-fn)
executors (atom nil)
;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
;; to the supervisor
_ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
_ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors))
_ (register-callbacks worker)
refresh-connections (mk-refresh-connections worker)
refresh-load (mk-refresh-load worker)
_ (refresh-connections nil)
_ (activate-worker-when-all-connections-ready worker)
_ (refresh-storm-active worker nil)
_ (deserialize-worker-hooks worker)
_ (run-worker-start-hooks worker)
_ (reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e initial-credentials)))
transfer-tuples (mk-transfer-tuples-handler worker)
transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples)
disruptor-handler (mk-disruptor-backpressure-handler worker)
_ (.registerBackpressureCallback (:transfer-queue worker) disruptor-handler)
_ (-> (.setHighWaterMark (:transfer-queue worker) ((:storm-conf worker) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
(.setLowWaterMark ((:storm-conf worker) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
(.setEnableBackpressure ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)))
backpressure-handler (mk-backpressure-handler @executors storm-conf)
backpressure-thread (WorkerBackpressureThread. (:backpressure-trigger worker) worker backpressure-handler)
_ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
(.start backpressure-thread))
;; this callback is registered as a zk watch on topology's backpressure directory
;; which makes sure that the topology's backpressure status is updated to the worker's throttle-on
backpressure-znode-timeout-ms (* (storm-conf BACKPRESSURE-ZNODE-TIMEOUT-SECS) 1000)
topology-backpressure-callback (fn cb [& ignored]
(let [throttle-on (.topology-backpressure storm-cluster-state storm-id backpressure-znode-timeout-ms cb)]
(reset! (:throttle-on worker) throttle-on)))
_ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
(.topology-backpressure storm-cluster-state storm-id backpressure-znode-timeout-ms topology-backpressure-callback))
shutdown* (fn []
(log-message "Shutting down worker " storm-id " " assignment-id " " port)
(doseq [[_ socket] @(:cached-node+port->socket worker)]
;; this will do best effort flushing since the linger period
;; was set on creation
(.close socket))
(log-message "Terminating messaging context")
(log-message "Shutting down executors")
(doseq [executor @executors] (.shutdown executor))
(log-message "Shut down executors")
;;this is fine because the only time this is shared is when it's a local context,
;;in which case it's a noop
(.term ^IContext (:mq-context worker))
(log-message "Shutting down transfer thread")
(disruptor/halt-with-interrupt! (:transfer-queue worker))
(.interrupt transfer-thread)
(.join transfer-thread)
(log-message "Shut down transfer thread")
(.terminate backpressure-thread)
(log-message "Shut down backpressure thread")
(cancel-timer (:heartbeat-timer worker))
(cancel-timer (:refresh-connections-timer worker))
(cancel-timer (:refresh-credentials-timer worker))
(cancel-timer (:refresh-active-timer worker))
(cancel-timer (:executor-heartbeat-timer worker))
(cancel-timer (:user-timer worker))
(cancel-timer (:refresh-load-timer worker))
(close-resources worker)
(StormMetricRegistry/stop)
(log-message "Trigger any worker shutdown hooks")
(run-worker-shutdown-hooks worker)
(.remove-worker-heartbeat! (:storm-cluster-state worker) storm-id assignment-id port)
(.remove-worker-backpressure! (:storm-cluster-state worker) storm-id assignment-id port)
(log-message "Disconnecting from storm cluster state context")
(.disconnect (:storm-cluster-state worker))
(.close (:cluster-state worker))
(log-message "Shut down worker " storm-id " " assignment-id " " port))
ret (reify
Shutdownable
(shutdown
[this]
(shutdown*))
DaemonCommon
(waiting? [this]
(and
(timer-waiting? (:heartbeat-timer worker))
(timer-waiting? (:refresh-connections-timer worker))
(timer-waiting? (:refresh-load-timer worker))
(timer-waiting? (:refresh-credentials-timer worker))
(timer-waiting? (:refresh-active-timer worker))
(timer-waiting? (:executor-heartbeat-timer worker))
(timer-waiting? (:user-timer worker))
))
)
credentials (atom initial-credentials)
check-credentials-changed (fn []
(let [new-creds (.credentials (:storm-cluster-state worker) storm-id nil)]
(when-not (= new-creds @credentials) ;;This does not have to be atomic, worst case we update when one is not needed
(AuthUtils/updateSubject subject auto-creds new-creds)
(dofor [e @executors] (.credentials-changed e new-creds))
(reset! credentials new-creds))))
check-log-config-changed (fn []
(let [log-config (.topology-log-config (:storm-cluster-state worker) storm-id nil)]
(process-log-config-change latest-log-config original-log-levels log-config)
(establish-log-setting-callback)))]
(reset! original-log-levels (get-logger-levels))
(log-message "Started with log levels: " @original-log-levels)
(defn establish-log-setting-callback []
(.topology-log-config (:storm-cluster-state worker) storm-id (fn [args] (check-log-config-changed))))
(establish-log-setting-callback)
(.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-changed)))
(schedule-recurring (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
(fn [& args]
(check-credentials-changed)))
(if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
(schedule-recurring (:refresh-backpressure-timer worker) 0 (conf TASK-BACKPRESSURE-POLL-SECS) topology-backpressure-callback))
;; The jitter allows the clients to get the data at different times, and avoids thundering herd
(when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
(schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 refresh-load))
(schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections)
(schedule-recurring (:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS) (fn [] (reset-log-levels latest-log-config)))
(schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker))
(log-message "Worker has topology config " (ConfigUtils/maskPasswords (:storm-conf worker)))
(log-message "Worker " worker-id " for storm " storm-id " on " assignment-id ":" port " has finished loading")
ret
))))))
(defmethod mk-suicide-fn
:local [conf]
(fn [] (exit-process! 1 "Worker died")))
(defmethod mk-suicide-fn
:distributed [conf]
(fn [] (exit-process! 1 "Worker died")))
(defn -main [storm-id assignment-id port-str worker-id]
(let [conf (read-storm-config)]
(setup-default-uncaught-exception-handler)
(validate-distributed-mode! conf)
(let [worker (mk-worker conf nil storm-id assignment-id (Integer/parseInt port-str) worker-id)]
(add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown worker)))))