Permalink
Browse files

out with the old websocket implementation, and in with the new

  • Loading branch information...
ztellman committed Jan 25, 2012
1 parent 43e6f8d commit ec66b05cca06bdd3503ad038c244b6a2f6d7876c
Showing with 85 additions and 118 deletions.
  1. +12 −2 project.clj
  2. +1 −5 src/aleph/http.clj
  3. +1 −60 src/aleph/http/client.clj
  4. +69 −42 src/aleph/http/websocket.clj
  5. +1 −1 src/aleph/netty.clj
  6. +1 −1 src/aleph/tcp.clj
  7. +0 −7 test/aleph/test/http.clj
View
@@ -8,10 +8,20 @@
[org.clojure/tools.logging "0.2.3"]
[org.clojure/data.json "0.1.1"]
[org.clojure.contrib/prxml "1.3.0-alpha4"]
[org.jboss.netty/netty "3.2.5.Final"]
[io.netty/netty "3.3.0.Final"]
[clj-http "0.1.3"]
[lamina "0.4.1-SNAPSHOT"]
[gloss "0.2.1-SNAPSHOT"]
[user-agent-utils "1.2.3"]
[potemkin "0.1.1-SNAPSHOT"]]
[potemkin "0.1.1"]]
:multi-deps {:all [[org.clojure/tools.logging "0.2.3"]
[org.clojure/data.json "0.1.1"]
[org.clojure.contrib/prxml "1.3.0-alpha4"]
[io.netty/netty "3.3.0.Final"]
[clj-http "0.1.3"]
[lamina "0.4.1-SNAPSHOT"]
[gloss "0.2.1-SNAPSHOT"]
[user-agent-utils "1.2.3"]
[potemkin "0.1.1"]]
"1.2" [[org.clojure/clojure "1.2.1"]]}
:jvm-opts ["-server" "-XX:+UseConcMarkSweepGC"])
View
@@ -15,16 +15,14 @@
(:require
[aleph.http.server :as server]
[aleph.http.client :as client]
[aleph.http.utils :as utils]
[aleph.http.policy-file :as policy])
[aleph.http.utils :as utils])
(:import
[java.io InputStream]))
(import-fn server/start-http-server)
(import-fn client/http-client)
(import-fn client/pipelined-http-client)
(import-fn client/http-request)
(import-fn client/websocket-client)
(defn sync-http-request
"A synchronous version of http-request. Halts the thread until the response has returned,
@@ -34,8 +32,6 @@
([request timeout]
@(http-request request timeout)))
(import-fn policy/start-policy-file-server)
(defn wrap-aleph-handler
"Allows for an asynchronous handler to be used within a largely synchronous application.
Assuming the top-level handler has been wrapped in wrap-ring-handler, this function can be
View
@@ -14,8 +14,7 @@
[aleph.http.client requests responses]
[lamina core connections api])
(:require
[clj-http.client :as client]
[aleph.http.websocket.hixie :as hixie])
[clj-http.client :as client])
(:import
[java.util.concurrent
TimeoutException]
@@ -199,61 +198,3 @@
response)))
;;;
(defn websocket-handshake [protocol]
(wrap-client-request
{:method :get
:headers {"Sec-WebSocket-Key1" "18x 6]8vM;54 *(5: { U1]8 z [ 8" ;;TODO: actually randomly generate these
"Sec-WebSocket-Key2" "1_ tx7X d < nw 334J702) 7]o}` 0"
"Sec-WebSocket-Protocol" protocol
"Upgrade" "WebSocket"
"Connection" "Upgrade"}
:body (ByteBuffer/wrap (.getBytes "Tm[K T2u" "utf-8"))}))
(def expected-response (ByteBuffer/wrap (.getBytes "fQJ,fN/4F4!~K~MH" "utf-8")))
(defn websocket-pipeline [ch result options]
(create-netty-pipeline (:name options)
:decoder (HttpResponseDecoder.)
:encoder (HttpRequestEncoder.)
:handshake (message-stage
(fn [^Channel netty-channel ^HttpResponse rsp]
(if (not= 101 (-> rsp .getStatus .getCode))
(error! result (Exception. "Proper handshake not received."))
(let [pipeline (.getPipeline netty-channel)]
(.replace pipeline "decoder" "websocket-decoder" (WebSocketFrameDecoder.))
(.replace pipeline "encoder" "websocket-encoder" (WebSocketFrameEncoder.))
(.replace pipeline "handshake" "response"
(message-stage
(fn [netty-channel rsp]
(enqueue ch (hixie/from-websocket-frame rsp))
nil)))
(success! result ch)))
nil))))
(defn websocket-client [options]
(let [options (split-url options)
options (merge
{:name (str "websocket-client:" (:host options) ":" (gensym ""))}
options)
result (result-channel)
client (create-client
#(websocket-pipeline % result options)
identity
options)]
(run-pipeline client
(fn [ch]
(enqueue ch
(transform-aleph-request
(websocket-handshake (or (:protocol options) "aleph"))
(:scheme options)
(:server-name options)
(:server-port options)
options))
(run-pipeline result
(fn [_]
(let [in (channel)]
(siphon (map* hixie/to-websocket-frame in) ch)
(on-closed in #(close ch))
(splice ch in))))))))
@@ -12,70 +12,97 @@
[lamina.core]
[aleph.http core]
[aleph.http.server requests responses]
[aleph formats netty]
[gloss core io])
(:require
[aleph.http.websocket.hybi :as hybi]
[aleph.http.websocket.hixie :as hixie])
[aleph formats netty])
(:import
[org.jboss.netty.handler.codec.http
HttpRequest
HttpResponse
DefaultHttpResponse
DefaultHttpRequest
HttpVersion
HttpResponseStatus]
HttpResponseStatus
HttpChunkAggregator]
[org.jboss.netty.handler.codec.http.websocketx
WebSocketFrame
CloseWebSocketFrame
PingWebSocketFrame
PongWebSocketFrame
TextWebSocketFrame
BinaryWebSocketFrame
WebSocketServerHandshaker
WebSocketServerHandshakerFactory]
[org.jboss.netty.buffer
ChannelBuffers]
[org.jboss.netty.channel
Channel
ChannelHandlerContext
ChannelUpstreamHandler]))
(set! *warn-on-reflection* true)
(defn websocket-handshake? [^HttpRequest request]
(let [connection-header (.getHeader request "connection") ]
(and connection-header
(= "upgrade" (.toLowerCase connection-header))
(= "websocket" (.toLowerCase (.getHeader request "upgrade"))))))
(defn create-handshaker [^HttpRequest req]
(->
(WebSocketServerHandshakerFactory.
(str "ws://" (.getHeaders req "Host") (.getUri req))
nil
false)
(.newHandshaker req)))
(defn hybi? [^HttpRequest request]
(.containsHeader request "Sec-WebSocket-Version"))
(defn automatic-reply [^Channel ch ^WebSocketServerHandshaker handshaker ^WebSocketFrame frame]
(cond
(instance? CloseWebSocketFrame frame)
(do
(.close handshaker ch frame)
true)
(defn transform-handshake [^HttpRequest request netty-channel options]
(.setHeader request "content-type" "application/octet-stream")
(assoc (transform-netty-request request netty-channel options)
:websocket true))
(instance? PingWebSocketFrame frame)
(do
(.write ch (PongWebSocketFrame. (.getBinaryData frame)))
true)
(defn websocket-response [^HttpRequest netty-request netty-channel options]
(let [request (transform-handshake netty-request netty-channel options)
response (if (hybi? netty-request)
(hybi/websocket-response request)
(hixie/websocket-response request))]
(transform-aleph-response
(update-in response [:headers]
#(assoc %
"Upgrade" "WebSocket"
"Connection" "Upgrade"))
options)))
:else
false))
(defn unwrap-frame [^WebSocketFrame frame]
(if (instance? TextWebSocketFrame frame)
(.getText ^TextWebSocketFrame frame)
(.getBinaryData frame)))
(defn wrap-frame [x]
(if (string? x)
(TextWebSocketFrame. ^String x)
(BinaryWebSocketFrame. (bytes->channel-buffer x))))
(defn websocket-handshake-handler [handler options]
(let [[inner outer] (channel-pair)
inner (wrap-write-channel inner)]
inner (wrap-write-channel inner)
latch (atom false)
handshaker (atom nil)]
(reify ChannelUpstreamHandler
(handleUpstream [_ ctx evt]
(if-let [msg (message-event evt)]
(let [ch ^Channel (.getChannel ctx)]
(if (websocket-handshake? msg)
(let [handshake (transform-handshake msg ch options)
response (websocket-response msg ch options)]
(if (hybi? msg)
(hybi/update-pipeline handler ctx ch handshake response options)
(hixie/update-pipeline handler ctx ch handshake response options)))
(.sendUpstream ctx evt)))
(let [netty-channel (.getChannel ctx)]
(if-let [msg (message-event evt)]
(if (compare-and-set! latch false true)
(if-let [h (create-handshaker msg)]
(do
(reset! handshaker h)
(-> ctx .getPipeline (.addFirst "workaround" (HttpChunkAggregator. 1)))
(.handshake ^WebSocketServerHandshaker @handshaker netty-channel msg)
(receive-all outer
(fn [[returned-result msg]]
(when msg
(siphon-result
(write-to-channel netty-channel (wrap-frame msg) false)
returned-result))))
(handler
inner
(assoc (transform-netty-request msg netty-channel options)
:websocket true)))
(.sendUpstream ctx evt))
(if @handshaker
(when-not (automatic-reply netty-channel @handshaker msg)
(enqueue outer (unwrap-frame msg)))
(.sendUpstream ctx evt)))
(.sendUpstream ctx evt))))))
(.sendUpstream ctx evt)))))))
View
@@ -171,7 +171,7 @@
(when-not (trace error-probe
{:exception ex
:address (-> evt .getChannel channel-origin)})
(log/error ex)))
(log/error ex "Unhandled error in Netty pipeline.")))
nil))
traffic-handler (fn [probe-suffix]
(let [traffic-probe (canonical-probe [pipeline-name :traffic probe-suffix])]
View
@@ -69,7 +69,7 @@
:error-handler (fn [ex]
(when-not (trace [(:name options) :errors]
{:exception ex, :channel inner})
(log/error ex)))
(log/error ex "Unhandled exception in TCP connection handler.")))
(fn [_]
(receive-in-order outer
(fn [[returned-result msg]]
View
@@ -226,13 +226,6 @@
(let [result (sync-http-request {:url "http://localhost:8080", :method :get, :auto-transform true} 1000)]
(is (= {:foo 1, :bar 2} (:body result))))))
(deftest test-websockets
(with-server (start-http-server (fn [ch _] (siphon ch ch)) {:port 8080, :websocket true})
(let [ch @(websocket-client {:url "http://localhost:8080"})]
(enqueue ch "a" "b" "c")
(is (= ["a" "b" "c"] (channel-seq ch 1000)))
(close ch))))
(deftest test-single-response-close
(is-closed? basic-handler
{:method :get, :url "http://localhost:8080/string", :keep-alive? false}))

0 comments on commit ec66b05

Please sign in to comment.