Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid ordering peers based on peerid in blockfetch #3535

Merged
merged 2 commits into from Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 24 additions & 15 deletions ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision.hs
Expand Up @@ -46,6 +46,7 @@ import Ouroboros.Network.BlockFetch.ClientState (FetchRequest (..),
import Ouroboros.Network.BlockFetch.DeltaQ
(PeerFetchInFlightLimits (..), PeerGSV (..), SizeInBytes,
calculatePeerFetchInFlightLimits, comparePeerGSV,
comparePeerGSV',
estimateExpectedResponseDuration,
estimateResponseDeadlineProbability)

Expand Down Expand Up @@ -161,7 +162,8 @@ fetchDecisions
fetchDecisions fetchDecisionPolicy@FetchDecisionPolicy {
plausibleCandidateChain,
compareCandidateChains,
blockFetchSize
blockFetchSize,
peerSalt
}
fetchMode
currentChain
Expand All @@ -182,6 +184,7 @@ fetchDecisions fetchDecisionPolicy@FetchDecisionPolicy {
-- Reorder chains based on consensus policy and network timing data.
. prioritisePeerChains
fetchMode
peerSalt
compareCandidateChains
blockFetchSize
. map swizzleIG
Expand All @@ -206,7 +209,7 @@ fetchDecisions fetchDecisionPolicy@FetchDecisionPolicy {
where
-- Data swizzling functions to get the right info into each stage.
swizzleI (c, p@(_, inflight,_,_, _)) = (c, inflight, p)
swizzleIG (c, p@(_, inflight,gsvs,_, _)) = (c, inflight, gsvs, p)
swizzleIG (c, p@(_, inflight,gsvs,peer,_)) = (c, inflight, gsvs, peer, p)
swizzleSI (c, p@(status,inflight,_,_, _)) = (c, status, inflight, p)
swizzleSIG (c, p@(status,inflight,gsvs,peer,_)) = (c, status, inflight, gsvs, peer, p)

Expand Down Expand Up @@ -604,19 +607,21 @@ filterWithMaxSlotNo p maxSlotNo =
AF.filterWithStop p ((> maxSlotNo) . MaxSlotNo . blockSlot)

prioritisePeerChains
:: forall header peer. HasHeader header
:: forall extra header peer.
( HasHeader header
, Hashable peer
, Ord peer
)
=> FetchMode
-> Int
-> (AnchoredFragment header -> AnchoredFragment header -> Ordering)
-> (header -> SizeInBytes)
-> [(FetchDecision (CandidateFragments header), PeerFetchInFlight header,
PeerGSV,
peer)]
-> [(FetchDecision [AnchoredFragment header], peer)]
prioritisePeerChains FetchModeDeadline compareCandidateChains blockFetchSize =
--TODO: last tie-breaker is still original order (which is probably
-- peerid order). We should use a random tie breaker so that adversaries
-- cannot get any advantage.

peer,
extra )]
-> [(FetchDecision [AnchoredFragment header], extra)]
prioritisePeerChains FetchModeDeadline salt compareCandidateChains blockFetchSize =
map (\(decision, peer) ->
(fmap (\(_,_,fragment) -> fragment) decision, peer))
. concatMap ( concat
Expand Down Expand Up @@ -646,9 +651,11 @@ prioritisePeerChains FetchModeDeadline compareCandidateChains blockFetchSize =
`on`
(\(band, chain, _fragments) -> (band, chain))))))
. map annotateProbabilityBand
. sortBy (\(_,_,a,ap,_) (_,_,b,bp,_) ->
comparePeerGSV' salt (a,ap) (b,bp))
coot marked this conversation as resolved.
Show resolved Hide resolved
where
annotateProbabilityBand (Left decline, _, _, peer) = (Left decline, peer)
annotateProbabilityBand (Right (chain,fragments), inflight, gsvs, peer) =
annotateProbabilityBand (Left decline, _, _, _, peer) = (Left decline, peer)
annotateProbabilityBand (Right (chain,fragments), inflight, gsvs, _, peer) =
(Right (band, chain, fragments), peer)
where
band = probabilityBand $
Expand All @@ -666,7 +673,7 @@ prioritisePeerChains FetchModeDeadline compareCandidateChains blockFetchSize =

chainHeadPoint (_,ChainSuffix c,_) = AF.headPoint c

prioritisePeerChains FetchModeBulkSync compareCandidateChains blockFetchSize =
prioritisePeerChains FetchModeBulkSync salt compareCandidateChains blockFetchSize =
map (\(decision, peer) ->
(fmap (\(_, _, fragment) -> fragment) decision, peer))
. sortBy (comparingFst
Expand All @@ -678,9 +685,11 @@ prioritisePeerChains FetchModeBulkSync compareCandidateChains blockFetchSize =
`on`
(\(duration, chain, _fragments) -> (chain, duration)))))
. map annotateDuration
. sortBy (\(_,_,a,ap,_) (_,_,b,bp,_) ->
comparePeerGSV' salt (a,ap) (b,bp))
where
annotateDuration (Left decline, _, _, peer) = (Left decline, peer)
annotateDuration (Right (chain,fragments), inflight, gsvs, peer) =
annotateDuration (Left decline, _, _, _, peer) = (Left decline, peer)
annotateDuration (Right (chain,fragments), inflight, gsvs, _, peer) =
(Right (duration, chain, fragments), peer)
where
-- TODO: consider if we should put this into bands rather than just
Expand Down
15 changes: 14 additions & 1 deletion ouroboros-network/src/Ouroboros/Network/BlockFetch/DeltaQ.hs
Expand Up @@ -15,6 +15,7 @@ module Ouroboros.Network.BlockFetch.DeltaQ
, estimateResponseDeadlineProbability
, estimateExpectedResponseDuration
, comparePeerGSV
, comparePeerGSV'
) where

