Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-213. Decouple In-Process ZooKeeper from LocalCluster. #137

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 6 additions & 2 deletions storm-core/src/clj/backtype/storm/LocalCluster.clj
Expand Up @@ -19,14 +19,19 @@
(:gen-class
:init init
:implements [backtype.storm.ILocalCluster]
:constructors {[] [] [java.util.Map] []}
:constructors {[] [] [java.util.Map] [] [String Long] []}
:state state ))

(defn -init
([]
(let [ret (mk-local-storm-cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})]
[[] ret]
))
([^String zk-host ^Long zk-port]
(let [ret (mk-local-storm-cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true
STORM-ZOOKEEPER-SERVERS (list zk-host)
STORM-ZOOKEEPER-PORT zk-port})]
[[] zk-host zk-port]))
([^Map stateMap]
[[] stateMap]))

Expand Down Expand Up @@ -75,4 +80,3 @@

(defn -getState [this]
(.state this))

43 changes: 23 additions & 20 deletions storm-core/src/clj/backtype/storm/testing.clj
Expand Up @@ -116,17 +116,18 @@
;; if need to customize amt of ports more, can use add-supervisor calls afterwards
(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024]
(let [zk-tmp (local-temp-path)
[zk-port zk-handle] (zk/mk-inprocess-zookeeper zk-tmp)
[zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
(zk/mk-inprocess-zookeeper zk-tmp))
daemon-conf (merge (read-storm-config)
{TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
ZMQ-LINGER-MILLIS 0
TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50
}
daemon-conf
{STORM-CLUSTER-MODE "local"
STORM-ZOOKEEPER-PORT zk-port
STORM-ZOOKEEPER-SERVERS ["localhost"]})
STORM-CLUSTER-MODE "local"}
(if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
{STORM-ZOOKEEPER-PORT zk-port
STORM-ZOOKEEPER-SERVERS ["localhost"]})
daemon-conf)
nimbus-tmp (local-temp-path)
port-counter (mk-counter supervisor-slot-port-min)
nimbus (nimbus/service-handler
Expand All @@ -140,7 +141,7 @@
:state (mk-distributed-cluster-state daemon-conf)
:storm-cluster-state (mk-storm-cluster-state daemon-conf)
:tmp-dirs (atom [nimbus-tmp zk-tmp])
:zookeeper zk-handle
:zookeeper (if (not-nil? zk-handle) zk-handle)
:shared-context context}
supervisor-confs (if (sequential? supervisors)
supervisors
Expand Down Expand Up @@ -174,9 +175,11 @@
;; race condition here? will it launch the workers again?
(supervisor/kill-supervisor s))
(psim/kill-all-processes)
(log-message "Shutting down in process zookeeper")
(zk/shutdown-inprocess-zookeeper (:zookeeper cluster-map))
(log-message "Done shutting down in process zookeeper")
(if (not-nil? (:zookeeper cluster-map))
(do
(log-message "Shutting down in process zookeeper")
(zk/shutdown-inprocess-zookeeper (:zookeeper cluster-map))
(log-message "Done shutting down in process zookeeper")))
(doseq [t @(:tmp-dirs cluster-map)]
(log-message "Deleting temporary path " t)
(try
Expand Down Expand Up @@ -303,7 +306,7 @@
(let [conf (:conf supervisor)
supervisor-id (:supervisor-id supervisor)
port (find-worker-port conf worker-id)
existing (get @capture-atom [supervisor-id port] 0)]
existing (get @capture-atom [supervisor-id port] 0)]
(swap! capture-atom assoc [supervisor-id port] (inc existing))
(existing-fn supervisor worker-id)
))))
Expand All @@ -328,7 +331,7 @@
(let [state (:storm-cluster-state cluster-map)
nimbus (:nimbus cluster-map)
storm-id (common/get-storm-id state storm-name)

component->tasks (reverse-map
(common/storm-task-info
(.getUserTopology nimbus storm-id)
Expand All @@ -337,7 +340,7 @@
(select-keys component->tasks component-ids)
component->tasks)
task-ids (apply concat (vals component->tasks))
assignment (.assignment-info state storm-id nil)
assignment (.assignment-info state storm-id nil)
taskbeats (.taskbeats state storm-id (:task->node+port assignment))
heartbeats (dofor [id task-ids] (get taskbeats id))
stats (dofor [hb heartbeats] (if hb (stat-key (:stats hb)) 0))]
Expand Down Expand Up @@ -424,7 +427,7 @@
(.set_bolts topology
(assoc (clojurify-structure bolts)
(uuid)
(Bolt.
(Bolt.
(serialize-component-object capturer)
(mk-plain-component-common (into {} (for [[id direct?] all-streams]
[id (if direct?
Expand Down Expand Up @@ -452,7 +455,7 @@
(FixedTuple. (:stream tup) (:values tup))
tup))))
mock-sources)


]
(doseq [[id spout] replacements]
Expand All @@ -466,10 +469,10 @@

(doseq [spout (spout-objects spouts)]
(startup spout))

(submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)


(let [storm-id (common/get-storm-id state storm-name)]
(while-timeout TEST-TIMEOUT-MS (not (every? exhausted? (spout-objects spouts)))
(simulate-wait cluster-map))
Expand Down Expand Up @@ -499,7 +502,7 @@
(read-tuples results component-id Utils/DEFAULT_STREAM_ID)
))

(defn ms= [& args]
(defn ms= [& args]
(apply = (map multi-set args)))

(def TRACKER-BOLT-ID "+++tracker-bolt")
Expand Down Expand Up @@ -579,7 +582,7 @@
track-id (-> tracked-topology :cluster ::track-id)
waiting? (fn []
(or (not= target (global-amt track-id "spout-emitted"))
(not= (global-amt track-id "transferred")
(not= (global-amt track-id "transferred")
(global-amt track-id "processed"))
))]
(while-timeout TEST-TIMEOUT-MS (waiting?)
Expand Down