Skip to content

Commit

Permalink
add timeouts to HTTP calls, and final work on hybi websocket protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
ztellman committed Sep 28, 2011
1 parent 143f43b commit 859ea31
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
6 changes: 4 additions & 2 deletions src/aleph/http/server/requests.clj
Expand Up @@ -73,7 +73,8 @@
#(decode-aleph-message % options)))

(defn request-handler [handler options]
(let [f (executor
(let [timeout (:timeout options)
f (executor
(:thread-pool options)
(fn [req]
(let [ch (wrap-response-channel (constant-channel))]
Expand All @@ -90,7 +91,7 @@
options)]
(fn [netty-channel req]
(let [req (transform-netty-request req netty-channel options)]
(f [req])))))
(f [req] (when timeout {:timeout (timeout req)}))))))

(defn consume-request-stream [netty-channel in handler options]
(let [[a b] (channel-pair)
Expand All @@ -99,6 +100,7 @@
(receive c #(enqueue ch (assoc % :keep-alive? (:keep-alive? req))))
(run-pipeline (dissoc req :keep-alive?)
:error-handler (fn [_])
:timeout (when-let [timeout (:timeout options)] (timeout req))
#(pre-process-request % options)
#(handler c %))))]
(run-pipeline in
Expand Down
17 changes: 13 additions & 4 deletions src/aleph/http/websocket/protocol.clj
Expand Up @@ -53,7 +53,7 @@
(compile-frame
(header header-codec

(fn [{:keys [fin opcode mask length] :as m}]
(fn [{:keys [fin opcode mask length]}]
(case length
126 (body-codec mask :short)
127 (body-codec mask :long)
Expand All @@ -64,14 +64,14 @@
:mask :int32
:data (finite-block length))))))

(fn [{:keys [data type final?]}]
(fn [{:keys [data mask type final?]}]
(let [byte-length (byte-count data)]
{:fin (or final? true)
:rsv1 false
:rsv2 false
:rsv3 false
:opcode (msg-type->opcode (or type :binary))
:mask false
:mask (boolean mask)
:length (if (< byte-length 126)
byte-length
127)})))))
Expand Down Expand Up @@ -115,15 +115,24 @@

(defn encode-frame [data]
(encode websocket-frame
(if (map? data)
(cond

(map? data)
data

(string? data)
{:type :text
:data (bytes->byte-buffers data)}

:else
{:type :binary
:data (bytes->byte-buffers data)})))

(defn wrap-websocket-channel [src]
(let [dst (channel)
frames (decode-channel src websocket-frame)]
(run-pipeline nil
:error-handler (fn [_] (close dst))
(read-merge #(read-channel frames)
(fn [old-frame new-frame]
(let [new-frame (pre-process-frame new-frame)]
Expand Down

0 comments on commit 859ea31

Please sign in to comment.