Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network: Class-based Peer Selector #5937

Merged
merged 15 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
80 changes: 36 additions & 44 deletions catchup/catchpointService.go
Expand Up @@ -18,6 +18,7 @@

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -69,7 +70,7 @@
type CatchpointCatchupService struct {
// stats is the statistics object, updated async while downloading the ledger
stats CatchpointCatchupStats
// statsMu synchronizes access to stats, as we could attempt to update it while querying for it's current state
// statsMu synchronizes access to stats, as we could attempt to update it while querying for its current state
statsMu deadlock.Mutex
node CatchpointCatchupNodeServices
// ctx is the node cancellation context, used when the node is being stopped.
Expand Down Expand Up @@ -98,7 +99,7 @@
abortCtx context.Context
abortCtxFunc context.CancelFunc
// blocksDownloadPeerSelector is the peer selector used for downloading blocks.
blocksDownloadPeerSelector *peerSelector
blocksDownloadPeerSelector peerSelector
}

// MakeResumedCatchpointCatchupService creates a catchpoint catchup service for a node that is already in catchpoint catchup mode
Expand Down Expand Up @@ -280,51 +281,50 @@
}

// processStageLedgerDownload is the second catchpoint catchup stage. It downloads the ledger.
func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
func (cs *CatchpointCatchupService) processStageLedgerDownload() error {

Check warning on line 284 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L284

Added line #L284 was not covered by tests
cs.statsMu.Lock()
label := cs.stats.CatchpointLabel
cs.statsMu.Unlock()
round, _, err0 := ledgercore.ParseCatchpointLabel(label)
round, _, err := ledgercore.ParseCatchpointLabel(label)

Check warning on line 288 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L288

Added line #L288 was not covered by tests

if err0 != nil {
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to parse label : %v", err0))
if err != nil {
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to parse label : %v", err))

Check warning on line 291 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L290-L291

Added lines #L290 - L291 were not covered by tests
}

// download balances file.
peerSelector := cs.makeCatchpointPeerSelector()
ledgerFetcher := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config)
lf := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config)

Check warning on line 295 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L295

Added line #L295 was not covered by tests
attemptsCount := 0

for {
attemptsCount++

err = cs.ledgerAccessor.ResetStagingBalances(cs.ctx, true)
if err != nil {
err0 := cs.ledgerAccessor.ResetStagingBalances(cs.ctx, true)
if err0 != nil {

Check warning on line 302 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L301-L302

Added lines #L301 - L302 were not covered by tests
if cs.ctx.Err() != nil {
return cs.stopOrAbort()
}
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to reset staging balances : %v", err))
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to reset staging balances : %v", err0))

Check warning on line 306 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L306

Added line #L306 was not covered by tests
}
psp, err := peerSelector.getNextPeer()
if err != nil {
err = fmt.Errorf("processStageLedgerDownload: catchpoint catchup was unable to obtain a list of peers to retrieve the catchpoint file from")
return cs.abort(err)
psp, err0 := cs.blocksDownloadPeerSelector.getNextPeer()
if err0 != nil {
err0 = fmt.Errorf("processStageLedgerDownload: catchpoint catchup was unable to obtain a list of peers to retrieve the catchpoint file from")
return cs.abort(err0)

Check warning on line 311 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L308-L311

Added lines #L308 - L311 were not covered by tests
}
peer := psp.Peer
start := time.Now()
err = ledgerFetcher.downloadLedger(cs.ctx, peer, round)
if err == nil {
err0 = lf.downloadLedger(cs.ctx, peer, round)
if err0 == nil {

Check warning on line 316 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L315-L316

Added lines #L315 - L316 were not covered by tests
cs.log.Infof("ledger downloaded in %d seconds", time.Since(start)/time.Second)
start = time.Now()
err = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedCounts)
if err == nil {
err0 = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedCounts)
if err0 == nil {

Check warning on line 320 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L319-L320

Added lines #L319 - L320 were not covered by tests
cs.log.Infof("built merkle trie in %d seconds", time.Since(start)/time.Second)
break
}
// failed to build the merkle trie for the above catchpoint file.
peerSelector.rankPeer(psp, peerRankInvalidDownload)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload)

Check warning on line 325 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L325

Added line #L325 was not covered by tests
} else {
peerSelector.rankPeer(psp, peerRankDownloadFailed)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankDownloadFailed)

Check warning on line 327 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L327

Added line #L327 was not covered by tests
}

// instead of testing for err == cs.ctx.Err() , we'll check on the context itself.
Expand All @@ -335,10 +335,10 @@
}

