Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Revert "fix channels closing issue"
This reverts commit 2eb5ddf.
  • Loading branch information
dajac committed Jan 25, 2012
1 parent 2eb5ddf commit 0684a82
Showing 1 changed file with 11 additions and 22 deletions.
33 changes: 11 additions & 22 deletions src/aleph/netty.clj
Expand Up @@ -24,8 +24,6 @@
ChannelHandlerContext ChannelHandlerContext
MessageEvent MessageEvent
ChannelEvent ChannelEvent
ChannelState
ChannelStateEvent
ExceptionEvent ExceptionEvent
ChannelPipelineFactory ChannelPipelineFactory
Channels Channels
Expand Down Expand Up @@ -295,21 +293,10 @@
[^ChannelGroup channel-group options [^ChannelGroup channel-group options
connections-probe refuse-connections? connections-probe refuse-connections?
pipeline-fn & args] pipeline-fn & args]
(let [connection-count (atom 0) (let [connection-count (atom 0)]
close-callback (or (options :close-callback) (fn []))]
(reify ChannelPipelineFactory (reify ChannelPipelineFactory
(getPipeline [_] (getPipeline [_]
(let [pipeline ^ChannelPipeline (apply pipeline-fn args)] (let [pipeline ^ChannelPipeline (apply pipeline-fn args)]
(.addLast pipeline
"close-handler"
(upstream-stage
(fn [evt]
(when
(and (instance? ChannelStateEvent evt)
(= (.getState evt) ChannelState/OPEN)
(= (.getValue evt) false))
(close-callback))
evt)))
(.addFirst pipeline (.addFirst pipeline
"channel-listener" "channel-listener"
(if (and refuse-connections? @refuse-connections?) (if (and refuse-connections? @refuse-connections?)
Expand Down Expand Up @@ -464,14 +451,7 @@
(.setPipelineFactory client (.setPipelineFactory client
(create-pipeline-factory (create-pipeline-factory
channel-group channel-group
(merge options options
{:close-callback
(fn []
(close inner)
(close outer)
(run-pipeline (.close channel-group)
wrap-netty-channel-group-future
(fn [_] (future (.releaseExternalResources client)))))})
(canonical-probe [(:name options) :connections]) (canonical-probe [(:name options) :connections])
nil nil
pipeline-fn pipeline-fn
Expand All @@ -487,6 +467,15 @@
(let [write-queue (create-write-queue (let [write-queue (create-write-queue
netty-channel netty-channel
#(write-to-channel netty-channel nil true))] #(write-to-channel netty-channel nil true))]
(run-pipeline (.getCloseFuture netty-channel)
wrap-netty-channel-future
(fn [_]
(close inner)
(close outer)
(run-pipeline
(.close channel-group)
wrap-netty-channel-group-future
(fn [_] (future (.releaseExternalResources client))))))
(.add channel-group netty-channel) (.add channel-group netty-channel)
(run-pipeline (run-pipeline
(receive-in-order outer (receive-in-order outer
Expand Down

0 comments on commit 0684a82

Please sign in to comment.