From e40d838075e34e35f2f28f926050348aa99f31f8 Mon Sep 17 00:00:00 2001 From: Karl Knutsson Date: Thu, 15 Apr 2021 11:36:47 +0200 Subject: [PATCH] Peer metrics for bytes downloaded with policy --- .../src/Test/ThreadNet/Network.hs | 3 +- .../MiniProtocol/ChainSync/Client.hs | 2 +- .../Ouroboros/Consensus/Network/NodeToNode.hs | 12 ++-- .../src/Ouroboros/Consensus/Node.hs | 4 +- ouroboros-network/demo/chain-sync.hs | 3 +- ouroboros-network/ouroboros-network.cabal | 1 + .../Ouroboros/Network/BlockFetch/Client.hs | 9 ++- .../src/Ouroboros/Network/Diffusion.hs | 11 ++- .../Ouroboros/Network/Diffusion/Policies.hs | 10 ++- .../Network/PeerSelection/Governor.hs | 44 ++++++++---- .../Network/PeerSelection/Governor/Types.hs | 5 ++ .../Network/PeerSelection/PeerMetric.hs | 70 +++++++++++++++++-- .../Network/PeerSelection/PeerMetric/Type.hs | 15 ++++ .../Ouroboros/Network/BlockFetch/Examples.hs | 4 +- 14 files changed, 154 insertions(+), 39 deletions(-) create mode 100644 ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerMetric/Type.hs diff --git a/ouroboros-consensus-test/src/Test/ThreadNet/Network.hs b/ouroboros-consensus-test/src/Test/ThreadNet/Network.hs index e79b6c465a2..96385c0726e 100644 --- a/ouroboros-consensus-test/src/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus-test/src/Test/ThreadNet/Network.hs @@ -67,6 +67,7 @@ import qualified Ouroboros.Network.Protocol.ChainSync.Type as CS import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM) import Ouroboros.Network.NodeToNode (MiniProtocolParameters (..)) +import Ouroboros.Network.PeerSelection.PeerMetric (nullMetric) import Ouroboros.Network.Protocol.KeepAlive.Type import Ouroboros.Network.Protocol.Limits (waitForever) import Ouroboros.Network.Protocol.TxSubmission.Type @@ -1010,7 +1011,7 @@ runThreadNetwork systemTime ThreadNetworkArgs , intersectTimeout = waitForever , mustReplyTimeout = waitForever }) - (\_ _ _ -> return ()) + nullMetric (NTN.mkHandlers nodeKernelArgs nodeKernel) -- In practice, a robust wallet/user can persistently add a transaction diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs index 81f07bf8507..e76664166a3 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs @@ -56,7 +56,7 @@ import qualified Ouroboros.Network.AnchoredFragment as AF import qualified Ouroboros.Network.AnchoredSeq as AS import Ouroboros.Network.Block (Tip, getTipBlockNo) import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM) -import Ouroboros.Network.PeerSelection.PeerMetric (ReportHeaderMetricsSTM) +import Ouroboros.Network.PeerSelection.PeerMetric.Type (ReportHeaderMetricsSTM) import Ouroboros.Network.Protocol.ChainSync.ClientPipelined import Ouroboros.Network.Protocol.ChainSync.PipelineDecision diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs index c348c72c915..46cc4d14dd9 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs @@ -55,7 +55,8 @@ import Ouroboros.Network.Driver import Ouroboros.Network.KeepAlive import Ouroboros.Network.Mux import Ouroboros.Network.NodeToNode -import Ouroboros.Network.PeerSelection.PeerMetric (ReportHeaderMetricsSTM) +import Ouroboros.Network.PeerSelection.PeerMetric.Type (ReportFetchedMetricsSTM, + ReportHeaderMetricsSTM, ReportPeerMetrics (..)) import Ouroboros.Network.Protocol.BlockFetch.Codec import Ouroboros.Network.Protocol.BlockFetch.Server (BlockFetchServer, blockFetchServerPeer) @@ -125,6 +126,7 @@ data Handlers m peer blk = Handlers { , hBlockFetchClient :: NodeToNodeVersion -> ControlMessageSTM m + -> ReportFetchedMetricsSTM m -> BlockFetchClient (Header blk) blk m () , hBlockFetchServer @@ -448,10 +450,10 @@ mkApps -> Tracers m remotePeer blk e -> Codecs blk e m bCS bCS bBF bBF bTX bTX2 bKA -> m ChainSyncTimeout - -> (remotePeer -> ReportHeaderMetricsSTM m) + -> ReportPeerMetrics m remotePeer -> Handlers m remotePeer blk -> Apps m remotePeer bCS bBF bTX bTX2 bKA () -mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout addHeaderMetrics Handlers {..} = +mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout ReportPeerMetrics {..} Handlers {..} = Apps {..} where aChainSyncClient @@ -484,7 +486,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout addHeaderMetrics Hand channel $ chainSyncClientPeerPipelined $ hChainSyncClient them version controlMessageSTM - (addHeaderMetrics them) varCandidate + (reportHeader them) varCandidate return ((), trailing) aChainSyncServer @@ -523,7 +525,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout addHeaderMetrics Hand (byteLimitsBlockFetch (const 0)) -- TODO: Real Bytelimits, see #1727 timeLimitsBlockFetch channel - $ hBlockFetchClient version controlMessageSTM clientCtx + $ hBlockFetchClient version controlMessageSTM (reportFetch them) clientCtx aBlockFetchServer :: NodeToNodeVersion diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Node.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Node.hs index d95b8ef9460..31d385ca378 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Node.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Node.hs @@ -74,7 +74,7 @@ import Ouroboros.Network.NodeToNode (DiffusionMode, RemoteAddress, combineVersions, defaultMiniProtocolParameters) import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics (..), - newPeerMetric, addHeaderMetric) + newPeerMetric, reportMetric) import Ouroboros.Network.Protocol.Limits (shortWait) import Ouroboros.Consensus.Block @@ -328,7 +328,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} = rnTraceNTN (NTN.defaultCodecs codecConfig version) llrnChainSyncTimeout - (addHeaderMetric peerMetrics) + (reportMetric peerMetrics) (NTN.mkHandlers nodeKernelArgs nodeKernel) mkNodeToClientApps diff --git a/ouroboros-network/demo/chain-sync.hs b/ouroboros-network/demo/chain-sync.hs index 07344bd3a7b..fb4d613deeb 100644 --- a/ouroboros-network/demo/chain-sync.hs +++ b/ouroboros-network/demo/chain-sync.hs @@ -296,7 +296,8 @@ clientBlockFetch sockAddrs = withIOManager $ \iocp -> do nullTracer -- (contramap (show . TraceLabelPeer connectionId) stdoutTracer) codecBlockFetch channel - (blockFetchClient NodeToNodeV_1 (continueForever (Proxy :: Proxy IO)) clientCtx) + (blockFetchClient NodeToNodeV_1 (continueForever (Proxy :: Proxy IO)) + (\_ _ _ -> return ()) clientCtx) blockFetchPolicy :: BlockFetchConsensusInterface LocalConnectionId BlockHeader Block IO diff --git a/ouroboros-network/ouroboros-network.cabal b/ouroboros-network/ouroboros-network.cabal index beb9ebc7545..2d015c0aa81 100644 --- a/ouroboros-network/ouroboros-network.cabal +++ b/ouroboros-network/ouroboros-network.cabal @@ -70,6 +70,7 @@ library Ouroboros.Network.PeerSelection.LedgerPeers Ouroboros.Network.PeerSelection.LocalRootPeers Ouroboros.Network.PeerSelection.PeerMetric + Ouroboros.Network.PeerSelection.PeerMetric.Type Ouroboros.Network.PeerSelection.PeerStateActions Ouroboros.Network.PeerSelection.RootPeersDNS Ouroboros.Network.PeerSelection.Governor diff --git a/ouroboros-network/src/Ouroboros/Network/BlockFetch/Client.hs b/ouroboros-network/src/Ouroboros/Network/BlockFetch/Client.hs index e6bb4bcf65f..1c1bd0f7b48 100644 --- a/ouroboros-network/src/Ouroboros/Network/BlockFetch/Client.hs +++ b/ouroboros-network/src/Ouroboros/Network/BlockFetch/Client.hs @@ -56,6 +56,7 @@ import Ouroboros.Network.BlockFetch.ClientState , rejectedFetchBatch ) import Ouroboros.Network.BlockFetch.DeltaQ ( PeerGSV(..), PeerFetchInFlightLimits(..) ) +import Ouroboros.Network.PeerSelection.PeerMetric.Type (ReportFetchedMetricsSTM) data BlockFetchProtocolFailure = @@ -84,9 +85,10 @@ blockFetchClient :: forall header block m. HeaderHash header ~ HeaderHash block) => NodeToNodeVersion -> ControlMessageSTM m + -> ReportFetchedMetricsSTM m -> FetchClientContext header block m -> PeerPipelined (BlockFetch block (Point block)) AsClient BFIdle m () -blockFetchClient _version controlMessageSTM +blockFetchClient _version controlMessageSTM reportFetched FetchClientContext { fetchClientCtxTracer = tracer, fetchClientCtxPolicy = FetchClientPolicy { @@ -266,6 +268,7 @@ blockFetchClient _version controlMessageSTM (MsgBlock block, header:headers') -> ReceiverEffect $ do now <- getCurrentTime + nowMono <- getMonotonicTime --TODO: consider how to enforce expected block size limit. -- They've lied and are sending us a massive amount of data. -- Resource consumption attack. @@ -297,6 +300,10 @@ blockFetchClient _version controlMessageSTM forgeTime <- atomically $ blockForgeUTCTime $ FromConsensus block let blockDelay = diffUTCTime now forgeTime + let hf = getHeaderFields header + slotNo = headerFieldSlot hf + atomically $ reportFetched (blockFetchSize header) slotNo nowMono + -- Note that we add the block to the chain DB /before/ updating our -- current status and in-flight stats. Otherwise blocks will -- disappear from our in-flight set without yet appearing in the diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion.hs index ca0ad1f24ad..087203d9098 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion.hs @@ -95,7 +95,8 @@ import Ouroboros.Network.PeerSelection.RootPeersDNS ( DomainAddress import Ouroboros.Network.InboundGovernor (InboundGovernorTrace (..)) import Ouroboros.Network.InboundGovernor.State (InboundGovernorCounters (..)) import qualified Ouroboros.Network.PeerSelection.Governor as Governor -import Ouroboros.Network.PeerSelection.Governor.Types ( TracePeerSelection (..) +import Ouroboros.Network.PeerSelection.Governor.Types ( ChurnMode (..) + , TracePeerSelection (..) , DebugPeerSelection (..) , PeerSelectionCounters (..) ) @@ -656,6 +657,8 @@ runDataDiffusion tracers (policyRng, churnRng) = split rng' policyRngVar <- newTVarIO policyRng + churnModeVar <- newTVarIO ChurnModeNormal + -- Request interface, supply the number of peers desired. ledgerPeersReq <- newEmptyTMVarIO :: IO (StrictTMVar IO NumberOfPeers) -- Response interface, returns a Set of peers. Nothing indicates that the @@ -866,12 +869,13 @@ runDataDiffusion tracers dtTracePeerSelectionCounters peerSelectionActions (Diffusion.Policies.simplePeerSelectionPolicy - policyRngVar daPeerMetrics)) + policyRngVar (readTVar churnModeVar) daPeerMetrics)) $ \governorThread -> Async.withAsync (Governor.peerChurnGovernor dtTracePeerSelectionTracer daPeerMetrics + churnModeVar churnRng daBlockFetchMode daPeerSelectionTargets @@ -979,7 +983,7 @@ runDataDiffusion tracers dtTracePeerSelectionCounters peerSelectionActions (Diffusion.Policies.simplePeerSelectionPolicy - policyRngVar daPeerMetrics)) + policyRngVar (readTVar churnModeVar) daPeerMetrics)) $ \governorThread -> do let mkAddr :: AddrInfo -> (Socket.Family, SockAddr) mkAddr addr = ( Socket.addrFamily addr @@ -1014,6 +1018,7 @@ runDataDiffusion tracers (Governor.peerChurnGovernor dtTracePeerSelectionTracer daPeerMetrics + churnModeVar churnRng daBlockFetchMode daPeerSelectionTargets diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs index 1b762906250..c3765cfa3a0 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs @@ -40,9 +40,10 @@ closeConnectionTimeout = 120 simplePeerSelectionPolicy :: forall m. MonadSTM m => StrictTVar m StdGen + -> STM m ChurnMode -> PeerMetrics m SockAddr -> PeerSelectionPolicy SockAddr m -simplePeerSelectionPolicy rngVar metrics = PeerSelectionPolicy { +simplePeerSelectionPolicy rngVar getChurnMode metrics = PeerSelectionPolicy { policyPickKnownPeersForGossip = simplePromotionPolicy, policyPickColdPeersToPromote = simplePromotionPolicy, policyPickWarmPeersToPromote = simplePromotionPolicy, @@ -72,7 +73,12 @@ simplePeerSelectionPolicy rngVar metrics = PeerSelectionPolicy { hotDemotionPolicy :: PickPolicy SockAddr m hotDemotionPolicy available pickNum = do - scores <- upstreamyness <$> (getHeaderMetrics metrics) + mode <- getChurnMode + scores <- case mode of + ChurnModeNormal -> + upstreamyness <$> (getHeaderMetrics metrics) + ChurnModeBulkSync -> + fetchyness <$> (getFetchedMetrics metrics) available' <- addRand available return $ Set.fromList . map fst diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs index 16357a0f954..63b7a9b068c 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs @@ -580,43 +580,52 @@ peerChurnGovernor :: forall m peeraddr. ) => Tracer m (TracePeerSelection peeraddr) -> PeerMetrics m peeraddr + -> StrictTVar m ChurnMode -> StdGen -> STM m FetchMode -> PeerSelectionTargets -> StrictTVar m PeerSelectionTargets -> m Void -peerChurnGovernor tracer metrics inRng getFetchMode base peerSelectionVar = do +peerChurnGovernor tracer metrics churnModeVar inRng getFetchMode base peerSelectionVar = do -- Wait a while so that not only the closest peers have had the time -- to become warm. startTs0 <- getMonotonicTime threadDelay 3 - atomically increaseActivePeers + mode <- atomically updateChurnMode + atomically $ increaseActivePeers mode endTs0 <- getMonotonicTime fuzzyDelay inRng (Time $ diffTime endTs0 startTs0) >>= go where - increaseActivePeers :: STM m () - increaseActivePeers = do - mode <- getFetchMode + updateChurnMode :: STM m ChurnMode + updateChurnMode = do + fm <- getFetchMode + let mode = case fm of + FetchModeDeadline -> ChurnModeNormal + FetchModeBulkSync -> ChurnModeBulkSync + writeTVar churnModeVar mode + return mode + + increaseActivePeers :: ChurnMode -> STM m () + increaseActivePeers mode = do modifyTVar peerSelectionVar (\targets -> targets { targetNumberOfActivePeers = case mode of - FetchModeDeadline -> + ChurnModeNormal -> targetNumberOfActivePeers base - FetchModeBulkSync -> + ChurnModeBulkSync -> min 2 (targetNumberOfActivePeers base) }) - decreaseActivePeers :: STM m () - decreaseActivePeers = do - mode <- getFetchMode + decreaseActivePeers :: ChurnMode -> STM m () + decreaseActivePeers mode = do modifyTVar peerSelectionVar (\targets -> targets { targetNumberOfActivePeers = case mode of - FetchModeDeadline -> + ChurnModeNormal -> decrease $ targetNumberOfActivePeers base - FetchModeBulkSync -> + ChurnModeBulkSync -> min 1 (targetNumberOfActivePeers base - 1) }) @@ -626,17 +635,22 @@ peerChurnGovernor tracer metrics inRng getFetchMode base peerSelectionVar = do startTs <- getMonotonicTime headerScores <- upstreamyness <$> (atomically $ getHeaderMetrics metrics) + fetchScores <- fetchyness <$> (atomically $ getFetchedMetrics metrics) traceWith tracer $ TraceXXX $ "Header: " ++ (show $ sortOn snd $ Map.toList headerScores) + traceWith tracer $ TraceXXX $ "Bytes: " ++ (show $ sortOn snd $ Map.toList fetchScores) + + churnMode <- atomically updateChurnMode + traceWith tracer $ TraceChurnMode churnMode -- Purge the worst active peer(s). - atomically decreaseActivePeers + atomically $ decreaseActivePeers churnMode -- Short delay, we may have no active peers right now threadDelay 1 -- Pick new active peer(s) based on the best performing established -- peers. - atomically increaseActivePeers + atomically $ increaseActivePeers churnMode -- Give the promotion process time to start threadDelay 1 @@ -692,7 +706,7 @@ peerChurnGovernor tracer metrics inRng getFetchMode base peerSelectionVar = do churnIntervalBulk :: Time - churnIntervalBulk = Time 300 + churnIntervalBulk = Time 600 -- Replace 20% or at least on peer every churnInterval. decrease :: Int -> Int diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs index 8711f7ce662..baa38f014b8 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs @@ -512,6 +512,7 @@ data TracePeerSelection peeraddr = | TraceDemoteAsynchronous (Map peeraddr PeerStatus) | TraceGovernorWakeup | TraceChurnWait DiffTime + | TraceChurnMode ChurnMode | TraceXXX String deriving Show @@ -520,3 +521,7 @@ data DebugPeerSelection peeraddr peerconn = (Maybe DiffTime) (PeerSelectionState peeraddr peerconn) deriving (Show, Functor) + +data ChurnMode = ChurnModeBulkSync + | ChurnModeNormal deriving Show + diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerMetric.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerMetric.hs index 3fcb901e9e0..8d073f4d3dc 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerMetric.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerMetric.hs @@ -15,7 +15,10 @@ import Control.Monad.Class.MonadSTM.Strict import Control.Monad.Class.MonadTime import Cardano.Slotting.Slot (SlotNo (..)) +import Ouroboros.Network.DeltaQ ( SizeInBytes ) import Ouroboros.Network.NodeToNode ( ConnectionId (..)) +import Ouroboros.Network.PeerSelection.PeerMetric.Type + -- The maximum numbers of slots we will store data for. -- On some chains sometimes this corresponds to 1h @@ -24,14 +27,29 @@ maxSlotsToTrack :: Int maxSlotsToTrack = 180 -type ReportHeaderMetricsSTM m = (SlotNo -> Time -> STM m ()) - type SlotMetric p = IntPSQ SlotNo (p, Time) data PeerMetrics m p = PeerMetrics { headerMetrics :: StrictTVar m (SlotMetric p) + , fetchedMetrics :: StrictTVar m (SlotMetric (p, SizeInBytes)) } +reportMetric + :: forall m p. + ( MonadSTM m ) + => PeerMetrics m p + -> ReportPeerMetrics m (ConnectionId p) +reportMetric peerMetrics = + ReportPeerMetrics (addHeaderMetric peerMetrics) + (addFetchedMetric peerMetrics) + +nullMetric + :: MonadSTM m + => ReportPeerMetrics m p +nullMetric = + ReportPeerMetrics (\_ _ _ -> pure ()) + (\_ _ _ _ -> pure ()) + slotMetricKey :: SlotNo -> Int slotMetricKey (SlotNo s) = fromIntegral s @@ -39,12 +57,24 @@ addHeaderMetric :: forall m p. ( MonadSTM m ) => PeerMetrics m p - -> (ConnectionId p) + -> ConnectionId p -> SlotNo -> Time -> STM m () -addHeaderMetric PeerMetrics{headerMetrics} con slotNo time = - addMetrics headerMetrics (remoteAddress con) slotNo time +addHeaderMetric PeerMetrics{headerMetrics} con = + addMetrics headerMetrics (remoteAddress con) + +addFetchedMetric + :: forall m p. + ( MonadSTM m ) + => PeerMetrics m p + -> ConnectionId p + -> SizeInBytes + -> SlotNo + -> Time + -> STM m () +addFetchedMetric PeerMetrics{fetchedMetrics} con bytes = + addMetrics fetchedMetrics (remoteAddress con, bytes) getHeaderMetrics @@ -53,6 +83,12 @@ getHeaderMetrics -> STM m (SlotMetric p) getHeaderMetrics PeerMetrics{headerMetrics} = readTVar headerMetrics +getFetchedMetrics + :: MonadSTM m + => PeerMetrics m p + -> STM m (SlotMetric (p, SizeInBytes)) +getFetchedMetrics PeerMetrics{fetchedMetrics} = readTVar fetchedMetrics + addMetrics :: forall m p. ( MonadSTM m ) => StrictTVar m (SlotMetric p) @@ -84,7 +120,8 @@ newPeerMetric => m (PeerMetrics m p) newPeerMetric = do hs <- newTVarIO Pq.empty - return $ PeerMetrics hs + bs <- newTVarIO Pq.empty + return $ PeerMetrics hs bs -- Returns a Map which counts the number of times a given peer -- was the first to present us with a block/header. @@ -107,4 +144,25 @@ upstreamyness = Pq.fold' count Map.empty fn (Just c) = Just $! c + 1 +-- Returns a Map which counts the number of bytes downloaded +-- for a given peer. +fetchyness + :: forall p. ( Ord p ) + => SlotMetric (p, SizeInBytes) + -> Map p Int +fetchyness = Pq.fold' count Map.empty + where + count :: Int + -> SlotNo + -> ((p, SizeInBytes), Time) + -> Map p Int + -> Map p Int + count _ _ ((peer, bytes),_) m = + Map.alter fn peer m + where + fn :: Maybe Int -> Maybe Int + fn Nothing = Just $ fromIntegral bytes + fn (Just oldBytes) = Just $! oldBytes + fromIntegral bytes + + diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerMetric/Type.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerMetric/Type.hs new file mode 100644 index 00000000000..1ae8e0f551c --- /dev/null +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerMetric/Type.hs @@ -0,0 +1,15 @@ +module Ouroboros.Network.PeerSelection.PeerMetric.Type where + +import Control.Monad.Class.MonadSTM.Strict +import Control.Monad.Class.MonadTime + +import Cardano.Slotting.Slot (SlotNo (..)) +import Ouroboros.Network.DeltaQ ( SizeInBytes ) + +type ReportHeaderMetricsSTM m = (SlotNo -> Time -> STM m ()) +type ReportFetchedMetricsSTM m = (SizeInBytes -> SlotNo -> Time -> STM m ()) + +data ReportPeerMetrics m p = ReportPeerMetrics { + reportHeader :: p -> ReportHeaderMetricsSTM m + , reportFetch :: p -> ReportFetchedMetricsSTM m + } diff --git a/ouroboros-network/test/Ouroboros/Network/BlockFetch/Examples.hs b/ouroboros-network/test/Ouroboros/Network/BlockFetch/Examples.hs index bb19ae248f5..43d38e5ce6d 100644 --- a/ouroboros-network/test/Ouroboros/Network/BlockFetch/Examples.hs +++ b/ouroboros-network/test/Ouroboros/Network/BlockFetch/Examples.hs @@ -88,7 +88,7 @@ blockFetchExample0 decisionTracer clientStateTracer clientMsgTracer (contramap (TraceLabelPeer peerno) serverMsgTracer) clientDelay serverDelay registry peerno - (blockFetchClient NodeToNodeV_1 controlMessageSTM) + (blockFetchClient NodeToNodeV_1 controlMessageSTM (\_ _ _ -> pure ())) (mockBlockFetchServer1 candidateChain) fetchAsync <- async $ do @@ -194,7 +194,7 @@ blockFetchExample1 decisionTracer clientStateTracer clientMsgTracer (contramap (TraceLabelPeer peerno) serverMsgTracer) clientDelay serverDelay registry peerno - (blockFetchClient NodeToNodeV_1 controlMessageSTM) + (blockFetchClient NodeToNodeV_1 controlMessageSTM (\_ _ _ -> pure ())) (mockBlockFetchServer1 candidateChain) | (peerno, candidateChain) <- zip [1..] candidateChains ]