Skip to content

Commit

Permalink
Use a queue for stdin, to avoid PipedReader/Writer
Browse files Browse the repository at this point in the history
Fixes NREPL-39
  • Loading branch information
trptcolin committed Mar 22, 2013
1 parent e7cd0d4 commit 3cd3d0e
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 42 deletions.
87 changes: 57 additions & 30 deletions src/main/clojure/clojure/tools/nrepl/middleware/session.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
[clojure.tools.nrepl.transport :as t])
(:import clojure.tools.nrepl.transport.Transport
(java.io PipedReader PipedWriter Reader Writer PrintWriter StringReader)
clojure.lang.LineNumberingPushbackReader))
clojure.lang.LineNumberingPushbackReader
java.util.concurrent.LinkedBlockingQueue))

(def ^{:private true} sessions (atom {}))

Expand All @@ -20,6 +21,7 @@
;; how best to make it configurable though...

(def ^{:dynamic true :private true} *out-limit* 1024)
(def ^{:dynamic true :private true} *skipping-eol* false)

(defn- session-out
"Returns a PrintWriter suitable for binding as *out* or *err*. All of
Expand Down Expand Up @@ -56,28 +58,52 @@
{:status :need-input} message on the provided transport so the client/user
can provide content to be read."
[session-id transport]
(let [request-input (fn [^PipedReader r]
(when-not (.ready r)
(t/send transport
(response-for *msg* :session session-id
:status :need-input))))
writer (PipedWriter.)
(let [input-queue (LinkedBlockingQueue.)
request-input (fn []
(cond (> (.size input-queue) 0)
(.take input-queue)
*skipping-eol*
nil
:else
(do
(t/send transport
(response-for *msg* :session session-id
:status :need-input))
(.take input-queue))))
do-read (fn [buf off len]
(locking input-queue
(loop [i off]
(cond
(>= i (+ off len))
(+ off len)
(.peek input-queue)
(do (aset-char buf i (char (.take input-queue)))
(recur (inc i)))
:else
i))))
reader (LineNumberingPushbackReader.
(proxy [PipedReader] [writer]
(close [])
(proxy [Reader] []
(close [] (.clear input-queue))
(read
([] (request-input this)
(let [^Reader this this] (proxy-super read)))
([x] (request-input this)
(let [^Reader this this]
(if (instance? java.nio.CharBuffer x)
(proxy-super read ^java.nio.CharBuffer x)
(proxy-super read ^chars x))))
([buf off len]
(let [^Reader this this]
(request-input this)
(proxy-super read buf off len))))))]
[reader writer]))
([]
(let [^Reader this this] (proxy-super read)))
([x]
(let [^Reader this this]
(if (instance? java.nio.CharBuffer x)
(proxy-super read ^java.nio.CharBuffer x)
(proxy-super read ^chars x))))
([^chars buf off len]
(if (zero? len)
-1
(let [first-character (request-input)]
(if (or (nil? first-character) (= first-character -1))
-1
(do
(aset-char buf off (char first-character))
(- (do-read buf (inc off) (dec len))
off)))))))))]
{:input-queue input-queue
:stdin-reader reader}))

(defn- create-session
"Returns a new atom containing a map of bindings as per
Expand All @@ -90,10 +116,10 @@
(clojure.main/with-bindings
(let [id (uuid)
out (session-out :out id transport)
[in in-writer] (session-in id transport)]
{:keys [input-queue stdin-reader]} (session-in id transport)]
(binding [*out* out
*err* (session-out :err id transport)
*in* in
*in* stdin-reader
*ns* (create-ns 'user)
*out-limit* (or (baseline-bindings #'*out-limit*) 1024)
; clojure.test captures *out* at load-time, so we need to make sure
Expand All @@ -105,8 +131,8 @@
; don't capture that *agent* binding for userland REPL sessions
(atom (merge baseline-bindings (dissoc (get-thread-bindings) #'*agent*))
:meta {:id id
:stdin-reader in
:stdin-writer in-writer}))))))
:stdin-reader stdin-reader
:input-queue input-queue}))))))

