diff --git a/src/aleph/http.clj b/src/aleph/http.clj index 06b70587..26f95acc 100644 --- a/src/aleph/http.clj +++ b/src/aleph/http.clj @@ -11,6 +11,7 @@ [aleph.http.websocket.common :as ws.common] [aleph.http.websocket.server :as ws.server] [aleph.netty :as netty] + [aleph.util :as util] [clojure.string :as str] [clojure.tools.logging :as log] [manifold.deferred :as d] @@ -100,20 +101,22 @@ will be errors, and a new connection must be created." [^URI uri options middleware on-closed] (let [scheme (.getScheme uri) - ssl? (= "https" scheme)] - (-> (client/http-connection - (InetSocketAddress/createUnresolved - (.getHost uri) - (int - (or - (when (pos? (.getPort uri)) (.getPort uri)) - (if ssl? 443 80)))) - ssl? - (if on-closed - (assoc options :on-closed on-closed) - options)) - - (d/chain' middleware)))) + ssl? (= "https" scheme) + conn (client/http-connection + (InetSocketAddress/createUnresolved + (.getHost uri) + (int + (or + (when (pos? (.getPort uri)) (.getPort uri)) + (if ssl? 443 80)))) + ssl? + (if on-closed + (assoc options :on-closed on-closed) + options))] + (-> (d/chain' conn middleware) + (util/propagate-error conn + (fn [e] + (log/trace e "Terminated creation of HTTP connection")))))) (def ^:private connection-stats-callbacks (atom #{})) @@ -389,6 +392,12 @@ ;; function. (reset! dispose-conn! (fn [] (flow/dispose pool k conn))) + ;; allow cancellation during connection establishment + (util/propagate-error result + (first conn) + (fn [e] + (log/trace e "Aborted connection acquisition"))) + (if (realized? result) ;; to account for race condition between setting `dispose-conn!` ;; and putting `result` into error state for cancellation @@ -456,11 +465,10 @@ (middleware/handle-redirects request req)))))))))))) req))] (d/connect response result) - (d/catch' result - (fn [e] - (log/trace e "Request failed. Disposing of connection...") - (@dispose-conn!) - (d/error-deferred e))) + (util/on-error result + (fn [e] + (log/trace e "Request failed. Disposing of connection...") + (@dispose-conn!))) result))) (defn cancel-request! diff --git a/src/aleph/http/client.clj b/src/aleph/http/client.clj index 9b4e252a..468f2fea 100644 --- a/src/aleph/http/client.clj +++ b/src/aleph/http/client.clj @@ -6,6 +6,7 @@ [aleph.http.multipart :as multipart] [aleph.http.websocket.client :as ws.client] [aleph.netty :as netty] + [aleph.util :as util] [clj-commons.byte-streams :as bs] [clojure.tools.logging :as log] [manifold.deferred :as d] @@ -814,119 +815,131 @@ :logger logger :pipeline-transform pipeline-transform)) - ch-d (netty/create-client-chan - {:pipeline-builder pipeline-builder - :bootstrap-transform bootstrap-transform - :remote-address remote-address - :local-address local-address - :transport (netty/determine-transport transport epoll?) - :name-resolver name-resolver - :connect-timeout connect-timeout})] - - (attach-on-close-handler ch-d on-closed) - - (d/chain' ch-d - (fn setup-client - [^Channel ch] - (log/debug "Channel:" ch) - - ;; We know the SSL handshake must be complete because create-client wraps the - ;; future with maybe-ssl-handshake-future, so we can get the negotiated - ;; protocol, falling back to HTTP/1.1 by default. - (let [pipeline (.pipeline ch) - protocol (cond - ssl? - (or (-> pipeline - ^SslHandler (.get ^Class SslHandler) - (.applicationProtocol)) - ApplicationProtocolNames/HTTP_1_1) ; Not using ALPN, HTTP/2 isn't allowed - - force-h2c? - (do - (log/info "Forcing HTTP/2 over cleartext. Be sure to do this only with servers you control.") - ApplicationProtocolNames/HTTP_2) - - :else - ApplicationProtocolNames/HTTP_1_1) ; Not using SSL, HTTP/2 isn't allowed unless h2c requested - setup-opts (assoc opts - :authority authority - :ch ch - :server? false - :keep-alive? keep-alive? - :keep-alive?' keep-alive?' - :logger logger - :non-tun-proxy? non-tun-proxy? - :pipeline pipeline - :pipeline-transform pipeline-transform - :raw-stream? raw-stream? - :remote-address remote-address - :response-buffer-size response-buffer-size - :ssl-context ssl-context - :ssl? ssl?)] - - (log/debug (str "Using HTTP protocol: " protocol) - {:authority authority - :ssl? ssl? - :force-h2c? force-h2c?}) - - ;; can't use ApnHandler, because we need to coordinate with Manifold code - (let [http-req-handler - (cond (.equals ApplicationProtocolNames/HTTP_1_1 protocol) - (setup-http1-client setup-opts) - - (.equals ApplicationProtocolNames/HTTP_2 protocol) - (do - (http2/setup-conn-pipeline setup-opts) - (http2-req-handler setup-opts)) - - :else - (do - (let [msg (str "Unknown protocol: " protocol) - e (IllegalStateException. msg)] - (log/error e msg) - (netty/close ch) - (throw e))))] - - ;; Both Netty and Aleph are set up, unpause the pipeline - (when (.get pipeline "pause-handler") - (log/debug "Unpausing pipeline") - (.remove pipeline "pause-handler")) - - (fn http-req-fn - [req] - (log/trace "http-req-fn fired") - (log/debug "client request:" (pr-str req)) - - ;; If :aleph/close is set in the req, closes the channel and - ;; returns a deferred containing the result. - (if (or (contains? req :aleph/close) - (contains? req ::close)) - (-> ch (netty/close) (netty/wrap-future)) - - (let [t0 (System/nanoTime) - ;; I suspect the below is an error for http1 - ;; since the shared handler might not match. - ;; Should work for HTTP2, though - raw-stream? (get req :raw-stream? raw-stream?)] - - (if (or (not (.isActive ch)) - (not (.isOpen ch))) - - (d/error-deferred - (ex-info "Channel is inactive/closed." - {:req req - :ch ch - :open? (.isOpen ch) - :active? (.isActive ch)})) - - (-> (http-req-handler req) - (d/chain' (rsp-handler - {:ch ch - :keep-alive? keep-alive? ; why not keep-alive?' - :raw-stream? raw-stream? - :req req - :response-buffer-size response-buffer-size - :t0 t0}))))))))))))) + ch-d (doto (netty/create-client-chan + {:pipeline-builder pipeline-builder + :bootstrap-transform bootstrap-transform + :remote-address remote-address + :local-address local-address + :transport (netty/determine-transport transport epoll?) + :name-resolver name-resolver + :connect-timeout connect-timeout}) + (attach-on-close-handler on-closed)) + + close-ch! (atom (fn [])) + result (d/deferred) + + conn (d/chain' ch-d + (fn setup-client + [^Channel ch] + (log/debug "Channel:" ch) + (reset! close-ch! (fn [] @(-> (netty/close ch) (netty/wrap-future)))) + (if (realized? result) + ;; Account for race condition between setting `close-ch!` and putting + ;; `result` into error state for cancellation + (@close-ch!) + ;; We know the SSL handshake must be complete because create-client wraps the + ;; future with maybe-ssl-handshake-future, so we can get the negotiated + ;; protocol, falling back to HTTP/1.1 by default. + (let [pipeline (.pipeline ch) + protocol (cond + ssl? + (or (-> pipeline + ^SslHandler (.get ^Class SslHandler) + (.applicationProtocol)) + ApplicationProtocolNames/HTTP_1_1) ; Not using ALPN, HTTP/2 isn't allowed + + force-h2c? + (do + (log/info "Forcing HTTP/2 over cleartext. Be sure to do this only with servers you control.") + ApplicationProtocolNames/HTTP_2) + + :else + ApplicationProtocolNames/HTTP_1_1) ; Not using SSL, HTTP/2 isn't allowed unless h2c requested + setup-opts (assoc opts + :authority authority + :ch ch + :server? false + :keep-alive? keep-alive? + :keep-alive?' keep-alive?' + :logger logger + :non-tun-proxy? non-tun-proxy? + :pipeline pipeline + :pipeline-transform pipeline-transform + :raw-stream? raw-stream? + :remote-address remote-address + :response-buffer-size response-buffer-size + :ssl-context ssl-context + :ssl? ssl?)] + + (log/debug (str "Using HTTP protocol: " protocol) + {:authority authority + :ssl? ssl? + :force-h2c? force-h2c?}) + + ;; can't use ApnHandler, because we need to coordinate with Manifold code + (let [http-req-handler + (cond (.equals ApplicationProtocolNames/HTTP_1_1 protocol) + (setup-http1-client setup-opts) + + (.equals ApplicationProtocolNames/HTTP_2 protocol) + (do + (http2/setup-conn-pipeline setup-opts) + (http2-req-handler setup-opts)) + + :else + (do + (let [msg (str "Unknown protocol: " protocol) + e (IllegalStateException. msg)] + (log/error e msg) + (netty/close ch) + (throw e))))] + + ;; Both Netty and Aleph are set up, unpause the pipeline + (when (.get pipeline "pause-handler") + (log/debug "Unpausing pipeline") + (.remove pipeline "pause-handler")) + + (fn http-req-fn + [req] + (log/trace "http-req-fn fired") + (log/debug "client request:" (pr-str req)) + + ;; If :aleph/close is set in the req, closes the channel and + ;; returns a deferred containing the result. + (if (or (contains? req :aleph/close) + (contains? req ::close)) + (-> ch (netty/close) (netty/wrap-future)) + + (let [t0 (System/nanoTime) + ;; I suspect the below is an error for http1 + ;; since the shared handler might not match. + ;; Should work for HTTP2, though + raw-stream? (get req :raw-stream? raw-stream?)] + + (if (or (not (.isActive ch)) + (not (.isOpen ch))) + + (d/error-deferred + (ex-info "Channel is inactive/closed." + {:req req + :ch ch + :open? (.isOpen ch) + :active? (.isActive ch)})) + + (-> (http-req-handler req) + (d/chain' (rsp-handler + {:ch ch + :keep-alive? keep-alive? ; why not keep-alive?' + :raw-stream? raw-stream? + :req req + :response-buffer-size response-buffer-size + :t0 t0}))))))))))))] + (d/connect conn result) + (util/propagate-error result + ch-d + (fn [e] + (log/trace e "Closing HTTP connection channel") + (@close-ch!))))) diff --git a/src/aleph/netty.clj b/src/aleph/netty.clj index 73fc1b6e..be372792 100644 --- a/src/aleph/netty.clj +++ b/src/aleph/netty.clj @@ -1,6 +1,7 @@ (ns aleph.netty (:refer-clojure :exclude [flush]) (:require + [aleph.util :as util] [clj-commons.byte-streams :as bs] [clj-commons.primitive-math :as p] [clojure.string :as str] @@ -1521,6 +1522,14 @@ (ssl-handler ch ssl-ctx)))) (pipeline-builder p)))) +(defn- connect-client + ^ChannelFuture [^Bootstrap bootstrap + ^SocketAddress remote-address + ^SocketAddress local-address] + (if local-address + (.connect bootstrap remote-address local-address) + (.connect bootstrap remote-address))) + (defn ^:no-doc create-client-chan "Returns a deferred containing a new Channel. @@ -1529,8 +1538,8 @@ complete." [{:keys [pipeline-builder bootstrap-transform - ^SocketAddress remote-address - ^SocketAddress local-address + remote-address + local-address transport name-resolver connect-timeout] @@ -1543,32 +1552,38 @@ (throw (IllegalArgumentException. "Can't use :ssl-context anymore."))) (let [^Class chan-class (transport-channel-class transport) - initializer (pipeline-initializer pipeline-builder)] - (try - (let [client-event-loop-group @(transport-client-group transport) - resolver' (when (some? name-resolver) - (cond - (= :default name-resolver) nil - (= :noop name-resolver) NoopAddressResolverGroup/INSTANCE - (instance? AddressResolverGroup name-resolver) name-resolver)) - bootstrap (doto (Bootstrap.) - (.option ChannelOption/SO_REUSEADDR true) - (.option ChannelOption/CONNECT_TIMEOUT_MILLIS (int connect-timeout)) - #_(.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) ; option deprecated, removed in v5 - (.group client-event-loop-group) - (.channel chan-class) - (.handler initializer) - (.resolver resolver') - bootstrap-transform) - - fut (if local-address - (.connect bootstrap remote-address local-address) - (.connect bootstrap remote-address))] - - (d/chain' (wrap-future fut) - (fn [_] - (let [ch (.channel ^ChannelFuture fut)] - (maybe-ssl-handshake-future ch)))))))) + initializer (pipeline-initializer pipeline-builder) + client-event-loop-group @(transport-client-group transport) + resolver' (when (some? name-resolver) + (cond + (= :default name-resolver) nil + (= :noop name-resolver) NoopAddressResolverGroup/INSTANCE + (instance? AddressResolverGroup name-resolver) name-resolver)) + bootstrap (doto (Bootstrap.) + (.option ChannelOption/SO_REUSEADDR true) + (.option ChannelOption/CONNECT_TIMEOUT_MILLIS (int connect-timeout)) + #_(.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) ; option deprecated, removed in v5 + (.group client-event-loop-group) + (.channel chan-class) + (.handler initializer) + (.resolver resolver') + bootstrap-transform) + + fut (connect-client bootstrap remote-address local-address)] + (-> (wrap-future fut) + (d/chain' + (fn [_] + (let [ch (.channel ^ChannelFuture fut)] + (maybe-ssl-handshake-future ch)))) + (util/on-error (fn [e] + (when-not (.isDone fut) + (log/trace e "Cancelling Bootstrap#connect future") + (when-not (.cancel fut true) + (when-not (.isDone fut) + (log/warn "Transport" transport "does not support cancellation of connection attempts." + "Instead, you have to wait for the connect timeout to expire for it to be terminated." + "Its current value is" connect-timeout "ms." + "It can be set via the `connect-timeout` option."))))))))) (defn ^:no-doc ^:deprecated create-client @@ -1732,7 +1747,7 @@ (fn [shutdown-output] (when (= shutdown-output ::timeout) (log/error - (format "Timeout while waiting for requests to close (exceeded: %ss)" + (format "Timeout while waiting for connections to close (exceeded: %ss)" shutdown-timeout))))) (d/finally' ;; 3. At this stage, stop the EventLoopGroup, this will cancel any diff --git a/src/aleph/tcp.clj b/src/aleph/tcp.clj index 1d66cb4a..d1778cee 100644 --- a/src/aleph/tcp.clj +++ b/src/aleph/tcp.clj @@ -1,6 +1,7 @@ (ns aleph.tcp (:require [aleph.netty :as netty] + [aleph.util :as util] [clojure.tools.logging :as log] [manifold.deferred :as d] [manifold.stream :as s] @@ -155,6 +156,11 @@ "Given a host and port, returns a deferred which yields a duplex stream that can be used to communicate with the server. + Closing the stream will also close the underlying connection. + + Putting the returned deferred into an error state before it yielded the stream will cancel an + in-flight connection attempt. + Param key | Description | --- | --- | `host` | the hostname of the server. @@ -204,13 +210,16 @@ (netty/ssl-handler (.channel pipeline) ssl-context remote-address ssl-endpoint-id-alg))) (.addLast pipeline "handler" handler) (when pipeline-transform - (pipeline-transform pipeline)))] - (-> (netty/create-client-chan - {:pipeline-builder pipeline-builder - :bootstrap-transform bootstrap-transform - :remote-address remote-address - :local-address local-address - :transport (netty/determine-transport transport epoll?) - :connect-timeout connect-timeout}) - (d/catch' #(d/error! s %))) - s)) + (pipeline-transform pipeline))) + ch-d (netty/create-client-chan + {:pipeline-builder pipeline-builder + :bootstrap-transform bootstrap-transform + :remote-address remote-address + :local-address local-address + :transport (netty/determine-transport transport epoll?) + :connect-timeout connect-timeout})] + (util/propagate-error ch-d s) + (util/propagate-error s + ch-d + (fn [e] + (log/trace e "Closed TCP client channel"))))) diff --git a/src/aleph/util.clj b/src/aleph/util.clj new file mode 100644 index 00000000..e8b64d27 --- /dev/null +++ b/src/aleph/util.clj @@ -0,0 +1,20 @@ +(ns aleph.util + (:require [manifold.deferred :as d])) + +(defn on-error + [d f] + (d/on-realized d identity f)) + +(defn propagate-error + "Registers an error callback with source which will attempt to propagate the error to destination. + + If the error was propagated (i.e. destination wasn't yet realized), on-propagate is invoked with + the error value. + + Returns source." + ([source destination] + (propagate-error source destination identity)) + ([source destination on-propagate] + (on-error source (fn [e] + (when (d/error! destination e) + (on-propagate e)))))) diff --git a/test/aleph/http_test.clj b/test/aleph/http_test.clj index b6ed9f7e..221dd9da 100644 --- a/test/aleph/http_test.clj +++ b/test/aleph/http_test.clj @@ -7,7 +7,7 @@ [aleph.resource-leak-detector] [aleph.ssl :as test-ssl] [aleph.tcp :as tcp] - [aleph.testutils :refer [str=]] + [aleph.testutils :refer [passive-tcp-server str=]] [clj-commons.byte-streams :as bs] [clojure.java.io :as io] [clojure.string :as str] @@ -1451,18 +1451,50 @@ (is (instance? IllegalArgumentException result)) (is (= "use-h2c? may only be true when HTTP/2 is enabled." (ex-message result)))))) +(deftest test-request-cancellation-during-connection-acquisition + (let [starved-pool (http/connection-pool + {:total-connections 0})] + (try + (let [rsp (http-get "/" {:pool starved-pool + :pool-timeout 500})] + (http/cancel-request! rsp) + (is (thrown? RequestCancellationException (deref rsp 0 :timeout)))) + (finally + (.shutdown ^Pool starved-pool))))) + +(deftest test-request-cancellation-during-connection-establishment + (let [connect-client @#'aleph.netty/connect-client + connect-future (promise)] + (with-redefs [aleph.netty/connect-client (fn [& args] + (let [fut (apply connect-client args)] + (deliver connect-future fut) + fut))] + (with-server (passive-tcp-server port) + (let [rsp (http-get "/")] + (is (some? (deref connect-future 1000 nil))) + (http/cancel-request! rsp) + (is (thrown? RequestCancellationException (deref rsp 1000 :timeout))) + (some-> @connect-future (.await 2000 TimeUnit/MILLISECONDS)) + (is (some-> @connect-future .isSuccess false?)) + (is (some-> @connect-future .isDone)) + (is (some-> @connect-future .isCancelled))))))) + (deftest test-in-flight-request-cancellation - (let [conn-established (promise) - conn-closed (promise)] + (let [conn-established (atom nil) + conn-closed (atom nil)] (with-raw-handler (fn [req] - (deliver conn-established true) + (deliver @conn-established true) (s/on-closed (:body req) (fn [] - (deliver conn-closed true)))) + (deliver @conn-closed true)))) + ;; NOTE: The atom indirection here is needed because `with-raw-handler` will run the body + ;; twice (for HTTP1 and HTTP2), so we need a new promise for each run. + (reset! conn-established (promise)) + (reset! conn-closed (promise)) (let [rsp (http-get "/")] - (is (= true (deref conn-established 1000 :timeout))) + (is (= true (deref @conn-established 1000 :timeout))) (http/cancel-request! rsp) - (is (= true (deref conn-closed 1000 :timeout))) + (is (= true (deref @conn-closed 1000 :timeout))) (is (thrown? RequestCancellationException (deref rsp 1000 :timeout))))))) (deftest ^:leak test-leak-in-raw-stream-handler diff --git a/test/aleph/tcp_test.clj b/test/aleph/tcp_test.clj index 11e26881..875c3470 100644 --- a/test/aleph/tcp_test.clj +++ b/test/aleph/tcp_test.clj @@ -3,9 +3,13 @@ [aleph.netty :as netty] [aleph.resource-leak-detector] [aleph.tcp :as tcp] + [aleph.testutils :refer [passive-tcp-server]] [clj-commons.byte-streams :as bs] - [clojure.test :refer [deftest testing is]] - [manifold.stream :as s])) + [clojure.test :refer [deftest is testing]] + [manifold.deferred :as d] + [manifold.stream :as s]) + (:import + (java.util.concurrent TimeUnit))) (defn echo-handler [s _] (s/connect s s)) @@ -55,4 +59,22 @@ (catch Exception _ (is (not (netty/io-uring-available?))))))) +(deftest test-cancellation-during-connection-establishment + (let [connect-client @#'aleph.netty/connect-client + connect-future (promise) + server (passive-tcp-server 0)] + (with-redefs [aleph.netty/connect-client (fn [& args] + (let [fut (apply connect-client args)] + (deliver connect-future fut) + fut))] + (with-server server + (let [c (tcp/client {:host "localhost" + :port (netty/port server)})] + (is (some? (deref connect-future 1000 nil))) + (d/timeout! c 10) + (some-> @connect-future (.await 2000 TimeUnit/MILLISECONDS)) + (is (some-> @connect-future .isSuccess false?)) + (is (some-> @connect-future .isDone)) + (is (some-> @connect-future .isCancelled))))))) + (aleph.resource-leak-detector/instrument-tests!) diff --git a/test/aleph/testutils.clj b/test/aleph/testutils.clj index f225c2c0..c0059c8c 100644 --- a/test/aleph/testutils.clj +++ b/test/aleph/testutils.clj @@ -1,8 +1,36 @@ (ns aleph.testutils - (:import (io.netty.util AsciiString))) + (:require + [aleph.netty :as netty]) + (:import + (io.netty.util AsciiString) + (java.io Closeable) + (java.net ServerSocket Socket))) (defn str= "AsciiString-aware equals" [^CharSequence x ^CharSequence y] (AsciiString/contentEquals x y)) +(defn passive-tcp-server + "Starts a TCP server which never accepts a connection." + [port] + (let [;; A backlog of 0 would be ideal for this purpose but: "The value provided should be greater + ;; than 0. If it is less than or equal to 0, then an implementation specific default will be + ;; used." Source: + ;; https://docs.oracle.com/en%2Fjava%2Fjavase%2F21%2Fdocs%2Fapi%2F%2F/java.base/java/net/ServerSocket.html#%3Cinit%3E(int,int) + backlog 1 + server (ServerSocket. port backlog) + port (.getLocalPort server) + ;; Fill up the backlog with pending connection attempts. For some reason, the backlog length + ;; is off by one, thus the `inc`. + pending-connects (doall (repeatedly (inc backlog) #(Socket. "localhost" (int port))))] + (reify + netty/AlephServer + (port [_] + port) + (wait-for-close [_] + true) + Closeable + (close [_] + (run! #(.close %) pending-connects) + (.close server))))) diff --git a/test/aleph/util_test.clj b/test/aleph/util_test.clj new file mode 100644 index 00000000..17f2634e --- /dev/null +++ b/test/aleph/util_test.clj @@ -0,0 +1,45 @@ +(ns aleph.util-test + (:require [aleph.util :as util] + [clojure.test :refer [deftest is testing]] + [manifold.deferred :as d])) + +(deftest test-propagate-error + (testing "Happy path" + (let [src (d/deferred) + dst (d/deferred) + prp (promise)] + (util/propagate-error src dst (fn [e] (deliver prp e))) + (d/error! src :boom) + (is (d/realized? dst)) + (is (= :boom (d/error-value dst nil))) + (is (= :boom (deref prp 0 nil))))) + + (testing "Without on-propagate" + (let [src (d/deferred) + dst (d/deferred)] + (util/propagate-error src dst) + (d/error! src :boom) + (is (d/realized? dst)))) + + (testing "Exception in on-propagate" + (let [src (d/deferred) + dst (d/deferred)] + (util/propagate-error src dst (fn [_] (throw (RuntimeException. "Oops")))) + (d/error! src :boom) + (is (d/realized? dst)) + (is (= :boom (d/error-value dst nil))))) + + (testing "Already realized destination" + (let [src (d/deferred) + dst (d/success-deferred :ok)] + (util/propagate-error src dst) + (d/error! src :boom) + (is (d/realized? dst)) + (is (= nil (d/error-value dst nil))))) + + (testing "Successfully realized source" + (let [src (d/deferred) + dst (d/deferred)] + (util/propagate-error src dst) + (d/success! src :ok) + (is (not (d/realized? dst))))))