Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network: Class-based Peer Selector #5937

Merged
merged 15 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
99 changes: 58 additions & 41 deletions catchup/catchpointService.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -69,7 +70,7 @@
type CatchpointCatchupService struct {
// stats is the statistics object, updated async while downloading the ledger
stats CatchpointCatchupStats
// statsMu synchronizes access to stats, as we could attempt to update it while querying for it's current state
// statsMu synchronizes access to stats, as we could attempt to update it while querying for its current state
statsMu deadlock.Mutex
node CatchpointCatchupNodeServices
// ctx is the node cancellation context, used when the node is being stopped.
Expand Down Expand Up @@ -98,7 +99,7 @@
abortCtx context.Context
abortCtxFunc context.CancelFunc
// blocksDownloadPeerSelector is the peer selector used for downloading blocks.
blocksDownloadPeerSelector *peerSelector
blocksDownloadPeerSelector peerSelector
}

// MakeResumedCatchpointCatchupService creates a catchpoint catchup service for a node that is already in catchpoint catchup mode
Expand Down Expand Up @@ -280,7 +281,7 @@
}

// processStageLedgerDownload is the second catchpoint catchup stage. It downloads the ledger.
func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
func (cs *CatchpointCatchupService) processStageLedgerDownload() error {

Check warning on line 284 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L284

Added line #L284 was not covered by tests
cs.statsMu.Lock()
label := cs.stats.CatchpointLabel
cs.statsMu.Unlock()
Expand All @@ -291,40 +292,40 @@
}

// download balances file.
peerSelector := cs.makeCatchpointPeerSelector()
ledgerFetcher := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config)
ps := cs.makeCatchpointPeerSelector()
lf := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config)

Check warning on line 296 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L295-L296

Added lines #L295 - L296 were not covered by tests
attemptsCount := 0

for {
attemptsCount++

err = cs.ledgerAccessor.ResetStagingBalances(cs.ctx, true)
if err != nil {
err1 := cs.ledgerAccessor.ResetStagingBalances(cs.ctx, true)
if err1 != nil {

Check warning on line 303 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L302-L303

Added lines #L302 - L303 were not covered by tests
if cs.ctx.Err() != nil {
return cs.stopOrAbort()
}
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to reset staging balances : %v", err))
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to reset staging balances : %v", err1))

Check warning on line 307 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L307

Added line #L307 was not covered by tests
}
psp, err := peerSelector.getNextPeer()
if err != nil {
err = fmt.Errorf("processStageLedgerDownload: catchpoint catchup was unable to obtain a list of peers to retrieve the catchpoint file from")
return cs.abort(err)
psp, err1 := ps.getNextPeer()
if err1 != nil {
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
err1 = fmt.Errorf("processStageLedgerDownload: catchpoint catchup was unable to obtain a list of peers to retrieve the catchpoint file from")
return cs.abort(err1)

Check warning on line 312 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L309-L312

Added lines #L309 - L312 were not covered by tests
}
peer := psp.Peer
start := time.Now()
err = ledgerFetcher.downloadLedger(cs.ctx, peer, round)
if err == nil {
err1 = lf.downloadLedger(cs.ctx, peer, round)
if err1 == nil {

Check warning on line 317 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L316-L317

Added lines #L316 - L317 were not covered by tests
cs.log.Infof("ledger downloaded in %d seconds", time.Since(start)/time.Second)
start = time.Now()
err = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedCounts)
if err == nil {
err1 = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedCounts)
if err1 == nil {

Check warning on line 321 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L320-L321

Added lines #L320 - L321 were not covered by tests
cs.log.Infof("built merkle trie in %d seconds", time.Since(start)/time.Second)
break
}
// failed to build the merkle trie for the above catchpoint file.
peerSelector.rankPeer(psp, peerRankInvalidDownload)
ps.rankPeer(psp, peerRankInvalidDownload)

Check warning on line 326 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L326

Added line #L326 was not covered by tests
} else {
peerSelector.rankPeer(psp, peerRankDownloadFailed)
ps.rankPeer(psp, peerRankDownloadFailed)

Check warning on line 328 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L328

Added line #L328 was not covered by tests
}

// instead of testing for err == cs.ctx.Err() , we'll check on the context itself.
Expand All @@ -335,15 +336,15 @@
}

if attemptsCount >= cs.config.CatchupLedgerDownloadRetryAttempts {
err = fmt.Errorf("processStageLedgerDownload: catchpoint catchup exceeded number of attempts to retrieve ledger")
return cs.abort(err)
err1 = fmt.Errorf("processStageLedgerDownload: catchpoint catchup exceeded number of attempts to retrieve ledger")
return cs.abort(err1)

Check warning on line 340 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L339-L340

Added lines #L339 - L340 were not covered by tests
}
cs.log.Warnf("unable to download ledger : %v", err)
cs.log.Warnf("unable to download ledger : %v", err1)

Check warning on line 342 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L342

Added line #L342 was not covered by tests
}

err = cs.updateStage(ledger.CatchpointCatchupStateLatestBlockDownload)
if err != nil {
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to update stage to CatchpointCatchupStateLatestBlockDownload : %v", err))
err0 = cs.updateStage(ledger.CatchpointCatchupStateLatestBlockDownload)
if err0 != nil {
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to update stage to CatchpointCatchupStateLatestBlockDownload : %v", err0))

Check warning on line 347 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L345-L347

Added lines #L345 - L347 were not covered by tests
}
return nil
}
Expand Down Expand Up @@ -506,14 +507,14 @@
return uint64(topBlock.Round().SubSaturate(lowestStateProofRound))
}

