Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ledger: make catchpoint generation backward compatible #5598

Merged
merged 4 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
101 changes: 69 additions & 32 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@
// enableGeneratingCatchpointFiles determines whether catchpoints files should be generated by the trackers.
enableGeneratingCatchpointFiles bool

// Prepared SQL statements for fast accounts DB lookups.
accountsq trackerdb.AccountsReader

// log copied from ledger
log logging.Logger

Expand Down Expand Up @@ -138,6 +135,9 @@
// roundDigest stores the digest of the block for every round starting with dbRound+1 and every round after it.
roundDigest []crypto.Digest

// consensusVersion stores the consensus versions for every round starting with dbRound+1 and every round after it.
consensusVersion []protocol.ConsensusVersion

// reenableCatchpointsRound is a round where the EnableCatchpointsWithSPContexts feature was enabled via the consensus.
// we avoid generating catchpoints before that round in order to ensure the network remain consistent in the catchpoint
// label being produced. This variable could be "wrong" in two cases -
Expand All @@ -151,9 +151,13 @@
// catchpoint files even before the protocol upgrade took place.
forceCatchpointFileWriting bool

// catchpointsMu protects `roundDigest`, `reenableCatchpointsRound` and
// catchpointsMu protects roundDigest, reenableCatchpointsRound, cachedDBRound and
// `lastCatchpointLabel`.
catchpointsMu deadlock.RWMutex

// cachedDBRound is always exactly tracker DB round (and therefore, accountsRound()),
// cached to use in lookup functions
cachedDBRound basics.Round
}

// initialize initializes the catchpointTracker structure
Expand Down Expand Up @@ -205,7 +209,7 @@
return encodedData, spVerificationHash, nil
}

