Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catchpoint: store certs with blocks during catchpoint restore #5798

Merged
merged 3 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 17 additions & 14 deletions catchup/catchpointService.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
Expand Down Expand Up @@ -370,6 +371,7 @@ func (cs *CatchpointCatchupService) processStageLatestBlockDownload() (err error

attemptsCount := 0
var blk *bookkeeping.Block
var cert *agreement.Certificate
// check to see if the current ledger might have this block. If so, we should try this first instead of downloading anything.
if ledgerBlock, err := cs.ledger.Block(blockRound); err == nil {
blk = &ledgerBlock
Expand All @@ -384,7 +386,7 @@ func (cs *CatchpointCatchupService) processStageLatestBlockDownload() (err error
blockDownloadDuration := time.Duration(0)
if blk == nil {
var stop bool
blk, blockDownloadDuration, psp, stop, err = cs.fetchBlock(blockRound, uint64(attemptsCount))
blk, cert, blockDownloadDuration, psp, stop, err = cs.fetchBlock(blockRound, uint64(attemptsCount))
if stop {
return err
} else if blk == nil {
Expand Down Expand Up @@ -462,7 +464,7 @@ func (cs *CatchpointCatchupService) processStageLatestBlockDownload() (err error
return cs.abort(fmt.Errorf("processStageLatestBlockDownload failed when calling StoreBalancesRound : %v", err))
}

err = cs.ledgerAccessor.StoreFirstBlock(cs.ctx, blk)
err = cs.ledgerAccessor.StoreFirstBlock(cs.ctx, blk, cert)
if err != nil {
if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts {
// try again.
Expand Down Expand Up @@ -542,6 +544,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
prevBlock := &topBlock
blocksFetched := uint64(1) // we already got the first block in the previous step.
var blk *bookkeeping.Block
var cert *agreement.Certificate
for retryCount := uint64(1); blocksFetched <= lookback; {
if err := cs.ctx.Err(); err != nil {
return cs.stopOrAbort()
Expand All @@ -564,7 +567,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
blockDownloadDuration := time.Duration(0)
if blk == nil {
var stop bool
blk, blockDownloadDuration, psp, stop, err = cs.fetchBlock(topBlock.Round()-basics.Round(blocksFetched), retryCount)
blk, cert, blockDownloadDuration, psp, stop, err = cs.fetchBlock(topBlock.Round()-basics.Round(blocksFetched), retryCount)
if stop {
return err
} else if blk == nil {
Expand Down Expand Up @@ -624,7 +627,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
}

// all good, persist and move on.
err = cs.ledgerAccessor.StoreBlock(cs.ctx, blk)
err = cs.ledgerAccessor.StoreBlock(cs.ctx, blk, cert)
if err != nil {
cs.log.Warnf("processStageBlocksDownload failed to store downloaded staging block for round %d", blk.Round())
cs.updateBlockRetrievalStatistics(-1, -1)
Expand All @@ -649,17 +652,17 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
// fetchBlock uses the internal peer selector blocksDownloadPeerSelector to pick a peer and then attempt to fetch the block requested from that peer.
// The method return stop=true if the caller should exit the current operation
// If the method return a nil block, the caller is expected to retry the operation, increasing the retry counter as needed.
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) {
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, cert *agreement.Certificate, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) {
psp, err = cs.blocksDownloadPeerSelector.getNextPeer()
if err != nil {
if err == errPeerSelectorNoPeerPoolsAvailable {
cs.log.Infof("fetchBlock: unable to obtain a list of peers to retrieve the latest block from; will retry shortly.")
// this is a possible on startup, since the network package might have yet to retrieve the list of peers.
time.Sleep(noPeersAvailableSleepInterval)
return nil, time.Duration(0), psp, false, nil
return nil, nil, time.Duration(0), psp, false, nil
}
err = fmt.Errorf("fetchBlock: unable to obtain a list of peers to retrieve the latest block from : %w", err)
return nil, time.Duration(0), psp, true, cs.abort(err)
return nil, nil, time.Duration(0), psp, true, cs.abort(err)
}
peer := psp.Peer

Expand All @@ -669,26 +672,26 @@ func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount ui
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload)
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
// try again.
return nil, time.Duration(0), psp, false, nil
return nil, nil, time.Duration(0), psp, false, nil
}
return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock: recurring non-HTTP peer was provided by the peer selector"))
return nil, nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock: recurring non-HTTP peer was provided by the peer selector"))
}
fetcher := makeUniversalBlockFetcher(cs.log, cs.net, cs.config)
blk, _, downloadDuration, err = fetcher.fetchBlock(cs.ctx, round, httpPeer)
blk, cert, downloadDuration, err = fetcher.fetchBlock(cs.ctx, round, httpPeer)
if err != nil {
if cs.ctx.Err() != nil {
return nil, time.Duration(0), psp, true, cs.stopOrAbort()
return nil, nil, time.Duration(0), psp, true, cs.stopOrAbort()
}
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
// try again.
cs.log.Infof("Failed to download block %d on attempt %d out of %d. %v", round, retryCount, cs.config.CatchupBlockDownloadRetryAttempts, err)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankDownloadFailed)
return nil, time.Duration(0), psp, false, nil
return nil, nil, time.Duration(0), psp, false, nil
}
return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts"))
return nil, nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts"))
}
// success
return blk, downloadDuration, psp, false, nil
return blk, cert, downloadDuration, psp, false, nil
}

// processStageLedgerDownload is the fifth catchpoint catchup stage. It completes the catchup process, swap the new tables and restart the node functionality.
Expand Down
5 changes: 3 additions & 2 deletions components/mocks/mockCatchpointCatchupAccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package mocks
import (
"context"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
Expand Down Expand Up @@ -86,12 +87,12 @@ func (m *MockCatchpointCatchupAccessor) StoreBalancesRound(ctx context.Context,
}

// StoreFirstBlock stores a single block to the blocks database.
func (m *MockCatchpointCatchupAccessor) StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block) (err error) {
func (m *MockCatchpointCatchupAccessor) StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error) {
return nil
}

// StoreBlock stores a single block to the blocks database.
func (m *MockCatchpointCatchupAccessor) StoreBlock(ctx context.Context, blk *bookkeeping.Block) (err error) {
func (m *MockCatchpointCatchupAccessor) StoreBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error) {
return nil
}

Expand Down
13 changes: 7 additions & 6 deletions ledger/catchupaccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sync"
"time"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/crypto/merkletrie"
Expand Down Expand Up @@ -78,10 +79,10 @@ type CatchpointCatchupAccessor interface {
StoreBalancesRound(ctx context.Context, blk *bookkeeping.Block) (err error)

// StoreFirstBlock stores a single block to the blocks database.
StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block) (err error)
StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error)

// StoreBlock stores a single block to the blocks database.
StoreBlock(ctx context.Context, blk *bookkeeping.Block) (err error)
StoreBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error)

// FinishBlocks concludes the catchup of the blocks database.
FinishBlocks(ctx context.Context, applyChanges bool) (err error)
Expand Down Expand Up @@ -1055,12 +1056,12 @@ func (c *catchpointCatchupAccessorImpl) StoreBalancesRound(ctx context.Context,
}

// StoreFirstBlock stores a single block to the blocks database.
func (c *catchpointCatchupAccessorImpl) StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block) (err error) {
func (c *catchpointCatchupAccessorImpl) StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error) {
blockDbs := c.ledger.blockDB()
start := time.Now()
ledgerStorefirstblockCount.Inc(nil)
err = blockDbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
return blockdb.BlockStartCatchupStaging(tx, *blk)
return blockdb.BlockStartCatchupStaging(tx, *blk, *cert)
})
ledgerStorefirstblockMicros.AddMicrosecondsSince(start, nil)
if err != nil {
Expand All @@ -1070,12 +1071,12 @@ func (c *catchpointCatchupAccessorImpl) StoreFirstBlock(ctx context.Context, blk
}

// StoreBlock stores a single block to the blocks database.
func (c *catchpointCatchupAccessorImpl) StoreBlock(ctx context.Context, blk *bookkeeping.Block) (err error) {
func (c *catchpointCatchupAccessorImpl) StoreBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error) {
blockDbs := c.ledger.blockDB()
start := time.Now()
ledgerCatchpointStoreblockCount.Inc(nil)
err = blockDbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
return blockdb.BlockPutStaging(tx, *blk)
return blockdb.BlockPutStaging(tx, *blk, *cert)
})
ledgerCatchpointStoreblockMicros.AddMicrosecondsSince(start, nil)
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions ledger/catchupaccessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
Expand Down Expand Up @@ -162,7 +163,7 @@ func initializeTestCatchupAccessor(t *testing.T, l *Ledger, accountsCount uint64
require.NoError(t, err)

// We do this to initialize the catchpointblocks table. Needed to be able to use CompleteCatchup.
err = catchpointAccessor.StoreFirstBlock(ctx, &bookkeeping.Block{})
err = catchpointAccessor.StoreFirstBlock(ctx, &bookkeeping.Block{}, &agreement.Certificate{})
require.NoError(t, err)

// We do this to initialize the accounttotals table. Needed to be able to use CompleteCatchup.
Expand Down Expand Up @@ -441,6 +442,7 @@ func TestVerifyCatchpoint(t *testing.T) {

// actual testing...
var blk bookkeeping.Block
var cert agreement.Certificate
err = catchpointAccessor.VerifyCatchpoint(ctx, &blk)
require.Error(t, err)

Expand All @@ -455,14 +457,14 @@ func TestVerifyCatchpoint(t *testing.T) {
err = catchpointAccessor.StoreBalancesRound(ctx, &blk)
require.NoError(t, err)
// StoreFirstBlock is a dumb wrapper on some db logic
err = catchpointAccessor.StoreFirstBlock(ctx, &blk)
err = catchpointAccessor.StoreFirstBlock(ctx, &blk, &cert)
require.NoError(t, err)

_, err = catchpointAccessor.EnsureFirstBlock(ctx)
require.NoError(t, err)

blk.BlockHeader.Round++
err = catchpointAccessor.StoreBlock(ctx, &blk)
err = catchpointAccessor.StoreBlock(ctx, &blk, &cert)
require.NoError(t, err)

// TODO: write a case with working no-err
Expand Down
10 changes: 6 additions & 4 deletions ledger/store/blockdb/blockdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func BlockForgetBefore(tx *sql.Tx, rnd basics.Round) error {
}

// BlockStartCatchupStaging initializes catchup for catchpoint
func BlockStartCatchupStaging(tx *sql.Tx, blk bookkeeping.Block) error {
func BlockStartCatchupStaging(tx *sql.Tx, blk bookkeeping.Block, cert agreement.Certificate) error {
// delete the old catchpointblocks table, if there is such.
for _, stmt := range blockResetExprs {
stmt = strings.Replace(stmt, "blocks", "catchpointblocks", 1)
Expand All @@ -262,11 +262,12 @@ func BlockStartCatchupStaging(tx *sql.Tx, blk bookkeeping.Block) error {
}

// insert the top entry to the blocks table.
_, err := tx.Exec("INSERT INTO catchpointblocks (rnd, proto, hdrdata, blkdata) VALUES (?, ?, ?, ?)",
_, err := tx.Exec("INSERT INTO catchpointblocks (rnd, proto, hdrdata, blkdata, certdata) VALUES (?, ?, ?, ?)",
blk.Round(),
blk.CurrentProtocol,
protocol.Encode(&blk.BlockHeader),
protocol.Encode(&blk),
protocol.Encode(&cert),
)
if err != nil {
return err
Expand Down Expand Up @@ -305,13 +306,14 @@ func BlockAbortCatchup(tx *sql.Tx) error {
}

// BlockPutStaging store a block into catchpoint staging table
func BlockPutStaging(tx *sql.Tx, blk bookkeeping.Block) (err error) {
func BlockPutStaging(tx *sql.Tx, blk bookkeeping.Block, cert agreement.Certificate) (err error) {
// insert the new entry
_, err = tx.Exec("INSERT INTO catchpointblocks (rnd, proto, hdrdata, blkdata) VALUES (?, ?, ?, ?)",
_, err = tx.Exec("INSERT INTO catchpointblocks (rnd, proto, hdrdata, blkdata, certdata) VALUES (?, ?, ?, ?)",
blk.Round(),
blk.CurrentProtocol,
protocol.Encode(&blk.BlockHeader),
protocol.Encode(&blk),
protocol.Encode(&cert),
)
if err != nil {
return err
Expand Down