Permalink
Browse files

Ensure that socket transports and clients die ASAP when connection to…

… server is lost, fixes NREPL-30
  • Loading branch information...
1 parent f546a2a commit 8a5dad2045434fcc06f2878de55f7dcdefa01a1b @cemerick cemerick committed Oct 3, 2012
@@ -24,6 +24,7 @@
(deftype FnTransport [recv-fn send-fn close]
Transport
+ ;; TODO this keywordization/stringification has no business being in FnTransport
(send [this msg] (-> msg clojure.walk/stringify-keys send-fn) this)
(recv [this] (.recv this Long/MAX_VALUE))
(recv [this timeout] (clojure.walk/keywordize-keys (recv-fn timeout)))
@@ -36,10 +37,19 @@
([read write] (fn-transport read write nil))
([read write close]
(let [read-queue (SynchronousQueue.)]
- (future (while true
- (.put read-queue (read))))
+ (future (try
+ (while true
+ (.put read-queue (read)))
+ (catch Throwable t
+ (.put read-queue t))))
(FnTransport.
- #(.poll read-queue % TimeUnit/MILLISECONDS)
+ (let [failure (atom nil)]
+ #(if @failure
+ (throw @failure)
+ (let [msg (.poll read-queue % TimeUnit/MILLISECONDS)]
+ (if (instance? Throwable msg)
+ (do (reset! failure msg) (throw msg))
+ msg))))
write
close))))
@@ -8,7 +8,7 @@
(ns #^{:author "Chas Emerick"}
clojure.tools.nrepl.cmdline-test
- (:use [clojure.tools.nrepl-test :only (def-repl-test repl-server-fixture *server-port*)]
+ (:use [clojure.tools.nrepl-test :only (def-repl-test repl-server-fixture *server*)]
clojure.test)
(:require
[clojure.tools.nrepl :as repl]))
@@ -20,7 +20,7 @@
(repl/reset-ack-port!)
(let [server-process (.exec (Runtime/getRuntime)
(into-array ["java" "-Dnreplacktest=y" "-cp" (System/getProperty "java.class.path")
- "clojure.tools.nrepl.main" "--ack" (str *server-port*)]))
+ "clojure.tools.nrepl.main" "--ack" (str (:port *server*))]))
acked-port (repl/wait-for-ack 20000)]
(try
(is acked-port "Timed out waiting for ack")
@@ -38,7 +38,7 @@
(.getLocalPort ss))
server-process (.exec (Runtime/getRuntime)
(into-array ["java" "-Dnreplacktest=y" "-cp" (System/getProperty "java.class.path")
- "clojure.tools.nrepl.main" "--port" (str free-port) "--ack" (str *server-port*)]))
+ "clojure.tools.nrepl.main" "--port" (str free-port) "--ack" (str (:port *server*))]))
acked-port (repl/wait-for-ack 20000)]
(try
(is acked-port "Timed out waiting for ack")
@@ -1,24 +1,25 @@
(ns clojure.tools.nrepl-test
+ (:import java.io.IOException)
(:use clojure.test
clojure.tools.nrepl)
(:require (clojure.tools.nrepl [transport :as transport]
[server :as server]
[ack :as ack])))
-(def ^{:dynamic true} *server-port* nil)
+(def ^{:dynamic true} *server* nil)
(defn repl-server-fixture
[f]
(with-open [server (server/start-server)]
- (binding [*server-port* (:port server)]
+ (binding [*server* server]
(f))))
-(use-fixtures :once repl-server-fixture)
+(use-fixtures :each repl-server-fixture)
(defmacro def-repl-test
[name & body]
`(deftest ~(with-meta name {:private true})
- (with-open [transport# (connect :port *server-port*)]
+ (with-open [transport# (connect :port (:port *server*))]
(let [~'transport transport#
~'client (client transport# Long/MAX_VALUE)
~'session (client-session ~'client)
@@ -220,6 +221,68 @@
(.close server)
(is (thrown? java.net.ConnectException (connect :port (:port server))))))
+; wasn't added until Clojure 1.3.0
+(defn- root-cause
+ "Returns the initial cause of an exception or error by peeling off all of
+ its wrappers"
+ [^Throwable t]
+ (loop [cause t]
+ (if-let [cause (.getCause cause)]
+ (recur cause)
+ cause)))
+
+(defn- disconnection-exception?
+ [e]
+ ; thrown? should check for the root cause!
+ (and (instance? IOException (root-cause e))
+ (re-matches #".*Unexpected end of input.*" (.getMessage (root-cause e)))))
+
+(deftest transports-fail-on-disconnects
+ (testing "Ensure that transports fail ASAP when the server they're connected to goes down."
+ (let [server (server/start-server)
+ transport (connect :port (:port server))]
+ (transport/send transport {"op" "eval" "code" "(+ 1 1)"})
+
+ (let [reader (future (while true (transport/recv transport)))]
+ (Thread/sleep 1000)
+ (.close server)
+ (Thread/sleep 1000)
+ ; no deref with timeout in Clojure 1.2.0 :-(
+ (try
+ (.get reader 10000 java.util.concurrent.TimeUnit/MILLISECONDS)
+ (is false "A reader started prior to the server closing should throw an error...")
+ (catch Throwable e
+ (is (disconnection-exception? e)))))
+
+ (is (thrown? IOException
+ (transport/recv transport)))
+ ;; TODO no idea yet why two sends are *sometimes* required to get a failure
+ (try
+ (transport/send transport {"op" "eval" "code" "(+ 5 1)"})
+ (catch Throwable t))
+ (is (thrown? IOException
+ (transport/send transport {"op" "eval" "code" "(+ 5 1)"}))))))
+
+(def-repl-test clients-fail-on-disconnects
+ (testing "Ensure that clients fail ASAP when the server they're connected to goes down."
+ (let [resp (repl-eval client "1 2 3 4 5 6 7 8 9 10")]
+ (is (= "1" (-> resp first :value)))
+ (Thread/sleep 1000)
+ (.close *server*)
+ (Thread/sleep 1000)
+ (try
+ ; these responses were on the wire before the remote transport was closed
+ (is (> 20 (count resp)))
+ (transport/recv transport)
+ (is false "reads after the server is closed should fail")
+ (catch Throwable t
+ (is (disconnection-exception? t)))))
+
+ ;; TODO as noted in transports-fail-on-disconnects, *sometimes* two sends are needed
+ ;; to trigger an exception on send to an unavailable server
+ (try (repl-eval session "(+ 1 1)") (catch Throwable t))
+ (is (thrown? IOException (repl-eval session "(+ 1 1)")))))
+
(def-repl-test request-*in*
(is (= '((1 2 3)) (response-values (for [resp (repl-eval session "(read)")]
(do
@@ -251,7 +314,7 @@
(is (= [" :kthxbai"] (repl-values session "(read-line)"))))
(def-repl-test test-url-connect
- (with-open [conn (url-connect (str "nrepl://localhost:" *server-port*))]
+ (with-open [conn (url-connect (str "nrepl://localhost:" (:port *server*)))]
(transport/send conn {:op :eval :code "(+ 1 1)"})
(is (= [2] (response-values (response-seq conn 100))))))

0 comments on commit 8a5dad2

Please sign in to comment.