Skip to content

Commit

Permalink
Make the request available via the channel.
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias committed Feb 9, 2015
1 parent 97993db commit 58c8de5
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 21 deletions.
4 changes: 2 additions & 2 deletions project.clj
Expand Up @@ -87,8 +87,8 @@
h2 "1.3.176"
jersey-media-sse "2.15"

org.projectodd.wunderboss "0.4.0"
;; org.projectodd.wunderboss "1.x.incremental.190"
;; org.projectodd.wunderboss "0.4.0"
org.projectodd.wunderboss "1.x.incremental.193"
;; org.projectodd.wunderboss "0.5.0-SNAPSHOT"

org.immutant :version}}
Expand Down
4 changes: 4 additions & 0 deletions web/src/immutant/web/async.clj
Expand Up @@ -37,6 +37,8 @@
This will trigger the :on-close callback if one is registered. with
[[as-channel]].")
(originating-request [ch]
"Returns the request map for the request that initiated the channel.")
(send! [ch message] [ch message options]
"Send a message to the channel, asynchronously.
Expand Down Expand Up @@ -65,6 +67,8 @@
Channel
(open? [^org.projectodd.wunderboss.web.async.Channel ch] (.isOpen ch))
(close [^org.projectodd.wunderboss.web.async.Channel ch] (.close ch))
(originating-request [^org.projectodd.wunderboss.web.async.Channel ch]
(.originatingRequest ch))
(send!
([ch message]
(send! ch message nil))
Expand Down
13 changes: 8 additions & 5 deletions web/src/immutant/web/internal/servlet.clj
Expand Up @@ -16,7 +16,8 @@
(:require [immutant.web.internal.ring :as ring]
[immutant.web.internal.headers :as hdr]
[immutant.web.async :as async])
(:import [org.projectodd.wunderboss.web.async Channel$OnOpen Channel$OnClose Channel$OnError
(:import [org.projectodd.wunderboss.web.async Channel
Channel$OnOpen Channel$OnClose Channel$OnError
ServletHttpChannel Util]
[org.projectodd.wunderboss.web.async.websocket DelegatingJavaxEndpoint
JavaxWebsocketChannel WebSocketHelpyHelpersonFilter
Expand Down Expand Up @@ -156,9 +157,10 @@
(ServletHttpChannel.
(:servlet-request request)
(:servlet-response request)
(when on-open
(reify Channel$OnOpen
(handle [_ ch _]
(reify Channel$OnOpen
(handle [_ ch _]
(.setOriginatingRequest ^Channel ch request)
(when on-open
(on-open ch))))
(when on-error
(reify Channel$OnError
Expand All @@ -171,10 +173,11 @@
:reason reason}))))))

(defmethod async/initialize-websocket :servlet
[_ {:keys [on-open on-error on-close on-message on-error]}]
[request {:keys [on-open on-error on-close on-message on-error]}]
(JavaxWebsocketChannel.
(reify Channel$OnOpen
(handle [_ ch config]
(.setOriginatingRequest ^Channel ch request)
(when on-open
(on-open ch))))
(reify Channel$OnError
Expand Down
11 changes: 7 additions & 4 deletions web/src/immutant/web/internal/undertow.clj
Expand Up @@ -23,8 +23,9 @@
[io.undertow.util HeaderMap Headers HttpString Sessions]
[io.undertow.websockets.core CloseMessage WebSocketChannel]
[io.undertow.websockets.spi WebSocketHttpExchange]
[org.projectodd.wunderboss.web.async Channel$OnOpen Channel$OnClose
Channel$OnError UndertowHttpChannel]
[org.projectodd.wunderboss.web.async Channel
Channel$OnOpen Channel$OnClose Channel$OnError
UndertowHttpChannel]
[org.projectodd.wunderboss.web.async.websocket UndertowWebsocket
UndertowWebsocketChannel
WebsocketChannel WebsocketChannel$OnMessage WebsocketInitHandler]))
Expand Down Expand Up @@ -139,6 +140,7 @@
(when on-open
(reify Channel$OnOpen
(handle [_ ch _]
(.setOriginatingRequest ^Channel ch request)
(on-open ch))))
(when on-error
(reify Channel$OnError
Expand All @@ -150,10 +152,11 @@
(on-close ch {:code code :reason reason}))))))

(defmethod async/initialize-websocket :undertow
[_ {:keys [on-open on-error on-close on-message on-error]}]
[request {:keys [on-open on-error on-close on-message on-error]}]
(UndertowWebsocketChannel.
(reify Channel$OnOpen
(handle [_ ch handshake]
(handle [_ ch _]
(.setOriginatingRequest ^Channel ch request)
(when on-open
(on-open ch))))
(reify Channel$OnError
Expand Down
35 changes: 35 additions & 0 deletions web/test-integration/immutant/web/integ_test.clj
Expand Up @@ -178,6 +178,24 @@
(is (= (str "/" path)
(-> result (deref 5000 "nil") read-string :path-info)))))))


(deftest request-should-be-attached-to-channel-for-ws
(replace-handler
'(do
(reset! client-state (promise))
(fn [request]
(async/as-channel request
:on-message (fn [ch _] (deliver @client-state
(dissoc (async/originating-request ch)
:server-exchange
:body)))))))
(with-open [socket (ws/connect (cdef-url "ws"))]
(ws/send-msg socket "hello")
(let [request (read-string (get-body (str (cdef-url) "state")))]
(is request)
(is (= "/" (:path-info request)))
(is (= "Upgrade" (get-in request [:headers "connection"]))))))

(deftest ws-on-close-should-be-invoked-when-closing-on-server-side
(replace-handler
'(do
Expand Down Expand Up @@ -218,6 +236,23 @@
(ws/send-msg socket "hello")
(is (= "BOOM" (read-string (get-body (str (cdef-url) "state")))))))

(deftest request-should-be-attached-to-channel-for-stream
(replace-handler
'(do
(reset! client-state (promise))
(fn [request]
(async/as-channel request
:on-open (fn [ch]
(deliver @client-state
(dissoc (async/originating-request ch)
:server-exchange
:body))
(async/send! ch "done" {:close? true}))))))
(is (= "done" (get-body (cdef-url))))
(let [request (read-string (get-body (str (cdef-url) "state")))]
(is request)
(is (= "/" (:path-info request)))))

(deftest stream-on-close-should-be-invoked-when-closing-on-server-side
(replace-handler
'(do
Expand Down
20 changes: 11 additions & 9 deletions web/test/immutant/web/async_test.clj
Expand Up @@ -94,17 +94,19 @@
(stop server)))))

