Skip to content

Commit

Permalink
Allow using WebSockets from user-provided servlets [IMMUTANT-576]
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias committed Sep 4, 2015
1 parent 73a329f commit ea8d068
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 36 deletions.
4 changes: 2 additions & 2 deletions project.clj
Expand Up @@ -91,8 +91,8 @@
clj-http "1.0.1"
environ "1.0.0"

org.projectodd.wunderboss "0.9.0"
;; org.projectodd.wunderboss "1.x.incremental.287"
;; org.projectodd.wunderboss "0.9.0"
org.projectodd.wunderboss "1.x.incremental.289"
;; org.projectodd.wunderboss "0.9.1-SNAPSHOT"

org.immutant :version
Expand Down
51 changes: 50 additions & 1 deletion web/dev-resources/testing/app.clj
Expand Up @@ -17,12 +17,15 @@
[immutant.web :as web]
[immutant.web.async :as async]
[immutant.web.sse :as sse]
[immutant.web.internal.servlet :as servlet]
[immutant.web.internal.ring :as ring]
[immutant.internal.util :refer [maybe-deref]]
[immutant.web.middleware :refer (wrap-session wrap-websocket)]
[immutant.codecs :refer (encode)]
[compojure.core :refer (GET defroutes)]
[ring.util.response :refer (charset redirect response)]
[ring.middleware.params :refer [wrap-params]]))
[ring.middleware.params :refer [wrap-params]])
(:import [javax.servlet.http HttpServlet]))

(defn counter [{:keys [session websocket?] :as request}]
(if websocket?
Expand Down Expand Up @@ -101,6 +104,50 @@
(when (nil? state) (println "CLIENT-STATE IS NIL!"))
(-> state pr-str response)))

(def user-defined-servlet
(let [events (atom nil)
results (atom (promise))]
(proxy [HttpServlet] []
(service [servlet-request servlet-response]
(let [ring-request (ring/ring-request-map servlet-request
[:servlet-request servlet-request]
[:servlet-response servlet-response])
ring-response (if (= "get-result" (:query-string ring-request))
(-> (maybe-deref @results 30000 :failure!) pr-str response)
(async/as-channel ring-request
:on-open (fn [stream]
(dotimes [n 10]
(async/send! stream (str n) {:close? (= n 9)})))))]
(ring/write-response servlet-response ring-response)))
(init [config]
(proxy-super init config)
(servlet/add-endpoint this config
{:on-open (fn [_]
(reset! events [:open]))
:on-close (fn [_ {c :code}]
(deliver @results (swap! events conj c)))
:on-message (fn [_ m]
(swap! events conj m))})))))

(def wrapped-handler-servlet
(let [events (atom nil)
results (atom (promise))]
(-> (fn [{:keys [websocket? query-string] :as req}]
(if (= "get-result" query-string)
(-> (maybe-deref @results 30000 :failure!) pr-str response)
(async/as-channel req
:on-open (fn [ch]
(if websocket?
(reset! events [:open])
(dotimes [n 10]
(async/send! ch (str n) {:close? (= n 9)}))))
:on-close (fn [_ {c :code}]
(when websocket?
(deliver @results (swap! events conj c))))
:on-message (fn [_ m]
(swap! events conj m)))))
servlet/create-servlet)))

(defroutes routes
(GET "/" [] counter)
(GET "/session" {s :session} (encode s))
Expand All @@ -125,4 +172,6 @@
(web/run (-> #'cdef-handler wrap-params) :path "/cdef")
(web/run (-> ws-as-channel wrap-session) :path "/ws")
(web/run (-> dump wrap-session wrap-params) :path "/dump")
(web/run user-defined-servlet :path "/user-defined-servlet")
(web/run wrapped-handler-servlet :path "/wrapped-handler-servlet")
(web/run nested-ws-routes :path "/nested-ws"))
5 changes: 3 additions & 2 deletions web/src/immutant/web/async.clj
Expand Up @@ -33,9 +33,10 @@
(.attach :set-headers-fn set-headers-fn)
(.notifyOpen nil)))

(defmulti ^:internal ^:no-doc initialize-stream :handler-type)
(let [dispatch-fn (fn [request _] (:handler-type request :servlet))]
(defmulti ^:internal ^:no-doc initialize-stream dispatch-fn)

(defmulti ^:internal ^:no-doc initialize-websocket :handler-type)
(defmulti ^:internal ^:no-doc initialize-websocket dispatch-fn))

(defprotocol ^:private MessageDispatch
(dispatch-message [from ch options-map]))
Expand Down
64 changes: 35 additions & 29 deletions web/src/immutant/web/internal/servlet.clj
Expand Up @@ -105,29 +105,40 @@
(defn ^ServerContainer server-container [^ServletContext context]
(.getAttribute context "javax.websocket.server.ServerContainer"))

(defn add-endpoint-with-handler
[^HttpServlet servlet ^ServletConfig config handler]
(let [servlet-context (.getServletContext config)
mapping (-> servlet-context
(.getServletRegistration (.getServletName servlet))
.getMappings
first)
path (apply str (take (- (count mapping) 2) mapping))
path (if (empty? path) "/" path)]
(.addEndpoint (server-container servlet-context)
(.. ServerEndpointConfig$Builder
(create DelegatingJavaxEndpoint path)
(configurator (proxy [ServerEndpointConfig$Configurator] []
(getEndpointInstance [_] (DelegatingJavaxEndpoint.))
(modifyHandshake [_ _ _]
(let [request (.get WebSocketHelpyHelpertonFilter/requestTL)
body (:body (handler (ring/ring-request-map request
[:handler-type :servlet]
[:servlet-request request]
[:websocket? true])))]
(when (instance? WebsocketChannel body)
(DelegatingJavaxEndpoint/setCurrentDelegate
(.endpoint ^WebsocketChannel body)))))))
build))))

