From 859ea319a4daf7dc83c724d448f038724cf978f7 Mon Sep 17 00:00:00 2001 From: ztellman Date: Wed, 28 Sep 2011 13:45:12 -0400 Subject: [PATCH] add timeouts to HTTP calls, and final work on hybi websocket protocol --- src/aleph/http/server/requests.clj | 6 ++++-- src/aleph/http/websocket/protocol.clj | 17 +++++++++++++---- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/aleph/http/server/requests.clj b/src/aleph/http/server/requests.clj index 2e76e227..1e0b9f94 100644 --- a/src/aleph/http/server/requests.clj +++ b/src/aleph/http/server/requests.clj @@ -73,7 +73,8 @@ #(decode-aleph-message % options))) (defn request-handler [handler options] - (let [f (executor + (let [timeout (:timeout options) + f (executor (:thread-pool options) (fn [req] (let [ch (wrap-response-channel (constant-channel))] @@ -90,7 +91,7 @@ options)] (fn [netty-channel req] (let [req (transform-netty-request req netty-channel options)] - (f [req]))))) + (f [req] (when timeout {:timeout (timeout req)})))))) (defn consume-request-stream [netty-channel in handler options] (let [[a b] (channel-pair) @@ -99,6 +100,7 @@ (receive c #(enqueue ch (assoc % :keep-alive? (:keep-alive? req)))) (run-pipeline (dissoc req :keep-alive?) :error-handler (fn [_]) + :timeout (when-let [timeout (:timeout options)] (timeout req)) #(pre-process-request % options) #(handler c %))))] (run-pipeline in diff --git a/src/aleph/http/websocket/protocol.clj b/src/aleph/http/websocket/protocol.clj index fd2e7ad3..c963a38d 100644 --- a/src/aleph/http/websocket/protocol.clj +++ b/src/aleph/http/websocket/protocol.clj @@ -53,7 +53,7 @@ (compile-frame (header header-codec - (fn [{:keys [fin opcode mask length] :as m}] + (fn [{:keys [fin opcode mask length]}] (case length 126 (body-codec mask :short) 127 (body-codec mask :long) @@ -64,14 +64,14 @@ :mask :int32 :data (finite-block length)))))) - (fn [{:keys [data type final?]}] + (fn [{:keys [data mask type final?]}] (let [byte-length (byte-count data)] {:fin (or final? true) :rsv1 false :rsv2 false :rsv3 false :opcode (msg-type->opcode (or type :binary)) - :mask false + :mask (boolean mask) :length (if (< byte-length 126) byte-length 127)}))))) @@ -115,15 +115,24 @@ (defn encode-frame [data] (encode websocket-frame - (if (map? data) + (cond + + (map? data) data + + (string? data) {:type :text + :data (bytes->byte-buffers data)} + + :else + {:type :binary :data (bytes->byte-buffers data)}))) (defn wrap-websocket-channel [src] (let [dst (channel) frames (decode-channel src websocket-frame)] (run-pipeline nil + :error-handler (fn [_] (close dst)) (read-merge #(read-channel frames) (fn [old-frame new-frame] (let [new-frame (pre-process-frame new-frame)]