// processStageBlocksDownload is the fourth catchpoint catchup stage. It downloads all the reminder of the blocks, verifying each one of them against it's predecessor.
// processStageBlocksDownload is the fourth catchpoint catchup stage. It downloads all the reminder of the blocks, verifying each one of them against its predecessor.
func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
topBlock, err := cs.ledgerAccessor.EnsureFirstBlock(cs.ctx)
if err != nil {
return cs.abort(fmt.Errorf("processStageBlocksDownload failed, unable to ensure first block : %v", err))
}

// pick the lookback with the greater of
// pick the lookback with the greatest of
// either (MaxTxnLife+DeeperBlockHeaderHistory+CatchpointLookback) or MaxBalLookback
// Explanation:
// 1. catchpoint snapshots accounts at round X-CatchpointLookback
Expand All @@ -531,13 +532,13 @@
}

// in case the effective lookback is going before our rounds count, trim it there.
// ( a catchpoint is generated starting round MaxBalLookback, and this is a possible in any round in the range of MaxBalLookback..MaxTxnLife)
// ( a catchpoint is generated starting round MaxBalLookback, and this is a possible in any round in the range of MaxBalLookback...MaxTxnLife)
if lookback >= uint64(topBlock.Round()) {
lookback = uint64(topBlock.Round() - 1)
}