if attemptsCount >= cs.config.CatchupLedgerDownloadRetryAttempts {
err = fmt.Errorf("processStageLedgerDownload: catchpoint catchup exceeded number of attempts to retrieve ledger")
return cs.abort(err)
err0 = fmt.Errorf("processStageLedgerDownload: catchpoint catchup exceeded number of attempts to retrieve ledger")
return cs.abort(err0)

Check warning on line 339 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L338-L339

Added lines #L338 - L339 were not covered by tests
}
cs.log.Warnf("unable to download ledger : %v", err)
cs.log.Warnf("unable to download ledger : %v", err0)

Check warning on line 341 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L341

Added line #L341 was not covered by tests
}

err = cs.updateStage(ledger.CatchpointCatchupStateLatestBlockDownload)
Expand Down Expand Up @@ -506,14 +506,14 @@
return uint64(topBlock.Round().SubSaturate(lowestStateProofRound))
}

// processStageBlocksDownload is the fourth catchpoint catchup stage. It downloads all the reminder of the blocks, verifying each one of them against it's predecessor.
// processStageBlocksDownload is the fourth catchpoint catchup stage. It downloads all the reminder of the blocks, verifying each one of them against its predecessor.
func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
topBlock, err := cs.ledgerAccessor.EnsureFirstBlock(cs.ctx)
if err != nil {
return cs.abort(fmt.Errorf("processStageBlocksDownload failed, unable to ensure first block : %v", err))
}

// pick the lookback with the greater of
// pick the lookback with the greatest of
// either (MaxTxnLife+DeeperBlockHeaderHistory+CatchpointLookback) or MaxBalLookback
// Explanation:
// 1. catchpoint snapshots accounts at round X-CatchpointLookback
Expand All @@ -531,13 +531,13 @@
}

// in case the effective lookback is going before our rounds count, trim it there.
// ( a catchpoint is generated starting round MaxBalLookback, and this is a possible in any round in the range of MaxBalLookback..MaxTxnLife)
// ( a catchpoint is generated starting round MaxBalLookback, and this is a possible in any round in the range of MaxBalLookback...MaxTxnLife)
if lookback >= uint64(topBlock.Round()) {
lookback = uint64(topBlock.Round() - 1)
}

