diff --git a/src/aleph/http/client.clj b/src/aleph/http/client.clj index de8d9d35..d584cba2 100644 --- a/src/aleph/http/client.clj +++ b/src/aleph/http/client.clj @@ -96,7 +96,7 @@ complete body)))] - (netty/channel-handler + (netty/channel-inbound-handler :exception-caught ([_ ctx ex] @@ -148,7 +148,7 @@ complete body)))] - (netty/channel-handler + (netty/channel-inbound-handler :exception-caught ([_ ctx ex] @@ -288,7 +288,7 @@ "Consider setting 'tunnel?' to 'true' or omit it at all")))) (if (non-tunnel-proxy? options') - (netty/channel-handler + (netty/channel-outbound-handler :connect ([_ ctx remote-address local-address promise] (.connect ^ChannelHandlerContext ctx address local-address promise))) @@ -536,7 +536,7 @@ [d - (netty/channel-handler + (netty/channel-inbound-handler :exception-caught ([_ ctx ex] diff --git a/src/aleph/http/server.clj b/src/aleph/http/server.clj index 03f770e1..6eae6e17 100644 --- a/src/aleph/http/server.clj +++ b/src/aleph/http/server.clj @@ -305,7 +305,7 @@ (handle-request ctx @request s))))))))] - (netty/channel-handler + (netty/channel-inbound-handler :exception-caught ([_ ctx ex] @@ -355,7 +355,7 @@ @previous-response body (HttpHeaders/isKeepAlive req))))] - (netty/channel-handler + (netty/channel-inbound-handler :exception-caught ([_ ctx ex] @@ -511,7 +511,7 @@ (s/splice out in) (reset-meta! {:aleph/channel ch})) - (netty/channel-handler + (netty/channel-inbound-handler :exception-caught ([_ ctx ex] diff --git a/src/aleph/netty.clj b/src/aleph/netty.clj index 14dd13cd..4d3d32bf 100644 --- a/src/aleph/netty.clj +++ b/src/aleph/netty.clj @@ -482,6 +482,96 @@ ~@(or (:write handlers) `([_# ctx# msg# promise#] (.write ctx# msg# promise#)))) + (flush + ~@(or (:flush handlers) + `([_# ctx#] + (.flush ctx#)))))) + +(defmacro channel-inbound-handler + [& {:as handlers}] + `(reify + ChannelHandler + ChannelInboundHandler + + (handlerAdded + ~@(or (:handler-added handlers) `([_# _#]))) + (handlerRemoved + ~@(or (:handler-removed handlers) `([_# _#]))) + (exceptionCaught + ~@(or (:exception-caught handlers) + `([_# ctx# cause#] + (.fireExceptionCaught ctx# cause#)))) + (channelRegistered + ~@(or (:channel-registered handlers) + `([_# ctx#] + (.fireChannelRegistered ctx#)))) + (channelUnregistered + ~@(or (:channel-unregistered handlers) + `([_# ctx#] + (.fireChannelUnregistered ctx#)))) + (channelActive + ~@(or (:channel-active handlers) + `([_# ctx#] + (.fireChannelActive ctx#)))) + (channelInactive + ~@(or (:channel-inactive handlers) + `([_# ctx#] + (.fireChannelInactive ctx#)))) + (channelRead + ~@(or (:channel-read handlers) + `([_# ctx# msg#] + (.fireChannelRead ctx# msg#)))) + (channelReadComplete + ~@(or (:channel-read-complete handlers) + `([_# ctx#] + (.fireChannelReadComplete ctx#)))) + (userEventTriggered + ~@(or (:user-event-triggered handlers) + `([_# ctx# evt#] + (.fireUserEventTriggered ctx# evt#)))) + (channelWritabilityChanged + ~@(or (:channel-writability-changed handlers) + `([_# ctx#] + (.fireChannelWritabilityChanged ctx#)))))) + +(defmacro channel-outbound-handler + [& {:as handlers}] + `(reify + ChannelHandler + ChannelOutboundHandler + + (handlerAdded + ~@(or (:handler-added handlers) `([_# _#]))) + (handlerRemoved + ~@(or (:handler-removed handlers) `([_# _#]))) + (exceptionCaught + ~@(or (:exception-caught handlers) + `([_# ctx# cause#] + (.fireExceptionCaught ctx# cause#)))) + (bind + ~@(or (:bind handlers) + `([_# ctx# local-address# promise#] + (.bind ctx# local-address# promise#)))) + (connect + ~@(or (:connect handlers) + `([_# ctx# remote-address# local-address# promise#] + (.connect ctx# remote-address# local-address# promise#)))) + (disconnect + ~@(or (:disconnect handlers) + `([_# ctx# promise#] + (.disconnect ctx# promise#)))) + (close + ~@(or (:close handlers) + `([_# ctx# promise#] + (.close ctx# promise#)))) + (read + ~@(or (:read handlers) + `([_# ctx#] + (.read ctx#)))) + (write + ~@(or (:write handlers) + `([_# ctx# msg# promise#] + (.write ctx# msg# promise#)))) (flush ~@(or (:flush handlers) `([_# ctx#] diff --git a/src/aleph/tcp.clj b/src/aleph/tcp.clj index 70253760..28745ffa 100644 --- a/src/aleph/tcp.clj +++ b/src/aleph/tcp.clj @@ -28,7 +28,7 @@ (defn- ^ChannelHandler server-channel-handler [handler {:keys [raw-stream?] :as options}] (let [in (atom nil)] - (netty/channel-handler + (netty/channel-inbound-handler :exception-caught ([_ ctx ex] @@ -95,7 +95,7 @@ in (atom nil)] [d - (netty/channel-handler + (netty/channel-inbound-handler :exception-caught ([_ ctx ex] diff --git a/src/aleph/udp.clj b/src/aleph/udp.clj index 9301b14a..738376ef 100644 --- a/src/aleph/udp.clj +++ b/src/aleph/udp.clj @@ -51,7 +51,7 @@ (.channel (if epoll? EpollDatagramChannel NioDatagramChannel)) (.option ChannelOption/SO_BROADCAST (boolean broadcast?)) (.handler - (netty/channel-handler + (netty/channel-inbound-handler :exception-caught ([_ ctx ex] (when-not (d/error! d ex)