Permalink
Browse files

make sure channel closes propagate to connection, fixes ztellman#50

  • Loading branch information...
ztellman committed Jan 25, 2012
1 parent e389f44 commit 5d6b399df6fefb100d826771fa5415840ed717b0
Showing with 40 additions and 35 deletions.
  1. +10 −5 src/aleph/http/client.clj
  2. +30 −30 src/aleph/netty.clj
View
@@ -95,9 +95,11 @@
(run-pipeline client
:error-handler (fn [_])
(fn [ch]
(splice
(wrap-response-stream options ch)
(siphon->> (wrap-request-stream options) ch))))))
(let [requests (siphon->> (wrap-request-stream options) ch)]
(on-closed requests #(close ch))
(splice
(wrap-response-stream options ch)
requests))))))
(defn- http-client- [client-fn options]
(let [options (process-options options)
@@ -173,16 +175,19 @@
(error! response
(TimeoutException. (str "HTTP request timed out after " (elapsed) " milliseconds.")))))))
;; request
(let [connection (http-connection
(update-in request [:probes :errors]
#(or % nil-channel)))
close-connection (pipeline :error-handler (fn [_]) close)]
(run-pipeline response
:error-handler (fn [ex]
(close-connection connection)))
(run-pipeline connection
:error-handler (fn [ex]
(close-connection connection)
(when-not (instance? TimeoutException)
(error! response ex)))
(error! response ex))
(fn [ch]
(enqueue ch request)
(read-channel ch timeout))
View
@@ -309,31 +309,31 @@
(.addLast pipeline
"close-listener"
(create-simple-channel-upstream-handler close-callback)))
(.addFirst pipeline
"channel-listener"
(if (and refuse-connections? @refuse-connections?)
(refuse-connection-stage)
(upstream-stage
(fn [evt]
(when-let [ch ^Channel (channel-event evt)]
(if (.isOpen ch)
(when (.add channel-group ch)
(let [origin (channel-origin ch)
connections (swap! connection-count inc)]
(trace connections-probe
{:event :opened
:connections connections
:address origin})
(run-pipeline (.getCloseFuture ch)
wrap-netty-channel-future
(fn [_]
(let [connections (swap! connection-count dec)]
(trace connections-probe
{:event :closed
:connections connections
:address origin}))))))))
nil))))
pipeline)))))
(.addFirst pipeline
"channel-listener"
(if (and refuse-connections? @refuse-connections?)
(refuse-connection-stage)
(upstream-stage
(fn [evt]
(when-let [ch ^Channel (channel-event evt)]
(if (.isOpen ch)
(when (.add channel-group ch)
(let [origin (channel-origin ch)
connections (swap! connection-count inc)]
(trace connections-probe
{:event :opened
:connections connections
:address origin})
(run-pipeline (.getCloseFuture ch)
wrap-netty-channel-future
(fn [_]
(let [connections (swap! connection-count dec)]
(trace connections-probe
{:event :closed
:connections connections
:address origin}))))))))
nil))))
pipeline)))))
(defn graceful-shutdown [server timeout]
(let [connections (server-probe server :connections)
@@ -484,16 +484,16 @@
;; set pipeline factory
(.setPipelineFactory client
(create-pipeline-factory
(create-pipeline-factory
channel-group
options
(canonical-probe [(:name options) :connections])
nil
(fn [_]
(close inner)
(close outer)
(run-pipeline (.close channel-group)
wrap-netty-channel-group-future))
(close inner)
(close outer)
(run-pipeline (.close channel-group)
wrap-netty-channel-group-future))
pipeline-fn
outer))

0 comments on commit 5d6b399

Please sign in to comment.