Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catchup: fetchAndWrite/fetchRound quit early on errNoBlockForRound #5809

Merged
Merged
27 changes: 27 additions & 0 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@
return
}

const errNoBlockForRoundThreshold = 5

// fetchAndWrite fetches a block, checks the cert, and writes it to the ledger. Cert checking and ledger writing both wait for the ledger to advance if necessary.
// Returns false if we should stop trying to catch up. This may occur for several reasons:
// - If the context is canceled (e.g. if the node is shutting down)
Expand All @@ -254,6 +256,11 @@
if dontSyncRound := s.GetDisableSyncRound(); dontSyncRound != 0 && r >= basics.Round(dontSyncRound) {
return false
}

// peerErrors tracks errNoBlockForRound in order to quite earlier without making
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
// tons of requests for a block that most likely not happened yet
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
peerErrors := map[network.Peer]int{}

i := 0
for {
i++
Expand Down Expand Up @@ -302,6 +309,15 @@
s.log.Infof("fetchAndWrite(%d): the block is already in the ledger. The catchup is complete", r)
return false
}
if err == errNoBlockForRound {
// remote peer doesn't have the block, try another peer
// quit if the the same peer peer encountered errNoBlockForRound more than errNoBlockForRoundThreshold times
if count := peerErrors[peer]; count > errNoBlockForRoundThreshold {
s.log.Infof("fetchAndWrite(%d): remote peers do not have the block. Quitting", r)
return false
}
peerErrors[peer]++
}
s.log.Debugf("fetchAndWrite(%v): Could not fetch: %v (attempt %d)", r, err, i)
peerSelector.rankPeer(psp, peerRankDownloadFailed)

Expand Down Expand Up @@ -689,6 +705,8 @@
return
}

peerErrors := map[network.Peer]int{}

blockHash := bookkeeping.BlockHash(cert.Proposal.BlockDigest) // semantic digest (i.e., hash of the block header), not byte-for-byte digest
peerSelector := createPeerSelector(s.net, s.cfg, false)
for s.ledger.LastRound() < cert.Round {
Expand All @@ -710,6 +728,15 @@
return
default:
}
if err == errNoBlockForRound {

Check warning on line 731 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L731

Added line #L731 was not covered by tests
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
// remote peer doesn't have the block, try another peer
// quit if the the same peer peer encountered errNoBlockForRound more than errNoBlockForRoundThreshold times
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
if count := peerErrors[peer]; count > errNoBlockForRoundThreshold {
s.log.Debugf("fetchAndWrite(%d): remote peers do not have the block. Quitting", cert.Round)
return

Check warning on line 736 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L734-L736

Added lines #L734 - L736 were not covered by tests
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
}
peerErrors[peer]++

Check warning on line 738 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L738

Added line #L738 was not covered by tests
}
logging.Base().Warnf("fetchRound could not acquire block, fetcher errored out: %v", err)
peerSelector.rankPeer(psp, peerRankDownloadFailed)
continue
Expand Down
82 changes: 78 additions & 4 deletions catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"context"
"errors"
"math/rand"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -180,6 +182,27 @@ func (cl *periodicSyncLogger) Warnf(s string, args ...interface{}) {
cl.Logger.Warnf(s, args...)
}

type periodicSyncDebugLogger struct {
periodicSyncLogger
debugMsgFilter []string
debugMsgs atomic.Uint32
}

func (cl *periodicSyncDebugLogger) Debugf(s string, args ...interface{}) {
// save debug messages for later inspection.
if len(cl.debugMsgFilter) > 0 {
for _, filter := range cl.debugMsgFilter {
if strings.Contains(s, filter) {
cl.debugMsgs.Add(1)
break
}
}
} else {
cl.debugMsgs.Add(1)
}
cl.Logger.Debugf(s, args...)
}

