Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ zmq.hwm: 0
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 5242880 #5MB buffer
# Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker.
# Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker. The reconnection period need also be bigger than storm.zookeeper.session.timeout(default is 20s), so that we can abort the reconnection when the target worker is dead.
storm.messaging.netty.max_retries: 300
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.min_wait_ms: 100
Expand Down
52 changes: 46 additions & 6 deletions storm-core/src/clj/backtype/storm/daemon/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
(:import [java.util ArrayList HashMap])
(:import [backtype.storm.utils TransferDrainer])
(:import [backtype.storm.messaging TransportFactory])
(:import [backtype.storm.messaging TaskMessage IContext IConnection])
(:import [backtype.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status])
(:import [backtype.storm.security.auth AuthUtils])
(:import [javax.security.auth Subject])
(:import [java.security PrivilegedExceptionAction])
Expand Down Expand Up @@ -217,6 +217,10 @@
:worker-id worker-id
:cluster-state cluster-state
:storm-cluster-state storm-cluster-state
;; when worker bootup, worker will start to setup initial connections to
;; other workers. When all connection is ready, we will enable this flag
;; and spout and bolt will be activated.
:worker-active-flag (atom false)
:storm-active-atom (atom false)
:executors executors
:task-ids (->> receive-queue-map keys (map int) sort)
Expand Down Expand Up @@ -321,7 +325,7 @@
(let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) callback)]
(reset!
(:storm-active-atom worker)
(= :active (-> base :status :type))
(and (= :active (-> base :status :type)) @(:worker-active-flag worker))
))
))

Expand All @@ -343,6 +347,37 @@
(.send drainer node+port->socket)))
(.clear drainer))))))

;; Check whether this messaging connection is ready to send data
(defn is-connection-ready [^IConnection connection]
(if (instance? ConnectionWithStatus connection)
(let [^ConnectionWithStatus connection connection
status (.status connection)]
(= status ConnectionWithStatus$Status/Ready))
true))

;; all connections are ready
(defn all-connections-ready [worker]
(let [connections (vals @(:cached-node+port->socket worker))]
(every? is-connection-ready connections)))

;; we will wait all connections to be ready and then activate the spout/bolt
;; when the worker bootup
(defn activate-worker-when-all-connections-ready
[worker]
(let [timer (:refresh-active-timer worker)
delay-secs 0
recur-secs 1]
(schedule timer
delay-secs
(fn this []
(if (all-connections-ready worker)
(do
(log-message "All connections are ready for worker " (:assignment-id worker) ":" (:port worker)
" with id "(:worker-id worker))
(reset! (:worker-active-flag worker) true))
(schedule timer recur-secs this :check-active false)
)))))

(defn launch-receive-thread [worker]
(log-message "Launching receive-thread for " (:assignment-id worker) ":" (:port worker))
(msg-loader/launch-receive-thread!
Expand Down Expand Up @@ -395,21 +430,26 @@
;; do this here so that the worker process dies if this fails
;; it's important that worker heartbeat to supervisor ASAP when launching so that the supervisor knows it's running (and can move on)
_ (heartbeat-fn)

executors (atom nil)
;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
;; to the supervisor
_ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
_ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors))

receive-thread-shutdown (launch-receive-thread worker)

refresh-connections (mk-refresh-connections worker)

_ (refresh-connections nil)

_ (activate-worker-when-all-connections-ready worker)

_ (refresh-storm-active worker nil)



_ (reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e initial-credentials)))
receive-thread-shutdown (launch-receive-thread worker)


transfer-tuples (mk-transfer-tuples-handler worker)

transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples)
Expand Down
2 changes: 1 addition & 1 deletion storm-core/src/clj/backtype/storm/messaging/local.clj
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,4 @@
(defn mk-context []
(let [context (LocalContext. nil nil)]
(.prepare ^IContext context nil)
context))
context))
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package backtype.storm.messaging;

public abstract class ConnectionWithStatus implements IConnection {

public static enum Status {

/**
* we are establishing a active connection with target host. The new data
* sending request can be buffered for future sending, or dropped(cases like
* there is no enough memory). It varies with difference IConnection
* implementations.
*/
Connecting,

/**
* We have a alive connection channel, which can be used to transfer data.
*/
Ready,

/**
* The connection channel is closed or being closed. We don't accept further
* data sending or receiving. All data sending request will be dropped.
*/
Closed
};

/**
* whether this connection is available to transfer data
*/
public abstract Status status();

}
Loading