import Control.Monad.Class.MonadTime
Expand Down Expand Up @@ -50,7 +51,7 @@ comparePeerGSV activePeers salt (a, a_p) (b, b_p) =
else gs a
gs_b = if isActive b_p then activeAdvantage * gs b
else gs b in
if abs (gs_a - gs_b) < 0.05*gs_a
if abs (gs_a - gs_b) < 0.05 * max gs_a gs_b
then compare (hashWithSalt salt a_p) (hashWithSalt salt b_p)
else compare gs_a gs_b
where
Expand All @@ -67,6 +68,18 @@ comparePeerGSV activePeers salt (a, a_p) (b, b_p) =
inboundGSV = GSV g_in _s_in _v_in
} = g_out + g_in

-- | Order two PeerGSVs based on `g`.
-- Like comparePeerGSV but doesn't take active status into account
comparePeerGSV' :: forall peer.
( Hashable peer
, Ord peer
)
=> Int
-> (PeerGSV, peer)
-> (PeerGSV, peer)
-> Ordering
comparePeerGSV' = comparePeerGSV Set.empty


calculatePeerFetchInFlightLimits :: PeerGSV -> PeerFetchInFlightLimits
calculatePeerFetchInFlightLimits PeerGSV {
Expand Down
41 changes: 41 additions & 0 deletions ouroboros-network/test/Test/Ouroboros/Network/BlockFetch.hs
Expand Up @@ -28,16 +28,19 @@ import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer
import Control.Monad.Class.MonadTime (Time (..))
import Control.Monad.IOSim
import Control.Tracer (Tracer (Tracer), contramap, nullTracer)

import Ouroboros.Network.DeltaQ
--TODO: could re-export some of the trace types from more convenient places:
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import qualified Ouroboros.Network.AnchoredFragment as AnchoredFragment
import Ouroboros.Network.Block
import Ouroboros.Network.BlockFetch
import Ouroboros.Network.BlockFetch.ClientRegistry
import Ouroboros.Network.BlockFetch.ClientState
import Ouroboros.Network.BlockFetch.DeltaQ
import Ouroboros.Network.BlockFetch.Examples
import Ouroboros.Network.Driver (TraceSendRecv)
import qualified Ouroboros.Network.MockChain.Chain as Chain
Expand Down Expand Up @@ -68,6 +71,8 @@ tests = testGroup "BlockFetch"
-- requests (testing the high/low watermark mechanism).
, testProperty "termination"
prop_terminate
, testProperty "compare comparePeerGSV" prop_comparePeerGSV
, testProperty "eq comparePeerGSV" prop_comparePeerGSVEq
]


Expand Down Expand Up @@ -663,6 +668,42 @@ prop_terminate (TestChainFork _commonChain forkChain _forkChain) (Positive (Smal

fork' = chainToAnchoredFragment forkChain

newtype PeerGSVT = PeerGSVT {
unPeerGSVT :: PeerGSV
} deriving Show

instance Arbitrary PeerGSVT where
arbitrary = do
Delay gIn <- resize 1000 arbitrary
Delay gOut <- resize 1000 arbitrary
let gsvIn = ballisticGSV gIn 2e-6 (degenerateDistribution 0)
gsvOut = ballisticGSV gOut 2e-6 (degenerateDistribution 0)
return $ PeerGSVT $ PeerGSV (Time 0) gsvOut gsvIn
karknu marked this conversation as resolved.
Show resolved Hide resolved

shrink (PeerGSVT (PeerGSV ts (GSV gOut sOut vOut) (GSV gIn sIn vIn))) =
[PeerGSVT (PeerGSV ts (GSV gOut' sOut vOut) (GSV gIn' sIn vIn))
| (Delay gIn', Delay gOut') <- shrink (Delay gIn, Delay gOut)]


-- | Check that comparePeerGSV satisfies Ord axioms
prop_comparePeerGSV :: Int -> Int -> Int -> PeerGSVT -> PeerGSVT -> Bool -> Bool -> Property
prop_comparePeerGSV salt pa pb (PeerGSVT a) (PeerGSVT b) aActive bActive =
let peerSet = case (aActive, bActive) of
(False, False) -> Set.empty
(True, False) -> Set.singleton pa
(False, True) -> Set.singleton pb
(True, True) -> Set.fromList [pa, pb] in
case comparePeerGSV peerSet salt (a, pa) (b, pb) of
LT -> comparePeerGSV peerSet salt (b, pb) (a, pa) === GT
GT -> comparePeerGSV peerSet salt (b, pb) (a, pa) === LT
EQ -> comparePeerGSV peerSet salt (b, pb) (a, pa) === EQ

-- | Check that identical peers are equal
prop_comparePeerGSVEq :: Int -> Int -> PeerGSVT -> Bool -> Property
prop_comparePeerGSVEq salt p (PeerGSVT a) aActive =
let peerSet = if aActive then Set.singleton p
else Set.empty in
comparePeerGSV peerSet salt (a, p) (a, p) === EQ
karknu marked this conversation as resolved.
Show resolved Hide resolved


--
Expand Down