Skip to content

Commit

Permalink
Merge pull request #371 from kachayev/fix-cleanup
Browse files Browse the repository at this point in the history
Cleaning up code
  • Loading branch information
ztellman committed Mar 16, 2018
2 parents 58b8366 + d2e1f84 commit 63ef527
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 94 deletions.
3 changes: 1 addition & 2 deletions src/aleph/http.clj
Expand Up @@ -54,8 +54,7 @@
will be errors, and a new connection must be created."
[^URI uri options middleware on-closed]
(let [scheme (.getScheme uri)
ssl? (= "https" scheme)
response-executor (:response-executor options)]
ssl? (= "https" scheme)]
(-> (client/http-connection
(InetSocketAddress.
(.getHost uri)
Expand Down
53 changes: 26 additions & 27 deletions src/aleph/http/client.clj
Expand Up @@ -6,7 +6,6 @@
[manifold.stream :as s]
[aleph.http.core :as http]
[aleph.http.multipart :as multipart]
[aleph.http.client-middleware :as middleware]
[aleph.netty :as netty])
(:import
[java.io
Expand All @@ -17,10 +16,8 @@
IDN
URL]
[io.netty.buffer
ByteBuf
Unpooled]
ByteBuf]
[io.netty.handler.codec.http
HttpMessage
HttpClientCodec
DefaultHttpHeaders
HttpHeaders
Expand All @@ -29,15 +26,9 @@
HttpContent
LastHttpContent
FullHttpResponse
DefaultLastHttpContent
DefaultHttpContent
DefaultFullHttpResponse
HttpVersion
HttpResponseStatus
HttpObjectAggregator]
[io.netty.channel
Channel ChannelFuture
ChannelFutureListener
Channel
ChannelHandler ChannelHandlerContext
ChannelPipeline
VoidChannelPromise]
Expand Down Expand Up @@ -96,7 +87,6 @@
(defn raw-client-handler
[response-stream buffer-capacity]
(let [stream (atom nil)
previous-response (atom nil)
complete (atom nil)

handle-response
Expand All @@ -118,7 +108,8 @@
([_ ctx]
(when-let [s @stream]
(s/close! s))
(s/close! response-stream))
(s/close! response-stream)
(.fireChannelInactive ctx))

:channel-read
([_ ctx msg]
Expand All @@ -139,7 +130,10 @@
(netty/put! (.channel ctx) @stream content)
(when (instance? LastHttpContent msg)
(d/success! @complete false)
(s/close! @stream))))))))
(s/close! @stream)))

:else
(.fireChannelRead ctx msg))))))

(defn client-handler
[response-stream ^long buffer-capacity]
Expand Down Expand Up @@ -168,7 +162,8 @@
(s/close! s))
(doseq [b @buffer]
(netty/release b))
(s/close! response-stream))
(s/close! response-stream)
(.fireChannelInactive ctx))

:channel-read
([_ ctx msg]
Expand Down Expand Up @@ -240,7 +235,10 @@

(s/on-closed s #(d/success! c true))

(handle-response @response c s)))))))))))))
(handle-response @response c s))))))))

:else
(.fireChannelRead ctx msg))))))

(defn non-tunnel-proxy? [{:keys [tunnel? user http-headers ssl?]
:as proxy-options}]
Expand Down Expand Up @@ -353,11 +351,9 @@

:user-event-triggered
([this ctx evt]
(if-not (instance? ProxyConnectionEvent evt)
(.fireUserEventTriggered ^ChannelHandlerContext ctx evt)
(do
(.remove (.pipeline ctx) this)
(.fireUserEventTriggered ^ChannelHandlerContext ctx evt))))))
(when (instance? ProxyConnectionEvent evt)
(.remove (.pipeline ctx) this))
(.fireUserEventTriggered ^ChannelHandlerContext ctx evt))))

(defn pipeline-builder
[response-stream
Expand Down Expand Up @@ -416,7 +412,6 @@
raw-stream?
bootstrap-transform
name-resolver
pipeline-transform
keep-alive?
insecure?
ssl-context
Expand Down Expand Up @@ -562,13 +557,15 @@
:channel-inactive
([_ ctx]
(when (realized? d)
(s/close! @d)))
(s/close! @d))
(.fireChannelInactive ctx))