(deftest upgrade-headers
(let [result (promise)]
(run (wrap-params (fn [req] (deliver result req) (as-channel req))))
(let [requestp (promise)]
(run (wrap-params
(wrap-websocket nil
:on-open (fn [ch] (deliver requestp (originating-request ch))))))
(with-open [client (http/create-client)
socket (http/websocket client "ws://localhost:8080/?x=y&j=k")]
(let [upgrade (deref result 1000 nil)]
(is (not (nil? upgrade)))
(is (= "Upgrade" (get-in upgrade [:headers "connection"])))
(is (= "k" (get-in upgrade [:params "j"])))
(is (= "x=y&j=k" (:query-string upgrade)))
(is (= "/" (:uri upgrade))) ;TODO: include query string?
;; (is (false? (-> upgrade (user-in-role? "admin"))))
(let [request (deref requestp 1000 nil)]
(is (not (nil? request)))
(is (= "Upgrade" (get-in request [:headers "connection"])))
(is (= "k" (get-in request [:params "j"])))
(is (= "x=y&j=k" (:query-string request)))
(is (= "/" (:uri request))) ;TODO: include query string?
;; (is (false? (-> request (user-in-role? "admin"))))
))
(stop)))

Expand Down
2 changes: 1 addition & 1 deletion web/test/immutant/web_test.clj
Expand Up @@ -29,7 +29,7 @@
(:import clojure.lang.ExceptionInfo
java.net.ConnectException))

(u/set-log-level! (or (System/getenv "LOG_LEVEL") :OFF))
(u/set-log-level! (or (System/getenv "LOG_LEVEL") :ERROR))

(use-fixtures :each u/reset-fixture)

Expand Down

0 comments on commit 58c8de5

Please sign in to comment.