From 9a35912bbbe04e43721855a57a5159f96fc646f9 Mon Sep 17 00:00:00 2001 From: Conor McDermottroe Date: Sun, 31 Jan 2016 16:13:41 +0000 Subject: [PATCH] Enforce read timeouts better. Ensure that we still time out even if the other end is still working fine, just not producing any output. --- src/clj_libssh2/channel.clj | 45 ++++++++++++++++++++++++--------- src/clj_libssh2/session.clj | 3 ++- src/clj_libssh2/socket.clj | 10 +++++++- test/clj_libssh2/test_ssh.clj | 34 +++++++++++++++++++++++-- test/clj_libssh2/test_utils.clj | 11 +++++++- 5 files changed, 86 insertions(+), 17 deletions(-) diff --git a/src/clj_libssh2/channel.clj b/src/clj_libssh2/channel.clj index 0db5a6c..25688e2 100644 --- a/src/clj_libssh2/channel.clj +++ b/src/clj_libssh2/channel.clj @@ -398,20 +398,40 @@ [session channel stream] (if (not= :eof (:status stream)) (let [pump-fn (if (= :output (:direction stream)) pull push) - last-read-time (:last-read-time stream) - new-status (pump-fn session channel (:id stream) (:stream stream)) - now (System/currentTimeMillis)] - (when (and (= pump-fn pull) - (= :eagain new-status) - (< (-> session :options :read-timeout) (- now last-read-time))) - (error/raise "Read timeout on a channel." - {:direction (-> stream :direction name) - :id (-> stream :id) - :timeout (-> session :options :read-timeout) - :session session})) - (assoc stream :status new-status :last-read-time now)) + new-status (pump-fn session channel (:id stream) (:stream stream))] + (assoc stream :status new-status + :last-read-time (if (= :ready new-status) + (System/currentTimeMillis) + (:last-read-time stream)))) stream)) +(defn- enforce-read-timeout + "Enforce the read timeout on the output streams in a set of streams. + + Arguments: + + session The clj-libssh2.session.Session object for the current session. + channel The SSH channel that we're enforcing timeouts on. + streams The collection of streams that are in use in pump + + Return: + + nil, or throw an exception if the timeout is exceeded on any of the streams + given." + [session channel streams] + (let [read-timeout (-> session :options :read-timeout) + last-read-time (->> streams + (remove #(= :input (:direction %))) + (map :last-read-time) + (#(when-not (empty? %) + (apply max %))))] + (when (and (some? last-read-time) + (< read-timeout (- (System/currentTimeMillis) last-read-time))) + (error/raise "Read timeout on a channel." + {:timeout read-timeout + :session session + :channel channel})))) + (defn pump "Process a collection of input and output streams all at once. This will run until all streams have reported EOF. @@ -452,6 +472,7 @@ (do (when (contains? status-set :eagain) (wait session)) + (enforce-read-timeout session channel s) (recur (map (partial pump-stream session channel) streams))) (->> s (filter #(= :output (:direction %))) diff --git a/src/clj_libssh2/session.clj b/src/clj_libssh2/session.clj index 63b8268..f8dddb9 100644 --- a/src/clj_libssh2/session.clj +++ b/src/clj_libssh2/session.clj @@ -18,7 +18,8 @@ (def default-opts "The default options for a session. These are not only the defaults, but an exhaustive list of the legal options." - {:character-set "UTF-8" + {:blocking-timeout 60000 + :character-set "UTF-8" :fail-if-not-in-known-hosts false :fail-unless-known-hosts-matches true :known-hosts-file nil diff --git a/src/clj_libssh2/socket.clj b/src/clj_libssh2/socket.clj index 959dd58..931e054 100644 --- a/src/clj_libssh2/socket.clj +++ b/src/clj_libssh2/socket.clj @@ -95,6 +95,12 @@ (when (>= 0 select-result) (handle-errors session libssh2/ERROR_TIMEOUT)))))) +(defn enforce-blocking-timeout + [session start-time] + (when (< (-> session :options :blocking-timeout) + (- (System/currentTimeMillis) start-time)) + (handle-errors session libssh2/ERROR_TIMEOUT))) + (defmacro block "Turn a non-blocking call that returns EAGAIN into a blocking one." [session & body] @@ -102,7 +108,8 @@ start-time# (System/currentTimeMillis)] (while (= libssh2/ERROR_EAGAIN (do ~@body)) (handle-errors session# - (wait session# start-time#))))) + (wait session# start-time#)) + (enforce-blocking-timeout session# start-time#)))) (defmacro block-return "Similar to block, but for functions that return a pointer" @@ -115,5 +122,6 @@ (handle-errors session# errno#) (when (= libssh2/ERROR_EAGAIN errno#) (wait session# start-time#)) + (enforce-blocking-timeout session# start-time#) (recur (do ~@body))) result#)))) diff --git a/test/clj_libssh2/test_ssh.clj b/test/clj_libssh2/test_ssh.clj index 3090645..1cb2553 100644 --- a/test/clj_libssh2/test_ssh.clj +++ b/test/clj_libssh2/test_ssh.clj @@ -3,7 +3,8 @@ [clojure.string :as str] [clojure.test :refer :all] [clj-libssh2.ssh :as ssh] - [clj-libssh2.test-utils :as test])) + [clj-libssh2.test-utils :as test]) + (:import [java.io OutputStream])) (test/fixtures) @@ -86,7 +87,36 @@ (deftest exec-times-out-when-commands-take-too-long (testing "Commands that take too long result in a timeout" (is (thrown? Exception (ssh/exec {:port 2222 :read-timeout 500} - "echo foo; sleep 1; echo bar"))))) + "echo foo; sleep 1; echo bar")))) + (testing "Commands that are blocking on input time out correctly" + (test/with-temp-file tempfile + (let [output (atom []) + streaming-reader (proxy [OutputStream] [] + (write [b off len] + (swap! output conj (String. b off len)))) + run-exec (future + (try + (ssh/exec {:port 2222 + :read-timeout 5000} + (str "tail -F " tempfile) + :out streaming-reader) + (catch Throwable t t)))] + ; Output starts off empty + (is (empty? @output)) + + ; We put some content into the file we're tailing. + (spit tempfile "Here is some output!\n" :append true) + + ; We wait for it to turn up on the far side. + (let [start-time (System/currentTimeMillis)] + (while (and (empty? @output) + (> 5000 (- (System/currentTimeMillis) start-time))) + (Thread/sleep 10))) + + ; Now there should be output and (once the exec finishes) an exception. + (is (= ["Here is some output!\n"] @output)) + (is (instance? Throwable @run-exec)) + (is (= "Timed out." (:error (ex-data @run-exec)))))))) (deftest scp-from-can-copy-files (testing "scp-from can copy files from the remote host" diff --git a/test/clj_libssh2/test_utils.clj b/test/clj_libssh2/test_utils.clj index 921b5a8..b2775e7 100644 --- a/test/clj_libssh2/test_utils.clj +++ b/test/clj_libssh2/test_utils.clj @@ -3,7 +3,8 @@ [clojure.string :as str] [clojure.test :as test] [net.n01se.clojure-jna :as jna] - [clj-libssh2.logging :as logging])) + [clj-libssh2.logging :as logging]) + (:import [java.io File])) (def ssh-host "127.0.0.1") (def ssh-port 2222) @@ -61,3 +62,11 @@ (test/use-fixtures :once (test/join-fixtures [with-sandbox-sshd with-really-verbose-logging]))) + +(defmacro with-temp-file + [file & body] + `(let [file# (File/createTempFile "clj-libssh2" nil) + ~file (.getPath file#)] + (try + (do ~@body) + (finally (.delete file#)))))