Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve mux tracing #2794

Merged
merged 3 commits into from
Dec 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 8 additions & 4 deletions network-mux/src/Network/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ runMux tracer Mux {muxMiniProtocols, muxControlCmdQueue, muxStatus} bearer = do

miniProtocolJob
:: forall mode m.
MonadSTM m
( MonadSTM m
, MonadThrow (STM m)
)
=> Tracer m MuxTrace
-> EgressQueue m
-> MiniProtocolState mode m
Expand Down Expand Up @@ -216,7 +218,8 @@ miniProtocolJob tracer egressQueue
mpsJobExit w
atomically $ do
writeTVar miniProtocolStatusVar StatusIdle
putTMVar completionVar $ Right result
putTMVar completionVar (Right result)
`orElse` (throwSTM (MuxError (MuxBlockedOnCompletionVar miniProtocolNum) ""))
case remainder of
Just trailing ->
modifyTVar miniProtocolIngressQueue (BL.append trailing)
Expand All @@ -225,6 +228,7 @@ miniProtocolJob tracer egressQueue

return (MiniProtocolShutdown miniProtocolNum miniProtocolDirEnum)

miniProtocolDirEnum :: MiniProtocolDir
miniProtocolDirEnum = protocolDirEnum miniProtocolDir

-- The Wanton w is the SDUs that are queued but not yet sent for this job.
Expand All @@ -233,7 +237,7 @@ miniProtocolJob tracer egressQueue
-- jobs will be cancelled directly.
mpsJobExit :: IngressQueue m -> m ()
mpsJobExit w = do
traceWith tracer (MuxTraceState Dying)
traceWith tracer (MuxTraceTerminating miniProtocolNum miniProtocolDirEnum)
atomically $ do
buf <- readTVar w
check (BL.null buf)
Expand All @@ -259,7 +263,7 @@ data MiniProtocolAction m where
-- 2. it starts responder protocol threads on demand when the first
-- incoming message arrives.
--
monitor :: forall mode m. (MonadSTM m, MonadAsync m, MonadMask m)
monitor :: forall mode m. (MonadSTM m, MonadAsync m, MonadMask m, MonadThrow (STM m))
=> Tracer m MuxTrace
-> JobPool.JobPool m MuxJobResult
-> EgressQueue m
Expand Down
23 changes: 12 additions & 11 deletions network-mux/src/Network/Mux/Trace.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ data MuxErrorType = MuxUnknownMiniProtocol
-- ^ Result of runMiniProtocol's completionAction in case of an error.
| MuxCleanShutdown
-- ^ Mux stopped by 'stopMux'
| MuxBlockedOnCompletionVar !MiniProtocolNum
-- ^ Mux blocked on @completionVar@.
deriving (Show, Eq)

