Skip to content

Commit

Permalink
fix issue with connection-loop that can appear under high load
Browse files Browse the repository at this point in the history
  • Loading branch information
ztellman committed Nov 11, 2011
1 parent 058290a commit 6171911
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 16 deletions.
8 changes: 4 additions & 4 deletions src/lamina/connections.clj
Expand Up @@ -22,11 +22,11 @@
(defn- incr-delay [delay]
(if (zero? delay)
125
(min 64000 (* 2 delay))))
(min 2000 (* 2 delay))))

(defn- wait-for-close
"Returns a result-channel representing the closing of the channel."
[ch options]
[ch]
(closed-result ch))

(defn- connect-loop
Expand Down Expand Up @@ -75,8 +75,8 @@
(when-let [new-connection-callback (:connection-callback options)]
(new-connection-callback ch))
(fn [_]
(success! @result ch)
(wait-for-close ch options))))
(future (success! @result ch))
(wait-for-close ch))))
;; wait here for connection to drop
(fn [_]
(trace [probe-prefix :connection:lost] (assoc desc :event :connection-lost, :elapsed (elapsed)))
Expand Down
6 changes: 4 additions & 2 deletions src/lamina/core/channel.clj
Expand Up @@ -138,13 +138,15 @@ ontains the specified messages."
(defn on-drained
"Registers callbacks that will be triggered by the channel being drained."
[ch & callbacks]
(-> ch queue (q/on-drained (map unwrap-fn callbacks))))
(-> ch queue (q/on-drained (map unwrap-fn callbacks)))
true)

(defn on-closed
"Registers callbacks that will be triggered by the channel closing."
[ch & callbacks]
(let [callbacks (map unwrap-fn callbacks)]
(-> ch consumer (o/subscribe (zipmap callbacks (map #(o/observer nil % nil) callbacks))))))
(-> ch consumer (o/subscribe (zipmap callbacks (map #(o/observer nil % nil) callbacks))))
true))

(defn close
"Closes the channel."
Expand Down
29 changes: 19 additions & 10 deletions test/lamina/test/connections.clj
Expand Up @@ -123,7 +123,7 @@
(Thread/sleep 100)
(start-server)
(Thread/sleep 1000)
(is (= (range 10) (map #(wait-for-result % 100) results))))
(is (= (range 10) (map #(wait-for-result % 10000) results))))
(finally
(close-connection f))))))

Expand All @@ -143,7 +143,7 @@
(is (thrown? TimeoutException @(f "echo" 10)))
(start-server)
(is (thrown? TimeoutException @(f "echo2" 10)))
(is (= "echo3" @(f "echo3" 2000)))
(is (= "echo3" @(f "echo3" 10000)))
(finally
(close-connection f))))))

Expand All @@ -155,20 +155,29 @@
(.start
(Thread.
#(loop []
(Thread/sleep 1000)
(Thread/sleep 4000)
(stop-server)
;;(println "Disconnecting")
(Thread/sleep 100)
(start-server)
;;(println "Reconnecting")
(when @continue
(recur)))))
(try
(let [s (range 1e3)]
(is (= s (map
#(let [val (wait-for-result (f %) 5000)]
(when (zero? (rem val 1000))
(println val))
val)
s))))
(let [s (range 1e4)]
(is (= s
(->> s
(map (fn [x]
(run-pipeline
(f x 1e5)
:error-handler (fn [_])
#(do
#_(when (zero? (rem % 100))
(println %))
%))))
doall
(map deref)
))))
(finally
(reset! continue false)
(close-connection f))))))
Expand Down

0 comments on commit 6171911

Please sign in to comment.