diff --git a/storm-core/src/clj/backtype/storm/LocalCluster.clj b/storm-core/src/clj/backtype/storm/LocalCluster.clj index 77f3b3f8816..4ddf75468b1 100644 --- a/storm-core/src/clj/backtype/storm/LocalCluster.clj +++ b/storm-core/src/clj/backtype/storm/LocalCluster.clj @@ -19,7 +19,7 @@ (:gen-class :init init :implements [backtype.storm.ILocalCluster] - :constructors {[] [] [java.util.Map] []} + :constructors {[] [] [java.util.Map] [] [String Long] []} :state state )) (defn -init @@ -27,6 +27,11 @@ (let [ret (mk-local-storm-cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})] [[] ret] )) + ([^String zk-host ^Long zk-port] + (let [ret (mk-local-storm-cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true + STORM-ZOOKEEPER-SERVERS (list zk-host) + STORM-ZOOKEEPER-PORT zk-port})] + [[] zk-host zk-port])) ([^Map stateMap] [[] stateMap])) @@ -75,4 +80,3 @@ (defn -getState [this] (.state this)) - diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj index 7bbe23844eb..c704725d6d9 100644 --- a/storm-core/src/clj/backtype/storm/testing.clj +++ b/storm-core/src/clj/backtype/storm/testing.clj @@ -116,17 +116,18 @@ ;; if need to customize amt of ports more, can use add-supervisor calls afterwards (defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024] (let [zk-tmp (local-temp-path) - [zk-port zk-handle] (zk/mk-inprocess-zookeeper zk-tmp) + [zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS) + (zk/mk-inprocess-zookeeper zk-tmp)) daemon-conf (merge (read-storm-config) {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true ZMQ-LINGER-MILLIS 0 TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50 - } - daemon-conf - {STORM-CLUSTER-MODE "local" - STORM-ZOOKEEPER-PORT zk-port - STORM-ZOOKEEPER-SERVERS ["localhost"]}) + STORM-CLUSTER-MODE "local"} + (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS) + {STORM-ZOOKEEPER-PORT zk-port + STORM-ZOOKEEPER-SERVERS ["localhost"]}) + daemon-conf) nimbus-tmp (local-temp-path) port-counter (mk-counter supervisor-slot-port-min) nimbus (nimbus/service-handler @@ -140,7 +141,7 @@ :state (mk-distributed-cluster-state daemon-conf) :storm-cluster-state (mk-storm-cluster-state daemon-conf) :tmp-dirs (atom [nimbus-tmp zk-tmp]) - :zookeeper zk-handle + :zookeeper (if (not-nil? zk-handle) zk-handle) :shared-context context} supervisor-confs (if (sequential? supervisors) supervisors @@ -174,9 +175,11 @@ ;; race condition here? will it launch the workers again? (supervisor/kill-supervisor s)) (psim/kill-all-processes) - (log-message "Shutting down in process zookeeper") - (zk/shutdown-inprocess-zookeeper (:zookeeper cluster-map)) - (log-message "Done shutting down in process zookeeper") + (if (not-nil? (:zookeeper cluster-map)) + (do + (log-message "Shutting down in process zookeeper") + (zk/shutdown-inprocess-zookeeper (:zookeeper cluster-map)) + (log-message "Done shutting down in process zookeeper"))) (doseq [t @(:tmp-dirs cluster-map)] (log-message "Deleting temporary path " t) (try @@ -303,7 +306,7 @@ (let [conf (:conf supervisor) supervisor-id (:supervisor-id supervisor) port (find-worker-port conf worker-id) - existing (get @capture-atom [supervisor-id port] 0)] + existing (get @capture-atom [supervisor-id port] 0)] (swap! capture-atom assoc [supervisor-id port] (inc existing)) (existing-fn supervisor worker-id) )))) @@ -328,7 +331,7 @@ (let [state (:storm-cluster-state cluster-map) nimbus (:nimbus cluster-map) storm-id (common/get-storm-id state storm-name) - + component->tasks (reverse-map (common/storm-task-info (.getUserTopology nimbus storm-id) @@ -337,7 +340,7 @@ (select-keys component->tasks component-ids) component->tasks) task-ids (apply concat (vals component->tasks)) - assignment (.assignment-info state storm-id nil) + assignment (.assignment-info state storm-id nil) taskbeats (.taskbeats state storm-id (:task->node+port assignment)) heartbeats (dofor [id task-ids] (get taskbeats id)) stats (dofor [hb heartbeats] (if hb (stat-key (:stats hb)) 0))] @@ -424,7 +427,7 @@ (.set_bolts topology (assoc (clojurify-structure bolts) (uuid) - (Bolt. + (Bolt. (serialize-component-object capturer) (mk-plain-component-common (into {} (for [[id direct?] all-streams] [id (if direct? @@ -452,7 +455,7 @@ (FixedTuple. (:stream tup) (:values tup)) tup)))) mock-sources) - + ] (doseq [[id spout] replacements] @@ -466,10 +469,10 @@ (doseq [spout (spout-objects spouts)] (startup spout)) - + (submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology) - - + + (let [storm-id (common/get-storm-id state storm-name)] (while-timeout TEST-TIMEOUT-MS (not (every? exhausted? (spout-objects spouts))) (simulate-wait cluster-map)) @@ -499,7 +502,7 @@ (read-tuples results component-id Utils/DEFAULT_STREAM_ID) )) -(defn ms= [& args] +(defn ms= [& args] (apply = (map multi-set args))) (def TRACKER-BOLT-ID "+++tracker-bolt") @@ -579,7 +582,7 @@ track-id (-> tracked-topology :cluster ::track-id) waiting? (fn [] (or (not= target (global-amt track-id "spout-emitted")) - (not= (global-amt track-id "transferred") + (not= (global-amt track-id "transferred") (global-amt track-id "processed")) ))] (while-timeout TEST-TIMEOUT-MS (waiting?)