/
tcp.clj
170 lines (152 loc) · 6.41 KB
/
tcp.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
(ns aleph.tcp
(:require
[potemkin :as p]
[manifold.stream :as s]
[manifold.deferred :as d]
[aleph.netty :as netty]
[clojure.tools.logging :as log])
(:import
[java.io
IOException]
[java.net
InetSocketAddress]
[io.netty.channel
Channel
ChannelHandler
ChannelPipeline]
[io.netty.handler.ssl
SslHandler]))
(p/def-derived-map TcpConnection [^Channel ch]
:server-name (netty/channel-server-name ch)
:server-port (netty/channel-server-port ch)
:remote-addr (netty/channel-remote-address ch)
:ssl-session (some-> ch ^ChannelPipeline (.pipeline) ^SslHandler (.get "ssl-handler") .engine .getSession))
(alter-meta! #'->TcpConnection assoc :private true)
(defn- ^ChannelHandler server-channel-handler
[handler {:keys [raw-stream?] :as options}]
(let [in (atom nil)]
(netty/channel-inbound-handler
:exception-caught
([_ ctx ex]
(when-not (instance? IOException ex)
(log/warn ex "error in TCP server")))
:channel-inactive
([_ ctx]
(s/close! @in)
(.fireChannelInactive ctx))
:channel-active
([_ ctx]
(let [ch (.channel ctx)]
(handler
(doto
(s/splice
(netty/sink ch true netty/to-byte-buf)
(reset! in (netty/source ch)))
(reset-meta! {:aleph/channel ch}))
(->TcpConnection ch)))
(.fireChannelActive ctx))
:channel-read
([_ ctx msg]
(netty/put! (.channel ctx) @in
(if raw-stream?
msg
(netty/release-buf->array msg)))))))
(defn start-server
"Takes a two-arg handler function which for each connection will be called with a duplex
stream and a map containing information about the client. Returns a server object that can
be shutdown via `java.io.Closeable.close()`, and whose port can be discovered via `aleph.netty.port`.
|:---|:-----
| `port` | the port the server will bind to. If `0`, the server will bind to a random port.
| `socket-address` | a `java.net.SocketAddress` specifying both the port and interface to bind to.
| `ssl-context` | an `io.netty.handler.ssl.SslContext` object. If a self-signed certificate is all that's required, `(aleph.netty/self-signed-ssl-context)` will suffice.
| `bootstrap-transform` | a function that takes an `io.netty.bootstrap.ServerBootstrap` object, which represents the server, and modifies it.
| `pipeline-transform` | a function that takes an `io.netty.channel.ChannelPipeline` object, which represents a connection, and modifies it.
| `raw-stream?` | if true, messages from the stream will be `io.netty.buffer.ByteBuf` objects rather than byte-arrays. This will minimize copying, but means that care must be taken with Netty's buffer reference counting. Only recommended for advanced users."
[handler
{:keys [port socket-address ssl-context bootstrap-transform pipeline-transform epoll?]
:or {bootstrap-transform identity
pipeline-transform identity
epoll? false}
:as options}]
(netty/start-server
(fn [^ChannelPipeline pipeline]
(.addLast pipeline "handler" (server-channel-handler handler options))
(pipeline-transform pipeline))
ssl-context
bootstrap-transform
nil
(if socket-address
socket-address
(InetSocketAddress. port))
epoll?))
(defn- ^ChannelHandler client-channel-handler
[{:keys [raw-stream?]}]
(let [d (d/deferred)
in (atom nil)]
[d
(netty/channel-inbound-handler
:exception-caught
([_ ctx ex]
(when-not (d/error! d ex)
(log/warn ex "error in TCP client")))
:channel-inactive
([_ ctx]
(s/close! @in)
(.fireChannelInactive ctx))
:channel-active
([_ ctx]
(let [ch (.channel ctx)]
(d/success! d
(doto
(s/splice
(netty/sink ch true netty/to-byte-buf)
(reset! in (netty/source ch)))
(reset-meta! {:aleph/channel ch}))))
(.fireChannelActive ctx))
:channel-read
([_ ctx msg]
(netty/put! (.channel ctx) @in
(if raw-stream?
msg
(netty/release-buf->array msg))))
:close
([_ ctx promise]
(.close ctx promise)
(d/error! d (IllegalStateException. "unable to connect"))))]))
(defn client
"Given a host and port, returns a deferred which yields a duplex stream that can be used
to communicate with the server.
|:---|:----
| `host` | the hostname of the server.
| `port` | the port of the server.
| `remote-address` | a `java.net.SocketAddress` specifying the server's address.
| `local-address` | a `java.net.SocketAddress` specifying the local network interface to use.
| `ssl-context` | an explicit `io.netty.handler.ssl.SslHandler` to use. Defers to `ssl?` and `insecure?` configuration if omitted.
| `ssl?` | if true, the client attempts to establish a secure connection with the server.
| `insecure?` | if true, the client will ignore the server's certificate.
| `bootstrap-transform` | a function that takes an `io.netty.bootstrap.Bootstrap` object, which represents the client, and modifies it.
| `pipeline-transform` | a function that takes an `io.netty.channel.ChannelPipeline` object, which represents a connection, and modifies it.
| `raw-stream?` | if true, messages from the stream will be `io.netty.buffer.ByteBuf` objects rather than byte-arrays. This will minimize copying, but means that care must be taken with Netty's buffer reference counting. Only recommended for advanced users."
[{:keys [host port remote-address local-address ssl-context ssl? insecure? pipeline-transform bootstrap-transform epoll?]
:or {bootstrap-transform identity
epoll? false}
:as options}]
(let [[s handler] (client-channel-handler options)]
(->
(netty/create-client
(fn [^ChannelPipeline pipeline]
(.addLast pipeline "handler" ^ChannelHandler handler)
(when pipeline-transform
(pipeline-transform pipeline)))
(if ssl-context
ssl-context
(when ssl?
(if insecure?
(netty/insecure-ssl-client-context)
(netty/ssl-client-context))))
bootstrap-transform
(or remote-address (InetSocketAddress. ^String host (int port)))
local-address
epoll?)
(d/catch' #(d/error! s %)))
s))