Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cancellation during client connection establishment #721

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
24 changes: 13 additions & 11 deletions src/aleph/http.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
[aleph.http.websocket.common :as ws.common]
[aleph.http.websocket.server :as ws.server]
[aleph.netty :as netty]
[aleph.util :as util]
[clojure.string :as str]
[clojure.tools.logging :as log]
[manifold.deferred :as d]
Expand Down Expand Up @@ -112,11 +113,10 @@
(if on-closed
(assoc options :on-closed on-closed)
options))]
(doto (d/chain' conn middleware)
(d/catch' (fn [e]
(log/trace e "Terminating creation of HTTP connection")
(d/error! conn e)
(d/error-deferred e))))))
(-> (d/chain' conn middleware)
(util/propagate-error conn
(fn [e]
(log/trace e "Terminated creation of HTTP connection"))))))

(def ^:private connection-stats-callbacks (atom #{}))

Expand Down Expand Up @@ -393,7 +393,10 @@
(reset! dispose-conn! (fn [] (flow/dispose pool k conn)))

;; allow cancellation during connection establishment
(d/connect result (first conn))
(util/propagate-error result
(first conn)
(fn [e]
(log/trace e "Aborted connection acquisition")))

(if (realized? result)
;; to account for race condition between setting `dispose-conn!`
Expand Down Expand Up @@ -462,11 +465,10 @@
(middleware/handle-redirects request req))))))))))))
req))]
(d/connect response result)
(d/catch' result
(fn [e]
(log/trace e "Request failed. Disposing of connection...")
(@dispose-conn!)
(d/error-deferred e)))
(util/on-error result
(fn [e]
(log/trace e "Request failed. Disposing of connection...")
(@dispose-conn!)))
result)))

(defn cancel-request!
Expand Down
31 changes: 15 additions & 16 deletions src/aleph/http/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
[aleph.http.multipart :as multipart]
[aleph.http.websocket.client :as ws.client]
[aleph.netty :as netty]
[aleph.util :as util]
[clj-commons.byte-streams :as bs]
[clojure.tools.logging :as log]
[manifold.deferred :as d]
Expand Down Expand Up @@ -814,16 +815,15 @@
:logger logger
:pipeline-transform pipeline-transform))

ch-d (netty/create-client-chan
{:pipeline-builder pipeline-builder
:bootstrap-transform bootstrap-transform
:remote-address remote-address
:local-address local-address
:transport (netty/determine-transport transport epoll?)
:name-resolver name-resolver
:connect-timeout connect-timeout})

_ (attach-on-close-handler ch-d on-closed)
ch-d (doto (netty/create-client-chan
{:pipeline-builder pipeline-builder
:bootstrap-transform bootstrap-transform
:remote-address remote-address
:local-address local-address
:transport (netty/determine-transport transport epoll?)
:name-resolver name-resolver
:connect-timeout connect-timeout})
(attach-on-close-handler on-closed))

close-ch! (atom (fn []))
result (d/deferred)
Expand Down Expand Up @@ -935,12 +935,11 @@
:response-buffer-size response-buffer-size
:t0 t0}))))))))))))]
(d/connect conn result)
(d/catch' result (fn [e]
(log/trace e "Closing HTTP connection channel")
(d/error! ch-d e)
(@close-ch!)
(d/error-deferred e)))
result))
(util/propagate-error result
ch-d
(fn [e]
(log/trace e "Closing HTTP connection channel")
(@close-ch!)))))



Expand Down
30 changes: 15 additions & 15 deletions src/aleph/netty.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns aleph.netty
(:refer-clojure :exclude [flush])
(:require
[aleph.util :as util]
[clj-commons.byte-streams :as bs]
[clj-commons.primitive-math :as p]
[clojure.string :as str]
Expand Down Expand Up @@ -1569,21 +1570,20 @@
bootstrap-transform)

fut (connect-client bootstrap remote-address local-address)]
(doto (-> (wrap-future fut)
(d/chain'
(fn [_]
(let [ch (.channel ^ChannelFuture fut)]
(maybe-ssl-handshake-future ch)))))
(d/catch' (fn [e]
(when-not (.isDone fut)
(log/trace e "Cancelling Bootstrap#connect future")
(when-not (.cancel fut true)
(when-not (.isDone fut)
(log/warn "Transport" transport "does not support cancellation of connection attempts."
"Instead, you have to wait for the connect timeout to expire for it to be terminated."
"Its current value is" connect-timeout "ms."
"It can be set via the `connect-timeout` option."))))
(d/error-deferred e))))))
(-> (wrap-future fut)
(d/chain'
(fn [_]
(let [ch (.channel ^ChannelFuture fut)]
(maybe-ssl-handshake-future ch))))
(util/on-error (fn [e]
(when-not (.isDone fut)
(log/trace e "Cancelling Bootstrap#connect future")
(when-not (.cancel fut true)
(when-not (.isDone fut)
(log/warn "Transport" transport "does not support cancellation of connection attempts."
"Instead, you have to wait for the connect timeout to expire for it to be terminated."
"Its current value is" connect-timeout "ms."
"It can be set via the `connect-timeout` option.")))))))))


