Skip to content

Commit

Permalink
More ws work, plus chunked streaming.
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias committed Jan 8, 2015
1 parent d1621d9 commit f7c27b3
Show file tree
Hide file tree
Showing 9 changed files with 374 additions and 192 deletions.
15 changes: 8 additions & 7 deletions web/dev-resources/testing/app.clj
Expand Up @@ -15,6 +15,7 @@
(ns testing.app
(:require [immutant.web :as web]
[immutant.web.websocket :as ws]
[immutant.web.async :as async]
[immutant.web.middleware :refer (wrap-session)]
[immutant.codecs :refer (encode)]
[compojure.core :refer (GET defroutes)]
Expand All @@ -23,16 +24,16 @@
(def handshakes (atom {}))

(defn on-open-set-handshake [channel handshake]
(let [data {:headers (ws/headers handshake)
:parameters (ws/parameters handshake)
:uri (ws/uri handshake)
:query (ws/query-string handshake)
:session (ws/session handshake)
:user-principal (ws/user-principal handshake)}]
(let [data {:headers (async/headers handshake)
:parameters (async/parameters handshake)
:uri (async/uri handshake)
:query (async/query-string handshake)
:session (async/session handshake)
:user-principal (async/user-principal handshake)}]
(swap! handshakes assoc channel data)))

(defn on-message-send-handshake [channel message]
(ws/send! channel (encode (get @handshakes channel))))
(async/send! channel (encode (get @handshakes channel))))

(defn counter [{session :session}]
(let [count (:count session 0)
Expand Down
187 changes: 187 additions & 0 deletions web/src/immutant/web/async.clj
@@ -0,0 +1,187 @@
;; Copyright 2014 Red Hat, Inc, and individual contributors.
;;
;; Licensed under the Apache License, Version 2.0 (the "License");
;; you may not use this file except in compliance with the License.
;; You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.

(ns immutant.web.async
(:require [immutant.web.internal.headers :as hdr])
(:import [java.io OutputStream IOException]
java.net.URI
java.util.concurrent.atomic.AtomicBoolean
[org.projectodd.wunderboss.websocket UndertowWebsocket Endpoint WebsocketInitHandler]))

(defprotocol Channel
"Streaming channel interface"
(open? [ch] "Is the channel open?")
(close [ch]
"Gracefully close the channel.
This will trigger the on-close callback for the channel if one is
registered.")
(send! [ch message] [ch message close?]
"Send a message to the channel.
If close? is truthy, close the channel after writing. close?
defaults to false for WebSockets, true otherwise.
The data is sent asynchronously for WebSockets, but blocks for
HTTP channels.
Returns nil if the channel is closed, true otherwise."))

(defrecord StreamChannel [^OutputStream os on-close ^AtomicBoolean open-state encoding]
Channel
(close [this]
(when (open? this)
(.close os)
(.set open-state false)
(when on-close
(on-close this nil))))
(send! [this message]
(send! this message true))
(send! [this message close?]
(when (open? this)
(let [bytes (if (instance? String message)
(.getBytes message encoding)
message)]
;; TODO: throw if message isn't String or bytes[]? support codecs?
(try
(.write os bytes)
(if close?
(close this)
(.flush os))
true
;; TODO: should we only deal with "Broken pipe" IOE's here?
;; rethrow others?
(catch IOException e
(try
(close this)
;; undertow throws when you close with unwritten data,
;; but the data can never be written - see UNDERTOW-368
(catch IOException ignored))
nil)))))
(open? [_]
(.get open-state)))

(defrecord StreamMarker [os callbacks])

(defprotocol Handshake
"Reflects the state of the initial websocket upgrade request"
(headers [hs] "Return request headers")
(parameters [hs] "Return map of params from request")
(uri [hs] "Return full request URI")
(query-string [hs] "Return query portion of URI")
(session [hs] "Return the user's session data, if any")
(user-principal [hs] "Return authorized `java.security.Principal`")
(user-in-role? [hs role] "Is user in role identified by String?"))

(defrecord WebsocketMarker
[channel-promise endpoint])

(defn create-websocket-init-handler [handler-fn downstream-handler request-map-fn]
(UndertowWebsocket/createConditionalUpgradeHandler
(reify WebsocketInitHandler
(shouldConnect [_ exchange endpoint-wrapper]
(boolean
(let [body (:body (handler-fn (request-map-fn exchange
[:websocket? true])))]
(when (instance? WebsocketMarker body)
(.setEndpoint endpoint-wrapper (:endpoint body))
true)))))
downstream-handler))

(defn create-wboss-endpoint [chan-promise {:keys [on-message on-open on-close on-error]}]
(reify Endpoint
(onMessage [_ channel message]
(when on-message
(on-message channel message)))
(onOpen [_ channel exchange]
(when chan-promise
(deliver chan-promise channel))
(when on-open
(on-open channel exchange)))
(onClose [_ channel cm]
(when on-close
(on-close channel {:code (.getCode cm) :reason (.getReason cm)})))
(onError [_ channel error]
(when on-error
(on-error channel error)))))

(defn initialize-websocket
[request callbacks]
(when-not (:websocket? request)
(throw (IllegalStateException. "The request isn't a websocket upgrade.")))
(let [chan-promise (promise)]
(->WebsocketMarker chan-promise (create-wboss-endpoint chan-promise callbacks))))

(defn streaming-body? [body]
(instance? StreamMarker body))

(defn add-streaming-headers [headers]
headers
;; (assoc headers "Transfer-Encoding" "chunked" "baz" "ffs")
)

(defn open-stream [{:keys [os callbacks]} headers]
(when-let [on-open (:on-open callbacks)]
(on-open (->StreamChannel os
(:on-close callbacks)
(AtomicBoolean. true)
(or (hdr/get-character-encoding headers)
hdr/default-encoding)))))

;; on-open on-close
(defn initialize-stream [request callbacks]
(->StreamMarker
(-> request
:server-exchange
(.setPersistent true)
.dispatch
.getOutputStream)
callbacks))

(defn as-channel [request callbacks]
(let [ch (if (:websocket? request)
(initialize-websocket request callbacks)
(initialize-stream request callbacks))]
{:status 200
:body ch}))

(comment
(require '[immutant.web.async :as s]
'[immutant.web :as web])

(def a-stream (atom nil))

(defn handler [req]
(println "called")
(let [data "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec a diam lectus. Sed sit amet ipsum mauris. Maecenas congue ligula ac quam viverra nec consectetur ante hendrerit. Donec et mollis dolor. Praesent et diam eget libero egestas mattis sit amet vitae augue. Nam tincidunt congue enim, ut porta lorem lacinia consectetur. Donec ut libero sed arcu vehicula ultricies a non tortor. Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean ut gravida lorem. Ut turpis felis, pulvinar a semper sed, adipiscing id dolor. Pellentesque auctor nisi id magna consequat sagittis. Curabitur dapibus enim sit amet elit pharetra tincidunt feugiat nisl imperdiet. Ut convallis libero in urna ultrices accumsan. Donec sed odio eros. Donec viverra mi quis quam pulvinar at malesuada arcu rhoncus. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. In rutrum accumsan ultricies. Mauris vitae nisi at sem facilisis semper ac in est."]
(assoc
(s/as-channel req
{:on-open
(fn [stream]
(println "OPEN" stream)
(reset! a-stream stream)
(.start (Thread.
(fn []
(dotimes [n 10]
(Thread/sleep 1000)
(when (s/open? stream)
(s/send! stream (format "%s\n%s\n" n data))))
(s/close stream)))))
:on-close
(fn [stream]
(println "CLOSED" stream))})
:headers {"foo" "bar"})))

(web/run handler)
)
53 changes: 53 additions & 0 deletions web/src/immutant/web/internal/headers.clj
@@ -0,0 +1,53 @@
;; Copyright 2014 Red Hat, Inc, and individual contributors.
;;
;; Licensed under the Apache License, Version 2.0 (the "License");
;; you may not use this file except in compliance with the License.
;; You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.

(ns ^{:no-doc true}
immutant.web.internal.headers
(:require [clojure.string :as str]
ring.util.request))

(def charset-pattern (deref #'ring.util.request/charset-pattern))

(def default-encoding "ISO-8859-1")

(defprotocol Headers
(get-names [x])
(get-values [x key])
(get-value [x key])
(set-header [x key value])
(add-header [x key value]))

(defn ^String get-character-encoding [headers]
(when-let [type (get-value headers "content-type")]
(second (re-find charset-pattern type))))

(defn headers->map [headers]
(persistent!
(reduce
(fn [accum ^String name]
(assoc! accum
(-> name .toLowerCase)
(->> name
(get-values headers)
(str/join ","))))
(transient {})
(get-names headers))))

(defn write-headers
[output, headers]
(doseq [[^String k, v] headers]
(if (coll? v)
(doseq [value v]
(add-header output k (str value)))
(set-header output k (str v)))))
57 changes: 15 additions & 42 deletions web/src/immutant/web/internal/ring.clj
Expand Up @@ -13,15 +13,13 @@
;; limitations under the License.

(ns ^{:no-doc true} immutant.web.internal.ring
(:require [potemkin :refer [def-map-type]]
[clojure.string :as str]
[clojure.java.io :as io]
ring.util.request)
(:require [potemkin :refer [def-map-type]]
[clojure.java.io :as io]
[immutant.web.async :as async]
[immutant.web.internal.headers :as hdr])
(:import [java.io File InputStream OutputStream]
[clojure.lang ISeq PersistentHashMap]))

(def charset-pattern (deref #'ring.util.request/charset-pattern))

(defprotocol Session
(attribute [session key])
(set-attribute! [session key value])
Expand Down Expand Up @@ -105,37 +103,6 @@
(.put m k v))
m))))

(defprotocol Headers
(get-names [x])
(get-values [x key])
(get-value [x key])
(set-header [x key value])
(add-header [x key value]))

(defn headers->map [headers]
(persistent!
(reduce
(fn [accum ^String name]
(assoc! accum
(-> name .toLowerCase)
(->> name
(get-values headers)
(str/join ","))))
(transient {})
(get-names headers))))

(defn write-headers
[output, headers]
(doseq [[^String k, v] headers]
(if (coll? v)
(doseq [value v]
(add-header output k (str value)))
(set-header output k (str v)))))

(defn ^String get-character-encoding [headers]
(when-let [type (get-value headers "content-type")]
(second (re-find charset-pattern type))))

(defprotocol BodyWriter
"Writing different body types to output streams"
(write-body [body stream headers]))
Expand All @@ -150,7 +117,9 @@

String
(write-body [body ^OutputStream os headers]
(.write os (.getBytes body (or (get-character-encoding headers) "ISO-8859-1"))))
(.write os (.getBytes body
(or (hdr/get-character-encoding headers)
hdr/default-encoding))))

ISeq
(write-body [body ^OutputStream os headers]
Expand All @@ -176,7 +145,11 @@
[response {:keys [status headers body]}]
(when status
(set-status response status))
(let [hmap (header-map response)]
(write-headers hmap headers)
(write-body body (output-stream response) hmap)))

(let [hmap (header-map response)
streaming? (async/streaming-body? body)]
(hdr/write-headers hmap (if streaming?
(async/add-streaming-headers headers)
headers))
(if streaming?
(async/open-stream body hmap)
(write-body body (output-stream response) hmap))))

0 comments on commit f7c27b3

Please sign in to comment.