func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basics.Round, updatingBalancesDuration time.Duration) error {
func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basics.Round, blockProto protocol.ConsensusVersion, updatingBalancesDuration time.Duration) error {
ct.log.Infof("finishing catchpoint's first stage dbRound: %d", dbRound)

var totalKVs uint64
Expand All @@ -216,11 +220,15 @@
var spVerificationEncodedData []byte
var catchpointGenerationStats telemetryspec.CatchpointGenerationEventDetails

// Generate the SP Verification hash and encoded data. The hash is used in the label when tracking catchpoints,
// and the encoded data for that hash will be added to the catchpoint file if catchpoint generation is enabled.
spVerificationEncodedData, spVerificationHash, err := ct.getSPVerificationData()
if err != nil {
return err
params := config.Consensus[blockProto]
if params.EnableCatchpointsWithSPContexts {
jannotti marked this conversation as resolved.
Show resolved Hide resolved
// Generate the SP Verification hash and encoded data. The hash is used in the label when tracking catchpoints,
// and the encoded data for that hash will be added to the catchpoint file if catchpoint generation is enabled.
var err error
spVerificationEncodedData, spVerificationHash, err = ct.getSPVerificationData()
if err != nil {
return err

Check warning on line 230 in ledger/catchpointtracker.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointtracker.go#L230

Added line #L230 was not covered by tests
}
}

if ct.enableGeneratingCatchpointFiles {
Expand Down Expand Up @@ -257,7 +265,7 @@

// Possibly finish generating first stage catchpoint db record and data file after
// a crash.
func (ct *catchpointTracker) finishFirstStageAfterCrash(dbRound basics.Round) error {
func (ct *catchpointTracker) finishFirstStageAfterCrash(dbRound basics.Round, blockProto protocol.ConsensusVersion) error {
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
v, err := ct.catchpointStore.ReadCatchpointStateUint64(
context.Background(), trackerdb.CatchpointStateWritingFirstStageInfo)
if err != nil {
Expand All @@ -274,10 +282,10 @@
return err
}

return ct.finishFirstStage(context.Background(), dbRound, 0)
return ct.finishFirstStage(context.Background(), dbRound, blockProto, 0)
}

func (ct *catchpointTracker) finishCatchpointsAfterCrash(catchpointLookback uint64) error {
func (ct *catchpointTracker) finishCatchpointsAfterCrash(blockProto protocol.ConsensusVersion, catchpointLookback uint64) error {
records, err := ct.catchpointStore.SelectUnfinishedCatchpoints(context.Background())
if err != nil {
return err
Expand All @@ -292,7 +300,7 @@
}

err = ct.finishCatchpoint(
context.Background(), record.Round, record.BlockHash, catchpointLookback)
context.Background(), record.Round, record.BlockHash, blockProto, catchpointLookback)
if err != nil {
return err
}
Expand All @@ -301,8 +309,8 @@
return nil
}

func (ct *catchpointTracker) recoverFromCrash(dbRound basics.Round) error {
err := ct.finishFirstStageAfterCrash(dbRound)
func (ct *catchpointTracker) recoverFromCrash(dbRound basics.Round, blockProto protocol.ConsensusVersion) error {
err := ct.finishFirstStageAfterCrash(dbRound, blockProto)
if err != nil {
return err
}
Expand All @@ -316,7 +324,7 @@
}

if catchpointLookback != 0 {
err = ct.finishCatchpointsAfterCrash(catchpointLookback)
err = ct.finishCatchpointsAfterCrash(blockProto, catchpointLookback)
if err != nil {
return err
}
Expand Down Expand Up @@ -346,11 +354,15 @@
return err
}

ct.catchpointsMu.Lock()
ct.cachedDBRound = dbRound
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
ct.roundDigest = nil
ct.consensusVersion = nil
ct.catchpointDataWriting.Store(0)
// keep these channel closed if we're not generating catchpoint
ct.catchpointDataSlowWriting = make(chan struct{}, 1)
close(ct.catchpointDataSlowWriting)
ct.catchpointsMu.Unlock()
algogm marked this conversation as resolved.
Show resolved Hide resolved

err = ct.dbs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) error {
return ct.initializeHashes(ctx, tx, dbRound)
Expand All @@ -359,18 +371,18 @@
return err
}

ct.accountsq, err = ct.dbs.MakeAccountsOptimizedReader()
ct.lastCatchpointLabel, err = ct.catchpointStore.ReadCatchpointStateString(
context.Background(), trackerdb.CatchpointStateLastCatchpoint)
if err != nil {
return
}

ct.lastCatchpointLabel, err = ct.catchpointStore.ReadCatchpointStateString(
context.Background(), trackerdb.CatchpointStateLastCatchpoint)
hdr, err := l.BlockHdr(dbRound)
if err != nil {
return
}

return ct.recoverFromCrash(dbRound)
return ct.recoverFromCrash(dbRound, hdr.CurrentProtocol)
}

// newBlock informs the tracker of a new block from round
Expand All @@ -380,6 +392,7 @@
defer ct.catchpointsMu.Unlock()

ct.roundDigest = append(ct.roundDigest, blk.Digest())
ct.consensusVersion = append(ct.consensusVersion, blk.CurrentProtocol)

if (config.Consensus[blk.CurrentProtocol].EnableCatchpointsWithSPContexts || ct.forceCatchpointFileWriting) && ct.reenableCatchpointsRound == 0 {
catchpointLookback := config.Consensus[blk.CurrentProtocol].CatchpointLookback
Expand All @@ -396,7 +409,10 @@
// number that can be removed from the blocks database as well as the lookback that this
// tracker maintains.
func (ct *catchpointTracker) committedUpTo(rnd basics.Round) (retRound, lookback basics.Round) {
return rnd, basics.Round(0)
ct.catchpointsMu.RLock()
defer ct.catchpointsMu.RUnlock()
retRound = ct.cachedDBRound
return retRound, basics.Round(0)
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
}

// Calculate whether we have intermediate first stage catchpoint rounds and the
Expand Down Expand Up @@ -505,6 +521,8 @@

dcc.committedRoundDigests = make([]crypto.Digest, dcc.offset)
copy(dcc.committedRoundDigests, ct.roundDigest[:dcc.offset])
dcc.committedProtocolVersion = make([]protocol.ConsensusVersion, dcc.offset)
copy(dcc.committedProtocolVersion, ct.consensusVersion[:dcc.offset])

return nil
}
Expand Down Expand Up @@ -601,6 +619,8 @@

ct.catchpointsMu.Lock()
ct.roundDigest = ct.roundDigest[dcc.offset:]
ct.consensusVersion = ct.consensusVersion[dcc.offset:]
ct.cachedDBRound = dcc.newBase()
ct.catchpointsMu.Unlock()

dcc.updatingBalancesDuration = time.Since(dcc.flushTime)
Expand Down Expand Up @@ -736,9 +756,18 @@

// Create a catchpoint (a label and possibly a file with db record) and remove
// the unfinished catchpoint record.
func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound basics.Round, round basics.Round, dataInfo trackerdb.CatchpointFirstStageInfo, blockHash crypto.Digest) error {
func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound basics.Round, round basics.Round, dataInfo trackerdb.CatchpointFirstStageInfo, blockHash crypto.Digest, blockProto protocol.ConsensusVersion) error {
startTime := time.Now()
labelMaker := ledgercore.MakeCatchpointLabelMakerCurrent(round, &blockHash, &dataInfo.TrieBalancesHash, dataInfo.Totals, &dataInfo.StateProofVerificationHash)
var labelMaker ledgercore.CatchpointLabelMaker
var version uint64
params := config.Consensus[blockProto]
if params.EnableCatchpointsWithSPContexts {
labelMaker = ledgercore.MakeCatchpointLabelMakerCurrent(round, &blockHash, &dataInfo.TrieBalancesHash, dataInfo.Totals, &dataInfo.StateProofVerificationHash)
version = CatchpointFileVersionV7
} else {
labelMaker = ledgercore.MakeCatchpointLabelMakerV6(round, &blockHash, &dataInfo.TrieBalancesHash, dataInfo.Totals)
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
version = CatchpointFileVersionV6
}
label := ledgercore.MakeLabel(labelMaker)

ct.log.Infof(
Expand Down Expand Up @@ -774,7 +803,7 @@

// Make a catchpoint file.
header := CatchpointFileHeader{
Version: CatchpointFileVersionV7,
Version: version,
BalancesRound: accountsRound,
BlocksRound: round,
Totals: dataInfo.Totals,
Expand Down Expand Up @@ -834,7 +863,7 @@

// Try create a catchpoint (a label and possibly a file with db record) and remove
// the unfinished catchpoint record.
func (ct *catchpointTracker) finishCatchpoint(ctx context.Context, round basics.Round, blockHash crypto.Digest, catchpointLookback uint64) error {
func (ct *catchpointTracker) finishCatchpoint(ctx context.Context, round basics.Round, blockHash crypto.Digest, blockProto protocol.ConsensusVersion, catchpointLookback uint64) error {
accountsRound := round - basics.Round(catchpointLookback)

ct.log.Infof("finishing catchpoint round: %d accountsRound: %d", round, accountsRound)
Expand All @@ -847,7 +876,7 @@
if !exists {
return ct.catchpointStore.DeleteUnfinishedCatchpoint(ctx, round)
}
return ct.createCatchpoint(ctx, accountsRound, round, dataInfo, blockHash)
return ct.createCatchpoint(ctx, accountsRound, round, dataInfo, blockHash, blockProto)
}

// Calculate catchpoint round numbers in [min, max]. `catchpointInterval` must be
Expand Down Expand Up @@ -908,7 +937,9 @@

func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
if dcc.catchpointFirstStage {
err := ct.finishFirstStage(ctx, dcc.newBase(), dcc.updatingBalancesDuration)
round := dcc.newBase()
blockProto := dcc.committedProtocolVersion[round-dcc.oldBase-1]
err := ct.finishFirstStage(ctx, round, blockProto, dcc.updatingBalancesDuration)
if err != nil {
ct.log.Warnf(
"error finishing catchpoint's first stage dcc.newBase: %d err: %v",
Expand All @@ -918,8 +949,10 @@

// Generate catchpoints for rounds in (dcc.oldBase, dcc.newBase].
for _, round := range ct.calculateCatchpointRounds(&dcc.deferredCommitRange) {
blockHash := dcc.committedRoundDigests[round-dcc.oldBase-1]
blockProto := dcc.committedProtocolVersion[round-dcc.oldBase-1]
err := ct.finishCatchpoint(
ctx, round, dcc.committedRoundDigests[round-dcc.oldBase-1], dcc.catchpointLookback)
ctx, round, blockHash, blockProto, dcc.catchpointLookback)
if err != nil {
ct.log.Warnf("error creating catchpoint round: %d err: %v", round, err)
}
Expand Down Expand Up @@ -1157,9 +1190,13 @@
return
}

err = catchpointWriter.FileWriteSPVerificationContext(encodedSPData)
if err != nil {
return
// do not write encodedSPData if not provided,
// this is an indication the older catchpoint file is being generated.
if encodedSPData != nil {
err = catchpointWriter.FileWriteSPVerificationContext(encodedSPData)
if err != nil {
return

Check warning on line 1198 in ledger/catchpointtracker.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointtracker.go#L1198

Added line #L1198 was not covered by tests
}
}

for more {
Expand Down
57 changes: 56 additions & 1 deletion ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,10 @@ func createCatchpoint(t *testing.T, ct *catchpointTracker, accountsRound basics.

require.Equal(t, calculateStateProofVerificationHash(t, ml), stateProofVerificationHash)

err = ct.createCatchpoint(context.Background(), accountsRound, round, trackerdb.CatchpointFirstStageInfo{BiggestChunkLen: biggestChunkLen}, crypto.Digest{})
err = ct.createCatchpoint(
context.Background(), accountsRound, round,
trackerdb.CatchpointFirstStageInfo{BiggestChunkLen: biggestChunkLen},
crypto.Digest{}, protocol.ConsensusCurrentVersion)
require.NoError(t, err)
}

Expand Down Expand Up @@ -760,8 +763,10 @@ func TestCatchpointReproducibleLabels(t *testing.T) {

// test to see that after loadFromDisk, all the tracker content is lost ( as expected )
require.NotZero(t, len(ct.roundDigest))
require.NotZero(t, len(ct.consensusVersion))
require.NoError(t, ct.loadFromDisk(ml, ml.Latest()))
require.Zero(t, len(ct.roundDigest))
require.Zero(t, len(ct.consensusVersion))
require.Zero(t, ct.catchpointDataWriting.Load())
select {
case _, closed := <-ct.catchpointDataSlowWriting:
Expand All @@ -771,6 +776,56 @@ func TestCatchpointReproducibleLabels(t *testing.T) {
}
}

// TestCatchpointBackwardCompatibleLabels checks labels before and after EnableCatchpointsWithSPContexts was introduced.
func TestCatchpointBackwardCompatibleLabels(t *testing.T) {
partitiontest.PartitionTest(t)

temporaryDirectory := t.TempDir()

accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)}
ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts)
defer ml.Close()

ct := &catchpointTracker{enableGeneratingCatchpointFiles: false}
conf := config.GetDefaultLocal()

conf.Archival = true
paths := DirsAndPrefix{
ResolvedGenesisDirs: config.ResolvedGenesisDirs{
CatchpointGenesisDir: ".",
HotGenesisDir: ".",
},
}
ct.initialize(conf, paths)

defer ct.close()
ct.dbDirectory = temporaryDirectory
ct.tmpDir = temporaryDirectory

_, err := trackerDBInitialize(ml, true, ct.dbDirectory)
require.NoError(t, err)

err = ct.loadFromDisk(ml, ml.Latest())
require.NoError(t, err)

// create catpoint with the latest version of the code
round := basics.Round(2000)

protos := []protocol.ConsensusVersion{protocol.ConsensusCurrentVersion, protocol.ConsensusV37, protocol.ConsensusV36}
labels := make([]string, len(protos))
for i, proto := range protos {
err = ct.createCatchpoint(
context.Background(), round-1, round,
trackerdb.CatchpointFirstStageInfo{},
crypto.Digest{}, proto)
require.NoError(t, err)
require.NotEmpty(t, ct.lastCatchpointLabel)
labels[i] = ct.lastCatchpointLabel
}
require.NotEqual(t, labels[0], labels[1])
require.Equal(t, labels[1], labels[2])
}

// blockingTracker is a testing tracker used to test "what if" a tracker would get blocked.
type blockingTracker struct {
emptyTracker
Expand Down
3 changes: 3 additions & 0 deletions ledger/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ type deferredCommitContext struct {
// Block hashes for the committed rounds range.
committedRoundDigests []crypto.Digest

// Consensus versions for the committed rounds range.
committedProtocolVersion []protocol.ConsensusVersion
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved

// on catchpoint rounds, the transaction tail would fill up this field with the hash of the recent 1001 rounds
// of the txtail data. The catchpointTracker would be able to use that for calculating the catchpoint label.
txTailHash crypto.Digest
Expand Down