Skip to content

Commit

Permalink
Redo 'with-async-connection-pool'
Browse files Browse the repository at this point in the history
This commit drastically changes the way `with-async-connection-pool` works.
Previously we relied on creating a fake client and and starting to start the
io-reactor, and used a custom `:pooling-info` or `*pooling-info*` to do
poor man's reference counting for shutting down the pool.

No longer, this is now simplified and relies on the user managing scope
themselves.

Resolves #443
  • Loading branch information
dakrone committed Mar 28, 2018
1 parent 36bdc14 commit 1001815
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 171 deletions.
26 changes: 16 additions & 10 deletions README.org
Original file line number Diff line number Diff line change
Expand Up @@ -1071,16 +1071,22 @@ reuse the pool context, just use =reuse-pool=.
exce2))
#+END_SRC

To implement the persistent connections of async requests, we add a middleware
named =wrap-async-pooling= to the default middleware list. This middleware's
behaviour depends on =*pooling-info*= binding or =:polling-info= in options map.
The =pooling-info= contains =:conn-mgr=, =:allocate= and =:release=.
=:conn-mgr= is the connection manager used in the pooling context, =:allocate=
is a function that will be invoked when the request starts and =:release= will be invoked
when the request is finished.

=with-async-connection-pool= uses the =wrap-async-pooling= to manage the
connection manager; you can also implement your own manage strategy.
There are many advanced options available when creating asynchronous connection pools that can be
configured by passing an =:io-config= map in the connection manager parameters. It supports:

- =:connect-timeout=
- =:interest-op-queued=
- =:io-thread-count=
- =:rcv-buf-size=
- =:select-interval=
- =:shutdown-grace-period=
- =:snd-buf-size=
- =:so-keep-alive=
- =:so-linger=
- =:so-timeout=
- =:tcp-no-delay=

See the docstring on =with-async-connection-pool= for more information about these options.

If you would prefer to handle managing the connection manager yourself, you can
create a connection manager and specify it for each request:
Expand Down
1 change: 1 addition & 0 deletions changelog.org
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ List of user-visible changes that have gone into each release
- Add support for reusable http clients, returning the client in =:http-client= and allowing one to
be specified (with the same setting) - https://github.com/dakrone/clj-http/issues/441
- Cancelling the =Future= returned from an async http request now also aborts the HttpRequest object
- Async connection managers no longer put the connection manager in an illegal ACTIVE state [[https://github.com/dakrone/clj-http/issues/443][#443]]

** 3.8.0
- Reintroduce the =:save-request= and =:debug-body= options
Expand Down
109 changes: 39 additions & 70 deletions src/clj_http/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -976,50 +976,6 @@
#(respond (request-timing-response % start))
raise)))))

(defn- async-pooling-request
[req release]
(let [handler (:oncancel req)]
(assoc req :oncancel
(fn []
(release)
(when handler
(handler))))))

(def ^:dynamic *pooling-info*
"The pooling-info used in pooling function"
nil)