:channel-active
([_ ctx]
(let [ch (.channel ctx)]
(reset! in (netty/buffered-source ch (constantly 1) 16))
(.handshake handshaker ch)))
(.handshake handshaker ch))
(.fireChannelActive ctx))

:channel-read
([_ ctx msg]
Expand Down Expand Up @@ -628,7 +625,10 @@
(swap! desc assoc
:websocket-close-code (.statusCode frame)
:websocket-close-msg (.reasonText frame)))
(netty/close ctx))))
(netty/close ctx))

:else
(.fireChannelRead ctx msg)))
(finally
(netty/release msg)))))]))

Expand All @@ -654,8 +654,7 @@
extensions? false
max-frame-payload 65536
max-frame-size 1048576
compression? false}
:as options}]
compression? false}}]
(let [uri (URI. uri)
scheme (.getScheme uri)
_ (assert (#{"ws" "wss"} scheme) "scheme must be one of 'ws' or 'wss'")
Expand Down
5 changes: 2 additions & 3 deletions src/aleph/http/client_middleware.clj
Expand Up @@ -11,8 +11,7 @@
[manifold.stream :as s]
[manifold.executor :as ex]
[byte-streams :as bs]
[clojure.edn :as edn]
[aleph.http.core :as http])
[clojure.edn :as edn])
(:import
[io.netty.buffer ByteBuf Unpooled]
[io.netty.handler.codec.base64 Base64]
Expand All @@ -25,7 +24,7 @@
DefaultCookie]
[java.io InputStream ByteArrayOutputStream ByteArrayInputStream]
[java.nio.charset StandardCharsets]
[java.net IDN URL URLEncoder URLDecoder UnknownHostException]))
[java.net IDN URL URLEncoder URLDecoder]))

;; Cheshire is an optional dependency, so we check for it at compile time.
(def json-enabled?
Expand Down
30 changes: 12 additions & 18 deletions src/aleph/http/core.clj
Expand Up @@ -13,16 +13,15 @@
Channel
DefaultFileRegion
ChannelFuture
ChannelFutureListener
ChannelHandlerContext]
ChannelFutureListener]
[io.netty.buffer
ByteBuf Unpooled]
ByteBuf]
[java.nio
ByteBuffer]
[io.netty.handler.codec.http
DefaultHttpRequest DefaultLastHttpContent
DefaultHttpResponse DefaultFullHttpRequest
HttpHeaders HttpUtil DefaultHttpHeaders HttpContent
HttpHeaders HttpUtil HttpContent
HttpMethod HttpRequest HttpMessage
HttpResponse HttpResponseStatus
DefaultHttpContent
Expand All @@ -34,10 +33,6 @@
File
RandomAccessFile
Closeable]
[java.net
InetSocketAddress]
[java.util
Map$Entry]
[java.util.concurrent
ConcurrentHashMap]
[java.util.concurrent.atomic
Expand Down Expand Up @@ -199,9 +194,9 @@
:body body
:scheme (if ssl? :https :http)
:aleph/keep-alive? (HttpHeaders/isKeepAlive req)
:server-name (some-> ch ^InetSocketAddress (.localAddress) .getHostName)
:server-port (some-> ch ^InetSocketAddress (.localAddress) .getPort)
:remote-addr (some-> ch ^InetSocketAddress (.remoteAddress) .getAddress .getHostAddress))
:server-name (netty/channel-server-name ch)
:server-port (netty/channel-server-port ch)
:remote-addr (netty/channel-remote-address ch))

