Skip to content

Commit

Permalink
ledger: make catchpoint generation backward compat
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Jul 24, 2023
1 parent 197bfb6 commit d33eb05
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 31 deletions.
86 changes: 56 additions & 30 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ type catchpointTracker struct {
// enableGeneratingCatchpointFiles determines whether catchpoints files should be generated by the trackers.
enableGeneratingCatchpointFiles bool

// Prepared SQL statements for fast accounts DB lookups.
accountsq trackerdb.AccountsReader

// log copied from ledger
log logging.Logger

Expand Down Expand Up @@ -136,6 +133,9 @@ type catchpointTracker struct {
// roundDigest stores the digest of the block for every round starting with dbRound+1 and every round after it.
roundDigest []crypto.Digest

// consensusVersion stores the consensus versions for every round starting with dbRound+1 and every round after it.
consensusVersion []protocol.ConsensusVersion

// reenableCatchpointsRound is a round where the EnableCatchpointsWithSPContexts feature was enabled via the consensus.
// we avoid generating catchpoints before that round in order to ensure the network remain consistent in the catchpoint
// label being produced. This variable could be "wrong" in two cases -
Expand Down Expand Up @@ -218,7 +218,7 @@ func (ct *catchpointTracker) getSPVerificationData() (encodedData []byte, spVeri
return encodedData, spVerificationHash, nil
}

func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basics.Round, updatingBalancesDuration time.Duration) error {
func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basics.Round, blockProto protocol.ConsensusVersion, updatingBalancesDuration time.Duration) error {
ct.log.Infof("finishing catchpoint's first stage dbRound: %d", dbRound)

var totalKVs uint64
Expand All @@ -229,11 +229,15 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic
var spVerificationEncodedData []byte
var catchpointGenerationStats telemetryspec.CatchpointGenerationEventDetails

// Generate the SP Verification hash and encoded data. The hash is used in the label when tracking catchpoints,
// and the encoded data for that hash will be added to the catchpoint file if catchpoint generation is enabled.
spVerificationEncodedData, spVerificationHash, err := ct.getSPVerificationData()
if err != nil {
return err
params := config.Consensus[blockProto]
if params.EnableCatchpointsWithSPContexts {
// Generate the SP Verification hash and encoded data. The hash is used in the label when tracking catchpoints,
// and the encoded data for that hash will be added to the catchpoint file if catchpoint generation is enabled.
var err error
spVerificationEncodedData, spVerificationHash, err = ct.getSPVerificationData()
if err != nil {
return err
}
}

if ct.enableGeneratingCatchpointFiles {
Expand Down Expand Up @@ -270,7 +274,7 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic

// Possibly finish generating first stage catchpoint db record and data file after
// a crash.
func (ct *catchpointTracker) finishFirstStageAfterCrash(dbRound basics.Round) error {
func (ct *catchpointTracker) finishFirstStageAfterCrash(dbRound basics.Round, blockProto protocol.ConsensusVersion) error {
v, err := ct.catchpointStore.ReadCatchpointStateUint64(
context.Background(), trackerdb.CatchpointStateWritingFirstStageInfo)
if err != nil {
Expand All @@ -289,10 +293,10 @@ func (ct *catchpointTracker) finishFirstStageAfterCrash(dbRound basics.Round) er
return err
}

return ct.finishFirstStage(context.Background(), dbRound, 0)
return ct.finishFirstStage(context.Background(), dbRound, blockProto, 0)
}

func (ct *catchpointTracker) finishCatchpointsAfterCrash(catchpointLookback uint64) error {
func (ct *catchpointTracker) finishCatchpointsAfterCrash(blockProto protocol.ConsensusVersion, catchpointLookback uint64) error {
records, err := ct.catchpointStore.SelectUnfinishedCatchpoints(context.Background())
if err != nil {
return err
Expand All @@ -309,7 +313,7 @@ func (ct *catchpointTracker) finishCatchpointsAfterCrash(catchpointLookback uint
}

err = ct.finishCatchpoint(
context.Background(), record.Round, record.BlockHash, catchpointLookback)
context.Background(), record.Round, record.BlockHash, blockProto, catchpointLookback)
if err != nil {
return err
}
Expand All @@ -318,8 +322,8 @@ func (ct *catchpointTracker) finishCatchpointsAfterCrash(catchpointLookback uint
return nil
}

func (ct *catchpointTracker) recoverFromCrash(dbRound basics.Round) error {
err := ct.finishFirstStageAfterCrash(dbRound)
func (ct *catchpointTracker) recoverFromCrash(dbRound basics.Round, blockProto protocol.ConsensusVersion) error {
err := ct.finishFirstStageAfterCrash(dbRound, blockProto)
if err != nil {
return err
}
Expand All @@ -333,7 +337,7 @@ func (ct *catchpointTracker) recoverFromCrash(dbRound basics.Round) error {
}

if catchpointLookback != 0 {
err = ct.finishCatchpointsAfterCrash(catchpointLookback)
err = ct.finishCatchpointsAfterCrash(blockProto, catchpointLookback)
if err != nil {
return err
}
Expand Down Expand Up @@ -364,6 +368,7 @@ func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, dbRound basics.Rou
}

ct.roundDigest = nil
ct.consensusVersion = nil
ct.catchpointDataWriting = 0
// keep these channel closed if we're not generating catchpoint
ct.catchpointDataSlowWriting = make(chan struct{}, 1)
Expand All @@ -376,18 +381,18 @@ func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, dbRound basics.Rou
return err
}

ct.accountsq, err = ct.dbs.MakeAccountsOptimizedReader()
ct.lastCatchpointLabel, err = ct.catchpointStore.ReadCatchpointStateString(
context.Background(), trackerdb.CatchpointStateLastCatchpoint)
if err != nil {
return
}

ct.lastCatchpointLabel, err = ct.catchpointStore.ReadCatchpointStateString(
context.Background(), trackerdb.CatchpointStateLastCatchpoint)
hdr, err := l.BlockHdr(dbRound)
if err != nil {
return
}

return ct.recoverFromCrash(dbRound)
return ct.recoverFromCrash(dbRound, hdr.CurrentProtocol)
}

// newBlock informs the tracker of a new block from round
Expand All @@ -397,6 +402,7 @@ func (ct *catchpointTracker) newBlock(blk bookkeeping.Block, delta ledgercore.St
defer ct.catchpointsMu.Unlock()

ct.roundDigest = append(ct.roundDigest, blk.Digest())
ct.consensusVersion = append(ct.consensusVersion, blk.CurrentProtocol)

if (config.Consensus[blk.CurrentProtocol].EnableCatchpointsWithSPContexts || ct.forceCatchpointFileWriting) && ct.reenableCatchpointsRound == 0 {
catchpointLookback := config.Consensus[blk.CurrentProtocol].CatchpointLookback
Expand Down Expand Up @@ -522,6 +528,8 @@ func (ct *catchpointTracker) prepareCommit(dcc *deferredCommitContext) error {

dcc.committedRoundDigests = make([]crypto.Digest, dcc.offset)
copy(dcc.committedRoundDigests, ct.roundDigest[:dcc.offset])
dcc.committedProtocolVersion = make([]protocol.ConsensusVersion, dcc.offset)
copy(dcc.committedProtocolVersion, ct.consensusVersion[:dcc.offset])

return nil
}
Expand Down Expand Up @@ -618,6 +626,7 @@ func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommit

ct.catchpointsMu.Lock()
ct.roundDigest = ct.roundDigest[dcc.offset:]
ct.consensusVersion = ct.consensusVersion[dcc.offset:]
ct.catchpointsMu.Unlock()

dcc.updatingBalancesDuration = time.Since(dcc.flushTime)
Expand Down Expand Up @@ -753,9 +762,18 @@ func repackCatchpoint(ctx context.Context, header CatchpointFileHeader, biggestC

// Create a catchpoint (a label and possibly a file with db record) and remove
// the unfinished catchpoint record.
func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound basics.Round, round basics.Round, dataInfo trackerdb.CatchpointFirstStageInfo, blockHash crypto.Digest) error {
func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound basics.Round, round basics.Round, dataInfo trackerdb.CatchpointFirstStageInfo, blockHash crypto.Digest, blockProto protocol.ConsensusVersion) error {
startTime := time.Now()
labelMaker := ledgercore.MakeCatchpointLabelMakerCurrent(round, &blockHash, &dataInfo.TrieBalancesHash, dataInfo.Totals, &dataInfo.StateProofVerificationHash)
var labelMaker ledgercore.CatchpointLabelMaker
var version uint64
params := config.Consensus[blockProto]
if params.EnableCatchpointsWithSPContexts {
labelMaker = ledgercore.MakeCatchpointLabelMakerCurrent(round, &blockHash, &dataInfo.TrieBalancesHash, dataInfo.Totals, &dataInfo.StateProofVerificationHash)
version = CatchpointFileVersionV7
} else {
labelMaker = ledgercore.MakeCatchpointLabelMakerV6(round, &blockHash, &dataInfo.TrieBalancesHash, dataInfo.Totals)
version = CatchpointFileVersionV6
}
label := ledgercore.MakeLabel(labelMaker)

ct.log.Infof(
Expand Down Expand Up @@ -791,7 +809,7 @@ func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound

// Make a catchpoint file.
header := CatchpointFileHeader{
Version: CatchpointFileVersionV7,
Version: version,
BalancesRound: accountsRound,
BlocksRound: round,
Totals: dataInfo.Totals,
Expand Down Expand Up @@ -850,7 +868,7 @@ func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound

// Try create a catchpoint (a label and possibly a file with db record) and remove
// the unfinished catchpoint record.
func (ct *catchpointTracker) finishCatchpoint(ctx context.Context, round basics.Round, blockHash crypto.Digest, catchpointLookback uint64) error {
func (ct *catchpointTracker) finishCatchpoint(ctx context.Context, round basics.Round, blockHash crypto.Digest, blockProto protocol.ConsensusVersion, catchpointLookback uint64) error {
accountsRound := round - basics.Round(catchpointLookback)

ct.log.Infof("finishing catchpoint round: %d accountsRound: %d", round, accountsRound)
Expand All @@ -863,7 +881,7 @@ func (ct *catchpointTracker) finishCatchpoint(ctx context.Context, round basics.
if !exists {
return ct.catchpointStore.DeleteUnfinishedCatchpoint(ctx, round)
}
return ct.createCatchpoint(ctx, accountsRound, round, dataInfo, blockHash)
return ct.createCatchpoint(ctx, accountsRound, round, dataInfo, blockHash, blockProto)
}

// Calculate catchpoint round numbers in [min, max]. `catchpointInterval` must be
Expand Down Expand Up @@ -924,7 +942,9 @@ func (ct *catchpointTracker) pruneFirstStageRecordsData(ctx context.Context, max

func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
if dcc.catchpointFirstStage {
err := ct.finishFirstStage(ctx, dcc.newBase(), dcc.updatingBalancesDuration)
round := dcc.newBase()
blockProto := dcc.committedProtocolVersion[round-dcc.oldBase-1]
err := ct.finishFirstStage(ctx, round, blockProto, dcc.updatingBalancesDuration)
if err != nil {
ct.log.Warnf(
"error finishing catchpoint's first stage dcc.newBase: %d err: %v",
Expand All @@ -934,8 +954,10 @@ func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferr

// Generate catchpoints for rounds in (dcc.oldBase, dcc.newBase].
for _, round := range ct.calculateCatchpointRounds(&dcc.deferredCommitRange) {
blockHash := dcc.committedRoundDigests[round-dcc.oldBase-1]
blockProto := dcc.committedProtocolVersion[round-dcc.oldBase-1]
err := ct.finishCatchpoint(
ctx, round, dcc.committedRoundDigests[round-dcc.oldBase-1], dcc.catchpointLookback)
ctx, round, blockHash, blockProto, dcc.catchpointLookback)
if err != nil {
ct.log.Warnf("error creating catchpoint round: %d err: %v", round, err)
}
Expand Down Expand Up @@ -1157,9 +1179,13 @@ func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, account
return
}

err = catchpointWriter.WriteStateProofVerificationContext(encodedSPData)
if err != nil {
return
// do not write encodedSPData if not provided,
// this is an indication the older catchpoint file is being generated.
if encodedSPData != nil {
err = catchpointWriter.WriteStateProofVerificationContext(encodedSPData)
if err != nil {
return
}
}

for more {
Expand Down
7 changes: 6 additions & 1 deletion ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,10 @@ func createCatchpoint(t *testing.T, ct *catchpointTracker, accountsRound basics.

require.Equal(t, calculateStateProofVerificationHash(t, ml), stateProofVerificationHash)

err = ct.createCatchpoint(context.Background(), accountsRound, round, trackerdb.CatchpointFirstStageInfo{BiggestChunkLen: biggestChunkLen}, crypto.Digest{})
err = ct.createCatchpoint(
context.Background(), accountsRound, round,
trackerdb.CatchpointFirstStageInfo{BiggestChunkLen: biggestChunkLen},
crypto.Digest{}, protocol.ConsensusCurrentVersion)
require.NoError(t, err)
}

Expand Down Expand Up @@ -633,8 +636,10 @@ func TestCatchpointReproducibleLabels(t *testing.T) {

// test to see that after loadFromDisk, all the tracker content is lost ( as expected )
require.NotZero(t, len(ct.roundDigest))
require.NotZero(t, len(ct.consensusVersion))
require.NoError(t, ct.loadFromDisk(ml, ml.Latest()))
require.Zero(t, len(ct.roundDigest))
require.Zero(t, len(ct.consensusVersion))
require.Zero(t, ct.catchpointDataWriting)
select {
case _, closed := <-ct.catchpointDataSlowWriting:
Expand Down
3 changes: 3 additions & 0 deletions ledger/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ type deferredCommitContext struct {
// Block hashes for the committed rounds range.
committedRoundDigests []crypto.Digest

// Consensus versions for the committed rounds range.
committedProtocolVersion []protocol.ConsensusVersion

// on catchpoint rounds, the transaction tail would fill up this field with the hash of the recent 1001 rounds
// of the txtail data. The catchpointTracker would be able to use that for calculating the catchpoint label.
txTailHash crypto.Digest
Expand Down

0 comments on commit d33eb05

Please sign in to comment.