instance Exception MuxError where
Expand Down Expand Up @@ -110,9 +112,6 @@ data WithMuxBearer peerid a = WithMuxBearer {

data MuxBearerState = Mature
-- ^ MuxBearer has successufully completed the handshake.
| Dying
-- ^ MuxBearer is in the process of beeing torn down,
-- requests may fail.
| Dead
-- ^ MuxBearer is dead and the underlying bearer has been
-- closed.
Expand Down Expand Up @@ -146,28 +145,29 @@ data MuxTrace =
| MuxTraceStartEagerly !MiniProtocolNum !MiniProtocolDir
| MuxTraceStartOnDemand !MiniProtocolNum !MiniProtocolDir
| MuxTraceStartedOnDemand !MiniProtocolNum !MiniProtocolDir
| MuxTraceTerminating !MiniProtocolNum !MiniProtocolDir
| MuxTraceShutdown

instance Show MuxTrace where
show MuxTraceRecvHeaderStart = printf "Bearer Receive Header Start"
show (MuxTraceRecvHeaderEnd MuxSDUHeader { mhTimestamp, mhNum, mhDir, mhLength }) = printf "Bearer Receive Header End: ts: 0x%08x %s %s len %d"
show (MuxTraceRecvHeaderEnd MuxSDUHeader { mhTimestamp, mhNum, mhDir, mhLength }) = printf "Bearer Receive Header End: ts: 0x%08x (%s) %s len %d"
(unRemoteClockModel mhTimestamp) (show mhNum) (show mhDir) mhLength
show (MuxTraceRecvDeltaQObservation MuxSDUHeader { mhTimestamp, mhLength } ts) = printf "Bearer DeltaQ observation: remote ts %d local ts %s length %d"
(unRemoteClockModel mhTimestamp) (show ts) mhLength
show (MuxTraceRecvDeltaQSample d sp so dqs dqvm dqvs estR sdud) = printf "Bearer DeltaQ Sample: duration %.3e packets %d sumBytes %d DeltaQ_S %.3e DeltaQ_VMean %.3e DeltaQ_VVar %.3e DeltaQ_estR %.3e sizeDist %s"
d sp so dqs dqvm dqvs estR sdud
show (MuxTraceRecvStart len) = printf "Bearer Receive Start: length %d" len
show (MuxTraceRecvEnd len) = printf "Bearer Receive End: length %d" len
show (MuxTraceSendStart MuxSDUHeader { mhTimestamp, mhNum, mhDir, mhLength }) = printf "Bearer Send Start: ts: 0x%08x %s %s length %d"
show (MuxTraceSendStart MuxSDUHeader { mhTimestamp, mhNum, mhDir, mhLength }) = printf "Bearer Send Start: ts: 0x%08x (%s) %s length %d"
(unRemoteClockModel mhTimestamp) (show mhNum) (show mhDir) mhLength
show MuxTraceSendEnd = printf "Bearer Send End"
show (MuxTraceState new) = printf "State: %s" (show new)
show (MuxTraceCleanExit mid dir) = printf "Miniprotocol %s %s terminated cleanly" (show mid) (show dir)
show (MuxTraceCleanExit mid dir) = printf "Miniprotocol (%s) %s terminated cleanly" (show mid) (show dir)
show (MuxTraceExceptionExit mid dir e) = printf "Miniprotocol %s %s terminated with exception %s" (show mid) (show dir) (show e)
show (MuxTraceChannelRecvStart mid) = printf "Channel Receive Start on %s" (show mid)
show (MuxTraceChannelRecvEnd mid len) = printf "Channel Receive End on %s %d" (show mid)
show (MuxTraceChannelRecvEnd mid len) = printf "Channel Receive End on (%s) %d" (show mid)
len
show (MuxTraceChannelSendStart mid len) = printf "Channel Send Start on %s %d" (show mid)
show (MuxTraceChannelSendStart mid len) = printf "Channel Send Start on (%s) %d" (show mid)
len
show (MuxTraceChannelSendEnd mid) = printf "Channel Send End on %s" (show mid)
show MuxTraceHandshakeStart = "Handshake start"
Expand All @@ -179,8 +179,9 @@ instance Show MuxTrace where
show (MuxTraceHandshakeServerError e) = printf "Handshake Server Error %s" (show e)
show MuxTraceSDUReadTimeoutException = "Timed out reading SDU"
show MuxTraceSDUWriteTimeoutException = "Timed out writing SDU"
show (MuxTraceStartEagerly mid dir) = printf "Eagerly started %s in %s" (show mid) (show dir)
show (MuxTraceStartOnDemand mid dir) = printf "Preparing to start %s in %s" (show mid) (show dir)
show (MuxTraceStartedOnDemand mid dir) = printf "Started %s in %s" (show mid) (show dir)
show (MuxTraceStartEagerly mid dir) = printf "Eagerly started (%s) in %s" (show mid) (show dir)
show (MuxTraceStartOnDemand mid dir) = printf "Preparing to start (%s) in %s" (show mid) (show dir)
show (MuxTraceStartedOnDemand mid dir) = printf "Started on demand (%s) in %s" (show mid) (show dir)
show (MuxTraceTerminating mid dir) = printf "Terminating (%s) in %s" (show mid) (show dir)
show MuxTraceShutdown = "Mux shutdown"

1 change: 0 additions & 1 deletion network-mux/test/Test/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ instance Arbitrary ArbitrarySDU where

instance Arbitrary Compat.MuxBearerState where
arbitrary = elements [ Compat.Mature
, Compat.Dying
, Compat.Dead
]

Expand Down
13 changes: 7 additions & 6 deletions ouroboros-network/src/Ouroboros/Network/NodeToClient.hs
Original file line number Diff line number Diff line change
Expand Up @@ -564,12 +564,13 @@ networkErrorPolicies = ErrorPolicies
, ErrorPolicy
$ \(e :: MuxError)
-> case errorType e of
MuxUnknownMiniProtocol -> Just ourBug
MuxDecodeError -> Just ourBug
MuxIngressQueueOverRun -> Just ourBug
MuxInitiatorOnly -> Just ourBug
MuxShutdown {} -> Just ourBug
MuxCleanShutdown -> Just ourBug
MuxUnknownMiniProtocol -> Just ourBug
MuxDecodeError -> Just ourBug
MuxIngressQueueOverRun -> Just ourBug
MuxInitiatorOnly -> Just ourBug
MuxShutdown {} -> Just ourBug
MuxCleanShutdown -> Just ourBug
MuxBlockedOnCompletionVar {} -> Just ourBug

-- in case of bearer closed / or IOException we suspend
-- the peer for a short time
Expand Down
13 changes: 7 additions & 6 deletions ouroboros-network/src/Ouroboros/Network/NodeToNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -658,12 +658,13 @@ remoteNetworkErrorPolicy = ErrorPolicies {
-- 'responder'. If a 'responder' throws 'MuxError' we
-- might not want to shutdown the consumer (which is
-- using different connection), as we do below:
MuxBearerClosed -> Just (SuspendPeer shortDelay shortDelay)
MuxIOException{} -> Just (SuspendPeer shortDelay shortDelay)
MuxSDUReadTimeout -> Just (SuspendPeer shortDelay shortDelay)
MuxSDUWriteTimeout -> Just (SuspendPeer shortDelay shortDelay)
MuxShutdown {} -> Just (SuspendPeer shortDelay shortDelay)
MuxCleanShutdown -> Just (SuspendPeer shortDelay shortDelay)
MuxBearerClosed -> Just (SuspendPeer shortDelay shortDelay)
MuxIOException{} -> Just (SuspendPeer shortDelay shortDelay)
MuxSDUReadTimeout -> Just (SuspendPeer shortDelay shortDelay)
MuxSDUWriteTimeout -> Just (SuspendPeer shortDelay shortDelay)
MuxShutdown {} -> Just (SuspendPeer shortDelay shortDelay)
MuxCleanShutdown -> Just (SuspendPeer shortDelay shortDelay)
MuxBlockedOnCompletionVar {} -> Just (SuspendPeer shortDelay shortDelay)

-- Error policy for TxSubmission protocol: outbound side (client role)
, ErrorPolicy
Expand Down