Skip to content

Commit

Permalink
track errNoBlockForRound error
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Oct 28, 2023
1 parent e065c7f commit 827e74a
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 88 deletions.
86 changes: 27 additions & 59 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,57 +243,7 @@ func (s *Service) innerFetch(ctx context.Context, r basics.Round, peer network.P
return
}

const defaultPeerWaitTime time.Duration = 100 * time.Millisecond

// peerSelectionTracker helps to detect and handle a situation when catchup is trying to download a block that not happened yet.
// in this case fetchAndWrite/fetchRound could keep trying downloading the same block up to catchupRetryLimit times,
// punishing peers. Say if there are only two peers connected and one stalled for some reason, the node might end up fetching a block
// from there forewer.
// To resolve this peerSelectionTracker tracks peers incrementing the retry delay if there was invocation it erred.
// peerSelectionTracker is supposed to be instanciated in fetchAndWrite/fetchRound so covering a single round repeated downloads.
type peerSelectionTracker map[network.Peer]struct {
count int
delay time.Duration
errs int
}

func (pt peerSelectionTracker) maybeWait(peer network.Peer, maxDelay time.Duration, waiter func(d time.Duration)) {
if e, ok := pt[peer]; !ok {
e.count = 1
e.delay = defaultPeerWaitTime
e.errs = 0
pt[peer] = e
} else {
if e.errs > 0 {
waiter(e.delay)
e.delay *= 2
if e.delay > maxDelay {
e.delay = maxDelay
}
}
e.count++
pt[peer] = e
}
}

func (pt peerSelectionTracker) failedPrio(peer network.Peer, defaultPunishment int) int {
if e, ok := pt[peer]; ok {
e.errs++
pt[peer] = e
step := defaultPunishment / 1000
return int(basics.SubSaturate(uint32(defaultPunishment), uint32(step*e.errs)))
} else {
return defaultPunishment
}
}

func (pt peerSelectionTracker) invalidPrio(peer network.Peer, defaultPunishment int) int {
if e, ok := pt[peer]; ok {
e.errs++
pt[peer] = e
}
return defaultPunishment
}
const errNoBlockForRoundThreshold = 5

// fetchAndWrite fetches a block, checks the cert, and writes it to the ledger. Cert checking and ledger writing both wait for the ledger to advance if necessary.
// Returns false if we should stop trying to catch up. This may occur for several reasons:
Expand All @@ -307,7 +257,9 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo
return false
}

pst := peerSelectionTracker{}
// peerErrors tracks errNoBlockForRound in order to quite earlier without making
// tons of requests for a block that most likely not happened yet
peerErrors := map[network.Peer]int{}

i := 0
for {
Expand Down Expand Up @@ -346,7 +298,6 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo
return false
}
peer := psp.Peer
pst.maybeWait(peer, s.deadlineTimeout, time.Sleep)

// Try to fetch, timing out after retryInterval
block, cert, blockDownloadDuration, err := s.innerFetch(ctx, r, peer)
Expand All @@ -358,8 +309,17 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo
s.log.Infof("fetchAndWrite(%d): the block is already in the ledger. The catchup is complete", r)
return false
}
if err == errNoBlockForRound {
// remote peer doesn't have the block, try another peer
// quit if the the same peer peer encountered errNoBlockForRound more than errNoBlockForRoundThreshold times
if count := peerErrors[peer]; count > errNoBlockForRoundThreshold {
s.log.Infof("fetchAndWrite(%d): remote peers do not have the block. Quitting", r)
return false
}
peerErrors[peer]++
}
s.log.Debugf("fetchAndWrite(%v): Could not fetch: %v (attempt %d)", r, err, i)
peerSelector.rankPeer(psp, pst.failedPrio(peer, peerRankDownloadFailed))
peerSelector.rankPeer(psp, peerRankDownloadFailed)

// we've just failed to retrieve a block; wait until the previous block is fetched before trying again
// to avoid the usecase where the first block doesn't exist, and we're making many requests down the chain
Expand Down Expand Up @@ -404,7 +364,7 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo
err = s.auth.Authenticate(block, cert)
if err != nil {
s.log.Warnf("fetchAndWrite(%v): cert did not authenticate block (attempt %d): %v", r, i, err)
peerSelector.rankPeer(psp, pst.invalidPrio(peer, peerRankInvalidDownload))
peerSelector.rankPeer(psp, peerRankInvalidDownload)
continue // retry the fetch
}
}
Expand Down Expand Up @@ -745,7 +705,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy
return
}

pst := peerSelectionTracker{}
peerErrors := map[network.Peer]int{}

blockHash := bookkeeping.BlockHash(cert.Proposal.BlockDigest) // semantic digest (i.e., hash of the block header), not byte-for-byte digest
peerSelector := createPeerSelector(s.net, s.cfg, false)
Expand All @@ -757,7 +717,6 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy
continue
}
peer := psp.Peer
pst.maybeWait(peer, s.deadlineTimeout, time.Sleep)

