Skip to content

Commit

Permalink
Merge branch 'read-timeouts'
Browse files Browse the repository at this point in the history
  • Loading branch information
conormcd committed Feb 13, 2016
2 parents b9d2a03 + 9a35912 commit 3ae9101
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 17 deletions.
45 changes: 33 additions & 12 deletions src/clj_libssh2/channel.clj
Expand Up @@ -398,20 +398,40 @@
[session channel stream]
(if (not= :eof (:status stream))
(let [pump-fn (if (= :output (:direction stream)) pull push)
last-read-time (:last-read-time stream)
new-status (pump-fn session channel (:id stream) (:stream stream))
now (System/currentTimeMillis)]
(when (and (= pump-fn pull)
(= :eagain new-status)
(< (-> session :options :read-timeout) (- now last-read-time)))
(error/raise "Read timeout on a channel."
{:direction (-> stream :direction name)
:id (-> stream :id)
:timeout (-> session :options :read-timeout)
:session session}))
(assoc stream :status new-status :last-read-time now))
new-status (pump-fn session channel (:id stream) (:stream stream))]
(assoc stream :status new-status
:last-read-time (if (= :ready new-status)
(System/currentTimeMillis)
(:last-read-time stream))))
stream))

(defn- enforce-read-timeout
"Enforce the read timeout on the output streams in a set of streams.
Arguments:
session The clj-libssh2.session.Session object for the current session.
channel The SSH channel that we're enforcing timeouts on.
streams The collection of streams that are in use in pump
Return:
nil, or throw an exception if the timeout is exceeded on any of the streams
given."
[session channel streams]
(let [read-timeout (-> session :options :read-timeout)
last-read-time (->> streams
(remove #(= :input (:direction %)))
(map :last-read-time)
(#(when-not (empty? %)
(apply max %))))]
(when (and (some? last-read-time)
(< read-timeout (- (System/currentTimeMillis) last-read-time)))
(error/raise "Read timeout on a channel."
{:timeout read-timeout
:session session
:channel channel}))))

(defn pump
"Process a collection of input and output streams all at once. This will run
until all streams have reported EOF.
Expand Down Expand Up @@ -452,6 +472,7 @@
(do
(when (contains? status-set :eagain)
(wait session))
(enforce-read-timeout session channel s)
(recur (map (partial pump-stream session channel) streams)))
(->> s
(filter #(= :output (:direction %)))
Expand Down
3 changes: 2 additions & 1 deletion src/clj_libssh2/session.clj
Expand Up @@ -18,7 +18,8 @@
(def default-opts
"The default options for a session. These are not only the defaults, but an
exhaustive list of the legal options."
{:character-set "UTF-8"
{:blocking-timeout 60000
:character-set "UTF-8"
:fail-if-not-in-known-hosts false
:fail-unless-known-hosts-matches true
:known-hosts-file nil
Expand Down
10 changes: 9 additions & 1 deletion src/clj_libssh2/socket.clj
Expand Up @@ -95,14 +95,21 @@
(when (>= 0 select-result)
(handle-errors session libssh2/ERROR_TIMEOUT))))))

(defn enforce-blocking-timeout
[session start-time]
(when (< (-> session :options :blocking-timeout)
(- (System/currentTimeMillis) start-time))
(handle-errors session libssh2/ERROR_TIMEOUT)))

(defmacro block
"Turn a non-blocking call that returns EAGAIN into a blocking one."
[session & body]
`(let [session# ~session
start-time# (System/currentTimeMillis)]
(while (= libssh2/ERROR_EAGAIN (do ~@body))
(handle-errors session#
(wait session# start-time#)))))
(wait session# start-time#))
(enforce-blocking-timeout session# start-time#))))

(defmacro block-return
"Similar to block, but for functions that return a pointer"
Expand All @@ -115,5 +122,6 @@
(handle-errors session# errno#)
(when (= libssh2/ERROR_EAGAIN errno#)
(wait session# start-time#))
(enforce-blocking-timeout session# start-time#)
(recur (do ~@body)))
result#))))
34 changes: 32 additions & 2 deletions test/clj_libssh2/test_ssh.clj
Expand Up @@ -3,7 +3,8 @@
[clojure.string :as str]
[clojure.test :refer :all]
[clj-libssh2.ssh :as ssh]
[clj-libssh2.test-utils :as test]))
[clj-libssh2.test-utils :as test])
(:import [java.io OutputStream]))

(test/fixtures)

Expand Down Expand Up @@ -86,7 +87,36 @@
(deftest exec-times-out-when-commands-take-too-long
(testing "Commands that take too long result in a timeout"
(is (thrown? Exception (ssh/exec {:port 2222 :read-timeout 500}
"echo foo; sleep 1; echo bar")))))
"echo foo; sleep 1; echo bar"))))
(testing "Commands that are blocking on input time out correctly"
(test/with-temp-file tempfile
(let [output (atom [])
streaming-reader (proxy [OutputStream] []
(write [b off len]
(swap! output conj (String. b off len))))
run-exec (future
(try
(ssh/exec {:port 2222
:read-timeout 5000}
(str "tail -F " tempfile)
:out streaming-reader)
(catch Throwable t t)))]
; Output starts off empty
(is (empty? @output))

; We put some content into the file we're tailing.
(spit tempfile "Here is some output!\n" :append true)

; We wait for it to turn up on the far side.
(let [start-time (System/currentTimeMillis)]
(while (and (empty? @output)
(> 5000 (- (System/currentTimeMillis) start-time)))
(Thread/sleep 10)))

; Now there should be output and (once the exec finishes) an exception.
(is (= ["Here is some output!\n"] @output))
(is (instance? Throwable @run-exec))
(is (= "Timed out." (:error (ex-data @run-exec))))))))

(deftest scp-from-can-copy-files
(testing "scp-from can copy files from the remote host"
Expand Down
11 changes: 10 additions & 1 deletion test/clj_libssh2/test_utils.clj
Expand Up @@ -3,7 +3,8 @@
[clojure.string :as str]
[clojure.test :as test]
[net.n01se.clojure-jna :as jna]
[clj-libssh2.logging :as logging]))
[clj-libssh2.logging :as logging])
(:import [java.io File]))

(def ssh-host "127.0.0.1")
(def ssh-port 2222)
Expand Down Expand Up @@ -61,3 +62,11 @@
(test/use-fixtures :once (test/join-fixtures
[with-sandbox-sshd
with-really-verbose-logging])))

(defmacro with-temp-file
[file & body]
`(let [file# (File/createTempFile "clj-libssh2" nil)
~file (.getPath file#)]
(try
(do ~@body)
(finally (.delete file#)))))

0 comments on commit 3ae9101

Please sign in to comment.