cs.statsMu.Lock()
cs.stats.TotalBlocks = uint64(lookback)
cs.stats.TotalBlocks = lookback
cs.stats.AcquiredBlocks = 0
cs.stats.VerifiedBlocks = 0
cs.statsMu.Unlock()
Expand All @@ -558,8 +558,9 @@
blk = &ledgerBlock
cert = &ledgerCert
} else {
switch err0.(type) {
case ledgercore.ErrNoEntry:
var errNoEntry ledgercore.ErrNoEntry
switch {
case errors.As(err0, &errNoEntry):

Check warning on line 563 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L561-L563

Added lines #L561 - L563 were not covered by tests
// this is expected, ignore this one.
default:
cs.log.Warnf("processStageBlocksDownload encountered the following error when attempting to retrieve the block for round %d : %v", topBlock.Round()-basics.Round(blocksFetched), err0)
Expand Down Expand Up @@ -658,7 +659,7 @@
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, cert *agreement.Certificate, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) {
psp, err = cs.blocksDownloadPeerSelector.getNextPeer()
if err != nil {
if err == errPeerSelectorNoPeerPoolsAvailable {
if errors.Is(err, errPeerSelectorNoPeerPoolsAvailable) {

Check warning on line 662 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L662

Added line #L662 was not covered by tests
cs.log.Infof("fetchBlock: unable to obtain a list of peers to retrieve the latest block from; will retry shortly.")
// this is a possible on startup, since the network package might have yet to retrieve the list of peers.
time.Sleep(noPeersAvailableSleepInterval)
Expand Down Expand Up @@ -718,7 +719,7 @@
// stopOrAbort is called when any of the stage processing function sees that cs.ctx has been canceled. It can be
// due to the end user attempting to abort the current catchpoint catchup operation or due to a node shutdown.
func (cs *CatchpointCatchupService) stopOrAbort() error {
if cs.abortCtx.Err() == context.Canceled {
if errors.Is(cs.abortCtx.Err(), context.Canceled) {

Check warning on line 722 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L722

Added line #L722 was not covered by tests
return cs.abort(context.Canceled)
}
return nil
Expand Down Expand Up @@ -749,7 +750,7 @@
return nil
}

// updateNodeCatchupMode requests the node to change it's operational mode from
// updateNodeCatchupMode requests the node to change its operational mode from
// catchup mode to normal mode and vice versa.
func (cs *CatchpointCatchupService) updateNodeCatchupMode(catchupModeEnabled bool) {
newCtxCh := cs.node.SetCatchpointCatchupMode(catchupModeEnabled)
Expand Down Expand Up @@ -802,15 +803,7 @@
}

func (cs *CatchpointCatchupService) initDownloadPeerSelector() {
cs.blocksDownloadPeerSelector = cs.makeCatchpointPeerSelector()
}

func (cs *CatchpointCatchupService) makeCatchpointPeerSelector() *peerSelector {
return makePeerSelector(
cs.net,
[]peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookRelays},
})
cs.blocksDownloadPeerSelector = makeCatchpointPeerSelector(cs.net)
}

// checkLedgerDownload sends a HEAD request to the ledger endpoint of peers to validate the catchpoint's availability
Expand All @@ -821,10 +814,9 @@
if err != nil {
return fmt.Errorf("failed to parse catchpoint label : %v", err)
}
peerSelector := cs.makeCatchpointPeerSelector()
ledgerFetcher := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config)
for i := 0; i < cs.config.CatchupLedgerDownloadRetryAttempts; i++ {
psp, peerError := peerSelector.getNextPeer()
psp, peerError := cs.blocksDownloadPeerSelector.getNextPeer()

Check warning on line 819 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L819

Added line #L819 was not covered by tests
if peerError != nil {
return err
}
Expand Down
156 changes: 156 additions & 0 deletions catchup/classBasedPeerSelector.go
@@ -0,0 +1,156 @@
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package catchup

import (
"errors"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-deadlock"
"time"
)

// classBasedPeerSelector is a rankPooledPeerSelector that tracks and ranks classes of peers based on their response behavior.
// It is used to select the most appropriate peers to download blocks from - this is most useful when catching up
// and needing to figure out whether the blocks can be retrieved from relay nodes or require archive nodes.
// The ordering of the peerSelectors directly determines the priority of the classes of peers.
type classBasedPeerSelector struct {
mu deadlock.Mutex
peerSelectors []*wrappedPeerSelector
}

func makeClassBasedPeerSelector(peerSelectors []*wrappedPeerSelector) *classBasedPeerSelector {
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
return &classBasedPeerSelector{
peerSelectors: peerSelectors,
}
}

func (c *classBasedPeerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int, int) {
c.mu.Lock()
defer c.mu.Unlock()

oldRank, newRank := -1, -1
for _, wp := range c.peerSelectors {
// See if the peer is in the class, ranking it appropriately if so
if psp.peerClass != wp.peerClass {
continue
}

oldRank, newRank = wp.peerSelector.rankPeer(psp, rank)
if oldRank < 0 || newRank < 0 {
// Peer not found in this selector
continue
}

// Peer was in this class, if there was any kind of download issue, we increment the failure count
if rank >= peerRankNoBlockForRound {
wp.downloadFailures++
}

break
}

return oldRank, newRank
}

func (c *classBasedPeerSelector) peerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) {
c.mu.Lock()
defer c.mu.Unlock()

for _, wp := range c.peerSelectors {
rank = wp.peerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration)
// If rank is peerRankInvalidDownload, we check the next class's rankPooledPeerSelector
if rank >= peerRankInvalidDownload {
continue
}
// Should be a legit ranking, we return it
return rank
}
// If we reached here, we have exhausted all classes without finding the peer
return peerRankInvalidDownload
}

func (c *classBasedPeerSelector) getNextPeer() (psp *peerSelectorPeer, err error) {
c.mu.Lock()
defer c.mu.Unlock()
return c.internalGetNextPeer(0)
}

// internalGetNextPeer is a helper function that should be called with the lock held
func (c *classBasedPeerSelector) internalGetNextPeer(recurseCount int8) (psp *peerSelectorPeer, err error) {
// Safety check to prevent infinite recursion
if recurseCount > 1 {
return nil, errPeerSelectorNoPeerPoolsAvailable

Check warning on line 96 in catchup/classBasedPeerSelector.go

View check run for this annotation

Codecov / codecov/patch

catchup/classBasedPeerSelector.go#L96

Added line #L96 was not covered by tests
}
selectorDisabledCount := 0
for _, wp := range c.peerSelectors {
if wp.downloadFailures > wp.toleranceFactor {
// peerSelector is disabled for now, we move to the next one
selectorDisabledCount++
continue
}
psp, err = wp.peerSelector.getNextPeer()

if err != nil {
// This is mostly just future-proofing, as we don't expect any other errors from getNextPeer
if errors.Is(err, errPeerSelectorNoPeerPoolsAvailable) {
// We penalize this class the equivalent of one download failure (in case this is transient)
wp.downloadFailures++
}
continue
}
return psp, nil
}
// If we reached here, we have exhausted all classes and still have no peers
// IFF all classes are disabled, we reset the downloadFailures for all classes and start over
if len(c.peerSelectors) != 0 && selectorDisabledCount == len(c.peerSelectors) {
for _, wp := range c.peerSelectors {
wp.downloadFailures = 0
}
// Recurse to try again, we should have at least one class enabled now
return c.internalGetNextPeer(recurseCount + 1)
}
// If we reached here, we have exhausted all classes without finding a peer, not due to all classes being disabled
return nil, errPeerSelectorNoPeerPoolsAvailable
}

type wrappedPeerSelector struct {
peerSelector peerSelector // The underlying peerSelector for this class
peerClass network.PeerOption // The class of peers the peerSelector is responsible for
toleranceFactor int // The number of times we can net fail for any reason before we move to the next class's rankPooledPeerSelector
downloadFailures int // The number of times we have failed to download a block from this class's rankPooledPeerSelector since it was last reset
}

// makeCatchpointPeerSelector returns a classBasedPeerSelector that selects peers based on their class and response behavior.
// These are the preferred configurations for the catchpoint service.
func makeCatchpointPeerSelector(net peersRetriever) peerSelector {
wrappedPeerSelectors := []*wrappedPeerSelector{
{
peerClass: network.PeersPhonebookRelays,
peerSelector: makeRankPooledPeerSelector(net,
[]peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookRelays}}),
toleranceFactor: 3,
},
{
peerClass: network.PeersPhonebookArchivalNodes,
peerSelector: makeRankPooledPeerSelector(net,
[]peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivalNodes}}),
toleranceFactor: 10,
},
}

return makeClassBasedPeerSelector(wrappedPeerSelectors)
}