Skip to content

Commit

Permalink
Expose channel idle-timeout [IMMUTANT-523]
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias committed Aug 5, 2015
1 parent 95ea7e6 commit 4585a38
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 7 deletions.
2 changes: 1 addition & 1 deletion project.clj
Expand Up @@ -91,7 +91,7 @@
clj-http "1.0.1"

;; org.projectodd.wunderboss "0.8.1"
org.projectodd.wunderboss "1.x.incremental.267"
org.projectodd.wunderboss "1.x.incremental.268"
;; org.projectodd.wunderboss "0.8.2-SNAPSHOT"

org.immutant :version
Expand Down
18 changes: 12 additions & 6 deletions web/src/immutant/web/async.clj
Expand Up @@ -251,7 +251,7 @@
is a Websocket upgrade request (the :websocket? key is true), a
Websocket channel will be created. Otherwise, an HTTP stream channel
is created. You interact with both channel types using the other
functions in this namespace, and through the given `callbacks`.
functions in this namespace, and through callbacks in `options`.
The callbacks common to both channel types are:
Expand All @@ -277,21 +277,27 @@
* :on-message - `(fn [ch message] ...)` - Called for each message
from the client. `message` will be a `String` or `byte[]`
You can also specify an `:idle-timeout` option, that will cause the
channel to be closed if idle more than the timeout. It defaults to 0
(no timeout), and is in milliseconds.
When the ring handler is called during a WebSocket upgrade request,
any headers returned in the response map are ignored, but any changes to
the session are applied.
Returns a ring response map, at least the :body of which *must* be
returned in the response map from the calling ring handler."
[request & callbacks]
(let [callbacks (-> callbacks
[request & options]
(let [options (-> options
u/kwargs-or-map->map
(o/validate-options as-channel))
ch (if (:websocket? request)
(initialize-websocket request callbacks)
(initialize-stream request callbacks))]
(initialize-websocket request options)
(initialize-stream request options))]
(when-let [idle-timeout (:idle-timeout options)]
(.setIdleTimeout ^Channel ch idle-timeout))
{:status 200
:body ch}))

(o/set-valid-options! as-channel
#{:on-open :on-close :on-message :on-error})
#{:on-open :on-close :on-message :on-error :idle-timeout})
79 changes: 79 additions & 0 deletions web/test-integration/immutant/web/integ_test.clj
Expand Up @@ -598,6 +598,85 @@
(is (= data @rcvd))))
(is (= :complete! (read-string (get-body (str (cdef-url) "state")))))))

(deftest ws-should-timeout-when-idle
(let [handler
'(do
(reset! client-state (promise))
(fn [request]
(async/as-channel request
:idle-timeout 100
:on-error (fn [_ e] (.printStackTrace e))
:on-open (fn [ch] (async/send! ch "open"))
:on-close (fn [_ reason]
(deliver @client-state :closed)))))
ready (promise)]
(replace-handler handler)
(with-open [socket (ws/connect (cdef-url "ws")
:on-receive #(deliver ready %))]
(is (= "open" (deref ready 1000 :failure)))
(is (= :closed (read-string (get-body (str (cdef-url) "state"))))))))

(deftest ws-timeout-should-occur-when-truly-idle
(let [handler
'(do
(reset! client-state (promise))
(fn [request]
(async/as-channel request
:idle-timeout 100
:on-error (fn [_ e] (.printStackTrace e))
:on-open (fn [ch] (future
(dotimes [n 4]
(async/send! ch (str n))
(Thread/sleep 50))
(async/send! ch "done")))
:on-close (fn [_ reason]
(deliver @client-state :closed)))))
ready (promise)
data (atom [])]
(replace-handler handler)
(with-open [socket (ws/connect (cdef-url "ws")
:on-receive (fn [m]
(if (= "done" m)
(deliver ready m)
(swap! data conj m))))]
(is (= "done" (deref ready 1000 :failure)))
(is (= ["0" "1" "2" "3"] @data))
(is (= :closed (read-string (get-body (str (cdef-url) "state"))))))))

(deftest stream-should-timeout-when-idle
(let [handler
'(do
(reset! client-state (promise))
(fn [request]
(async/as-channel request
:idle-timeout 100
:on-error (fn [_ e] (.printStackTrace e))
:on-close (fn [_ reason]
(deliver @client-state :closed)))))]
(replace-handler handler)
(is (= 200 (:status (get-response (cdef-url)))))
(is (= :closed (read-string (get-body (str (cdef-url) "state")))))))

(deftest stream-timeout-should-occur-when-truly-idle
(let [handler
'(do
(reset! client-state (promise))
(fn [request]
(async/as-channel request
:idle-timeout 100
:on-open (fn [ch] (future
(async/send! ch "[")
(dotimes [n 4]
(async/send! ch (str " " n))
(Thread/sleep 50))
(async/send! ch "]")))
:on-error (fn [_ e] (.printStackTrace e))
:on-close (fn [_ reason]
(deliver @client-state :closed)))))]
(replace-handler handler)
(is (= [0 1 2 3] (read-string (get-body (cdef-url)))))
(is (= :closed (read-string (get-body (str (cdef-url) "state")))))))

;; TODO: build a long-running random test

(when (not (in-container?))
Expand Down

0 comments on commit 4585a38

Please sign in to comment.