func TestSyncRound(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down Expand Up @@ -208,7 +231,7 @@ func TestSyncRound(t *testing.T) {

auth := &mockedAuthenticator{fail: true}
initialLocalRound := local.LastRound()
require.True(t, 0 == initialLocalRound)
require.Zero(t, initialLocalRound)

// Make Service
localCfg := config.GetDefaultLocal()
Expand Down Expand Up @@ -253,7 +276,7 @@ func TestSyncRound(t *testing.T) {
s.UnsetDisableSyncRound()
// wait until the catchup is done
waitStart = time.Now()
for time.Now().Sub(waitStart) < 8*s.deadlineTimeout {
for time.Since(waitStart) < 8*s.deadlineTimeout {
if remote.LastRound() == local.LastRound() {
break
}
Expand Down Expand Up @@ -298,7 +321,7 @@ func TestPeriodicSync(t *testing.T) {

auth := &mockedAuthenticator{fail: true}
initialLocalRound := local.LastRound()
require.True(t, 0 == initialLocalRound)
require.Zero(t, initialLocalRound)

// Make Service
s := MakeService(logging.Base(), defaultConfig, net, local, auth, nil, nil)
Expand All @@ -315,7 +338,7 @@ func TestPeriodicSync(t *testing.T) {
// wait until the catchup is done. Since we've might have missed the sleep window, we need to wait
// until the synchronization is complete.
waitStart := time.Now()
for time.Now().Sub(waitStart) < 10*s.deadlineTimeout {
for time.Since(waitStart) < 10*s.deadlineTimeout {
if remote.LastRound() == local.LastRound() {
break
}
Expand Down Expand Up @@ -472,6 +495,8 @@ func TestServiceFetchBlocksMultiBlocks(t *testing.T) {
}
addBlocks(t, remote, blk, int(numberOfBlocks)-1)

logging.Base().SetLevel(logging.Debug)

// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
Expand Down Expand Up @@ -1184,3 +1209,52 @@ func TestServiceLedgerUnavailable(t *testing.T) {
require.Greater(t, local.LastRound(), basics.Round(0))
require.Less(t, local.LastRound(), remote.LastRound())
}

// TestServiceNoBlockForRound checks if fetchAndWrite does not repeats 500 times if a block not avaialble
func TestServiceNoBlockForRound(t *testing.T) {
partitiontest.PartitionTest(t)

// Make Ledger
local := new(mockedLedger)
local.blocks = append(local.blocks, bookkeeping.Block{})

remote, _, blk, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
t.Fatal(err)
return
}
numBlocks := 10
addBlocks(t, remote, blk, numBlocks)

// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
net.addPeer(rootURL)

require.Equal(t, basics.Round(0), local.LastRound())
require.Equal(t, basics.Round(numBlocks+1), remote.LastRound())

// Make Service
auth := &mockedAuthenticator{fail: false}
cfg := config.GetDefaultLocal()
cfg.CatchupParallelBlocks = 8
s := MakeService(logging.Base(), cfg, net, local, auth, nil, nil)
pl := &periodicSyncDebugLogger{periodicSyncLogger: periodicSyncLogger{Logger: logging.Base()}}
s.log = pl
s.deadlineTimeout = 1 * time.Second

s.testStart()
defer s.Stop()
s.sync()

// without the fix there are about 2k messages (4x catchupRetryLimit)
// with the fix expect less than catchupRetryLimit
require.Less(t, int(pl.debugMsgs.Load()), catchupRetryLimit)
}
18 changes: 0 additions & 18 deletions test/e2e-go/features/catchup/basicCatchup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,6 @@ func TestBasicCatchup(t *testing.T) {
// Now, catch up
err = fixture.LibGoalFixture.ClientWaitForRoundWithTimeout(cloneClient, waitForRound)
a.NoError(err)

cloneNC := fixture.GetNodeControllerForDataDir(cloneDataDir)
cloneRestClient := fixture.GetAlgodClientForController(cloneNC)

// an immediate call for ready will error, for sync time != 0
a.Error(cloneRestClient.ReadyCheck())

for {
status, err := cloneRestClient.Status()
a.NoError(err)

if status.LastRound < 10 {
time.Sleep(250 * time.Millisecond)
continue
}
a.NoError(cloneRestClient.ReadyCheck())
break
}
}

// TestCatchupOverGossip tests catchup across network versions
Expand Down