Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ledger: use single SP verification hash/data query for catchpoint tracking & generation #5579

Merged
merged 5 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 28 additions & 6 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,20 @@
return ct.lastCatchpointLabel
}

func (ct *catchpointTracker) getSPVerificationData() (encodedData []byte, spVerificationHash crypto.Digest, err error) {
err = ct.dbs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) error {

Check failure on line 205 in ledger/catchpointtracker.go

View workflow job for this annotation

GitHub Actions / reviewdog-errors

[Lint Errors] reported by reviewdog 🐶 ineffectual assignment to err (ineffassign) Raw Output: ledger/catchpointtracker.go:205:2: ineffectual assignment to err (ineffassign) err = ct.dbs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) error { ^
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need a Transaction scope if you're just reading?

Copy link
Contributor Author

@cce cce Jul 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good point, the place I pulled this out of was inside a Transaction, but now it is read only

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #5592

rawData, err := tx.MakeSpVerificationCtxReader().GetAllSPContexts(ctx)
if err != nil {
return err

Check warning on line 208 in ledger/catchpointtracker.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointtracker.go#L208

Added line #L208 was not covered by tests
}

wrappedData := catchpointStateProofVerificationContext{Data: rawData}
spVerificationHash, encodedData = crypto.EncodeAndHash(wrappedData)
return nil
})
return encodedData, spVerificationHash, nil
cce marked this conversation as resolved.
Show resolved Hide resolved
}

