From c86dbf956aabcd2623bdbc937dd51acfafe87d4a Mon Sep 17 00:00:00 2001 From: Aaron Levin Date: Wed, 21 May 2014 15:43:40 -0400 Subject: [PATCH] Allow users to pass TEST-TIMEOUT-MS as param It would be nice if `complete-topology` allowed a usere to pass in the default timeout as a parameter. This PR adds functionality without breaking any existing code. Tests pass. --- storm-core/src/clj/backtype/storm/testing.clj | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj index 7bbe23844eb..fd972554589 100644 --- a/storm-core/src/clj/backtype/storm/testing.clj +++ b/storm-core/src/clj/backtype/storm/testing.clj @@ -195,7 +195,8 @@ (defn wait-until-cluster-waiting "Wait until the cluster is idle. Should be used with time simulation." - [cluster-map] + ([cluster-map] (wait-until-cluster-waiting cluster-map TEST-TIMEOUT-MS)) + ([cluster-map timeout-ms] ;; wait until all workers, supervisors, and nimbus is waiting (let [supervisors @(:supervisors cluster-map) workers (filter (partial satisfies? common/DaemonCommon) (psim/all-processes)) @@ -204,12 +205,12 @@ supervisors workers) ; because a worker may already be dead ] - (while-timeout TEST-TIMEOUT-MS (not (every? (memfn waiting?) daemons)) + (while-timeout timeout-ms (not (every? (memfn waiting?) daemons)) (Thread/sleep 10) ;; (doseq [d daemons] ;; (if-not ((memfn waiting?) d) ;; (println d))) - ))) + )))) (defn advance-cluster-time ([cluster-map secs increment-secs] @@ -438,7 +439,7 @@ )) ;; TODO: mock-sources needs to be able to mock out state spouts as well -(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true :topology-name nil] +(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true :topology-name nil :timeout-ms TEST-TIMEOUT-MS] ;; TODO: the idea of mocking for transactional topologies should be done an ;; abstraction level above... should have a complete-transactional-topology for this (let [{topology :topology capturer :capturer} (capture-topology topology) @@ -471,11 +472,11 @@ (let [storm-id (common/get-storm-id state storm-name)] - (while-timeout TEST-TIMEOUT-MS (not (every? exhausted? (spout-objects spouts))) + (while-timeout timeout-ms (not (every? exhausted? (spout-objects spouts))) (simulate-wait cluster-map)) (.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0))) - (while-timeout TEST-TIMEOUT-MS (.assignment-info state storm-id nil) + (while-timeout timeout-ms (.assignment-info state storm-id nil) (simulate-wait cluster-map)) (when cleanup-state (doseq [spout (spout-objects spouts)] @@ -573,8 +574,10 @@ (defn tracked-wait "Waits until topology is idle and 'amt' more tuples have been emitted by spouts." ([tracked-topology] - (tracked-wait tracked-topology 1)) + (tracked-wait tracked-topology 1 TEST-TIMEOUT-MS)) ([tracked-topology amt] + (tracked-wait tracked-topology amt TEST-TIMEOUT-MS)) + ([tracked-topology amt timeout-ms] (let [target (+ amt @(:last-spout-emit tracked-topology)) track-id (-> tracked-topology :cluster ::track-id) waiting? (fn []