Skip to content

Commit

Permalink
ledger: move catchpoint staging into a storage package (#4836)
Browse files Browse the repository at this point in the history
  • Loading branch information
icorderi committed Nov 30, 2022
1 parent d19f2aa commit 7aa2f8c
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 347 deletions.
337 changes: 33 additions & 304 deletions ledger/accountdb.go

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion ledger/accountdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,8 @@ func benchmarkWriteCatchpointStagingBalancesSub(b *testing.B, ascendingOrder boo
require.NoError(b, err)
b.StartTimer()
err = l.trackerDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
err = writeCatchpointStagingBalances(ctx, tx, normalizedAccountBalances)
crw := store.NewCatchpointSQLReaderWriter(tx)
err = crw.WriteCatchpointStagingBalances(ctx, normalizedAccountBalances)
return
})

Expand Down
3 changes: 2 additions & 1 deletion ledger/catchpointwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,8 @@ func testNewLedgerFromCatchpoint(t *testing.T, catchpointWriterReadAccess db.Acc
require.NoError(t, err)

err = l.trackerDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
err := applyCatchpointStagingBalances(ctx, tx, 0, 0)
crw := store.NewCatchpointSQLReaderWriter(tx)
err := crw.ApplyCatchpointStagingBalances(ctx, 0, 0)
return err
})
require.NoError(t, err)
Expand Down
78 changes: 46 additions & 32 deletions ledger/catchupaccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ type CatchpointCatchupAccessor interface {
}

type stagingWriter interface {
writeBalances(context.Context, []normalizedAccountBalance) error
writeCreatables(context.Context, []normalizedAccountBalance) error
writeHashes(context.Context, []normalizedAccountBalance) error
writeBalances(context.Context, []store.NormalizedAccountBalance) error
writeCreatables(context.Context, []store.NormalizedAccountBalance) error
writeHashes(context.Context, []store.NormalizedAccountBalance) error
writeKVs(context.Context, []encodedKVRecordV6) error
isShared() bool
}
Expand All @@ -104,27 +104,41 @@ type stagingWriterImpl struct {
wdb db.Accessor
}

func (w *stagingWriterImpl) writeBalances(ctx context.Context, balances []normalizedAccountBalance) error {
func (w *stagingWriterImpl) writeBalances(ctx context.Context, balances []store.NormalizedAccountBalance) error {
return w.wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
return writeCatchpointStagingBalances(ctx, tx, balances)
crw := store.NewCatchpointSQLReaderWriter(tx)
return crw.WriteCatchpointStagingBalances(ctx, balances)
})
}

func (w *stagingWriterImpl) writeKVs(ctx context.Context, kvrs []encodedKVRecordV6) error {
return w.wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
return writeCatchpointStagingKVs(ctx, tx, kvrs)
crw := store.NewCatchpointSQLReaderWriter(tx)

keys := make([][]byte, len(kvrs))
values := make([][]byte, len(kvrs))
hashes := make([][]byte, len(kvrs))
for i := 0; i < len(kvrs); i++ {
keys[i] = kvrs[i].Key
values[i] = kvrs[i].Value
hashes[i] = kvHashBuilderV6(string(keys[i]), values[i])
}

return crw.WriteCatchpointStagingKVs(ctx, keys, values, hashes)
})
}

func (w *stagingWriterImpl) writeCreatables(ctx context.Context, balances []normalizedAccountBalance) error {
func (w *stagingWriterImpl) writeCreatables(ctx context.Context, balances []store.NormalizedAccountBalance) error {
return w.wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
return writeCatchpointStagingCreatable(ctx, tx, balances)
crw := store.NewCatchpointSQLReaderWriter(tx)
return crw.WriteCatchpointStagingCreatable(ctx, balances)
})
}