func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basics.Round, updatingBalancesDuration time.Duration) error {
ct.log.Infof("finishing catchpoint's first stage dbRound: %d", dbRound)

Expand All @@ -209,8 +223,16 @@
var totalChunks uint64
var biggestChunkLen uint64
var spVerificationHash crypto.Digest
var spVerificationEncodedData []byte
var catchpointGenerationStats telemetryspec.CatchpointGenerationEventDetails

// Generate the SP Verification hash and encoded data. The hash is used in the label when tracking catchpoints,
// and the encoded data for that hash will be added to the catchpoint file if catchpoint generation is enabled.
spVerificationEncodedData, spVerificationHash, err := ct.getSPVerificationData()
if err != nil {
return err

Check warning on line 233 in ledger/catchpointtracker.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointtracker.go#L233

Added line #L233 was not covered by tests
}

if ct.enableGeneratingCatchpointFiles {
// Generate the catchpoint file. This is done inline so that it will
// block any new accounts from being written. generateCatchpointData()
Expand All @@ -219,8 +241,8 @@
var err error

catchpointGenerationStats.BalancesWriteTime = uint64(updatingBalancesDuration.Nanoseconds())
totalKVs, totalAccounts, totalChunks, biggestChunkLen, spVerificationHash, err = ct.generateCatchpointData(
ctx, dbRound, &catchpointGenerationStats)
totalKVs, totalAccounts, totalChunks, biggestChunkLen, err = ct.generateCatchpointData(
ctx, dbRound, &catchpointGenerationStats, spVerificationEncodedData)
atomic.StoreInt32(&ct.catchpointDataWriting, 0)
if err != nil {
return err
Expand Down Expand Up @@ -1102,7 +1124,7 @@
// - Balance and KV chunk (named balances.x.msgpack).
// ...
// - Balance and KV chunk (named balances.x.msgpack).
func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, accountsRound basics.Round, catchpointGenerationStats *telemetryspec.CatchpointGenerationEventDetails) (totalKVs, totalAccounts, totalChunks, biggestChunkLen uint64, spVerificationHash crypto.Digest, err error) {
func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, accountsRound basics.Round, catchpointGenerationStats *telemetryspec.CatchpointGenerationEventDetails, encodedSPData []byte) (totalKVs, totalAccounts, totalChunks, biggestChunkLen uint64, err error) {
ct.log.Debugf("catchpointTracker.generateCatchpointData() writing catchpoint accounts for round %d", accountsRound)

startTime := time.Now()
Expand Down Expand Up @@ -1132,7 +1154,7 @@
return
}

spVerificationHash, err = catchpointWriter.WriteStateProofVerificationContext()
err = catchpointWriter.WriteStateProofVerificationContext(encodedSPData)
if err != nil {
return
}
Expand Down Expand Up @@ -1187,7 +1209,7 @@
ledgerGeneratecatchpointMicros.AddMicrosecondsSince(start, nil)
if err != nil {
ct.log.Warnf("catchpointTracker.generateCatchpointData() %v", err)
return 0, 0, 0, 0, crypto.Digest{}, err
return 0, 0, 0, 0, err

Check warning on line 1212 in ledger/catchpointtracker.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointtracker.go#L1212

Added line #L1212 was not covered by tests
}

catchpointGenerationStats.FileSize = uint64(catchpointWriter.writtenBytes)
Expand All @@ -1196,7 +1218,7 @@
catchpointGenerationStats.KVsCount = catchpointWriter.totalKVs
catchpointGenerationStats.AccountsRound = uint64(accountsRound)

return catchpointWriter.totalKVs, catchpointWriter.totalAccounts, catchpointWriter.chunkNum, catchpointWriter.biggestChunkLen, spVerificationHash, nil
return catchpointWriter.totalKVs, catchpointWriter.totalAccounts, catchpointWriter.chunkNum, catchpointWriter.biggestChunkLen, nil
}

func (ct *catchpointTracker) recordFirstStageInfo(ctx context.Context, tx trackerdb.TransactionScope, catchpointGenerationStats *telemetryspec.CatchpointGenerationEventDetails, accountsRound basics.Round, totalKVs uint64, totalAccounts uint64, totalChunks uint64, biggestChunkLen uint64, stateProofVerificationHash crypto.Digest) error {
Expand Down
20 changes: 15 additions & 5 deletions ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,12 @@ func TestRecordCatchpointFile(t *testing.T) {
}

func createCatchpoint(t *testing.T, ct *catchpointTracker, accountsRound basics.Round, ml *mockLedgerForTracker, round basics.Round) {
spVerificationEncodedData, stateProofVerificationHash, err := ct.getSPVerificationData()
require.NoError(t, err)

var catchpointGenerationStats telemetryspec.CatchpointGenerationEventDetails
_, _, _, biggestChunkLen, stateProofVerificationHash, err := ct.generateCatchpointData(
context.Background(), accountsRound, &catchpointGenerationStats)
_, _, _, biggestChunkLen, err := ct.generateCatchpointData(
context.Background(), accountsRound, &catchpointGenerationStats, spVerificationEncodedData)
require.NoError(t, err)

require.Equal(t, calculateStateProofVerificationHash(t, ml), stateProofVerificationHash)
Expand Down Expand Up @@ -460,16 +463,18 @@ func BenchmarkLargeCatchpointDataWriting(b *testing.B) {
require.NoError(b, err)

var catchpointGenerationStats telemetryspec.CatchpointGenerationEventDetails
encodedSPData, _, err := ct.getSPVerificationData()
require.NoError(b, err)
b.ResetTimer()
ct.generateCatchpointData(context.Background(), basics.Round(0), &catchpointGenerationStats)
ct.generateCatchpointData(context.Background(), basics.Round(0), &catchpointGenerationStats, encodedSPData)
b.StopTimer()
b.ReportMetric(float64(accountsNumber), "accounts")
}

func TestCatchpointReproducibleLabels(t *testing.T) {
partitiontest.PartitionTest(t)

if runtime.GOARCH == "arm" || runtime.GOARCH == "arm64" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems not related to the PR so just curious what the purpose of the chang eis.

if runtime.GOARCH == "arm" {
t.Skip("This test is too slow on ARM and causes CI builds to time out")
}

Expand Down Expand Up @@ -592,7 +597,12 @@ func TestCatchpointReproducibleLabels(t *testing.T) {
ml2 := ledgerHistory[rnd]
require.NotNil(t, ml2)

ct2 := newCatchpointTracker(t, ml2, cfg, ".")
cfg2 := cfg
// every other iteration modify CatchpointTracking to ensure labels generation does not depends on catchpoint file creation
if rnd%2 == 0 {
cfg2.CatchpointTracking = int64(crypto.RandUint63())%2 + 1 //values 1 or 2
}
Comment on lines +601 to +604
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not guaranteed to modify the tracking value, for what looks like 2 reasons:

  1. the 50% chance assignment, depending on the duration of this loop, might fail to modify the value (rare but flakey behavior)

  2. the loop modifies this rnd value by rnd -= cfg.CatchpointInterval, which in this test appears to be 50. So I would expect rnd%2 to always be true/false for a given test

why not just unconditionally set CatchpointTracking = (CatchpointTracking % 2) + 1 on every loop?

ct2 := newCatchpointTracker(t, ml2, cfg2, ".")
defer ct2.close()
for i := rnd + 1; i <= lastRound; i++ {
blk := bookkeeping.Block{
Expand Down
19 changes: 5 additions & 14 deletions ledger/catchpointwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
"os"
"path/filepath"

"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/ledger/encoded"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/ledger/store/trackerdb"
Expand Down Expand Up @@ -160,35 +159,27 @@
return os.Remove(cw.filePath)
}

func (cw *catchpointWriter) WriteStateProofVerificationContext() (crypto.Digest, error) {
rawData, err := cw.tx.MakeSpVerificationCtxReader().GetAllSPContexts(cw.ctx)
if err != nil {
return crypto.Digest{}, err
}

wrappedData := catchpointStateProofVerificationContext{Data: rawData}
dataHash, encodedData := crypto.EncodeAndHash(wrappedData)

err = cw.tar.WriteHeader(&tar.Header{
func (cw *catchpointWriter) WriteStateProofVerificationContext(encodedData []byte) error {
err := cw.tar.WriteHeader(&tar.Header{
Name: catchpointSPVerificationFileName,
Mode: 0600,
Size: int64(len(encodedData)),
})

if err != nil {
return crypto.Digest{}, err
return err

Check warning on line 170 in ledger/catchpointwriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointwriter.go#L170

Added line #L170 was not covered by tests
}

_, err = cw.tar.Write(encodedData)
if err != nil {
return crypto.Digest{}, err
return err

Check warning on line 175 in ledger/catchpointwriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointwriter.go#L175

Added line #L175 was not covered by tests
}

if chunkLen := uint64(len(encodedData)); cw.biggestChunkLen < chunkLen {
cw.biggestChunkLen = chunkLen
}

return dataHash, nil
return nil
}

// WriteStep works for a short period of time (determined by stepCtx) to get
Expand Down
22 changes: 19 additions & 3 deletions ledger/catchpointwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,13 @@ func verifyStateProofVerificationContextWrite(t *testing.T, data []ledgercore.St
if err != nil {
return err
}
_, err = writer.WriteStateProofVerificationContext()
rawData, err := tx.MakeSpVerificationCtxReader().GetAllSPContexts(ctx)
if err != nil {
return err
}
_, encodedData := crypto.EncodeAndHash(catchpointStateProofVerificationContext{Data: rawData})

err = writer.WriteStateProofVerificationContext(encodedData)
if err != nil {
return err
}
Expand Down Expand Up @@ -260,7 +266,12 @@ func TestBasicCatchpointWriter(t *testing.T) {
if err != nil {
return err
}
_, err = writer.WriteStateProofVerificationContext()
rawData, err := tx.MakeSpVerificationCtxReader().GetAllSPContexts(ctx)
if err != nil {
return err
}
_, encodedData := crypto.EncodeAndHash(catchpointStateProofVerificationContext{Data: rawData})
err = writer.WriteStateProofVerificationContext(encodedData)
if err != nil {
return err
}
Expand Down Expand Up @@ -304,7 +315,12 @@ func testWriteCatchpoint(t *testing.T, rdb trackerdb.Store, datapath string, fil
if err != nil {
return err
}
_, err = writer.WriteStateProofVerificationContext()
rawData, err := tx.MakeSpVerificationCtxReader().GetAllSPContexts(ctx)
if err != nil {
return err
}
_, encodedData := crypto.EncodeAndHash(catchpointStateProofVerificationContext{Data: rawData})
err = writer.WriteStateProofVerificationContext(encodedData)
if err != nil {
return err
}
Expand Down