Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

no zmq local mode finished

  • Loading branch information...
commit 969df632859e9e6f8e4405e2cd48d959cadd672e 1 parent 25e8d8a
@nathanmarz nathanmarz authored
View
11 src/clj/backtype/storm/daemon/supervisor.clj
@@ -160,7 +160,7 @@
;; in local state, supervisor stores who its current assignments are
;; another thread launches events to restart any dead processes if necessary
-(defserverfn mk-supervisor [conf]
+(defserverfn mk-supervisor [conf shared-context]
(FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf)))
(let [active (atom true)
uptime (uptime-computer)
@@ -227,6 +227,7 @@
id
)
(launch-worker conf
+ shared-context
(:storm-id assignment)
supervisor-id
port
@@ -359,7 +360,7 @@
(defmethod launch-worker
- :distributed [conf storm-id supervisor-id port worker-id worker-thread-pids-atom]
+ :distributed [conf shared-context storm-id supervisor-id port worker-id worker-thread-pids-atom]
(let [stormroot (supervisor-stormdist-root conf storm-id)
stormjar (supervisor-stormjar-path stormroot)
classpath (add-to-classpath (current-classpath) [stormjar])
@@ -386,9 +387,9 @@
))))
(defmethod launch-worker
- :local [conf storm-id supervisor-id port worker-id worker-thread-pids-atom]
+ :local [conf shared-context storm-id supervisor-id port worker-id worker-thread-pids-atom]
(let [pid (uuid)
- worker (worker/mk-worker conf storm-id supervisor-id port worker-id)]
+ worker (worker/mk-worker conf shared-context storm-id supervisor-id port worker-id)]
(psim/register-process pid worker)
(swap! worker-thread-pids-atom assoc worker-id pid)
))
@@ -396,4 +397,4 @@
(defn -main []
(let [conf (read-storm-config)]
(validate-distributed-mode! conf)
- (mk-supervisor conf)))
+ (mk-supervisor conf nil)))
View
32 src/clj/backtype/storm/daemon/worker.clj
@@ -7,8 +7,9 @@
(bootstrap)
-
-(defmulti mk-context cluster-mode)
+(defn local-mode-zmq? [conf]
+ (or (= (conf STORM-CLUSTER-MODE) "distributed")
+ (conf STORM-LOCAL-MODE-ZMQ)))
(defn read-worker-task-ids [storm-cluster-state storm-id supervisor-id port]
@@ -74,7 +75,7 @@
;; what about if there's inconsistency in assignments? -> but nimbus
;; should guarantee this consistency
;; TODO: consider doing worker heartbeating rather than task heartbeating to reduce the load on zookeeper
-(defserverfn mk-worker [conf storm-id supervisor-id port worker-id]
+(defserverfn mk-worker [conf mq-context storm-id supervisor-id port worker-id]
(log-message "Launching worker for " storm-id " on " supervisor-id ":" port " with id " worker-id)
(let [active (atom true)
storm-active-atom (atom false)
@@ -101,7 +102,11 @@
(worker-pids-root conf worker-id)
%)
- mq-context (mk-context conf storm-conf)
+ mq-context (if mq-context
+ mq-context
+ (msg-loader/mk-zmq-context (storm-conf ZMQ-THREADS)
+ (storm-conf ZMQ-LINGER-MILLIS)
+ (= (conf STORM-CLUSTER-MODE) "local")))
outbound-tasks (worker-outbound-tasks task->component mk-topology-context task-ids)
endpoint-socket-lock (mk-rw-lock)
node+port->socket (atom {})
@@ -187,15 +192,15 @@
0 )
:args-fn (fn [] [(ArrayList.) (TupleSerializer. storm-conf)]))
heartbeat-thread]
- _ (log-message "Launching virtual port for " supervisor-id ":" port)
- virtual-port-shutdown (if (conf STORM-LOCAL-MODE-ZMQ)
+ virtual-port-shutdown (when (local-mode-zmq? conf)
+ (log-message "Launching virtual port for " supervisor-id ":" port)
(msg-loader/launch-virtual-port!
+ (= (conf STORM-CLUSTER-MODE) "local")
mq-context
- (str "tcp://*:" port)
- :kill-fn (fn []
+ port
+ :kill-fn (fn [t]
(halt-process! 11))
:valid-ports task-ids))
- _ (log-message "Launched virtual port for " supervisor-id ":" port)
shutdown* (fn []
(log-message "Shutting down worker " storm-id " " supervisor-id " " port)
(reset! active false)
@@ -231,16 +236,9 @@
(log-message "Worker " worker-id " for storm " storm-id " on " supervisor-id ":" port " has finished loading")
ret
))
-
-(defmethod mk-context :local [conf storm-conf]
- (msg-loader/mk-local-context))
-
-(defmethod mk-context :distributed [conf storm-conf]
- (msg-loader/mk-zmq-context (storm-conf ZMQ-THREADS)
- (storm-conf ZMQ-LINGER-MILLIS)))
(defn -main [storm-id supervisor-id port-str worker-id]
(let [conf (read-storm-config)]
(validate-distributed-mode! conf)
- (mk-worker conf storm-id supervisor-id (Integer/parseInt port-str) worker-id)))
+ (mk-worker conf nil storm-id supervisor-id (Integer/parseInt port-str) worker-id)))
View
11 src/clj/backtype/storm/messaging/loader.clj
@@ -4,18 +4,21 @@
(defn mk-local-context []
(local/mk-local-context))
-(defn mk-zmq-context [num-threads linger]
+(defn mk-zmq-context [& args]
(require '[backtype.storm.messaging.zmq :as zmq])
(let [afn (-> 'backtype.storm.messaging.zmq/mk-zmq-context
find-var
var-get)]
- (afn num-threads linger)))
+ (apply afn args)))
-(defn launch-virtual-port! [context & args]
+(defn launch-virtual-port! [local? context port & args]
(require '[zilch.virtual-port :as mqvp])
(require '[backtype.storm.messaging.zmq :as zmq])
(let [afn (-> 'zilch.virtual-port/launch-virtual-port!
find-var
var-get)
+ url (if local?
+ (str "ipc://" port ".ipc")
+ (str "tcp://*:" port))
]
- (apply afn (cons (.zmq-context context) args))))
+ (apply afn (concat [(.zmq-context context) url] args))))
View
18 src/clj/backtype/storm/messaging/local.clj
@@ -4,30 +4,32 @@
(:import [java.util.concurrent LinkedBlockingQueue])
)
-(deftype LocalConnection [queues-map queue]
+(defn add-queue! [queues-map lock port]
+ (locking lock
+ (when-not (contains? @queues-map port)
+ (swap! queues-map assoc port (LinkedBlockingQueue.))))
+ (@queues-map port))
+
+(deftype LocalConnection [queues-map lock queue]
Connection
(recv [this]
(when-not queue
(throw (IllegalArgumentException. "Cannot receive on this socket")))
(.take queue))
(send [this task message]
- (let [send-queue (@queues-map task)]
+ (let [send-queue (add-queue! queues-map lock task)]
(.put send-queue message)
))
(close [this]
))
-(defn add-queue! [queues-map lock port]
- (locking lock
- (if-not (contains? @queues-map port)
- (swap! queues-map assoc port (LinkedBlockingQueue.)))))
(deftype LocalContext [queues-map lock]
Context
(bind [this virtual-port]
- (LocalConnection. queues-map (add-queue! queues-map lock virtual-port)))
+ (LocalConnection. queues-map lock (add-queue! queues-map lock virtual-port)))
(connect [this host port]
- (LocalConnection. queues-map nil)
+ (LocalConnection. queues-map lock nil)
)
(send-local-task-empty [this virtual-port]
(let [queue (add-queue! queues-map lock virtual-port)]
View
14 src/clj/backtype/storm/messaging/zmq.clj
@@ -17,19 +17,23 @@
(.close socket)
))
-(deftype ZMQContext [context linger-ms]
+(deftype ZMQContext [context linger-ms ipc?]
Context
(bind [this virtual-port]
(-> context
(mq/socket mq/pull)
(mqvp/virtual-bind virtual-port)
+ (ZMQConnection.)
))
(connect [this host port]
- (let [url (str "tcp://" host ":" port)]
+ (let [url (if ipc?
+ (str "ipc://" port ".ipc")
+ (str "tcp://" host ":" port))]
(-> context
(mq/socket mq/push)
(mq/set-linger linger-ms)
- (mq/connect url))))
+ (mq/connect url)
+ (ZMQConnection.))))
(send-local-task-empty [this virtual-port]
(let [pusher (-> context (mq/socket mq/push) (mqvp/virtual-connect virtual-port))]
(mq/send pusher (mq/barr))
@@ -41,6 +45,6 @@
context))
-(defn mk-zmq-context [num-threads linger]
- (ZMQContext. (mq/context num-threads) linger))
+(defn mk-zmq-context [num-threads linger local?]
+ (ZMQContext. (mq/context num-threads) linger local?))
View
15 src/clj/backtype/storm/testing.clj
@@ -13,6 +13,7 @@
(:import [backtype.storm.testing FeederSpout FixedTupleSpout FixedTuple TupleCaptureBolt
SpoutTracker BoltTracker TrackerAggregator])
(:require [backtype.storm [zookeeper :as zk]])
+ (:require [backtype.storm.messaging.loader :as msg-loader])
(:use [clojure.contrib.def :only [defnk]])
(:use [clojure.contrib.seq :only [find-first]])
(:use [backtype.storm cluster util thrift config log]))
@@ -70,12 +71,18 @@
SUPERVISOR-SLOTS-PORTS port-ids
})
id-fn (if id (fn [] id) supervisor/generate-supervisor-id)
- daemon (with-var-roots [supervisor/generate-supervisor-id id-fn] (supervisor/mk-supervisor supervisor-conf))]
+ daemon (with-var-roots [supervisor/generate-supervisor-id id-fn] (supervisor/mk-supervisor supervisor-conf (:shared-context cluster-map)))]
(swap! (:supervisors cluster-map) conj daemon)
(swap! (:tmp-dirs cluster-map) conj tmp-dir)
daemon
))
+(defn mk-shared-context [conf]
+ (if (and (= (conf STORM-CLUSTER-MODE) "local")
+ (not (conf STORM-LOCAL-MODE-ZMQ)))
+ (msg-loader/mk-local-context)
+ ))
+
;; returns map containing cluster info
;; local dir is always overridden in maps
;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
@@ -95,6 +102,7 @@
port-counter (mk-counter)
nimbus (nimbus/service-handler
(assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp))
+ context (mk-shared-context daemon-conf)
cluster-map {:nimbus nimbus
:port-counter port-counter
:daemon-conf daemon-conf
@@ -102,7 +110,8 @@
:state (mk-distributed-cluster-state daemon-conf)
:storm-cluster-state (mk-storm-cluster-state daemon-conf)
:tmp-dirs (atom [nimbus-tmp zk-tmp])
- :zookeeper zk-handle}
+ :zookeeper zk-handle
+ :shared-context context}
supervisor-confs (if (sequential? supervisors)
supervisors
(repeat supervisors {}))]
@@ -206,7 +215,7 @@
))
(defn mk-capture-launch-fn [capture-atom]
- (fn [conf storm-id supervisor-id port worker-id _]
+ (fn [conf shared-context storm-id supervisor-id port worker-id _]
(let [existing (get @capture-atom [supervisor-id port] [])]
(swap! capture-atom assoc [supervisor-id port] (conj existing storm-id))
)))
View
2  src/clj/zilch/virtual_port.clj
@@ -48,7 +48,7 @@
(defnk launch-virtual-port!
[context url :daemon true
- :kill-fn (fn [] (System/exit 1))
+ :kill-fn (fn [t] (System/exit 1))
:priority Thread/NORM_PRIORITY
:valid-ports nil]
(let [valid-ports (set (map short valid-ports))
Please sign in to comment.
Something went wrong with that request. Please try again.