From 6a41b22bd7b633653633b868bcd4ca9cb8b858f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Thu, 25 May 2017 22:32:42 +0200 Subject: [PATCH] STORM-2535: Replace test-reset-timeout with a more reliable test --- .../org/apache/storm/integration_test.clj | 53 +++++++++++++------ 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj index 775949e3086..a1477136ae4 100644 --- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj +++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj @@ -129,19 +129,25 @@ (advance-cluster-time cluster 12) (assert-failed tracker 2) ))) - -(defbolt extend-timeout-twice {} {:prepare true} + +(defbolt reset-timeout-bolt {} {:prepare true} [conf context collector] - (let [state (atom -1)] + (let [tuple-counter (atom 1) + first-tuple (atom nil)] (bolt (execute [tuple] - (do - (Time/sleep (* 8 1000)) - (reset-timeout! collector tuple) - (Time/sleep (* 8 1000)) - (reset-timeout! collector tuple) - (Time/sleep (* 8 1000)) - (ack! collector tuple) + (do + (condp = @tuple-counter + 1 (reset! first-tuple tuple) + 2 (reset-timeout! collector @first-tuple) + 5 (do + (ack! collector @first-tuple) + (ack! collector tuple) + ) + (do + (reset-timeout! collector @first-tuple) + (ack! collector tuple))) + (swap! tuple-counter inc) ))))) (deftest test-reset-timeout @@ -151,18 +157,33 @@ _ (.setAckFailDelegate feeder tracker) topology (thrift/mk-topology {"1" (thrift/mk-spout-spec feeder)} - {"2" (thrift/mk-bolt-spec {"1" :global} extend-timeout-twice)})] + {"2" (thrift/mk-bolt-spec {"1" :global} reset-timeout-bolt)})] (submit-local-topology (:nimbus cluster) "timeout-tester" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10} topology) - (advance-cluster-time cluster 11) + ;The first tuple will be used to check timeout reset (.feed feeder ["a"] 1) - (advance-cluster-time cluster 21) + ;The second tuple is used to wait for the spout to rotate the spout's pending map + (.feed feeder ["b"] 2) + (advance-cluster-time cluster 9) + ;The other tuples are used to reset the first tuple's timeout, + ;and to wait for the message to get through to the spout (acks use the same path as timeout resets) + (.feed feeder ["c"] 3) + (assert-acked tracker 3) + (advance-cluster-time cluster 9) + (.feed feeder ["d"], 4) + (assert-acked tracker 4) + (advance-cluster-time cluster 2) + ;The time is now twice the message timeout, the second tuple should expire since it was not acked + ;Waiting for this also ensures that the first tuple gets failed if reset-timeout doesn't work + (assert-failed tracker 2) + ;Put in a tuple to cause the first tuple to be acked + (.feed feeder ["e"], 5) + (assert-acked tracker 5) + ;The first tuple should be acked and should not have failed (is (not (.isFailed tracker 1))) - (is (not (.isAcked tracker 1))) - (advance-cluster-time cluster 5) - (assert-acked tracker 1) + (is (.isAcked tracker 1)) ))) (defn mk-validate-topology-1 []