cs.statsMu.Lock()
cs.stats.TotalBlocks = uint64(lookback)
cs.stats.TotalBlocks = lookback
cs.stats.AcquiredBlocks = 0
cs.stats.VerifiedBlocks = 0
cs.statsMu.Unlock()
Expand All @@ -558,8 +559,9 @@
blk = &ledgerBlock
cert = &ledgerCert
} else {
switch err0.(type) {
case ledgercore.ErrNoEntry:
var errNoEntry ledgercore.ErrNoEntry
switch {
case errors.As(err0, &errNoEntry):

Check warning on line 564 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L562-L564

Added lines #L562 - L564 were not covered by tests
// this is expected, ignore this one.
default:
cs.log.Warnf("processStageBlocksDownload encountered the following error when attempting to retrieve the block for round %d : %v", topBlock.Round()-basics.Round(blocksFetched), err0)
Expand Down Expand Up @@ -658,7 +660,7 @@
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, cert *agreement.Certificate, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) {
psp, err = cs.blocksDownloadPeerSelector.getNextPeer()
if err != nil {
if err == errPeerSelectorNoPeerPoolsAvailable {
if errors.Is(err, errPeerSelectorNoPeerPoolsAvailable) {

Check warning on line 663 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L663

Added line #L663 was not covered by tests
cs.log.Infof("fetchBlock: unable to obtain a list of peers to retrieve the latest block from; will retry shortly.")
// this is a possible on startup, since the network package might have yet to retrieve the list of peers.
time.Sleep(noPeersAvailableSleepInterval)
Expand Down Expand Up @@ -718,7 +720,7 @@
// stopOrAbort is called when any of the stage processing function sees that cs.ctx has been canceled. It can be
// due to the end user attempting to abort the current catchpoint catchup operation or due to a node shutdown.
func (cs *CatchpointCatchupService) stopOrAbort() error {
if cs.abortCtx.Err() == context.Canceled {
if errors.Is(cs.abortCtx.Err(), context.Canceled) {

Check warning on line 723 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L723

Added line #L723 was not covered by tests
return cs.abort(context.Canceled)
}
return nil
Expand Down Expand Up @@ -749,7 +751,7 @@
return nil
}

// updateNodeCatchupMode requests the node to change it's operational mode from
// updateNodeCatchupMode requests the node to change its operational mode from
// catchup mode to normal mode and vice versa.
func (cs *CatchpointCatchupService) updateNodeCatchupMode(catchupModeEnabled bool) {
newCtxCh := cs.node.SetCatchpointCatchupMode(catchupModeEnabled)
Expand Down Expand Up @@ -805,12 +807,27 @@
cs.blocksDownloadPeerSelector = cs.makeCatchpointPeerSelector()
}

func (cs *CatchpointCatchupService) makeCatchpointPeerSelector() *peerSelector {
return makePeerSelector(
cs.net,
[]peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookRelays},
})
func (cs *CatchpointCatchupService) makeCatchpointPeerSelector() peerSelector {
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
wrappedPeerSelectors := []*wrappedPeerSelector{
{
peerClass: network.PeersPhonebookRelays,
peerSelector: makeRankPooledPeerSelector(cs.net,
[]peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookRelays}}),
priority: peerRankInitialFirstPriority,
toleranceFactor: 3,
lastCheckedTime: time.Now(),
},
{
peerClass: network.PeersPhonebookArchivalNodes,
peerSelector: makeRankPooledPeerSelector(cs.net,
[]peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivalNodes}}),
priority: peerRankInitialSecondPriority,
toleranceFactor: 10,
lastCheckedTime: time.Now(),
},
}

return makeClassBasedPeerSelector(wrappedPeerSelectors)
}

// checkLedgerDownload sends a HEAD request to the ledger endpoint of peers to validate the catchpoint's availability
Expand All @@ -821,10 +838,10 @@
if err != nil {
return fmt.Errorf("failed to parse catchpoint label : %v", err)
}
peerSelector := cs.makeCatchpointPeerSelector()
ps := cs.makeCatchpointPeerSelector()

Check warning on line 841 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L841

Added line #L841 was not covered by tests
ledgerFetcher := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config)
for i := 0; i < cs.config.CatchupLedgerDownloadRetryAttempts; i++ {
psp, peerError := peerSelector.getNextPeer()
psp, peerError := ps.getNextPeer()

Check warning on line 844 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L844

Added line #L844 was not covered by tests
if peerError != nil {
return err
}
Expand Down
158 changes: 158 additions & 0 deletions catchup/classBasedPeerSelector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package catchup

import (
"errors"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-deadlock"
"sort"
"time"
)

// The duration after which we reset the downloadFailures for a rankPooledPeerSelector
const lastCheckedDuration = 10 * time.Minute

// classBasedPeerSelector is a rankPooledPeerSelector that tracks and ranks classes of peers based on their response behavior.
// It is used to select the most appropriate peers to download blocks from - this is most useful when catching up
// and needing to figure out whether the blocks can be retrieved from relay nodes or require archive nodes.
type classBasedPeerSelector struct {
mu deadlock.Mutex
peerSelectors []*wrappedPeerSelector
}

func makeClassBasedPeerSelector(peerSelectors []*wrappedPeerSelector) *classBasedPeerSelector {
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
// Sort the peerSelectors by priority
sort.SliceStable(peerSelectors, func(i, j int) bool {
return peerSelectors[i].priority < peerSelectors[j].priority
})
return &classBasedPeerSelector{
peerSelectors: peerSelectors,
}
}

func (c *classBasedPeerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int, int) {
c.mu.Lock()
defer c.mu.Unlock()

peerSelectorSortNeeded := false
poolIdx, peerIdx := -1, -1
for _, wp := range c.peerSelectors {
// See if the peer is in the class, ranking it appropriately if so
poolIdx, peerIdx = wp.peerSelector.rankPeer(psp, rank)
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
if poolIdx < 0 || peerIdx < 0 {
// Peer not found in this class
continue
}
// Peer was in this class, if there was any kind of download issue, we increment the failure count
if rank >= peerRankNoBlockForRound {
wp.downloadFailures++
}

// If we have failed more than the tolerance factor, we re-sort the slice of peerSelectors
if wp.downloadFailures > wp.toleranceFactor {
peerSelectorSortNeeded = true
}
break
}

if peerSelectorSortNeeded {
c.sortPeerSelectors()
}

return poolIdx, peerIdx
}

// sortPeerSelectors sorts the peerSelectors by tolerance factor violation, and then by priority
// It should only be called within a locked context
func (c *classBasedPeerSelector) sortPeerSelectors() {
psUnderTolerance := make([]*wrappedPeerSelector, 0, len(c.peerSelectors))
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
psOverTolerance := make([]*wrappedPeerSelector, 0, len(c.peerSelectors))
for _, wp := range c.peerSelectors {
// If the rankPooledPeerSelector's download failures have not been reset in a while, we reset them
if time.Since(wp.lastCheckedTime) > lastCheckedDuration {
wp.downloadFailures = 0

Check warning on line 88 in catchup/classBasedPeerSelector.go

View check run for this annotation

Codecov / codecov/patch

catchup/classBasedPeerSelector.go#L88

Added line #L88 was not covered by tests
// Reset again here, so we don't keep resetting the same rankPooledPeerSelector
wp.lastCheckedTime = time.Now()

Check warning on line 90 in catchup/classBasedPeerSelector.go

View check run for this annotation

Codecov / codecov/patch

catchup/classBasedPeerSelector.go#L90

Added line #L90 was not covered by tests
}

if wp.downloadFailures <= wp.toleranceFactor {
psUnderTolerance = append(psUnderTolerance, wp)
} else {
psOverTolerance = append(psOverTolerance, wp)
}

}

// Sort the two groups by priority
sortByPriority := func(ps []*wrappedPeerSelector) {
sort.SliceStable(ps, func(i, j int) bool {
return ps[i].priority < ps[j].priority
})
}

sortByPriority(psUnderTolerance)
sortByPriority(psOverTolerance)

//Append the two groups back together
c.peerSelectors = append(psUnderTolerance, psOverTolerance...)
}

func (c *classBasedPeerSelector) peerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) {
c.mu.Lock()
defer c.mu.Unlock()

for _, wp := range c.peerSelectors {
rank = wp.peerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration)
// If rank is peerRankInvalidDownload, we check the next class's rankPooledPeerSelector
if rank >= peerRankInvalidDownload {
continue
}
// Should be a legit ranking, we return it
return rank
}
// If we reached here, we have exhausted all classes without finding the peer
return peerRankInvalidDownload

Check warning on line 129 in catchup/classBasedPeerSelector.go

View check run for this annotation

Codecov / codecov/patch

catchup/classBasedPeerSelector.go#L129

Added line #L129 was not covered by tests
}

func (c *classBasedPeerSelector) getNextPeer() (psp *peerSelectorPeer, err error) {
c.mu.Lock()
defer c.mu.Unlock()
for _, wp := range c.peerSelectors {
psp, err = wp.peerSelector.getNextPeer()
wp.lastCheckedTime = time.Now()
if err != nil {
if errors.Is(err, errPeerSelectorNoPeerPoolsAvailable) {
// We penalize this class the equivalent of one download failure (in case this is transient)
wp.downloadFailures++
}
continue
}
return psp, nil
}
// If we reached here, we have exhausted all classes and still have no peers
return nil, err
}

type wrappedPeerSelector struct {
peerSelector peerSelector // The underlying rankPooledPeerSelector for this class
peerClass network.PeerOption // The class of peers the rankPooledPeerSelector is responsible for
toleranceFactor int // The number of times we can net fail for any reason before we move to the next class's rankPooledPeerSelector
downloadFailures int // The number of times we have failed to download a block from this class's rankPooledPeerSelector since it was last reset
priority int // The original priority of the rankPooledPeerSelector, used for sorting
lastCheckedTime time.Time // The last time we tried to use the rankPooledPeerSelector
}