Skip to content

Commit

Permalink
Don't use a static key for ws endpoints [IMMUTANT-554]
Browse files Browse the repository at this point in the history
The userProperties for the ServerEndpointConfig are global for the
endpoint, not local for each connection, so we store the endpoint under
the thread id instead of a static key. wunderboss reads it in onOpen,
which will always be called on the same thread as modifyHandshake.
  • Loading branch information
tobias committed Apr 10, 2015
1 parent 11218ed commit 02fb891
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
2 changes: 1 addition & 1 deletion project.clj
Expand Up @@ -91,7 +91,7 @@
clj-http "1.0.1"

;; org.projectodd.wunderboss "0.4.1"
org.projectodd.wunderboss "1.x.incremental.227"
org.projectodd.wunderboss "1.x.incremental.229"
;; org.projectodd.wunderboss "0.5.1-SNAPSHOT"

org.immutant :version
Expand Down
17 changes: 8 additions & 9 deletions web/src/immutant/web/internal/servlet.clj
Expand Up @@ -103,29 +103,28 @@

(defn add-endpoint
"Adds an endpoint to a container obtained from the servlet-context"
[^Endpoint endpoint ^ServletContext servlet-context {:keys [path handshake] :or {path "/"}}]
[^ServletContext servlet-context {:keys [path handshake] :or {path "/"}}]
(.addEndpoint (server-container servlet-context)
(.. ServerEndpointConfig$Builder
(create (class endpoint) path)
(create DelegatingJavaxEndpoint path)
(configurator (proxy [ServerEndpointConfig$Configurator] []
(getEndpointInstance [c] endpoint)
(modifyHandshake [^ServerEndpointConfig config _ response]
(getEndpointInstance [c] (DelegatingJavaxEndpoint.))
(modifyHandshake [config _ _]
(when handshake
(handshake config
(.get (WebSocketHelpyHelpersonFilter/requestTL))
response)))))
(.get WebSocketHelpyHelpersonFilter/requestTL))))))
build)))

(defn handshake-ring-invoker [handler]
(fn [^ServerEndpointConfig config request response]
(fn [^ServerEndpointConfig config request]
(let [body (:body (handler (ring/ring-request-map request
[:handler-type :servlet]
[:servlet-request request]
[:websocket? true])))]
(when (instance? WebsocketChannel body)
(-> config
.getUserProperties
(.put DelegatingJavaxEndpoint/ENDPOINT_KEY
(.put (DelegatingJavaxEndpoint/endpointKey)
(.endpoint ^WebsocketChannel body)))))))

(defn ^Servlet create-servlet
Expand All @@ -150,7 +149,7 @@
mapping (-> context (.getServletRegistration (.getServletName this)) .getMappings first)
path (apply str (take (- (count mapping) 2) mapping))
path (if (empty? path) "/" path)]
(add-endpoint (DelegatingJavaxEndpoint.) context
(add-endpoint context
{:path path :handshake (handshake-ring-invoker handler)}))))))

(defmethod async/initialize-stream :servlet
Expand Down
24 changes: 24 additions & 0 deletions web/test-integration/immutant/web/integ_test.clj
Expand Up @@ -179,6 +179,30 @@
(is (= (str "/" path)
(-> result (deref 5000 "nil") read-string :path-info)))))))

(deftest concurrent-ws-requests-should-not-cross-streams
(replace-handler
'(fn [request]
(async/as-channel request
:on-open (fn [ch]
(let [query-string (:query-string (async/originating-request ch))]
(async/send! ch (last (re-find #"x=(.*)$" query-string))))))))
(let [results (atom [])
clients (atom [])
done? (promise)
client-count 40]
(dotimes [n client-count]
(future
(let [client (ws/connect (str (cdef-url "ws") "?x=" n)
:on-receive (fn [m]
(swap! results conj m)
(when (= client-count (count @results))
(deliver done? true))))]
(swap! clients conj client))))
(is (deref done? 5000 nil))
(is (= (->> client-count (range 0) (map str) set)
(set @results)))
(doseq [client @clients]
(.close client))))

(deftest request-should-be-attached-to-channel-for-ws
(replace-handler
Expand Down

0 comments on commit 02fb891

Please sign in to comment.