(defn ^:no-doc ^:deprecated create-client
Expand Down
11 changes: 6 additions & 5 deletions src/aleph/tcp.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns aleph.tcp
(:require
[aleph.netty :as netty]
[aleph.util :as util]
[clojure.tools.logging :as log]
[manifold.deferred :as d]
[manifold.stream :as s]
Expand Down Expand Up @@ -217,8 +218,8 @@
:local-address local-address
:transport (netty/determine-transport transport epoll?)
:connect-timeout connect-timeout})]
(d/catch' ch-d #(d/error! s %))
(d/catch' s (fn [e]
(d/error! ch-d e)
(d/error-deferred e)))
s))
(util/propagate-error ch-d s)
(util/propagate-error s
ch-d
(fn [e]
(log/trace e "Closed TCP client channel")))))
20 changes: 20 additions & 0 deletions src/aleph/util.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
(ns aleph.util
(:require [manifold.deferred :as d]))

(defn on-error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be documented why you would use this over chain or on-realized.

[d f]
(d/on-realized d identity f))
Comment on lines +4 to +6
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK for now, but this should be in Manifold


(defn propagate-error
"Registers an error callback with source which will attempt to propagate the error to destination.

If the error was propagated (i.e. destination wasn't yet realized), on-propagate is invoked with
the error value.

Returns source."
([source destination]
(propagate-error source destination identity))
([source destination on-propagate]
(on-error source (fn [e]
(when (d/error! destination e)
(on-propagate e))))))
Comment on lines +8 to +20
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source is stream terminology, not deferred terminology. We should call it something else to avoid confusion. I made that exact mistake on the first read-through.

16 changes: 10 additions & 6 deletions test/aleph/http_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -1480,17 +1480,21 @@
(is (some-> @connect-future .isCancelled)))))))

(deftest test-in-flight-request-cancellation
(let [conn-established (promise)
conn-closed (promise)]
(let [conn-established (atom nil)
conn-closed (atom nil)]
(with-raw-handler (fn [req]
(deliver conn-established true)
(deliver @conn-established true)
(s/on-closed (:body req)
(fn []
(deliver conn-closed true))))
(deliver @conn-closed true))))
;; NOTE: The atom indirection here is needed because `with-raw-handler` will run the body
;; twice (for HTTP1 and HTTP2), so we need a new promise for each run.
(reset! conn-established (promise))
(reset! conn-closed (promise))
(let [rsp (http-get "/")]
(is (= true (deref conn-established 1000 :timeout)))
(is (= true (deref @conn-established 1000 :timeout)))
(http/cancel-request! rsp)
(is (= true (deref conn-closed 1000 :timeout)))
(is (= true (deref @conn-closed 1000 :timeout)))
Comment on lines -1463 to +1497
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(deref @ ... looks weird. I suggest switching to unwrap/unwrap'.

(is (thrown? RequestCancellationException (deref rsp 1000 :timeout)))))))

(deftest ^:leak test-leak-in-raw-stream-handler
Expand Down
45 changes: 45 additions & 0 deletions test/aleph/util_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
(ns aleph.util-test
(:require [aleph.util :as util]
[clojure.test :refer [deftest is testing]]
[manifold.deferred :as d]))

(deftest test-propagate-error
(testing "Happy path"
(let [src (d/deferred)
dst (d/deferred)
prp (promise)]
(util/propagate-error src dst (fn [e] (deliver prp e)))
(d/error! src :boom)
(is (d/realized? dst))
Comment on lines +12 to +13
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a race condition? Might need to put them on same executor and thread.

(is (= :boom (d/error-value dst nil)))
(is (= :boom (deref prp 0 nil)))))

(testing "Without on-propagate"
(let [src (d/deferred)
dst (d/deferred)]
(util/propagate-error src dst)
(d/error! src :boom)
(is (d/realized? dst))))

(testing "Exception in on-propagate"
(let [src (d/deferred)
dst (d/deferred)]
(util/propagate-error src dst (fn [_] (throw (RuntimeException. "Oops"))))
(d/error! src :boom)
(is (d/realized? dst))
(is (= :boom (d/error-value dst nil)))))

(testing "Already realized destination"
(let [src (d/deferred)
dst (d/success-deferred :ok)]
(util/propagate-error src dst)
(d/error! src :boom)
(is (d/realized? dst))
(is (= nil (d/error-value dst nil)))))

(testing "Successfully realized source"
(let [src (d/deferred)
dst (d/deferred)]
(util/propagate-error src dst)
(d/success! src :ok)
(is (not (d/realized? dst))))))