Skip to content

Commit

Permalink
prefer receive-all to receive-in order, and make sure we wait all wri…
Browse files Browse the repository at this point in the history
…tes on a streaming response to complete
  • Loading branch information
ztellman committed Jan 25, 2012
1 parent ec66b05 commit 64dfb16
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 26 deletions.
5 changes: 3 additions & 2 deletions src/aleph/http/server/responses.clj
Expand Up @@ -131,8 +131,9 @@
(enqueue returned-result (write-to-channel netty-channel msg false)) (enqueue returned-result (write-to-channel netty-channel msg false))
nil))) nil)))
(fn [_] (fn [_]
(enqueue-and-close returned-result (let [final-write (write-to-channel netty-channel HttpChunk/LAST_CHUNK false)]
(write-to-channel netty-channel HttpChunk/LAST_CHUNK false)))))) (enqueue-and-close returned-result final-write)
final-write)))))


;;; ;;;


Expand Down
20 changes: 11 additions & 9 deletions src/aleph/netty.clj
Expand Up @@ -489,6 +489,7 @@
(let [write-queue (create-write-queue (let [write-queue (create-write-queue
netty-channel netty-channel
#(write-to-channel netty-channel nil true))] #(write-to-channel netty-channel nil true))]

(run-pipeline (.getCloseFuture netty-channel) (run-pipeline (.getCloseFuture netty-channel)
wrap-netty-channel-future wrap-netty-channel-future
(fn [_] (fn [_]
Expand All @@ -498,15 +499,16 @@
(.close channel-group) (.close channel-group)
wrap-netty-channel-group-future))) wrap-netty-channel-group-future)))
(.add channel-group netty-channel) (.add channel-group netty-channel)
(run-pipeline
(receive-in-order outer (receive-all outer
(fn [[returned-result msg]] (fn [[returned-result msg]]
(enqueue write-queue (when returned-result
(let [result (write-to-channel netty-channel (send-encoder msg) false)] (enqueue write-queue
(siphon-result result returned-result) (let [result (write-to-channel netty-channel (send-encoder msg) false)]
result)))) (siphon-result result returned-result)
(fn [_] result)))))
(close write-queue))) (on-drained outer #(close write-queue))

inner))))) inner)))))


;;; ;;;
Expand Down
24 changes: 9 additions & 15 deletions src/aleph/tcp.clj
Expand Up @@ -65,21 +65,15 @@
netty-channel netty-channel
#(write-to-channel netty-channel nil true))] #(write-to-channel netty-channel nil true))]
(handler inner {:remote-addr (.getRemoteAddress netty-channel)}) (handler inner {:remote-addr (.getRemoteAddress netty-channel)})
(run-pipeline nil (receive-all outer
:error-handler (fn [ex] (fn [[returned-result msg]]
(when-not (trace [(:name options) :errors] (when returned-result
{:exception ex, :channel inner}) (enqueue write-queue
(log/error ex "Unhandled exception in TCP connection handler."))) (let [result (write-to-channel netty-channel (send-encoder msg) false)]
(fn [_] (siphon-result result returned-result)
(receive-in-order outer result)))
(fn [[returned-result msg]] nil))
(enqueue write-queue (on-drained outer #(close write-queue))))))
(let [result (write-to-channel netty-channel (send-encoder msg) false)]
(siphon-result result returned-result)
result))
nil)))
(fn [_]
(close write-queue)))))))
:channel-close (upstream-stage :channel-close (upstream-stage
(channel-close-stage (channel-close-stage
(fn [_] (fn [_]
Expand Down

0 comments on commit 64dfb16

Please sign in to comment.