Skip to content

Commit

Permalink
Initial async channel support [IMMUTANT-521]
Browse files Browse the repository at this point in the history
This adds a new namespace (immutant.web.async), that provides a common
channel abstraction for http streaming and websockets.

WIP
  • Loading branch information
tobias committed Jan 14, 2015
1 parent 1fdf2d9 commit e828d84
Show file tree
Hide file tree
Showing 11 changed files with 528 additions and 172 deletions.
2 changes: 1 addition & 1 deletion project.clj
Expand Up @@ -87,7 +87,7 @@
h2 "1.3.176"

;; org.projectodd.wunderboss "0.3.0"
org.projectodd.wunderboss "1.x.incremental.174"
org.projectodd.wunderboss "1.x.incremental.176"
;; org.projectodd.wunderboss "0.4.0-SNAPSHOT"

org.immutant :version}}
Expand Down
58 changes: 49 additions & 9 deletions web/dev-resources/testing/app.clj
Expand Up @@ -15,6 +15,7 @@
(ns testing.app
(:require [immutant.web :as web]
[immutant.web.websocket :as ws]
[immutant.web.async :as async]
[immutant.web.middleware :refer (wrap-session)]
[immutant.codecs :refer (encode)]
[compojure.core :refer (GET defroutes)]
Expand All @@ -23,16 +24,16 @@
(def handshakes (atom {}))

(defn on-open-set-handshake [channel handshake]
(let [data {:headers (ws/headers handshake)
:parameters (ws/parameters handshake)
:uri (ws/uri handshake)
:query (ws/query-string handshake)
:session (ws/session handshake)
:user-principal (ws/user-principal handshake)}]
(let [data {:headers (async/headers handshake)
:parameters (async/parameters handshake)
:uri (async/uri handshake)
:query (async/query-string handshake)
:session (async/session handshake)
:user-principal (async/user-principal handshake)}]
(swap! handshakes assoc channel data)))

(defn on-message-send-handshake [channel message]
(ws/send! channel (encode (get @handshakes channel))))
(async/send! channel (encode (get @handshakes channel))))

(defn counter [{session :session}]
(let [count (:count session 0)
Expand All @@ -51,16 +52,55 @@
:body body}
cs)))

(defn chunked-stream [request]
(async/as-channel request
{:on-open
(fn [stream]
(.start
(Thread.
(fn []
(async/send! stream "[" false)
(dotimes [n 10]
;; we have to send a few bytes with each
;; response - there is a min-bytes threshold to
;; trigger data to the client
(async/send! stream (format "%s ;; %s\n" n (repeat 128 "1")) false))
;; 2-arity send! closes the stream
(async/send! stream "]")))))}))

(defn non-chunked-stream [request]
(async/as-channel request
{:on-open
(fn [stream]
(async/send! stream (str (repeat 128 "1"))))}))

(defn ws-as-channel
[request]
(async/as-channel request
{:on-open (fn [ch hs]
#_(println "TC: open" ch hs))
:on-message (fn [ch message]
#_(println "TC: message" message)
(async/send! ch (.toUpperCase message)))
:on-error (fn [ch err]
(println "Error on websocket")
(.printStackTrace err))
:on-close (fn [ch reason]
#_(println "TC: closed" reason))}))

(defroutes routes
(GET "/" [] counter)
(GET "/session" {s :session} (encode s))
(GET "/unsession" [] {:session nil})
(GET "/request" [] dump)
(GET "/charset" [] with-charset))
(GET "/charset" [] with-charset)
(GET "/chunked-stream" [] chunked-stream)
(GET "/non-chunked-stream" [] non-chunked-stream))

(defn run []
(web/run (-> #'routes
wrap-session
(ws/wrap-websocket
:on-open #'on-open-set-handshake
:on-message #'on-message-send-handshake))))
:on-message #'on-message-send-handshake)))
(web/run ws-as-channel :path "/ws"))
100 changes: 100 additions & 0 deletions web/src/immutant/web/async.clj
@@ -0,0 +1,100 @@
;; Copyright 2014 Red Hat, Inc, and individual contributors.
;;
;; Licensed under the Apache License, Version 2.0 (the "License");
;; you may not use this file except in compliance with the License.
;; You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.

(ns immutant.web.async
(:import [org.projectodd.wunderboss.web.async HttpChannel]))

(defn ^:internal streaming-body? [body]
(instance? HttpChannel body))

(defn ^:internal open-stream [^HttpChannel channel headers]
(.notifyOpen channel nil))

(defmulti ^:internal initialize-stream :handler-type)

(defmulti ^:internal initialize-websocket :handler-type)

(defprotocol WebsocketHandshake
"Reflects the state of the initial websocket upgrade request"
(headers [hs] "Return request headers")
(parameters [hs] "Return map of params from request")
(uri [hs] "Return full request URI")
(query-string [hs] "Return query portion of URI")
(session [hs] "Return the user's session data, if any")
(user-principal [hs] "Return authorized `java.security.Principal`")
(user-in-role? [hs role] "Is user in role identified by String?"))

(defprotocol Channel
"Streaming channel interface"
(open? [ch] "Is the channel open?")
(close [ch]
"Gracefully close the channel.
This will trigger the on-close callback for the channel if one is
registered.")
(send! [ch message] [ch message close?]
"Send a message to the channel.
If close? is truthy, close the channel after writing. close?
defaults to false for WebSockets, true otherwise.
Sending is asynchronous for WebSockets, but blocking for
HTTP channels.
Returns nil if the channel is closed, true otherwise."))

(extend-type org.projectodd.wunderboss.web.async.Channel
Channel
(open? [ch] (.isOpen ch))
(close [ch] (.close ch))
(send!
([ch message]
;; TODO: support codecs? support the same functionality as ring bodies?
(.send ch message))
([ch message close?]
(.send ch message close?))))

(defn as-channel
"Converts the current ring `request` in to an asynchronous channel.
The type of channel created depends on the request - if the request
is a Websocket upgrade request, a Websocket channel will be created.
Otherwise, an HTTP channel is created. You interact with both
channel types through the [[Channel]] protocol, and through the
given `callbacks`.
The callbacks common to both channel types are:
* `:on-open` - `(fn [ch ctx] ...)`
* `:on-close` - `(fn [ch reason] ...)` - invoked after close. TODO: make reason consistent
If the channel is a Websocket, the following callbacks are also used:
* `:on-message` - `(fn [ch message] ...)` - String or byte[]
* `:on-error` - `(fn [ch throwable] ...)`
The channel won't be available for writing until the `:on-open`
callback is invoked.
discuss: sessions, headers
provide usage example
Returns a ring response map, at least the :body of which *must* be
returned in the response map from the calling ring handler."
[request {:keys [on-open on-close on-message on-error] :as callbacks}]
(let [ch (if (:websocket? request)
(initialize-websocket request callbacks)
(initialize-stream request callbacks))]
{:status 200
:body ch}))
55 changes: 55 additions & 0 deletions web/src/immutant/web/internal/headers.clj
@@ -0,0 +1,55 @@
;; Copyright 2014 Red Hat, Inc, and individual contributors.
;;
;; Licensed under the Apache License, Version 2.0 (the "License");
;; you may not use this file except in compliance with the License.
;; You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.

(ns ^{:no-doc true}
immutant.web.internal.headers
(:require [clojure.string :as str]
ring.util.request))

(def charset-pattern (deref #'ring.util.request/charset-pattern))

(def default-encoding "ISO-8859-1")

(defprotocol Headers
(get-names [x])
(get-values [x key])
(get-value [x key])
(set-header [x key value])
(add-header [x key value]))

(defn ^String get-character-encoding [headers]
(or
(when-let [type (get-value headers "content-type")]
(second (re-find charset-pattern type)))
default-encoding))

(defn headers->map [headers]
(persistent!
(reduce
(fn [accum ^String name]
(assoc! accum
(-> name .toLowerCase)
(->> name
(get-values headers)
(str/join ","))))
(transient {})
(get-names headers))))

(defn write-headers
[output, headers]
(doseq [[^String k, v] headers]
(if (coll? v)
(doseq [value v]
(add-header output k (str value)))
(set-header output k (str v)))))
50 changes: 9 additions & 41 deletions web/src/immutant/web/internal/ring.clj
Expand Up @@ -13,15 +13,13 @@
;; limitations under the License.

(ns ^{:no-doc true} immutant.web.internal.ring
(:require [potemkin :refer [def-map-type]]
[clojure.string :as str]
[clojure.java.io :as io]
ring.util.request)
(:require [potemkin :refer [def-map-type]]
[clojure.java.io :as io]
[immutant.web.async :as async]
[immutant.web.internal.headers :as hdr])
(:import [java.io File InputStream OutputStream]
[clojure.lang ISeq PersistentHashMap]))

(def charset-pattern (deref #'ring.util.request/charset-pattern))

(defprotocol Session
(attribute [session key])
(set-attribute! [session key value])
Expand Down Expand Up @@ -105,37 +103,6 @@
(.put m k v))
m))))

(defprotocol Headers
(get-names [x])
(get-values [x key])
(get-value [x key])
(set-header [x key value])
(add-header [x key value]))

(defn headers->map [headers]
(persistent!
(reduce
(fn [accum ^String name]
(assoc! accum
(-> name .toLowerCase)
(->> name
(get-values headers)
(str/join ","))))
(transient {})
(get-names headers))))

(defn write-headers
[output, headers]
(doseq [[^String k, v] headers]
(if (coll? v)
(doseq [value v]
(add-header output k (str value)))
(set-header output k (str v)))))

(defn ^String get-character-encoding [headers]
(when-let [type (get-value headers "content-type")]
(second (re-find charset-pattern type))))

(defprotocol BodyWriter
"Writing different body types to output streams"
(write-body [body stream headers]))
Expand All @@ -150,7 +117,7 @@

String
(write-body [body ^OutputStream os headers]
(.write os (.getBytes body (or (get-character-encoding headers) "ISO-8859-1"))))
(.write os (.getBytes body (hdr/get-character-encoding headers))))

ISeq
(write-body [body ^OutputStream os headers]
Expand All @@ -177,6 +144,7 @@
(when status
(set-status response status))
(let [hmap (header-map response)]
(write-headers hmap headers)
(write-body body (output-stream response) hmap)))

(hdr/write-headers hmap headers)
(if (async/streaming-body? body)
(async/open-stream body hmap)
(write-body body (output-stream response) hmap))))

0 comments on commit e828d84

Please sign in to comment.