Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions storm-core/src/clj/backtype/storm/testing.clj
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@

(defn wait-until-cluster-waiting
"Wait until the cluster is idle. Should be used with time simulation."
[cluster-map]
([cluster-map] (wait-until-cluster-waiting cluster-map TEST-TIMEOUT-MS))
([cluster-map timeout-ms]
;; wait until all workers, supervisors, and nimbus is waiting
(let [supervisors @(:supervisors cluster-map)
workers (filter (partial satisfies? common/DaemonCommon) (psim/all-processes))
Expand All @@ -204,12 +205,12 @@
supervisors
workers) ; because a worker may already be dead
]
(while-timeout TEST-TIMEOUT-MS (not (every? (memfn waiting?) daemons))
(while-timeout timeout-ms (not (every? (memfn waiting?) daemons))
(Thread/sleep 10)
;; (doseq [d daemons]
;; (if-not ((memfn waiting?) d)
;; (println d)))
)))
))))

(defn advance-cluster-time
([cluster-map secs increment-secs]
Expand Down Expand Up @@ -438,7 +439,7 @@
))

;; TODO: mock-sources needs to be able to mock out state spouts as well
(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true :topology-name nil]
(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true :topology-name nil :timeout-ms TEST-TIMEOUT-MS]
;; TODO: the idea of mocking for transactional topologies should be done an
;; abstraction level above... should have a complete-transactional-topology for this
(let [{topology :topology capturer :capturer} (capture-topology topology)
Expand Down Expand Up @@ -471,11 +472,11 @@


(let [storm-id (common/get-storm-id state storm-name)]
(while-timeout TEST-TIMEOUT-MS (not (every? exhausted? (spout-objects spouts)))
(while-timeout timeout-ms (not (every? exhausted? (spout-objects spouts)))
(simulate-wait cluster-map))

(.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0)))
(while-timeout TEST-TIMEOUT-MS (.assignment-info state storm-id nil)
(while-timeout timeout-ms (.assignment-info state storm-id nil)
(simulate-wait cluster-map))
(when cleanup-state
(doseq [spout (spout-objects spouts)]
Expand Down Expand Up @@ -573,8 +574,10 @@
(defn tracked-wait
"Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
([tracked-topology]
(tracked-wait tracked-topology 1))
(tracked-wait tracked-topology 1 TEST-TIMEOUT-MS))
([tracked-topology amt]
(tracked-wait tracked-topology amt TEST-TIMEOUT-MS))
([tracked-topology amt timeout-ms]
(let [target (+ amt @(:last-spout-emit tracked-topology))
track-id (-> tracked-topology :cluster ::track-id)
waiting? (fn []
Expand Down