Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catchup: fetchAndWrite/fetchRound quit early on errNoBlockForRound #5809

Merged
Merged
9 changes: 6 additions & 3 deletions catchup/peerSelector.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func makeHistoricStatus(windowSize int, class peerClass) *historicStats {
// that will determine the rank of the peer.
hs := historicStats{
windowSize: windowSize,
rankSamples: make([]int, windowSize, windowSize),
rankSamples: make([]int, windowSize),
requestGaps: make([]uint64, 0, windowSize),
rankSum: uint64(class.initialRank) * uint64(windowSize),
gapSum: 0.0}
Expand Down Expand Up @@ -237,7 +237,10 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera
// download, the value added to rankSum will
// increase at an increasing rate to evict the peer
// from the class sooner.
value = upperBound(class) * int(math.Exp2(float64(hs.downloadFailures)))
value = upperBound(class) * int(math.Exp2(float64(hs.downloadFailures)*0.45))
if value > peerRankDownloadFailed {
value = peerRankDownloadFailed
}
} else {
if hs.downloadFailures > 0 {
hs.downloadFailures--
Expand Down Expand Up @@ -468,7 +471,7 @@ func (ps *peerSelector) refreshAvailablePeers() {
for peerIdx := len(pool.peers) - 1; peerIdx >= 0; peerIdx-- {
peer := pool.peers[peerIdx].peer
if peerAddress := peerAddress(peer); peerAddress != "" {
if toRemove, _ := existingPeers[pool.peers[peerIdx].class.peerClass][peerAddress]; toRemove {
if toRemove := existingPeers[pool.peers[peerIdx].class.peerClass][peerAddress]; toRemove {
// need to be removed.
pool.peers = append(pool.peers[:peerIdx], pool.peers[peerIdx+1:]...)
}
Expand Down
4 changes: 2 additions & 2 deletions catchup/peerSelector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,9 +646,9 @@ func TestEvictionAndUpgrade(t *testing.T) {

_, err := peerSelector.getNextPeer()
require.NoError(t, err)
for i := 0; i < 10; i++ {
for i := 0; i < 12; i++ {
if peerSelector.pools[len(peerSelector.pools)-1].rank == peerRankDownloadFailed {
require.Equal(t, 6, i)
require.Equal(t, 11, i)
break
}
psp, err := peerSelector.getNextPeer()
Expand Down
43 changes: 43 additions & 0 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@
return
}

const errNoBlockForRoundThreshold = 5

// fetchAndWrite fetches a block, checks the cert, and writes it to the ledger. Cert checking and ledger writing both wait for the ledger to advance if necessary.
// Returns false if we should stop trying to catch up. This may occur for several reasons:
// - If the context is canceled (e.g. if the node is shutting down)
Expand All @@ -254,6 +256,11 @@
if dontSyncRound := s.GetDisableSyncRound(); dontSyncRound != 0 && r >= basics.Round(dontSyncRound) {
return false
}

// peerErrors tracks occurrences of errNoBlockForRound in order to quit earlier without making
// repeated requests for a block that most likely does not exist yet
peerErrors := map[network.Peer]int{}

i := 0
for {
i++
Expand Down Expand Up @@ -302,6 +309,15 @@
s.log.Infof("fetchAndWrite(%d): the block is already in the ledger. The catchup is complete", r)
return false
}
if err == errNoBlockForRound {
// remote peer doesn't have the block, try another peer
// quit if the the same peer peer encountered errNoBlockForRound more than errNoBlockForRoundThreshold times
if count := peerErrors[peer]; count > errNoBlockForRoundThreshold {
s.log.Infof("fetchAndWrite(%d): remote peers do not have the block. Quitting", r)
return false
}
peerErrors[peer]++
}
s.log.Debugf("fetchAndWrite(%v): Could not fetch: %v (attempt %d)", r, err, i)
peerSelector.rankPeer(psp, peerRankDownloadFailed)

Expand Down Expand Up @@ -689,6 +705,11 @@
return
}

peerErrors := map[network.Peer]struct {
count int
totalWaitDuration time.Duration
}{}

blockHash := bookkeeping.BlockHash(cert.Proposal.BlockDigest) // semantic digest (i.e., hash of the block header), not byte-for-byte digest
peerSelector := createPeerSelector(s.net, s.cfg, false)
for s.ledger.LastRound() < cert.Round {
Expand All @@ -710,6 +731,28 @@
return
default:
}
if err == errNoBlockForRound {

Check warning on line 734 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L734

Added line #L734 was not covered by tests
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
// If a peer does not have the block after few attempts it probably has not persisted the block yet.
// Give it some time to persist the block and try again.
// Quit if the the same peer still errs after sleeping 2*config.Protocol.SmallLambda seconds in total.
if info, ok := peerErrors[peer]; ok {
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
if info.count > errNoBlockForRoundThreshold && info.totalWaitDuration >= 2*config.Protocol.SmallLambda {
s.log.Debugf("fetchAndWrite(%d): remote peers do not have the block. Quitting", cert.Round)
return

Check warning on line 741 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L738-L741

Added lines #L738 - L741 were not covered by tests
}
if info.count > errNoBlockForRoundThreshold {
const waitDuration = 200 * time.Millisecond
info.totalWaitDuration += waitDuration
time.Sleep(waitDuration)

Check warning on line 746 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L743-L746

Added lines #L743 - L746 were not covered by tests
}
info.count++
peerErrors[peer] = info
} else {
info.count++
peerErrors[peer] = info

Check warning on line 752 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L748-L752

Added lines #L748 - L752 were not covered by tests
}
}
// remote peer doesn't have the block, try another peer
logging.Base().Warnf("fetchRound could not acquire block, fetcher errored out: %v", err)
peerSelector.rankPeer(psp, peerRankDownloadFailed)
continue
Expand Down
82 changes: 78 additions & 4 deletions catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"context"
"errors"
"math/rand"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -180,6 +182,27 @@ func (cl *periodicSyncLogger) Warnf(s string, args ...interface{}) {
cl.Logger.Warnf(s, args...)
}

type periodicSyncDebugLogger struct {
periodicSyncLogger
debugMsgFilter []string
debugMsgs atomic.Uint32
}

func (cl *periodicSyncDebugLogger) Debugf(s string, args ...interface{}) {
// save debug messages for later inspection.
if len(cl.debugMsgFilter) > 0 {
for _, filter := range cl.debugMsgFilter {
if strings.Contains(s, filter) {
cl.debugMsgs.Add(1)
break
}
}
} else {
cl.debugMsgs.Add(1)
}
cl.Logger.Debugf(s, args...)
}

func TestSyncRound(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down Expand Up @@ -208,7 +231,7 @@ func TestSyncRound(t *testing.T) {

auth := &mockedAuthenticator{fail: true}
initialLocalRound := local.LastRound()
require.True(t, 0 == initialLocalRound)
require.Zero(t, initialLocalRound)

// Make Service
localCfg := config.GetDefaultLocal()
Expand Down Expand Up @@ -253,7 +276,7 @@ func TestSyncRound(t *testing.T) {
s.UnsetDisableSyncRound()
// wait until the catchup is done
waitStart = time.Now()
for time.Now().Sub(waitStart) < 8*s.deadlineTimeout {
for time.Since(waitStart) < 8*s.deadlineTimeout {
if remote.LastRound() == local.LastRound() {
break
}
Expand Down Expand Up @@ -298,7 +321,7 @@ func TestPeriodicSync(t *testing.T) {

auth := &mockedAuthenticator{fail: true}
initialLocalRound := local.LastRound()
require.True(t, 0 == initialLocalRound)
require.Zero(t, initialLocalRound)

// Make Service
s := MakeService(logging.Base(), defaultConfig, net, local, auth, nil, nil)
Expand All @@ -315,7 +338,7 @@ func TestPeriodicSync(t *testing.T) {
// wait until the catchup is done. Since we've might have missed the sleep window, we need to wait
// until the synchronization is complete.
waitStart := time.Now()
for time.Now().Sub(waitStart) < 10*s.deadlineTimeout {
for time.Since(waitStart) < 10*s.deadlineTimeout {
if remote.LastRound() == local.LastRound() {
break
}
Expand Down Expand Up @@ -472,6 +495,8 @@ func TestServiceFetchBlocksMultiBlocks(t *testing.T) {
}
addBlocks(t, remote, blk, int(numberOfBlocks)-1)

logging.Base().SetLevel(logging.Debug)

// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
Expand Down Expand Up @@ -1184,3 +1209,52 @@ func TestServiceLedgerUnavailable(t *testing.T) {
require.Greater(t, local.LastRound(), basics.Round(0))
require.Less(t, local.LastRound(), remote.LastRound())
}

// TestServiceNoBlockForRound checks if fetchAndWrite does not repeats 500 times if a block not avaialble
func TestServiceNoBlockForRound(t *testing.T) {
partitiontest.PartitionTest(t)

// Make Ledger
local := new(mockedLedger)
local.blocks = append(local.blocks, bookkeeping.Block{})

remote, _, blk, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
t.Fatal(err)
return
}
numBlocks := 10
addBlocks(t, remote, blk, numBlocks)

// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
net.addPeer(rootURL)

require.Equal(t, basics.Round(0), local.LastRound())
require.Equal(t, basics.Round(numBlocks+1), remote.LastRound())

// Make Service
auth := &mockedAuthenticator{fail: false}
cfg := config.GetDefaultLocal()
cfg.CatchupParallelBlocks = 8
s := MakeService(logging.Base(), cfg, net, local, auth, nil, nil)
pl := &periodicSyncDebugLogger{periodicSyncLogger: periodicSyncLogger{Logger: logging.Base()}}
s.log = pl
s.deadlineTimeout = 1 * time.Second

s.testStart()
defer s.Stop()
s.sync()

// without the fix there are about 2k messages (4x catchupRetryLimit)
// with the fix expect less than catchupRetryLimit
require.Less(t, int(pl.debugMsgs.Load()), catchupRetryLimit)
}
18 changes: 0 additions & 18 deletions test/e2e-go/features/catchup/basicCatchup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,6 @@ func TestBasicCatchup(t *testing.T) {
// Now, catch up
err = fixture.LibGoalFixture.ClientWaitForRoundWithTimeout(cloneClient, waitForRound)
a.NoError(err)

cloneNC := fixture.GetNodeControllerForDataDir(cloneDataDir)
cloneRestClient := fixture.GetAlgodClientForController(cloneNC)

// an immediate call for ready will error, for sync time != 0
a.Error(cloneRestClient.ReadyCheck())

for {
status, err := cloneRestClient.Status()
a.NoError(err)

if status.LastRound < 10 {
time.Sleep(250 * time.Millisecond)
continue
}
a.NoError(cloneRestClient.ReadyCheck())
break
}
}

// TestCatchupOverGossip tests catchup across network versions
Expand Down