// Ask the fetcher to get the block somehow
block, fetchedCert, _, err := s.innerFetch(s.ctx, cert.Round, peer)
Expand All @@ -769,8 +728,17 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy
return
default:
}
if err == errNoBlockForRound {
// remote peer doesn't have the block, try another peer
// quit if the the same peer peer encountered errNoBlockForRound more than errNoBlockForRoundThreshold times
if count := peerErrors[peer]; count > errNoBlockForRoundThreshold {
s.log.Debugf("fetchAndWrite(%d): remote peers do not have the block. Quitting", cert.Round)
return
}
peerErrors[peer]++
}
logging.Base().Warnf("fetchRound could not acquire block, fetcher errored out: %v", err)
peerSelector.rankPeer(psp, pst.failedPrio(peer, peerRankDownloadFailed))
peerSelector.rankPeer(psp, peerRankDownloadFailed)
continue
}

Expand All @@ -780,7 +748,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy
}
// Otherwise, fetcher gave us the wrong block
logging.Base().Warnf("fetcher gave us bad/wrong block (for round %d): fetched hash %v; want hash %v", cert.Round, block.Hash(), blockHash)
peerSelector.rankPeer(psp, pst.invalidPrio(peer, peerRankInvalidDownload))
peerSelector.rankPeer(psp, peerRankInvalidDownload)

// As a failsafe, if the cert we fetched is valid but for the wrong block, panic as loudly as possible
if cert.Round == fetchedCert.Round &&
Expand Down
87 changes: 58 additions & 29 deletions catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func TestServiceFetchBlocksSameRange(t *testing.T) {
type periodicSyncLogger struct {
logging.Logger
WarnfCallback func(string, ...interface{})
debugMsgs []string
}

func (cl *periodicSyncLogger) Warnf(s string, args ...interface{}) {
Expand All @@ -180,6 +181,12 @@ func (cl *periodicSyncLogger) Warnf(s string, args ...interface{}) {
cl.Logger.Warnf(s, args...)
}

func (cl *periodicSyncLogger) Debugf(s string, args ...interface{}) {
// save debug messages for later inspection.
cl.debugMsgs = append(cl.debugMsgs, s)
cl.Logger.Debugf(s, args...)
}

func TestSyncRound(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down Expand Up @@ -472,6 +479,8 @@ func TestServiceFetchBlocksMultiBlocks(t *testing.T) {
}
addBlocks(t, remote, blk, int(numberOfBlocks)-1)

logging.Base().SetLevel(logging.Debug)

// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
Expand Down Expand Up @@ -1100,35 +1109,6 @@ func TestSynchronizingTime(t *testing.T) {
require.NotEqual(t, time.Duration(0), s.SynchronizingTime())
}

func TestPeerSelectionTracker(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

ps := peerSelectionTracker{}

var mockDur time.Duration
mockWaiter := func(d time.Duration) {
mockDur = d
}

ps.maybeWait(1, time.Second, mockWaiter)
require.Equal(t, time.Duration(0), mockDur)
// no errors => no delay
ps.maybeWait(1, time.Second, mockWaiter)
require.Equal(t, time.Duration(0), mockDur)

ps.failedPrio(1, peerRankDownloadFailed)
expected := defaultPeerWaitTime
for i := 0; i < 4; i++ {
ps.maybeWait(1, time.Second, mockWaiter)
require.Equal(t, expected, mockDur)
expected *= 2
}
// one more time to exceed the max
ps.maybeWait(1, time.Second, mockWaiter)
require.Equal(t, time.Second, mockDur)
}

func TestDownloadBlocksToSupportStateProofs(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down Expand Up @@ -1213,3 +1193,52 @@ func TestServiceLedgerUnavailable(t *testing.T) {
require.Greater(t, local.LastRound(), basics.Round(0))
require.Less(t, local.LastRound(), remote.LastRound())
}

// TestServiceNoBlockForRound checks if fetchAndWrite does not repeats 500 times if a block not avaialble
func TestServiceNoBlockForRound(t *testing.T) {
partitiontest.PartitionTest(t)

// Make Ledger
local := new(mockedLedger)
local.blocks = append(local.blocks, bookkeeping.Block{})

remote, _, blk, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
t.Fatal(err)
return
}
numBlocks := 10
addBlocks(t, remote, blk, numBlocks)

// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
net.addPeer(rootURL)

require.Equal(t, basics.Round(0), local.LastRound())
require.Equal(t, basics.Round(numBlocks+1), remote.LastRound())

// Make Service
auth := &mockedAuthenticator{fail: false}
cfg := config.GetDefaultLocal()
cfg.CatchupParallelBlocks = 8
s := MakeService(logging.Base(), cfg, net, local, auth, nil, nil)
pl := &periodicSyncLogger{Logger: logging.Base()}
s.log = pl
s.deadlineTimeout = 1 * time.Second

s.testStart()
defer s.Stop()
s.sync()

// without the fix there are about 2k messages (4x catchupRetryLimit)
// with the fix expect less than catchupRetryLimit
require.Less(t, len(pl.debugMsgs), catchupRetryLimit)
}

0 comments on commit 827e74a

Please sign in to comment.