diff --git a/src/lamina/connections.clj b/src/lamina/connections.clj index 39257fb..c36070f 100644 --- a/src/lamina/connections.clj +++ b/src/lamina/connections.clj @@ -22,11 +22,11 @@ (defn- incr-delay [delay] (if (zero? delay) 125 - (min 64000 (* 2 delay)))) + (min 2000 (* 2 delay)))) (defn- wait-for-close "Returns a result-channel representing the closing of the channel." - [ch options] + [ch] (closed-result ch)) (defn- connect-loop @@ -75,8 +75,8 @@ (when-let [new-connection-callback (:connection-callback options)] (new-connection-callback ch)) (fn [_] - (success! @result ch) - (wait-for-close ch options)))) + (future (success! @result ch)) + (wait-for-close ch)))) ;; wait here for connection to drop (fn [_] (trace [probe-prefix :connection:lost] (assoc desc :event :connection-lost, :elapsed (elapsed))) diff --git a/src/lamina/core/channel.clj b/src/lamina/core/channel.clj index a97955b..3a3d8ce 100644 --- a/src/lamina/core/channel.clj +++ b/src/lamina/core/channel.clj @@ -138,13 +138,15 @@ ontains the specified messages." (defn on-drained "Registers callbacks that will be triggered by the channel being drained." [ch & callbacks] - (-> ch queue (q/on-drained (map unwrap-fn callbacks)))) + (-> ch queue (q/on-drained (map unwrap-fn callbacks))) + true) (defn on-closed "Registers callbacks that will be triggered by the channel closing." [ch & callbacks] (let [callbacks (map unwrap-fn callbacks)] - (-> ch consumer (o/subscribe (zipmap callbacks (map #(o/observer nil % nil) callbacks)))))) + (-> ch consumer (o/subscribe (zipmap callbacks (map #(o/observer nil % nil) callbacks)))) + true)) (defn close "Closes the channel." diff --git a/test/lamina/test/connections.clj b/test/lamina/test/connections.clj index 66308f2..7335267 100644 --- a/test/lamina/test/connections.clj +++ b/test/lamina/test/connections.clj @@ -123,7 +123,7 @@ (Thread/sleep 100) (start-server) (Thread/sleep 1000) - (is (= (range 10) (map #(wait-for-result % 100) results)))) + (is (= (range 10) (map #(wait-for-result % 10000) results)))) (finally (close-connection f)))))) @@ -143,7 +143,7 @@ (is (thrown? TimeoutException @(f "echo" 10))) (start-server) (is (thrown? TimeoutException @(f "echo2" 10))) - (is (= "echo3" @(f "echo3" 2000))) + (is (= "echo3" @(f "echo3" 10000))) (finally (close-connection f)))))) @@ -155,20 +155,29 @@ (.start (Thread. #(loop [] - (Thread/sleep 1000) + (Thread/sleep 4000) (stop-server) + ;;(println "Disconnecting") (Thread/sleep 100) (start-server) + ;;(println "Reconnecting") (when @continue (recur))))) (try - (let [s (range 1e3)] - (is (= s (map - #(let [val (wait-for-result (f %) 5000)] - (when (zero? (rem val 1000)) - (println val)) - val) - s)))) + (let [s (range 1e4)] + (is (= s + (->> s + (map (fn [x] + (run-pipeline + (f x 1e5) + :error-handler (fn [_]) + #(do + #_(when (zero? (rem % 100)) + (println %)) + %)))) + doall + (map deref) + )))) (finally (reset! continue false) (close-connection f))))))