(defn wrap-async-pooling
"Middleware that handle pooling async request. It use the value of key
:pooling-info, the value is a map contains three key-value
:conn-mgr - connection manager used by the HttpAsyncClient
:allocate - a function with no args, will be invoked when connection allocate
:release - a function with no args, will be invoked when connection finished
You must shutdown the conn-mgr when you don't need it."
[client]
(fn
([req]
(client req))
([req respond raise]
(if-let [pooling-info (or *pooling-info* (:pooling-info req))]
(let [{:keys [allocate release conn-mgr]} pooling-info]
(binding [conn/*async-connection-manager* conn-mgr]
(allocate)
(try
(client (async-pooling-request req release)
(fn [resp]
(respond (assoc resp :pooling-info pooling-info))
(release))
(fn [ex]
(release)
(raise ex)))
(catch Throwable ex
(release)
(throw ex)))))
(client req respond raise)))))

(defn check-conflicting-socket-capture-opts
"Checks whether the request has multually exclusive options for socket capturing."
[req]
Expand Down Expand Up @@ -1062,7 +1018,6 @@
(def default-middleware
"The default list of middleware clj-http uses for wrapping requests."
[wrap-request-timing
wrap-async-pooling
wrap-header-map
wrap-query-params
wrap-basic-auth
Expand Down Expand Up @@ -1276,32 +1231,46 @@
options))

(defmacro with-async-connection-pool
"Macro to execute the body using a connection manager. Creates a
PoolingNHttpClientConnectionManager to use for all requests within the body of
the expression. An option map is allowed to set options for the connection
manager.
Handles the same options as `with-connection-pool` plus:
:io-config which should be a map containing some of the following keys:
:connect-timeout - int the default connect timeout value for connection
requests (default 0, meaning no timeout)
:interest-op-queued - boolean, whether or not I/O interest operations are to
be queued and executed asynchronously or to be applied to the underlying
SelectionKey immediately (default false)
:io-thread-count - int, the number of I/O dispatch threads to be used
(default is the number of available processors)
:rcv-buf-size - int the default value of the SO_RCVBUF parameter for
newly created sockets (default is 0, meaning the system default)
:select-interval - long, time interval in milliseconds at which to check for
timed out sessions and session requests (default 1000)
:shutdown-grace-period - long, grace period in milliseconds to wait for
individual worker threads to terminate cleanly (default 500)
:snd-buf-size - int, the default value of the SO_SNDBUF parameter for
newly created sockets (default is 0, meaning the system default)
:so-keep-alive - boolean, the default value of the SO_KEEPALIVE parameter for
newly created sockets (default false)
:so-linger - int, the default value of the SO_LINGER parameter for
newly created sockets (default -1)
:so-timeout - int, the default socket timeout value for I/O operations
(default 0, meaning no timeout)
:tcp-no-delay - boolean, the default value of the TCP_NODELAY parameter for
newly created sockets (default true)
If the value 'nil' is specified or the value is not set, the default value
will be used."
[opts & body]
`(let [cm# (conn/make-reuseable-async-conn-manager ~opts)
count# (atom 0)
all-requested# (atom false)
p-info# {:conn-mgr cm#
:allocate (fn [] (swap! count# inc))
:release (fn [] (swap! count# dec))}
;; A http client hold the conn-mgr and start the io-reactor.
;; In this context any other client's ConnectionManagerShared will be
;; set to true.
holder-client# (-> (HttpAsyncClients/custom)
(.setConnectionManager cm#)
(.build)
(.start))]
(add-watch count# :close-conn-mgr
(fn [key# identity# old# new#]
(if (and (not= old# new#) (<= new# 0) @all-requested#)
(.shutdown
^PoolingNHttpClientConnectionManager
cm#))))
(binding [*pooling-info* p-info#]
`(let [cm# (conn/make-reuseable-async-conn-manager ~opts)]
(binding [conn/*async-connection-manager* cm#]
(try
~@body
(finally
(swap! all-requested# not)
(if (= 0 @count#)
(.shutdown
^PoolingNHttpClientConnectionManager
cm#)))))))
(.shutdown
^PoolingNHttpClientConnectionManager
cm#))))))
147 changes: 56 additions & 91 deletions test/clj_http/test/client_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@
(reduce (fn [m [ks v]]
(assoc-in m ks v)) {})))

(defn count-release
[count]
(let [release* (:release client/*pooling-info*)]
(fn [] (swap! count dec) (release*))))

(deftest ^:integration roundtrip
(run-server)
;; roundtrip with scheme as a keyword
Expand Down Expand Up @@ -1010,126 +1005,96 @@
(run-server)
(client/with-async-connection-pool {}
(let [resp1 (promise) resp2 (promise)
exce1 (promise) exce2 (promise)
count (atom 2)]
(binding [client/*pooling-info*
(assoc client/*pooling-info* :release (count-release count))]
(request {:async? true :uri "/get" :method :get} resp1 exce1)
(request {:async? true :uri "/get" :method :get} resp2 exce2)
(is (= 200 (:status @resp1) (:status @resp2)))
(is (:pooling-info @resp1))
(is (:pooling-info @resp2))
(is (not (realized? exce2)))
(is (not (realized? exce1)))
(is (= 0 @count))))))
exce1 (promise) exce2 (promise)]
(request {:async? true :uri "/get" :method :get} resp1 exce1)
(request {:async? true :uri "/get" :method :get} resp2 exce2)
(is (= 200 (:status @resp1) (:status @resp2)))
(is (not (realized? exce2)))
(is (not (realized? exce1))))))

(deftest ^:integration t-with-async-pool-sleep
(run-server)
(client/with-async-connection-pool {}
(let [resp1 (promise) resp2 (promise)
exce1 (promise) exce2 (promise)
count (atom 2)]
(binding [client/*pooling-info*
(assoc client/*pooling-info* :release (count-release count))]
(request {:async? true :uri "/get" :method :get} resp1 exce1)
(Thread/sleep 500)
(request {:async? true :uri "/get" :method :get} resp2 exce2)
(is (= 200 (:status @resp1) (:status @resp2)))
(is (:pooling-info @resp1))
(is (:pooling-info @resp2))
(is (not (realized? exce2)))
(is (not (realized? exce1)))
(is (= 0 @count))))))
exce1 (promise) exce2 (promise)]
(request {:async? true :uri "/get" :method :get} resp1 exce1)
(Thread/sleep 500)
(request {:async? true :uri "/get" :method :get} resp2 exce2)
(is (= 200 (:status @resp1) (:status @resp2)))
(is (not (realized? exce2)))
(is (not (realized? exce1))))))

(deftest ^:integration t-async-pool-wrap-exception
(run-server)
(client/with-async-connection-pool {}
(let [resp1 (promise) resp2 (promise)
exce1 (promise) exce2 (promise) count (atom 2)]
(binding [client/*pooling-info*
(assoc client/*pooling-info* :release (count-release count))]
(request {:async? true :uri "/error" :method :get} resp1 exce1)
(Thread/sleep 500)
(request {:async? true :uri "/get" :method :get} resp2 exce2)
(is (realized? exce1))
(is (not (realized? exce2)))
(is (= 200 (:status @resp2)))
(is (= 0 @count))))))
(request {:async? true :uri "/error" :method :get} resp1 exce1)
(Thread/sleep 500)
(request {:async? true :uri "/get" :method :get} resp2 exce2)
(is (realized? exce1))
(is (not (realized? exce2)))
(is (= 200 (:status @resp2))))))

(deftest ^:integration t-async-pool-exception-when-start
(run-server)
(client/with-async-connection-pool {}
(let [resp1 (promise) resp2 (promise)
exce1 (promise) exce2 (promise)
count (atom 2)
middleware (fn [client]
(fn [req resp raise] (throw (Exception.))))]
(client/with-additional-middleware
[middleware]
(binding [client/*pooling-info*
(assoc client/*pooling-info* :release (count-release count))]
(try (request {:async? true :uri "/error" :method :get} resp1 exce1)
(catch Throwable ex))
(Thread/sleep 500)
(try (request {:async? true :uri "/get" :method :get} resp2 exce2)
(catch Throwable ex))
(is (not (realized? exce1)))
(is (not (realized? exce2)))
(is (not (realized? resp1)))
(is (not (realized? resp2)))
(is (= 0 @count)))))))
(try (request {:async? true :uri "/error" :method :get} resp1 exce1)
(catch Throwable ex))
(Thread/sleep 500)
(try (request {:async? true :uri "/get" :method :get} resp2 exce2)
(catch Throwable ex))
(is (not (realized? exce1)))
(is (not (realized? exce2)))
(is (not (realized? resp1)))
(is (not (realized? resp2)))))))

(deftest ^:integration t-reuse-async-pool
(run-server)
(client/with-async-connection-pool {}
(let [resp1 (promise) resp2 (promise)
exce1 (promise) exce2 (promise)
count (atom 2)]
(binding [client/*pooling-info*
(assoc client/*pooling-info* :release (count-release count))]
(request {:async? true :uri "/get" :method :get}
(fn [resp]
(resp1 resp)
(request (client/reuse-pool
{:async? true
:uri "/get"
:method :get}
resp)
resp2
exce2))
exce1)
(is (= 200 (:status @resp1) (:status @resp2)))
(is (:pooling-info @resp1))
(is (:pooling-info @resp2))
(is (not (realized? exce2)))
(is (not (realized? exce1)))
(is (= 0 @count))))))
exce1 (promise) exce2 (promise)]
(request {:async? true :uri "/get" :method :get}
(fn [resp]
(resp1 resp)
(request (client/reuse-pool
{:async? true
:uri "/get"
:method :get}
resp)
resp2
exce2))
exce1)
(is (= 200 (:status @resp1) (:status @resp2)))
(is (not (realized? exce2)))
(is (not (realized? exce1))))))

(deftest ^:integration t-async-pool-redirect-to-get
(run-server)
(client/with-async-connection-pool {}
(let [resp (promise) exce (promise) count (atom 2)]
(binding [client/*pooling-info*
(assoc client/*pooling-info* :release (count-release count))]
(request {:async? true :uri "/redirect-to-get"
:method :get :redirect-strategy :default} resp exce)
(is (= 200 (:status @resp)))
(is (:pooling-info @resp))
(is (not (realized? exce)))
(is (= 1 @count))))))
(let [resp (promise)
exce (promise)]
(request {:async? true :uri "/redirect-to-get"
:method :get :redirect-strategy :default} resp exce)
(is (= 200 (:status @resp)))
(is (not (realized? exce))))))

(deftest ^:integration t-async-pool-max-redirect
(run-server)
(client/with-async-connection-pool {}
(let [resp (promise) exce (promise) count (atom 21)]
(binding [client/*pooling-info*
(assoc client/*pooling-info* :release (count-release count))]
(request {:async? true :uri "/redirect" :method :get
:redirect-strategy :default
:throw-exceptions true} resp exce)
(is @exce)
(is (not (realized? resp)))
(is (= 20 @count))))))
(let [resp (promise)
exce (promise)]
(request {:async? true :uri "/redirect" :method :get
:redirect-strategy :default
:throw-exceptions true} resp exce)
(is @exce)
(is (not (realized? resp))))))

(deftest test-url-encode-path
(is (= (client/url-encode-illegal-characters "?foo bar+baz[]75")
Expand Down

0 comments on commit 1001815

Please sign in to comment.