(defn- register-session
"Registers a new session containing the baseline bindings contained in the
Expand Down Expand Up @@ -194,13 +220,14 @@
(fn [{:keys [op stdin session transport] :as msg}]
(cond
(= op "eval")
(let [s (-> session meta ^LineNumberingPushbackReader (:stdin-reader))]
(when (.ready s)
(clojure.main/skip-if-eol s))
(let [in (-> (meta session) ^LineNumberingPushbackReader (:stdin-reader))]
(binding [*skipping-eol* true]
(clojure.main/skip-if-eol in))
(h msg))
(= op "stdin")
(do
(-> session meta ^Writer (:stdin-writer) (.write ^String stdin))
(let [q (-> (meta session) ^Writer (:input-queue))]
(locking q
(doseq [c stdin] (.put q c)))
(t/send transport (response-for msg :status :done)))
:else
(h msg))))
Expand Down
33 changes: 21 additions & 12 deletions src/test/clojure/clojure/tools/nrepl_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
(is (= history (repl-values sc-session "[*3 *2 *1]")))
(is (= history (repl-values sc-session "*1"))))))


(testing "without a session id, REPL-bound vars like *1 have default values"
(is (= [nil] (repl-values client "*1")))))

Expand Down Expand Up @@ -301,7 +301,7 @@
(let [server (server/start-server)
transport (connect :port (:port server))]
(transport/send transport {"op" "eval" "code" "(+ 1 1)"})

(let [reader (future (while true (transport/recv transport)))]
(Thread/sleep 1000)
(.close server)
Expand All @@ -312,7 +312,7 @@
(is false "A reader started prior to the server closing should throw an error...")
(catch Throwable e
(is (disconnection-exception? e)))))

(is (thrown? SocketException (transport/recv transport)))
;; TODO no idea yet why two sends are *sometimes* required to get a failure
(try
Expand All @@ -334,7 +334,7 @@
(is false "reads after the server is closed should fail")
(catch Throwable t
(is (disconnection-exception? t)))))

;; TODO as noted in transports-fail-on-disconnects, *sometimes* two sends are needed
;; to trigger an exception on send to an unavailable server
(try (repl-eval session "(+ 1 1)") (catch Throwable t))
Expand All @@ -353,20 +353,29 @@

(def-repl-test request-multiple-read-newline-*in*
(is (= '(:ohai) (response-values (for [resp (repl-eval session "(read)")]
(do
(when (-> resp :status set (contains? "need-input"))
(session {:op :stdin :stdin ":ohai\n"}))
resp)))))
(do
(when (-> resp :status set (contains? "need-input"))
(session {:op :stdin :stdin ":ohai\n"}))
resp)))))

(session {:op :stdin :stdin "a\n"})
(is (= ["a"] (repl-values session "(read-line)"))))

(def-repl-test request-multiple-read-with-buffered-newline-*in*
(is (= '(:ohai) (response-values (for [resp (repl-eval session "(read)")]
(do
(when (-> resp :status set (contains? "need-input"))
(session {:op :stdin :stdin ":ohai\na\n"}))
resp)))))

(is (= ["a"] (repl-values session "(read-line)"))))

(def-repl-test request-multiple-read-objects-*in*
(is (= '(:ohai) (response-values (for [resp (repl-eval session "(read)")]
(do
(when (-> resp :status set (contains? "need-input"))
(session {:op :stdin :stdin ":ohai :kthxbai\n"}))
resp)))))
(do
(when (-> resp :status set (contains? "need-input"))
(session {:op :stdin :stdin ":ohai :kthxbai\n"}))
resp)))))

(is (= [" :kthxbai"] (repl-values session "(read-line)"))))

Expand Down

0 comments on commit 3cd3d0e

Please sign in to comment.