/
ConnectionHandler.hs
330 lines (297 loc) · 13.2 KB
/
ConnectionHandler.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
-- | Implementation of 'ConnectionHandler'
--
module Ouroboros.Network.ConnectionManager.ConnectionHandler
( MuxPromise (..)
, MuxConnectionHandler
, makeConnectionHandler
, MuxConnectionManager
-- * tracing
, ConnectionTrace (..)
) where
import Control.Monad (when)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer
import Control.Tracer (Tracer, contramap, traceWith)
import Data.ByteString.Lazy (ByteString)
import Data.Functor (void)
import Data.Foldable (traverse_)
import Data.Typeable (Typeable)
import Network.Mux hiding (miniProtocolNum)
import Ouroboros.Network.Mux
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Channel (fromChannel)
import Ouroboros.Network.ConnectionId (ConnectionId)
import Ouroboros.Network.ConnectionManager.Types
-- | We place an upper limit of `30s` on the time we wait on receiving an SDU.
-- There is no upper bound on the time we wait when waiting for a new SDU.
-- This makes it possible for miniprotocols to use timeouts that are larger
-- than 30s or wait forever. `30s` for receiving an SDU corresponds to
-- a minimum speed limit of 17kbps.
--
-- ( 8 -- mux header length
-- + 0xffff -- maximum SDU payload
-- )
-- * 8
-- = 524_344 -- maximum bits in an SDU
--
-- 524_344 / 30 / 1024 = 17kbps
--
sduTimeout :: DiffTime
sduTimeout = 30
-- | For handshake, we put a limit of `10s` for sending or receiving a single
-- `MuxSDU`.
--
sduHandshakeTimeout :: DiffTime
sduHandshakeTimeout = 10
-- | States of the connection handler thread.
--
-- * 'MuxRunning' - sucessful Handshake, mux started
-- * 'MuxStopped' - either mux was gracefully stopped (using 'Mux' or by
-- 'killThread'; the latter is done by
-- 'Ouoroboros.Network.ConnectinoManager.withConnectionManager')
-- * 'MuxPromiseHandshakeClientError'
-- - the connection handler thread was running client side
-- of the handshake negotiation, which failed with
-- 'HandshakeException'
-- * 'MuxPromiseHandshakeServerError'
-- - the conneciton hndler thread was running server side
-- of the handshake protocol, which faile with
-- 'HandshakeException'
-- * 'MuxPromiseError' - the multiplexer thrown 'MuxError'.
--
data MuxPromise muxMode verionNumber bytes m a b where
MuxRunning
:: forall muxMode versionNumber bytes m a b.
!(Mux muxMode m)
-> !(MuxBundle muxMode bytes m a b)
-> !(Bundle (StrictTVar m RunOrStop))
-> MuxPromise muxMode versionNumber bytes m a b
MuxStopped
:: MuxPromise muxMode versionNumber bytes m a b
MuxPromiseHandshakeClientError
:: HasInitiator muxMode ~ True
=> !(HandshakeException (HandshakeClientProtocolError versionNumber))
-> MuxPromise muxMode versionNumber bytes m a b
MuxPromiseHandshakeServerError
:: HasResponder muxMode ~ True
=> !(HandshakeException (RefuseReason versionNumber))
-> MuxPromise muxMode versionNumber bytes m a b
MuxPromiseError
:: !SomeException
-> MuxPromise muxMode versionNumber bytes m a b
-- | A predicate which returns 'True' if connection handler thread has stopped running.
--
isConnectionHandlerRunning :: MuxPromise muxMode verionNumber bytes m a b -> Bool
isConnectionHandlerRunning muxPromise =
case muxPromise of
MuxRunning{} -> True
MuxPromiseHandshakeClientError{} -> False
MuxPromiseHandshakeServerError{} -> False
MuxPromiseError{} -> False
MuxStopped -> False
-- | Type of 'ConnectionHandler' implemented in this module.
--
type MuxConnectionHandler muxMode peerAddr versionNumber bytes m a b =
ConnectionHandler muxMode
(ConnectionTrace versionNumber)
peerAddr
(MuxPromise muxMode versionNumber bytes m a b)
m
-- | Type alias for 'ConnectionManager' using 'MuxPromise'.
--
type MuxConnectionManager muxMode socket peerAddr versionNumber bytes m a b =
ConnectionManager muxMode socket peerAddr (MuxPromise muxMode versionNumber bytes m a b) m
-- | To be used as `makeConnectionHandler` field of 'ConnectionManagerArguments'.
--
-- Note: We need to pass `MiniProtocolBundle` what forces us to have two
-- different `ConnectionManager`s: one for `node-to-client` and another for
-- `node-to-node` connections. But this is ok, as these resources are
-- independent.
--
makeConnectionHandler
:: forall peerAddr muxMode versionNumber extra m a b.
( MonadAsync m
, MonadCatch m
, MonadFork m
, MonadThrow (STM m)
, MonadTime m
, MonadTimer m
, MonadMask m
, Ord versionNumber
, Typeable versionNumber
)
=> Tracer m (WithMuxBearer (ConnectionId peerAddr) MuxTrace)
-> SingInitiatorResponderMode muxMode
-- ^ describe whether this is outbound or inbound connection, and bring
-- evidence that we can use mux with it.
-> MiniProtocolBundle muxMode
-> HandshakeArguments (ConnectionId peerAddr) versionNumber extra m
(OuroborosBundle muxMode peerAddr ByteString m a b)
-> MuxConnectionHandler muxMode peerAddr versionNumber ByteString m a b
makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArguments =
ConnectionHandler $
case singMuxMode of
SInitiatorMode -> WithInitiatorMode outboundConnectionHandler
SResponderMode -> WithResponderMode inboundConnectionHandler
SInitiatorResponderMode -> WithInitiatorResponderMode outboundConnectionHandler
inboundConnectionHandler
where
outboundConnectionHandler
:: HasInitiator muxMode ~ True
=> ConnectionHandlerFn (ConnectionTrace versionNumber)
peerAddr
(MuxPromise muxMode versionNumber ByteString m a b)
m
outboundConnectionHandler muxPromiseVar tracer connectionId muxBearer =
exceptionHandling muxPromiseVar tracer $ do
traceWith tracer ConnectionStart
hsResult <- runHandshakeClient (muxBearer sduHandshakeTimeout)
connectionId
handshakeArguments
case hsResult of
Left !err -> do
atomically $ writeTVar muxPromiseVar (Promised (MuxPromiseHandshakeClientError err))
traceWith tracer (ConnectionTraceHandshakeClientError err)
Right app -> do
traceWith tracer ConnectionTraceHandshakeSuccess
!scheduleStopVarBundle
<- (\a b c -> Bundle (WithHot a) (WithWarm b) (WithEstablished c))
<$> newTVarM Run
<*> newTVarM Run
<*> newTVarM Run
let muxApp
= mkMuxApplicationBundle
connectionId
(readTVar <$> scheduleStopVarBundle)
app
!mux <- newMux miniProtocolBundle
atomically $ writeTVar muxPromiseVar
(Promised
(MuxRunning mux
muxApp
scheduleStopVarBundle))
-- For outbound connections we need to on demand start receivers.
-- This is, in a sense, a no man land: the server will not act, as
-- it's only reacting to inbound connections, and it also does not
-- belong to initiator (peer-2-peer governor).
case (singMuxMode, muxApp) of
(SInitiatorResponderMode,
Bundle (WithHot hotPtcls)
(WithWarm warmPtcls)
(WithEstablished establishedPtcls)) -> do
-- TODO: #2221 restart responders
traverse_ (runResponder mux) hotPtcls
traverse_ (runResponder mux) warmPtcls
traverse_ (runResponder mux) establishedPtcls
_ -> pure ()
runMux (WithMuxBearer connectionId `contramap` muxTracer)
mux (muxBearer sduTimeout)
inboundConnectionHandler
:: HasResponder muxMode ~ True
=> ConnectionHandlerFn (ConnectionTrace versionNumber)
peerAddr
(MuxPromise muxMode versionNumber ByteString m a b)
m
inboundConnectionHandler muxPromiseVar tracer connectionId muxBearer =
exceptionHandling muxPromiseVar tracer $ do
traceWith tracer ConnectionStart
hsResult <- runHandshakeServer (muxBearer sduHandshakeTimeout)
connectionId
(\_ _ _ -> Accept) -- we accept all connections
handshakeArguments
case hsResult of
Left !err -> do
atomically $ writeTVar muxPromiseVar (Promised (MuxPromiseHandshakeServerError err))
traceWith tracer (ConnectionTraceHandshakeServerError err)
Right app -> do
traceWith tracer ConnectionTraceHandshakeSuccess
!scheduleStopVarBundle
<- (\a b c -> Bundle (WithHot a) (WithWarm b) (WithEstablished c))
<$> newTVarM Run
<*> newTVarM Run
<*> newTVarM Run
let muxApp
= mkMuxApplicationBundle
connectionId
(readTVar <$> scheduleStopVarBundle)
app
!mux <- newMux miniProtocolBundle
atomically $ writeTVar muxPromiseVar
(Promised
(MuxRunning mux
muxApp
scheduleStopVarBundle))
runMux (WithMuxBearer connectionId `contramap` muxTracer)
mux (muxBearer sduTimeout)
-- minimal error handling, just to make adequate changes to
-- `muxPromiseVar`; Classification of errors is done by
-- 'withConnectionManager' when the connection handler thread is started..
exceptionHandling :: forall x.
StrictTVar m
(Promise
(MuxPromise muxMode versionNumber ByteString m a b))
-> Tracer m (ConnectionTrace versionNumber)
-> m x -> m x
exceptionHandling muxPromiseVar tracer io =
io
-- the default for normal exit and unhandled error is to write
-- `MusStopped`, but we don't want to override handshake errors.
`finally` do
atomically $ do
st <- readTVar muxPromiseVar
when (case st of
Promised muxPromise -> isConnectionHandlerRunning muxPromise
Empty -> True)
$ writeTVar muxPromiseVar (Promised MuxStopped)
traceWith tracer ConnectionStopped
-- if 'MuxError' was thrown by the conneciton handler, let the other side
-- know.
`catch` \(e :: SomeException) -> do
atomically (writeTVar muxPromiseVar (Promised (MuxPromiseError e)))
throwM e
runResponder :: Mux InitiatorResponderMode m
-> MiniProtocol InitiatorResponderMode ByteString m a b -> m ()
runResponder mux MiniProtocol {
miniProtocolNum,
miniProtocolRun
} =
case miniProtocolRun of
InitiatorAndResponderProtocol _ responder ->
void $
runMiniProtocol
mux miniProtocolNum
ResponderDirection
StartOnDemand
(runMuxPeer responder . fromChannel)
--
-- Tracing
--
-- | 'ConnectionTrace' is embedded into 'ConnectionManagerTrace' with
-- 'Ouroboros.Network.ConnectionMamanger.Types.ConnectionTrace' constructor.
--
-- TODO: when 'Handshake' will get it's own tracer, independent of 'Mux', it
-- should be embedded into 'ConnectionTrace'.
--
data ConnectionTrace versionNumber =
ConnectionStart
| ConnectionTraceHandshakeSuccess
| ConnectionTraceHandshakeClientError
!(HandshakeException (HandshakeClientProtocolError versionNumber))
| ConnectionTraceHandshakeServerError
!(HandshakeException (RefuseReason versionNumber))
| ConnectionStopped
deriving Show