Skip to content

Commit

Permalink
connection-handler: include ConnectionId in MuxPromise
Browse files Browse the repository at this point in the history
It is useful to get the 'ConnectionId'.  This will allow to have better
trace messages in the governor integration.
  • Loading branch information
coot committed Aug 3, 2020
1 parent d04048d commit 9dc0330
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,35 +83,36 @@ sduHandshakeTimeout = 10
-- 'HandshakeException'
-- * 'MuxPromiseError' - the multiplexer thrown 'MuxError'.
--
data MuxPromise muxMode verionNumber bytes m a b where
data MuxPromise (muxMode :: MuxMode) peerAddr verionNumber bytes m a b where
MuxRunning
:: forall muxMode versionNumber bytes m a b.
!(Mux muxMode m)
:: forall muxMode peerAddr versionNumber bytes m a b.
!(ConnectionId peerAddr)
-> !(Mux muxMode m)
-> !(MuxBundle muxMode bytes m a b)
-> !(Bundle (StrictTVar m ControlMessage))
-> MuxPromise muxMode versionNumber bytes m a b
-> MuxPromise muxMode peerAddr versionNumber bytes m a b

MuxStopped
:: MuxPromise muxMode versionNumber bytes m a b
:: MuxPromise muxMode peerAddr versionNumber bytes m a b

MuxPromiseHandshakeClientError
:: HasInitiator muxMode ~ True
=> !(HandshakeException (HandshakeClientProtocolError versionNumber))
-> MuxPromise muxMode versionNumber bytes m a b
-> MuxPromise muxMode peerAddr versionNumber bytes m a b

MuxPromiseHandshakeServerError
:: HasResponder muxMode ~ True
=> !(HandshakeException (RefuseReason versionNumber))
-> MuxPromise muxMode versionNumber bytes m a b
-> MuxPromise muxMode peerAddr versionNumber bytes m a b

MuxPromiseError
:: !SomeException
-> MuxPromise muxMode versionNumber bytes m a b
-> MuxPromise muxMode peerAddr versionNumber bytes m a b


-- | A predicate which returns 'True' if connection handler thread has stopped running.
--
isConnectionHandlerRunning :: MuxPromise muxMode verionNumber bytes m a b -> Bool
isConnectionHandlerRunning :: MuxPromise muxMode peerAddr verionNumber bytes m a b -> Bool
isConnectionHandlerRunning muxPromise =
case muxPromise of
MuxRunning{} -> True
Expand All @@ -127,13 +128,14 @@ type MuxConnectionHandler muxMode peerAddr versionNumber bytes m a b =
ConnectionHandler muxMode
(ConnectionTrace versionNumber)
peerAddr
(MuxPromise muxMode versionNumber bytes m a b)
(MuxPromise muxMode peerAddr versionNumber bytes m a b)
m

-- | Type alias for 'ConnectionManager' using 'MuxPromise'.
--
type MuxConnectionManager muxMode socket peerAddr versionNumber bytes m a b =
ConnectionManager muxMode socket peerAddr (MuxPromise muxMode versionNumber bytes m a b) m
ConnectionManager muxMode socket peerAddr
(MuxPromise muxMode peerAddr versionNumber bytes m a b) m

-- | To be used as `makeConnectionHandler` field of 'ConnectionManagerArguments'.
--
Expand Down Expand Up @@ -174,7 +176,7 @@ makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArgument
:: HasInitiator muxMode ~ True
=> ConnectionHandlerFn (ConnectionTrace versionNumber)
peerAddr
(MuxPromise muxMode versionNumber ByteString m a b)
(MuxPromise muxMode peerAddr versionNumber ByteString m a b)
m
outboundConnectionHandler muxPromiseVar tracer connectionId muxBearer =
exceptionHandling muxPromiseVar tracer $ do
Expand All @@ -201,7 +203,8 @@ makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArgument
!mux <- newMux miniProtocolBundle
atomically $ writeTVar muxPromiseVar
(Promised
(MuxRunning mux
(MuxRunning connectionId
mux
muxApp
scheduleStopVarBundle))

Expand Down Expand Up @@ -229,7 +232,7 @@ makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArgument
:: HasResponder muxMode ~ True
=> ConnectionHandlerFn (ConnectionTrace versionNumber)
peerAddr
(MuxPromise muxMode versionNumber ByteString m a b)
(MuxPromise muxMode peerAddr versionNumber ByteString m a b)
m
inboundConnectionHandler muxPromiseVar tracer connectionId muxBearer =
exceptionHandling muxPromiseVar tracer $ do
Expand Down Expand Up @@ -257,7 +260,8 @@ makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArgument
!mux <- newMux miniProtocolBundle
atomically $ writeTVar muxPromiseVar
(Promised
(MuxRunning mux
(MuxRunning connectionId
mux
muxApp
scheduleStopVarBundle))
runMux (WithMuxBearer connectionId `contramap` muxTracer)
Expand All @@ -269,7 +273,7 @@ makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArgument
exceptionHandling :: forall x.
StrictTVar m
(Promise
(MuxPromise muxMode versionNumber ByteString m a b))
(MuxPromise muxMode peerAddr versionNumber ByteString m a b))
-> Tracer m (ConnectionTrace versionNumber)
-> m x -> m x
exceptionHandling muxPromiseVar tracer io =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ run ServerArguments {

monitoring :: TVar m
(StrictSeq
(STM m (MuxPromise muxMode verionNumber ByteString m a b)))
(STM m (MuxPromise muxMode peerAddr verionNumber ByteString m a b)))
-> m Void
monitoring muxVars = do
muxPromise <- atomically $ do
Expand All @@ -102,7 +102,8 @@ run ServerArguments {
writeTVar muxVars muxs'
pure muxPromise
case muxPromise of
MuxRunning mux
MuxRunning _connectionId
mux
(Bundle
(WithHot hotPtls)
(WithWarm warmPtls)
Expand Down Expand Up @@ -141,7 +142,7 @@ run ServerArguments {
acceptLoop :: TVar m
(StrictSeq
(STM m
(MuxPromise muxMode versionNumber ByteString m a b)))
(MuxPromise muxMode peerAddr versionNumber ByteString m a b)))
-> Accept m SomeException peerAddr socket
-> m Void
acceptLoop muxVars acceptOne = do
Expand Down

0 comments on commit 9dc0330

Please sign in to comment.