(p/def-derived-map NettyResponse [^HttpResponse rsp complete body]
:status (-> rsp .getStatus .code)
Expand All @@ -225,13 +220,13 @@

;;;

(defn try-set-content-length! [^HttpMessage msg ^long length]
(when-not (-> msg .headers (.contains "Content-Length"))
(HttpHeaders/setContentLength msg length)))

(defn has-content-length? [^HttpMessage msg]
(-> msg .headers (.contains "Content-Length")))

(defn try-set-content-length! [^HttpMessage msg ^long length]
(when-not (has-content-length? msg)
(HttpHeaders/setContentLength msg length)))

(def empty-last-content LastHttpContent/EMPTY_LAST_CONTENT)

(let [ary-class (class (byte-array 0))]
Expand Down Expand Up @@ -299,7 +294,7 @@
netty/channel
.closeFuture
netty/wrap-future
(d/chain (fn [_] (s/close! src))))
(d/chain' (fn [_] (s/close! src))))

(let [d (d/deferred)]
(s/on-closed sink
Expand Down Expand Up @@ -380,8 +375,7 @@
(defn send-message
[ch keep-alive? ssl? ^HttpMessage msg body]

(let [^HttpHeaders headers (.headers msg)
f (cond
(let [f (cond

(or
(nil? body)
Expand Down
2 changes: 0 additions & 2 deletions src/aleph/http/multipart.clj
Expand Up @@ -10,8 +10,6 @@
File]
[java.nio
ByteBuffer]
[java.nio.charset
Charset]
[java.net
URLConnection]
[io.netty.util.internal
Expand Down
45 changes: 26 additions & 19 deletions src/aleph/http/server.clj
Expand Up @@ -21,16 +21,15 @@
[aleph.http.core
NettyRequest]
[io.netty.buffer
ByteBuf Unpooled]
ByteBuf]
[io.netty.channel
Channel ChannelFuture ChannelHandlerContext
ChannelFutureListener ChannelHandler
Channel
ChannelHandlerContext
ChannelHandler
ChannelPipeline]
[io.netty.handler.stream ChunkedWriteHandler]
[io.netty.handler.codec.http
DefaultFullHttpResponse
DefaultHttpContent
DefaultLastHttpContent
HttpContent HttpHeaders
HttpContentCompressor
HttpRequest HttpResponse
Expand All @@ -50,9 +49,7 @@
[io.netty.handler.codec.http.websocketx.extensions.compression
WebSocketServerCompressionHandler]
[java.io
Closeable File InputStream RandomAccessFile IOException]
[java.nio
ByteBuffer]
IOException]
[java.net
InetSocketAddress]
[io.netty.util.concurrent
Expand Down Expand Up @@ -318,7 +315,8 @@
(when-let [s @stream]
(s/close! s))
(doseq [b @buffer]
(netty/release b)))
(netty/release b))
(.fireChannelInactive ctx))

:channel-read
([_ ctx msg]
Expand All @@ -332,7 +330,10 @@
(instance? HttpContent msg)
(if (instance? LastHttpContent msg)
(process-last-content ctx msg)
(process-content ctx msg)))))))
(process-content ctx msg))

:else
(.fireChannelRead ctx msg))))))

(defn raw-ring-handler
[ssl? handler rejected-handler executor buffer-capacity]
Expand Down Expand Up @@ -362,7 +363,8 @@
:channel-inactive
([_ ctx]
(when-let [s @stream]
(s/close! s)))
(s/close! s))
(.fireChannelInactive ctx))

:channel-read
([_ ctx msg]
Expand All @@ -380,7 +382,10 @@
(let [content (.content ^HttpContent msg)]
(netty/put! (.channel ctx) @stream content)
(when (instance? LastHttpContent msg)
(s/close! @stream))))))))
(s/close! @stream)))

:else
(.fireChannelRead ctx msg))))))

(defn pipeline-builder
[handler
Expand Down Expand Up @@ -429,12 +434,10 @@
{:keys [port
socket-address
executor
raw-stream?
bootstrap-transform
pipeline-transform
ssl-context
shutdown-executor?
rejected-handler
epoll?
compression?]
:or {bootstrap-transform identity
Expand Down Expand Up @@ -507,13 +510,15 @@
:channel-inactive
([_ ctx]
(s/close! out)
(s/close! in))
(s/close! in)
(.fireChannelInactive ctx))

:channel-read
([_ ctx msg]
(try
(let [ch (.channel ctx)]
(when (instance? WebSocketFrame msg)
(if-not (instance? WebSocketFrame msg)
(.fireChannelRead ctx msg)
(let [^WebSocketFrame msg msg]
(cond

Expand All @@ -531,7 +536,10 @@
(.writeAndFlush ch (PongWebSocketFrame. (netty/acquire (.content msg))))

(instance? CloseWebSocketFrame msg)
(.close handshaker ch (netty/acquire msg))))))
(.close handshaker ch (netty/acquire msg))

:else
(.fireChannelRead ctx msg)))))
(finally
(netty/release msg)))))]))

Expand All @@ -549,8 +557,7 @@
max-frame-size 1048576
allow-extensions? false
compression? false
pipeline-transform identity}
:as options}]
pipeline-transform identity}}]

(-> req ^AtomicBoolean (.websocket?) (.set true))

Expand Down

0 comments on commit 63ef527

Please sign in to comment.