func (w *stagingWriterImpl) writeHashes(ctx context.Context, balances []normalizedAccountBalance) error {
func (w *stagingWriterImpl) writeHashes(ctx context.Context, balances []store.NormalizedAccountBalance) error {
return w.wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
err := writeCatchpointStagingHashes(ctx, tx, balances)
crw := store.NewCatchpointSQLReaderWriter(tx)
err := crw.WriteCatchpointStagingHashes(ctx, balances)
return err
})
}
Expand Down Expand Up @@ -244,7 +258,7 @@ func (c *catchpointCatchupAccessorImpl) ResetStagingBalances(ctx context.Context
ledgerResetstagingbalancesCount.Inc(nil)
err = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
crw := store.NewCatchpointSQLReaderWriter(tx)
err = resetCatchpointStagingBalances(ctx, tx, newCatchup)
err = crw.ResetCatchpointStagingBalances(ctx, newCatchup)
if err != nil {
return fmt.Errorf("unable to reset catchpoint catchup balances : %v", err)
}
Expand Down Expand Up @@ -371,7 +385,7 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte
start := time.Now()
ledgerProcessstagingbalancesCount.Inc(nil)

var normalizedAccountBalances []normalizedAccountBalance
var normalizedAccountBalances []store.NormalizedAccountBalance
var expectingMoreEntries []bool
var chunkKVs []encodedKVRecordV6

Expand Down Expand Up @@ -423,11 +437,11 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte
// keep track of number of resources processed for each account
for i, balance := range normalizedAccountBalances {
// missing resources for this account
if expectingSpecificAccount && balance.address != nextExpectedAccount {
if expectingSpecificAccount && balance.Address != nextExpectedAccount {
return fmt.Errorf("processStagingBalances received incomplete chunks for account %v", nextExpectedAccount)
}

for _, resData := range balance.resources {
for _, resData := range balance.Resources {
if resData.IsApp() && resData.IsOwning() {
c.acctResCnt.totalAppParams++
}
Expand All @@ -443,43 +457,43 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte
}
// check that counted resources adds up for this account
if !expectingMoreEntries[i] {
if c.acctResCnt.totalAppParams != balance.accountData.TotalAppParams {
if c.acctResCnt.totalAppParams != balance.AccountData.TotalAppParams {
return fmt.Errorf(
"processStagingBalances received %d appParams for account %v, expected %d",
c.acctResCnt.totalAppParams,
balance.address,
balance.accountData.TotalAppParams,
balance.Address,
balance.AccountData.TotalAppParams,
)
}
if c.acctResCnt.totalAppLocalStates != balance.accountData.TotalAppLocalStates {
if c.acctResCnt.totalAppLocalStates != balance.AccountData.TotalAppLocalStates {
return fmt.Errorf(
"processStagingBalances received %d appLocalStates for account %v, expected %d",
c.acctResCnt.totalAppParams,
balance.address,
balance.accountData.TotalAppLocalStates,
balance.Address,
balance.AccountData.TotalAppLocalStates,
)
}
if c.acctResCnt.totalAssetParams != balance.accountData.TotalAssetParams {
if c.acctResCnt.totalAssetParams != balance.AccountData.TotalAssetParams {
return fmt.Errorf(
"processStagingBalances received %d assetParams for account %v, expected %d",
c.acctResCnt.totalAppParams,
balance.address,
balance.accountData.TotalAssetParams,
balance.Address,
balance.AccountData.TotalAssetParams,
)
}
if c.acctResCnt.totalAssets != balance.accountData.TotalAssets {
if c.acctResCnt.totalAssets != balance.AccountData.TotalAssets {
return fmt.Errorf(
"processStagingBalances received %d assets for account %v, expected %d",
c.acctResCnt.totalAppParams,
balance.address,
balance.accountData.TotalAssets,
balance.Address,
balance.AccountData.TotalAssets,
)
}
c.acctResCnt = catchpointAccountResourceCounter{}
nextExpectedAccount = basics.Address{}
expectingSpecificAccount = false
} else {
nextExpectedAccount = balance.address
nextExpectedAccount = balance.Address
expectingSpecificAccount = true
}
}
Expand Down Expand Up @@ -515,7 +529,7 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte
defer wg.Done()
hasCreatables := false
for _, accBal := range normalizedAccountBalances {
for _, res := range accBal.resources {
for _, res := range accBal.Resources {
if res.IsOwning() {
hasCreatables = true
break
Expand Down Expand Up @@ -582,8 +596,8 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte
progress.ProcessedBytes += uint64(len(bytes))
progress.ProcessedKVs += uint64(len(chunkKVs))
for _, acctBal := range normalizedAccountBalances {
progress.TotalAccountHashes += uint64(len(acctBal.accountHashes))
if !acctBal.partialBalance {
progress.TotalAccountHashes += uint64(len(acctBal.AccountHashes))
if !acctBal.PartialBalance {
progress.ProcessedAccounts++
}
}
Expand Down Expand Up @@ -1035,7 +1049,7 @@ func (c *catchpointCatchupAccessorImpl) finishBalances(ctx context.Context) (err
}
}

err = applyCatchpointStagingBalances(ctx, tx, basics.Round(balancesRound), basics.Round(hashRound))
err = crw.ApplyCatchpointStagingBalances(ctx, basics.Round(balancesRound), basics.Round(hashRound))
if err != nil {
return err
}
Expand All @@ -1045,7 +1059,7 @@ func (c *catchpointCatchupAccessorImpl) finishBalances(ctx context.Context) (err
return err
}

err = resetCatchpointStagingBalances(ctx, tx, false)
err = crw.ResetCatchpointStagingBalances(ctx, false)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions ledger/catchupaccessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,21 +427,21 @@ type testStagingWriter struct {
hashes map[[4 + crypto.DigestSize]byte]int
}

func (w *testStagingWriter) writeBalances(ctx context.Context, balances []normalizedAccountBalance) error {
func (w *testStagingWriter) writeBalances(ctx context.Context, balances []store.NormalizedAccountBalance) error {
return nil
}

func (w *testStagingWriter) writeCreatables(ctx context.Context, balances []normalizedAccountBalance) error {
func (w *testStagingWriter) writeCreatables(ctx context.Context, balances []store.NormalizedAccountBalance) error {
return nil
}

func (w *testStagingWriter) writeKVs(ctx context.Context, kvrs []encodedKVRecordV6) error {
return nil
}

func (w *testStagingWriter) writeHashes(ctx context.Context, balances []normalizedAccountBalance) error {
func (w *testStagingWriter) writeHashes(ctx context.Context, balances []store.NormalizedAccountBalance) error {
for _, bal := range balances {
for _, hash := range bal.accountHashes {
for _, hash := range bal.AccountHashes {
var key [4 + crypto.DigestSize]byte
require.Len(w.t, hash, 4+crypto.DigestSize)
copy(key[:], hash)
Expand Down
7 changes: 2 additions & 5 deletions ledger/store/accountsV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (r *accountsV2Reader) LoadAllFullAccounts(
return
}

// LoadFullAccount converts baseAccountData into basics.AccountData and loads all resources as needed
// LoadFullAccount converts BaseAccountData into basics.AccountData and loads all resources as needed
func (r *accountsV2Reader) LoadFullAccount(ctx context.Context, resourcesTable string, addr basics.Address, addrid int64, data BaseAccountData) (ad basics.AccountData, err error) {
ad = data.GetAccountData()

Expand Down Expand Up @@ -403,11 +403,8 @@ func (r *accountsV2Reader) LoadFullAccount(ctx context.Context, resourcesTable s
if err == nil && uint64(len(ad.AppLocalStates)) != data.TotalAppLocalStates {
err = fmt.Errorf("%s app local states mismatch: %d != %d", addr.String(), len(ad.AppLocalStates), data.TotalAppLocalStates)
}
if err != nil {
return
}

return
return ad, err
}

func (r *accountsV2Reader) AccountsOnlineRoundParams() (onlineRoundParamsData []ledgercore.OnlineRoundParamsData, endRound basics.Round, err error) {
Expand Down

0 comments on commit 7aa2f8c

Please sign in to comment.