(defn add-endpoint
"Adds an endpoint to a container obtained from the servlet-context"
[^ServletContext servlet-context {:keys [path handshake] :or {path "/"}}]
(.addEndpoint (server-container servlet-context)
(.. ServerEndpointConfig$Builder
(create DelegatingJavaxEndpoint path)
(configurator (proxy [ServerEndpointConfig$Configurator] []
(getEndpointInstance [_] (DelegatingJavaxEndpoint.))
(modifyHandshake [_ _ _]
(when handshake
(handshake
(.get WebSocketHelpyHelpertonFilter/requestTL))))))
build)))

(defn handshake-ring-invoker [handler]
(fn [request]
(let [body (:body (handler (ring/ring-request-map request
[:handler-type :servlet]
[:servlet-request request]
[:websocket? true])))]
(when (instance? WebsocketChannel body)
(DelegatingJavaxEndpoint/setCurrentDelegate
(.endpoint ^WebsocketChannel body))))))
"Adds a javax.websocket.Endpoint for the given `servlet` and `servlet-config`.
`ws-callbacks` is a map of callbacks for handling the WebSocket, and are
the same as the callbacks provided to [[immutant.web.async/as-channel]]."
[^HttpServlet servlet ^ServletConfig servlet-config ws-callbacks]
(add-endpoint-with-handler servlet servlet-config
(fn [req]
(async/as-channel req ws-callbacks))))

(defn ^Servlet create-servlet
"Encapsulate a ring handler within a servlet"
Expand All @@ -147,12 +158,7 @@
(init [^ServletConfig config]
(let [^HttpServlet this this]
(proxy-super init config)
(let [context (.getServletContext config)
mapping (-> context (.getServletRegistration (.getServletName this)) .getMappings first)
path (apply str (take (- (count mapping) 2) mapping))
path (if (empty? path) "/" path)]
(add-endpoint context
{:path path :handshake (handshake-ring-invoker handler)}))))))
(add-endpoint-with-handler this config handler)))))

(defn async-streaming-supported? []
(when-let [f (try-resolve 'immutant.wildfly/async-streaming-supported?)]
Expand All @@ -177,7 +183,7 @@
(handle [_ ch code reason]
(on-close ch {:code code
:reason reason}))))
(async-streaming-supported?)))
(boolean (async-streaming-supported?))))

(defmethod async/initialize-websocket :servlet
[request {:keys [on-open on-error on-close on-message on-error]}]
Expand Down
5 changes: 3 additions & 2 deletions web/src/immutant/web/internal/wunderboss.clj
Expand Up @@ -56,12 +56,13 @@
((u/try-resolve 'immutant.web.internal.undertow/create-websocket-init-handler)
handler ring-request-map))
handler)
servlet? (instance? Servlet hdlr)
opts (extract-options
(if (in-container?)
(if servlet?
(assoc opts :filter-map (websocket-servlet-filter-map))
opts)
Web$RegisterOption)]
(if (instance? Servlet hdlr)
(if servlet?
(try
(.registerServlet server hdlr opts)
(catch IllegalStateException e
Expand Down
44 changes: 44 additions & 0 deletions web/test-integration/immutant/web/integ_test.clj
Expand Up @@ -212,6 +212,50 @@
(is (= (str "/" path)
(-> result (deref 5000 "nil") read-string :path-info)))))))

(marktest websocket-from-user-servlet
(with-open [socket (ws/connect (str (url "ws") "user-defined-servlet"))]
(ws/send-msg socket "hello"))
(is (= [:open "hello" 1001] (read-string (get-body (str (url) "user-defined-servlet?get-result"))))))

(marktest stream-from-user-servlet
(with-open [client (http/create-client)]
(let [response (http/stream-seq client :get (str (url) "user-defined-servlet"))
stream @(:body response)
headers @(:headers response)
body (atom [])]
(loop []
(let [v (.poll stream)]
(when (not= v :http.async.client/done)
(when v
(swap! body conj (read-string (.toString v "UTF-8"))))
(recur))))
(is (= 200 (-> response :status deref :code)))
(is (= "chunked" (:transfer-encoding headers)))
(is (= (range 10) @body))
(is (= 10 (count @body))))))

(marktest websocket-from-wrapped-handler-servlet
(with-open [socket (ws/connect (str (url "ws") "wrapped-handler-servlet"))]
(ws/send-msg socket "hello"))
(is (= [:open "hello" 1001] (read-string (get-body (str (url) "wrapped-handler-servlet?get-result"))))))

(marktest stream-from-wrapped-handler-servlet
(with-open [client (http/create-client)]
(let [response (http/stream-seq client :get (str (url) "wrapped-handler-servlet"))
stream @(:body response)
headers @(:headers response)
body (atom [])]
(loop []
(let [v (.poll stream)]
(when (not= v :http.async.client/done)
(when v
(swap! body conj (read-string (.toString v "UTF-8"))))
(recur))))
(is (= 200 (-> response :status deref :code)))
(is (= "chunked" (:transfer-encoding headers)))
(is (= (range 10) @body))
(is (= 10 (count @body))))))

(marktest concurrent-ws-requests-should-not-cross-streams
(replace-handler
'(fn [request]
Expand Down

0 comments on commit ea8d068

Please sign in to comment.