diff --git a/ledger/accountdb.go b/ledger/accountdb.go index 17685b919d..c79fc45139 100644 --- a/ledger/accountdb.go +++ b/ledger/accountdb.go @@ -81,26 +81,16 @@ var creatablesMigration = []string{ `ALTER TABLE assetcreators ADD COLUMN ctype INTEGER DEFAULT 0`, } -// createNormalizedOnlineBalanceIndex handles accountbase/catchpointbalances tables -func createNormalizedOnlineBalanceIndex(idxname string, tablename string) string { - return fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s - ON %s ( normalizedonlinebalance, address, data ) WHERE normalizedonlinebalance>0`, idxname, tablename) -} - // createNormalizedOnlineBalanceIndexOnline handles onlineaccounts/catchpointonlineaccounts tables func createNormalizedOnlineBalanceIndexOnline(idxname string, tablename string) string { return fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s ON %s ( normalizedonlinebalance, address )`, idxname, tablename) } -func createUniqueAddressBalanceIndex(idxname string, tablename string) string { - return fmt.Sprintf(`CREATE UNIQUE INDEX IF NOT EXISTS %s ON %s (address)`, idxname, tablename) -} - var createOnlineAccountIndex = []string{ `ALTER TABLE accountbase ADD COLUMN normalizedonlinebalance INTEGER`, - createNormalizedOnlineBalanceIndex("onlineaccountbals", "accountbase"), + store.CreateNormalizedOnlineBalanceIndex("onlineaccountbals", "accountbase"), } var createResourcesTable = []string{ @@ -271,39 +261,18 @@ const MaxEncodedBaseAccountDataSize = 350 // this number is verified by the TestEncodedBaseResourceSize function. const MaxEncodedBaseResourceDataSize = 20000 -// normalizedAccountBalance is a staging area for a catchpoint file account information before it's being added to the catchpoint staging tables. -type normalizedAccountBalance struct { - // The public key address to which the account belongs. - address basics.Address - // accountData contains the baseAccountData for that account. - accountData store.BaseAccountData - // resources is a map, where the key is the creatable index, and the value is the resource data. - resources map[basics.CreatableIndex]store.ResourcesData - // encodedAccountData contains the baseAccountData encoded bytes that are going to be written to the accountbase table. - encodedAccountData []byte - // accountHashes contains a list of all the hashes that would need to be added to the merkle trie for that account. - // on V6, we could have multiple hashes, since we have separate account/resource hashes. - accountHashes [][]byte - // normalizedBalance contains the normalized balance for the account. - normalizedBalance uint64 - // encodedResources provides the encoded form of the resources - encodedResources map[basics.CreatableIndex][]byte - // partial balance indicates that the original account balance was split into multiple parts in catchpoint creation time - partialBalance bool -} - // prepareNormalizedBalancesV5 converts an array of encodedBalanceRecordV5 into an equal size array of normalizedAccountBalances. -func prepareNormalizedBalancesV5(bals []encodedBalanceRecordV5, proto config.ConsensusParams) (normalizedAccountBalances []normalizedAccountBalance, err error) { - normalizedAccountBalances = make([]normalizedAccountBalance, len(bals)) +func prepareNormalizedBalancesV5(bals []encodedBalanceRecordV5, proto config.ConsensusParams) (normalizedAccountBalances []store.NormalizedAccountBalance, err error) { + normalizedAccountBalances = make([]store.NormalizedAccountBalance, len(bals)) for i, balance := range bals { - normalizedAccountBalances[i].address = balance.Address + normalizedAccountBalances[i].Address = balance.Address var accountDataV5 basics.AccountData err = protocol.Decode(balance.AccountData, &accountDataV5) if err != nil { return nil, err } - normalizedAccountBalances[i].accountData.SetAccountData(&accountDataV5) - normalizedAccountBalances[i].normalizedBalance = accountDataV5.NormalizedOnlineBalance(proto) + normalizedAccountBalances[i].AccountData.SetAccountData(&accountDataV5) + normalizedAccountBalances[i].NormalizedBalance = accountDataV5.NormalizedOnlineBalance(proto) type resourcesRow struct { aidx basics.CreatableIndex store.ResourcesData @@ -316,64 +285,64 @@ func prepareNormalizedBalancesV5(bals []encodedBalanceRecordV5, proto config.Con if err = accountDataResources(context.Background(), &accountDataV5, 0, addResourceRow); err != nil { return nil, err } - normalizedAccountBalances[i].accountHashes = make([][]byte, 1) - normalizedAccountBalances[i].accountHashes[0] = accountHashBuilder(balance.Address, accountDataV5, balance.AccountData) + normalizedAccountBalances[i].AccountHashes = make([][]byte, 1) + normalizedAccountBalances[i].AccountHashes[0] = accountHashBuilder(balance.Address, accountDataV5, balance.AccountData) if len(resources) > 0 { - normalizedAccountBalances[i].resources = make(map[basics.CreatableIndex]store.ResourcesData, len(resources)) - normalizedAccountBalances[i].encodedResources = make(map[basics.CreatableIndex][]byte, len(resources)) + normalizedAccountBalances[i].Resources = make(map[basics.CreatableIndex]store.ResourcesData, len(resources)) + normalizedAccountBalances[i].EncodedResources = make(map[basics.CreatableIndex][]byte, len(resources)) } for _, resource := range resources { - normalizedAccountBalances[i].resources[resource.aidx] = resource.ResourcesData - normalizedAccountBalances[i].encodedResources[resource.aidx] = protocol.Encode(&resource.ResourcesData) + normalizedAccountBalances[i].Resources[resource.aidx] = resource.ResourcesData + normalizedAccountBalances[i].EncodedResources[resource.aidx] = protocol.Encode(&resource.ResourcesData) } - normalizedAccountBalances[i].encodedAccountData = protocol.Encode(&normalizedAccountBalances[i].accountData) + normalizedAccountBalances[i].EncodedAccountData = protocol.Encode(&normalizedAccountBalances[i].AccountData) } return } // prepareNormalizedBalancesV6 converts an array of encodedBalanceRecordV6 into an equal size array of normalizedAccountBalances. -func prepareNormalizedBalancesV6(bals []encodedBalanceRecordV6, proto config.ConsensusParams) (normalizedAccountBalances []normalizedAccountBalance, err error) { - normalizedAccountBalances = make([]normalizedAccountBalance, len(bals)) +func prepareNormalizedBalancesV6(bals []encodedBalanceRecordV6, proto config.ConsensusParams) (normalizedAccountBalances []store.NormalizedAccountBalance, err error) { + normalizedAccountBalances = make([]store.NormalizedAccountBalance, len(bals)) for i, balance := range bals { - normalizedAccountBalances[i].address = balance.Address - err = protocol.Decode(balance.AccountData, &(normalizedAccountBalances[i].accountData)) + normalizedAccountBalances[i].Address = balance.Address + err = protocol.Decode(balance.AccountData, &(normalizedAccountBalances[i].AccountData)) if err != nil { return nil, err } - normalizedAccountBalances[i].normalizedBalance = basics.NormalizedOnlineAccountBalance( - normalizedAccountBalances[i].accountData.Status, - normalizedAccountBalances[i].accountData.RewardsBase, - normalizedAccountBalances[i].accountData.MicroAlgos, + normalizedAccountBalances[i].NormalizedBalance = basics.NormalizedOnlineAccountBalance( + normalizedAccountBalances[i].AccountData.Status, + normalizedAccountBalances[i].AccountData.RewardsBase, + normalizedAccountBalances[i].AccountData.MicroAlgos, proto) - normalizedAccountBalances[i].encodedAccountData = balance.AccountData + normalizedAccountBalances[i].EncodedAccountData = balance.AccountData curHashIdx := 0 if balance.ExpectingMoreEntries { // There is a single chunk in the catchpoint file with ExpectingMoreEntries // set to false for this account. There may be multiple chunks with // ExpectingMoreEntries set to true. In this case, we do not have to add the // account's own hash to accountHashes. - normalizedAccountBalances[i].accountHashes = make([][]byte, len(balance.Resources)) - normalizedAccountBalances[i].partialBalance = true + normalizedAccountBalances[i].AccountHashes = make([][]byte, len(balance.Resources)) + normalizedAccountBalances[i].PartialBalance = true } else { - normalizedAccountBalances[i].accountHashes = make([][]byte, 1+len(balance.Resources)) - normalizedAccountBalances[i].accountHashes[0] = accountHashBuilderV6(balance.Address, &normalizedAccountBalances[i].accountData, balance.AccountData) + normalizedAccountBalances[i].AccountHashes = make([][]byte, 1+len(balance.Resources)) + normalizedAccountBalances[i].AccountHashes[0] = accountHashBuilderV6(balance.Address, &normalizedAccountBalances[i].AccountData, balance.AccountData) curHashIdx++ } if len(balance.Resources) > 0 { - normalizedAccountBalances[i].resources = make(map[basics.CreatableIndex]store.ResourcesData, len(balance.Resources)) - normalizedAccountBalances[i].encodedResources = make(map[basics.CreatableIndex][]byte, len(balance.Resources)) + normalizedAccountBalances[i].Resources = make(map[basics.CreatableIndex]store.ResourcesData, len(balance.Resources)) + normalizedAccountBalances[i].EncodedResources = make(map[basics.CreatableIndex][]byte, len(balance.Resources)) for cidx, res := range balance.Resources { var resData store.ResourcesData err = protocol.Decode(res, &resData) if err != nil { return nil, err } - normalizedAccountBalances[i].accountHashes[curHashIdx], err = resourcesHashBuilderV6(&resData, balance.Address, basics.CreatableIndex(cidx), resData.UpdateRound, res) + normalizedAccountBalances[i].AccountHashes[curHashIdx], err = resourcesHashBuilderV6(&resData, balance.Address, basics.CreatableIndex(cidx), resData.UpdateRound, res) if err != nil { return nil, err } - normalizedAccountBalances[i].resources[basics.CreatableIndex(cidx)] = resData - normalizedAccountBalances[i].encodedResources[basics.CreatableIndex(cidx)] = res + normalizedAccountBalances[i].Resources[basics.CreatableIndex(cidx)] = resData + normalizedAccountBalances[i].EncodedResources[basics.CreatableIndex(cidx)] = res curHashIdx++ } } @@ -889,99 +858,6 @@ func (a *compactOnlineAccountDeltas) updateOld(idx int, old store.PersistedOnlin a.deltas[idx].oldAcct = old } -// writeCatchpointStagingBalances inserts all the account balances in the provided array into the catchpoint balance staging table catchpointbalances. -func writeCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, bals []normalizedAccountBalance) error { - selectAcctStmt, err := tx.PrepareContext(ctx, "SELECT rowid FROM catchpointbalances WHERE address = ?") - if err != nil { - return err - } - - insertAcctStmt, err := tx.PrepareContext(ctx, "INSERT INTO catchpointbalances(address, normalizedonlinebalance, data) VALUES(?, ?, ?)") - if err != nil { - return err - } - - insertRscStmt, err := tx.PrepareContext(ctx, "INSERT INTO catchpointresources(addrid, aidx, data) VALUES(?, ?, ?)") - if err != nil { - return err - } - - var result sql.Result - var rowID int64 - for _, balance := range bals { - result, err = insertAcctStmt.ExecContext(ctx, balance.address[:], balance.normalizedBalance, balance.encodedAccountData) - if err == nil { - var aff int64 - aff, err = result.RowsAffected() - if err != nil { - return err - } - if aff != 1 { - return fmt.Errorf("number of affected record in insert was expected to be one, but was %d", aff) - } - rowID, err = result.LastInsertId() - if err != nil { - return err - } - } else { - var sqliteErr sqlite3.Error - if errors.As(err, &sqliteErr) && sqliteErr.Code == sqlite3.ErrConstraint && sqliteErr.ExtendedCode == sqlite3.ErrConstraintUnique { - // address exists: overflowed account record: find addrid - err = selectAcctStmt.QueryRowContext(ctx, balance.address[:]).Scan(&rowID) - if err != nil { - return err - } - } else { - return err - } - } - - // write resources - for aidx := range balance.resources { - var result sql.Result - result, err = insertRscStmt.ExecContext(ctx, rowID, aidx, balance.encodedResources[aidx]) - if err != nil { - return err - } - var aff int64 - aff, err = result.RowsAffected() - if err != nil { - return err - } - if aff != 1 { - return fmt.Errorf("number of affected record in insert was expected to be one, but was %d", aff) - } - } - } - return nil -} - -// writeCatchpointStagingHashes inserts all the account hashes in the provided array into the catchpoint pending hashes table catchpointpendinghashes. -func writeCatchpointStagingHashes(ctx context.Context, tx *sql.Tx, bals []normalizedAccountBalance) error { - insertStmt, err := tx.PrepareContext(ctx, "INSERT INTO catchpointpendinghashes(data) VALUES(?)") - if err != nil { - return err - } - - for _, balance := range bals { - for _, hash := range balance.accountHashes { - result, err := insertStmt.ExecContext(ctx, hash[:]) - if err != nil { - return err - } - - aff, err := result.RowsAffected() - if err != nil { - return err - } - if aff != 1 { - return fmt.Errorf("number of affected record in insert was expected to be one, but was %d", aff) - } - } - } - return nil -} - // createCatchpointStagingHashesIndex creates an index on catchpointpendinghashes to allow faster scanning according to the hash order func createCatchpointStagingHashesIndex(ctx context.Context, tx *sql.Tx) (err error) { _, err = tx.ExecContext(ctx, "CREATE INDEX IF NOT EXISTS catchpointpendinghashesidx ON catchpointpendinghashes(data)") @@ -991,153 +867,6 @@ func createCatchpointStagingHashesIndex(ctx context.Context, tx *sql.Tx) (err er return } -// writeCatchpointStagingCreatable inserts all the creatables in the provided array into the catchpoint asset creator staging table catchpointassetcreators. -// note that we cannot insert the resources here : in order to insert the resources, we need the rowid of the accountbase entry. This is being inserted by -// writeCatchpointStagingBalances via a separate go-routine. -func writeCatchpointStagingCreatable(ctx context.Context, tx *sql.Tx, bals []normalizedAccountBalance) error { - var insertCreatorsStmt *sql.Stmt - var err error - insertCreatorsStmt, err = tx.PrepareContext(ctx, "INSERT INTO catchpointassetcreators(asset, creator, ctype) VALUES(?, ?, ?)") - if err != nil { - return err - } - defer insertCreatorsStmt.Close() - - for _, balance := range bals { - for aidx, resData := range balance.resources { - if resData.IsOwning() { - // determine if it's an asset - if resData.IsAsset() { - _, err := insertCreatorsStmt.ExecContext(ctx, aidx, balance.address[:], basics.AssetCreatable) - if err != nil { - return err - } - } - // determine if it's an application - if resData.IsApp() { - _, err := insertCreatorsStmt.ExecContext(ctx, aidx, balance.address[:], basics.AppCreatable) - if err != nil { - return err - } - } - } - } - } - return nil -} - -// writeCatchpointStagingKVs inserts all the KVs in the provided array into the -// catchpoint kvstore staging table catchpointkvstore, and their hashes to the pending -func writeCatchpointStagingKVs(ctx context.Context, tx *sql.Tx, kvrs []encodedKVRecordV6) error { - insertKV, err := tx.PrepareContext(ctx, "INSERT INTO catchpointkvstore(key, value) VALUES(?, ?)") - if err != nil { - return err - } - defer insertKV.Close() - - insertHash, err := tx.PrepareContext(ctx, "INSERT INTO catchpointpendinghashes(data) VALUES(?)") - if err != nil { - return err - } - defer insertHash.Close() - - for _, kvr := range kvrs { - _, err := insertKV.ExecContext(ctx, kvr.Key, kvr.Value) - if err != nil { - return err - } - - hash := kvHashBuilderV6(string(kvr.Key), kvr.Value) - _, err = insertHash.ExecContext(ctx, hash) - if err != nil { - return err - } - } - return nil -} - -func resetCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, newCatchup bool) (err error) { - s := []string{ - "DROP TABLE IF EXISTS catchpointbalances", - "DROP TABLE IF EXISTS catchpointassetcreators", - "DROP TABLE IF EXISTS catchpointaccounthashes", - "DROP TABLE IF EXISTS catchpointpendinghashes", - "DROP TABLE IF EXISTS catchpointresources", - "DROP TABLE IF EXISTS catchpointkvstore", - "DELETE FROM accounttotals where id='catchpointStaging'", - } - - if newCatchup { - // SQLite has no way to rename an existing index. So, we need - // to cook up a fresh name for the index, which will be kept - // around after we rename the table from "catchpointbalances" - // to "accountbase". To construct a unique index name, we - // use the current time. - // Apply the same logic to - now := time.Now().UnixNano() - idxnameBalances := fmt.Sprintf("onlineaccountbals_idx_%d", now) - idxnameAddress := fmt.Sprintf("accountbase_address_idx_%d", now) - - s = append(s, - "CREATE TABLE IF NOT EXISTS catchpointassetcreators (asset integer primary key, creator blob, ctype integer)", - "CREATE TABLE IF NOT EXISTS catchpointbalances (addrid INTEGER PRIMARY KEY NOT NULL, address blob NOT NULL, data blob, normalizedonlinebalance INTEGER)", - "CREATE TABLE IF NOT EXISTS catchpointpendinghashes (data blob)", - "CREATE TABLE IF NOT EXISTS catchpointaccounthashes (id integer primary key, data blob)", - "CREATE TABLE IF NOT EXISTS catchpointresources (addrid INTEGER NOT NULL, aidx INTEGER NOT NULL, data BLOB NOT NULL, PRIMARY KEY (addrid, aidx) ) WITHOUT ROWID", - "CREATE TABLE IF NOT EXISTS catchpointkvstore (key blob primary key, value blob)", - - createNormalizedOnlineBalanceIndex(idxnameBalances, "catchpointbalances"), // should this be removed ? - createUniqueAddressBalanceIndex(idxnameAddress, "catchpointbalances"), - ) - } - - for _, stmt := range s { - _, err = tx.Exec(stmt) - if err != nil { - return err - } - } - - return nil -} - -// applyCatchpointStagingBalances switches the staged catchpoint catchup tables onto the actual -// tables and update the correct balance round. This is the final step in switching onto the new catchpoint round. -func applyCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, balancesRound basics.Round, merkleRootRound basics.Round) (err error) { - stmts := []string{ - "DROP TABLE IF EXISTS accountbase", - "DROP TABLE IF EXISTS assetcreators", - "DROP TABLE IF EXISTS accounthashes", - "DROP TABLE IF EXISTS resources", - "DROP TABLE IF EXISTS kvstore", - - "ALTER TABLE catchpointbalances RENAME TO accountbase", - "ALTER TABLE catchpointassetcreators RENAME TO assetcreators", - "ALTER TABLE catchpointaccounthashes RENAME TO accounthashes", - "ALTER TABLE catchpointresources RENAME TO resources", - "ALTER TABLE catchpointkvstore RENAME TO kvstore", - } - - for _, stmt := range stmts { - _, err = tx.Exec(stmt) - if err != nil { - return err - } - } - - _, err = tx.Exec("INSERT OR REPLACE INTO acctrounds(id, rnd) VALUES('acctbase', ?)", balancesRound) - if err != nil { - return err - } - - _, err = tx.Exec("INSERT OR REPLACE INTO acctrounds(id, rnd) VALUES('hashbase', ?)", merkleRootRound) - if err != nil { - return err - } - - return -} - // accountsInit fills the database using tx with initAccounts if the // database has not been initialized yet. // @@ -1410,8 +1139,8 @@ func performResourceTableMigration(ctx context.Context, tx *sql.Tx, log func(pro address blob NOT NULL, data blob, normalizedonlinebalance INTEGER )`, - createNormalizedOnlineBalanceIndex(idxnameBalances, "accountbase_resources_migration"), - createUniqueAddressBalanceIndex(idxnameAddress, "accountbase_resources_migration"), + store.CreateNormalizedOnlineBalanceIndex(idxnameBalances, "accountbase_resources_migration"), + store.CreateUniqueAddressBalanceIndex(idxnameAddress, "accountbase_resources_migration"), } applyNewAcctBase := []string{ diff --git a/ledger/accountdb_test.go b/ledger/accountdb_test.go index 373ce91e11..b1a027f161 100644 --- a/ledger/accountdb_test.go +++ b/ledger/accountdb_test.go @@ -1053,7 +1053,8 @@ func benchmarkWriteCatchpointStagingBalancesSub(b *testing.B, ascendingOrder boo require.NoError(b, err) b.StartTimer() err = l.trackerDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - err = writeCatchpointStagingBalances(ctx, tx, normalizedAccountBalances) + crw := store.NewCatchpointSQLReaderWriter(tx) + err = crw.WriteCatchpointStagingBalances(ctx, normalizedAccountBalances) return }) diff --git a/ledger/catchpointwriter_test.go b/ledger/catchpointwriter_test.go index f3428b7f34..02742c7f1e 100644 --- a/ledger/catchpointwriter_test.go +++ b/ledger/catchpointwriter_test.go @@ -574,7 +574,8 @@ func testNewLedgerFromCatchpoint(t *testing.T, catchpointWriterReadAccess db.Acc require.NoError(t, err) err = l.trackerDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { - err := applyCatchpointStagingBalances(ctx, tx, 0, 0) + crw := store.NewCatchpointSQLReaderWriter(tx) + err := crw.ApplyCatchpointStagingBalances(ctx, 0, 0) return err }) require.NoError(t, err) diff --git a/ledger/catchupaccessor.go b/ledger/catchupaccessor.go index 0d32ff0bd8..36d04ad8ac 100644 --- a/ledger/catchupaccessor.go +++ b/ledger/catchupaccessor.go @@ -93,9 +93,9 @@ type CatchpointCatchupAccessor interface { } type stagingWriter interface { - writeBalances(context.Context, []normalizedAccountBalance) error - writeCreatables(context.Context, []normalizedAccountBalance) error - writeHashes(context.Context, []normalizedAccountBalance) error + writeBalances(context.Context, []store.NormalizedAccountBalance) error + writeCreatables(context.Context, []store.NormalizedAccountBalance) error + writeHashes(context.Context, []store.NormalizedAccountBalance) error writeKVs(context.Context, []encodedKVRecordV6) error isShared() bool } @@ -104,27 +104,41 @@ type stagingWriterImpl struct { wdb db.Accessor } -func (w *stagingWriterImpl) writeBalances(ctx context.Context, balances []normalizedAccountBalance) error { +func (w *stagingWriterImpl) writeBalances(ctx context.Context, balances []store.NormalizedAccountBalance) error { return w.wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - return writeCatchpointStagingBalances(ctx, tx, balances) + crw := store.NewCatchpointSQLReaderWriter(tx) + return crw.WriteCatchpointStagingBalances(ctx, balances) }) } func (w *stagingWriterImpl) writeKVs(ctx context.Context, kvrs []encodedKVRecordV6) error { return w.wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - return writeCatchpointStagingKVs(ctx, tx, kvrs) + crw := store.NewCatchpointSQLReaderWriter(tx) + + keys := make([][]byte, len(kvrs)) + values := make([][]byte, len(kvrs)) + hashes := make([][]byte, len(kvrs)) + for i := 0; i < len(kvrs); i++ { + keys[i] = kvrs[i].Key + values[i] = kvrs[i].Value + hashes[i] = kvHashBuilderV6(string(keys[i]), values[i]) + } + + return crw.WriteCatchpointStagingKVs(ctx, keys, values, hashes) }) } -func (w *stagingWriterImpl) writeCreatables(ctx context.Context, balances []normalizedAccountBalance) error { +func (w *stagingWriterImpl) writeCreatables(ctx context.Context, balances []store.NormalizedAccountBalance) error { return w.wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { - return writeCatchpointStagingCreatable(ctx, tx, balances) + crw := store.NewCatchpointSQLReaderWriter(tx) + return crw.WriteCatchpointStagingCreatable(ctx, balances) }) } -func (w *stagingWriterImpl) writeHashes(ctx context.Context, balances []normalizedAccountBalance) error { +func (w *stagingWriterImpl) writeHashes(ctx context.Context, balances []store.NormalizedAccountBalance) error { return w.wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { - err := writeCatchpointStagingHashes(ctx, tx, balances) + crw := store.NewCatchpointSQLReaderWriter(tx) + err := crw.WriteCatchpointStagingHashes(ctx, balances) return err }) } @@ -244,7 +258,7 @@ func (c *catchpointCatchupAccessorImpl) ResetStagingBalances(ctx context.Context ledgerResetstagingbalancesCount.Inc(nil) err = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { crw := store.NewCatchpointSQLReaderWriter(tx) - err = resetCatchpointStagingBalances(ctx, tx, newCatchup) + err = crw.ResetCatchpointStagingBalances(ctx, newCatchup) if err != nil { return fmt.Errorf("unable to reset catchpoint catchup balances : %v", err) } @@ -371,7 +385,7 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte start := time.Now() ledgerProcessstagingbalancesCount.Inc(nil) - var normalizedAccountBalances []normalizedAccountBalance + var normalizedAccountBalances []store.NormalizedAccountBalance var expectingMoreEntries []bool var chunkKVs []encodedKVRecordV6 @@ -423,11 +437,11 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte // keep track of number of resources processed for each account for i, balance := range normalizedAccountBalances { // missing resources for this account - if expectingSpecificAccount && balance.address != nextExpectedAccount { + if expectingSpecificAccount && balance.Address != nextExpectedAccount { return fmt.Errorf("processStagingBalances received incomplete chunks for account %v", nextExpectedAccount) } - for _, resData := range balance.resources { + for _, resData := range balance.Resources { if resData.IsApp() && resData.IsOwning() { c.acctResCnt.totalAppParams++ } @@ -443,43 +457,43 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte } // check that counted resources adds up for this account if !expectingMoreEntries[i] { - if c.acctResCnt.totalAppParams != balance.accountData.TotalAppParams { + if c.acctResCnt.totalAppParams != balance.AccountData.TotalAppParams { return fmt.Errorf( "processStagingBalances received %d appParams for account %v, expected %d", c.acctResCnt.totalAppParams, - balance.address, - balance.accountData.TotalAppParams, + balance.Address, + balance.AccountData.TotalAppParams, ) } - if c.acctResCnt.totalAppLocalStates != balance.accountData.TotalAppLocalStates { + if c.acctResCnt.totalAppLocalStates != balance.AccountData.TotalAppLocalStates { return fmt.Errorf( "processStagingBalances received %d appLocalStates for account %v, expected %d", c.acctResCnt.totalAppParams, - balance.address, - balance.accountData.TotalAppLocalStates, + balance.Address, + balance.AccountData.TotalAppLocalStates, ) } - if c.acctResCnt.totalAssetParams != balance.accountData.TotalAssetParams { + if c.acctResCnt.totalAssetParams != balance.AccountData.TotalAssetParams { return fmt.Errorf( "processStagingBalances received %d assetParams for account %v, expected %d", c.acctResCnt.totalAppParams, - balance.address, - balance.accountData.TotalAssetParams, + balance.Address, + balance.AccountData.TotalAssetParams, ) } - if c.acctResCnt.totalAssets != balance.accountData.TotalAssets { + if c.acctResCnt.totalAssets != balance.AccountData.TotalAssets { return fmt.Errorf( "processStagingBalances received %d assets for account %v, expected %d", c.acctResCnt.totalAppParams, - balance.address, - balance.accountData.TotalAssets, + balance.Address, + balance.AccountData.TotalAssets, ) } c.acctResCnt = catchpointAccountResourceCounter{} nextExpectedAccount = basics.Address{} expectingSpecificAccount = false } else { - nextExpectedAccount = balance.address + nextExpectedAccount = balance.Address expectingSpecificAccount = true } } @@ -515,7 +529,7 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte defer wg.Done() hasCreatables := false for _, accBal := range normalizedAccountBalances { - for _, res := range accBal.resources { + for _, res := range accBal.Resources { if res.IsOwning() { hasCreatables = true break @@ -582,8 +596,8 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte progress.ProcessedBytes += uint64(len(bytes)) progress.ProcessedKVs += uint64(len(chunkKVs)) for _, acctBal := range normalizedAccountBalances { - progress.TotalAccountHashes += uint64(len(acctBal.accountHashes)) - if !acctBal.partialBalance { + progress.TotalAccountHashes += uint64(len(acctBal.AccountHashes)) + if !acctBal.PartialBalance { progress.ProcessedAccounts++ } } @@ -1035,7 +1049,7 @@ func (c *catchpointCatchupAccessorImpl) finishBalances(ctx context.Context) (err } } - err = applyCatchpointStagingBalances(ctx, tx, basics.Round(balancesRound), basics.Round(hashRound)) + err = crw.ApplyCatchpointStagingBalances(ctx, basics.Round(balancesRound), basics.Round(hashRound)) if err != nil { return err } @@ -1045,7 +1059,7 @@ func (c *catchpointCatchupAccessorImpl) finishBalances(ctx context.Context) (err return err } - err = resetCatchpointStagingBalances(ctx, tx, false) + err = crw.ResetCatchpointStagingBalances(ctx, false) if err != nil { return err } diff --git a/ledger/catchupaccessor_test.go b/ledger/catchupaccessor_test.go index f6283f9b70..d79d636e3c 100644 --- a/ledger/catchupaccessor_test.go +++ b/ledger/catchupaccessor_test.go @@ -427,11 +427,11 @@ type testStagingWriter struct { hashes map[[4 + crypto.DigestSize]byte]int } -func (w *testStagingWriter) writeBalances(ctx context.Context, balances []normalizedAccountBalance) error { +func (w *testStagingWriter) writeBalances(ctx context.Context, balances []store.NormalizedAccountBalance) error { return nil } -func (w *testStagingWriter) writeCreatables(ctx context.Context, balances []normalizedAccountBalance) error { +func (w *testStagingWriter) writeCreatables(ctx context.Context, balances []store.NormalizedAccountBalance) error { return nil } @@ -439,9 +439,9 @@ func (w *testStagingWriter) writeKVs(ctx context.Context, kvrs []encodedKVRecord return nil } -func (w *testStagingWriter) writeHashes(ctx context.Context, balances []normalizedAccountBalance) error { +func (w *testStagingWriter) writeHashes(ctx context.Context, balances []store.NormalizedAccountBalance) error { for _, bal := range balances { - for _, hash := range bal.accountHashes { + for _, hash := range bal.AccountHashes { var key [4 + crypto.DigestSize]byte require.Len(w.t, hash, 4+crypto.DigestSize) copy(key[:], hash) diff --git a/ledger/store/accountsV2.go b/ledger/store/accountsV2.go index ac9864b931..4816c6b127 100644 --- a/ledger/store/accountsV2.go +++ b/ledger/store/accountsV2.go @@ -321,7 +321,7 @@ func (r *accountsV2Reader) LoadAllFullAccounts( return } -// LoadFullAccount converts baseAccountData into basics.AccountData and loads all resources as needed +// LoadFullAccount converts BaseAccountData into basics.AccountData and loads all resources as needed func (r *accountsV2Reader) LoadFullAccount(ctx context.Context, resourcesTable string, addr basics.Address, addrid int64, data BaseAccountData) (ad basics.AccountData, err error) { ad = data.GetAccountData() @@ -403,11 +403,8 @@ func (r *accountsV2Reader) LoadFullAccount(ctx context.Context, resourcesTable s if err == nil && uint64(len(ad.AppLocalStates)) != data.TotalAppLocalStates { err = fmt.Errorf("%s app local states mismatch: %d != %d", addr.String(), len(ad.AppLocalStates), data.TotalAppLocalStates) } - if err != nil { - return - } - return + return ad, err } func (r *accountsV2Reader) AccountsOnlineRoundParams() (onlineRoundParamsData []ledgercore.OnlineRoundParamsData, endRound basics.Round, err error) { diff --git a/ledger/store/catchpoint.go b/ledger/store/catchpoint.go index f484661493..bf071cd16b 100644 --- a/ledger/store/catchpoint.go +++ b/ledger/store/catchpoint.go @@ -19,12 +19,16 @@ package store import ( "context" "database/sql" + "errors" + "fmt" + "time" "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/db" + "github.com/mattn/go-sqlite3" ) // CatchpointState is used to store catchpoint related variables into the catchpointstate table. @@ -38,6 +42,27 @@ type UnfinishedCatchpointRecord struct { BlockHash crypto.Digest } +// NormalizedAccountBalance is a staging area for a catchpoint file account information before it's being added to the catchpoint staging tables. +type NormalizedAccountBalance struct { + // The public key address to which the account belongs. + Address basics.Address + // accountData contains the baseAccountData for that account. + AccountData BaseAccountData + // resources is a map, where the key is the creatable index, and the value is the resource data. + Resources map[basics.CreatableIndex]ResourcesData + // encodedAccountData contains the baseAccountData encoded bytes that are going to be written to the accountbase table. + EncodedAccountData []byte + // accountHashes contains a list of all the hashes that would need to be added to the merkle trie for that account. + // on V6, we could have multiple hashes, since we have separate account/resource hashes. + AccountHashes [][]byte + // normalizedBalance contains the normalized balance for the account. + NormalizedBalance uint64 + // encodedResources provides the encoded form of the resources + EncodedResources map[basics.CreatableIndex][]byte + // partial balance indicates that the original account balance was split into multiple parts in catchpoint creation time + PartialBalance bool +} + type catchpointReader struct { q db.Queryable } @@ -330,3 +355,255 @@ func (cw *catchpointWriter) DeleteOldCatchpointFirstStageInfo(ctx context.Contex } return db.Retry(f) } + +// WriteCatchpointStagingBalances inserts all the account balances in the provided array into the catchpoint balance staging table catchpointbalances. +func (cw *catchpointWriter) WriteCatchpointStagingBalances(ctx context.Context, bals []NormalizedAccountBalance) error { + selectAcctStmt, err := cw.e.PrepareContext(ctx, "SELECT rowid FROM catchpointbalances WHERE address = ?") + if err != nil { + return err + } + + insertAcctStmt, err := cw.e.PrepareContext(ctx, "INSERT INTO catchpointbalances(address, normalizedonlinebalance, data) VALUES(?, ?, ?)") + if err != nil { + return err + } + + insertRscStmt, err := cw.e.PrepareContext(ctx, "INSERT INTO catchpointresources(addrid, aidx, data) VALUES(?, ?, ?)") + if err != nil { + return err + } + + var result sql.Result + var rowID int64 + for _, balance := range bals { + result, err = insertAcctStmt.ExecContext(ctx, balance.Address[:], balance.NormalizedBalance, balance.EncodedAccountData) + if err == nil { + var aff int64 + aff, err = result.RowsAffected() + if err != nil { + return err + } + if aff != 1 { + return fmt.Errorf("number of affected record in insert was expected to be one, but was %d", aff) + } + rowID, err = result.LastInsertId() + if err != nil { + return err + } + } else { + var sqliteErr sqlite3.Error + if errors.As(err, &sqliteErr) && sqliteErr.Code == sqlite3.ErrConstraint && sqliteErr.ExtendedCode == sqlite3.ErrConstraintUnique { + // address exists: overflowed account record: find addrid + err = selectAcctStmt.QueryRowContext(ctx, balance.Address[:]).Scan(&rowID) + if err != nil { + return err + } + } else { + return err + } + } + + // write resources + for aidx := range balance.Resources { + var result sql.Result + result, err = insertRscStmt.ExecContext(ctx, rowID, aidx, balance.EncodedResources[aidx]) + if err != nil { + return err + } + var aff int64 + aff, err = result.RowsAffected() + if err != nil { + return err + } + if aff != 1 { + return fmt.Errorf("number of affected record in insert was expected to be one, but was %d", aff) + } + } + } + return nil +} + +// WriteCatchpointStagingHashes inserts all the account hashes in the provided array into the catchpoint pending hashes table catchpointpendinghashes. +func (cw *catchpointWriter) WriteCatchpointStagingHashes(ctx context.Context, bals []NormalizedAccountBalance) error { + insertStmt, err := cw.e.PrepareContext(ctx, "INSERT INTO catchpointpendinghashes(data) VALUES(?)") + if err != nil { + return err + } + + for _, balance := range bals { + for _, hash := range balance.AccountHashes { + result, err := insertStmt.ExecContext(ctx, hash[:]) + if err != nil { + return err + } + + aff, err := result.RowsAffected() + if err != nil { + return err + } + if aff != 1 { + return fmt.Errorf("number of affected record in insert was expected to be one, but was %d", aff) + } + } + } + return nil +} + +// WriteCatchpointStagingCreatable inserts all the creatables in the provided array into the catchpoint asset creator staging table catchpointassetcreators. +// note that we cannot insert the resources here : in order to insert the resources, we need the rowid of the accountbase entry. This is being inserted by +// writeCatchpointStagingBalances via a separate go-routine. +func (cw *catchpointWriter) WriteCatchpointStagingCreatable(ctx context.Context, bals []NormalizedAccountBalance) error { + var insertCreatorsStmt *sql.Stmt + var err error + insertCreatorsStmt, err = cw.e.PrepareContext(ctx, "INSERT INTO catchpointassetcreators(asset, creator, ctype) VALUES(?, ?, ?)") + if err != nil { + return err + } + defer insertCreatorsStmt.Close() + + for _, balance := range bals { + for aidx, resData := range balance.Resources { + if resData.IsOwning() { + // determine if it's an asset + if resData.IsAsset() { + _, err := insertCreatorsStmt.ExecContext(ctx, aidx, balance.Address[:], basics.AssetCreatable) + if err != nil { + return err + } + } + // determine if it's an application + if resData.IsApp() { + _, err := insertCreatorsStmt.ExecContext(ctx, aidx, balance.Address[:], basics.AppCreatable) + if err != nil { + return err + } + } + } + } + } + return nil +} + +// WriteCatchpointStagingKVs inserts all the KVs in the provided array into the +// catchpoint kvstore staging table catchpointkvstore, and their hashes to the pending +func (cw *catchpointWriter) WriteCatchpointStagingKVs(ctx context.Context, keys [][]byte, values [][]byte, hashes [][]byte) error { + insertKV, err := cw.e.PrepareContext(ctx, "INSERT INTO catchpointkvstore(key, value) VALUES(?, ?)") + if err != nil { + return err + } + defer insertKV.Close() + + insertHash, err := cw.e.PrepareContext(ctx, "INSERT INTO catchpointpendinghashes(data) VALUES(?)") + if err != nil { + return err + } + defer insertHash.Close() + + for i := 0; i < len(keys); i++ { + _, err := insertKV.ExecContext(ctx, keys[i], values[i]) + if err != nil { + return err + } + + _, err = insertHash.ExecContext(ctx, hashes[i]) + if err != nil { + return err + } + } + return nil +} + +func (cw *catchpointWriter) ResetCatchpointStagingBalances(ctx context.Context, newCatchup bool) (err error) { + s := []string{ + "DROP TABLE IF EXISTS catchpointbalances", + "DROP TABLE IF EXISTS catchpointassetcreators", + "DROP TABLE IF EXISTS catchpointaccounthashes", + "DROP TABLE IF EXISTS catchpointpendinghashes", + "DROP TABLE IF EXISTS catchpointresources", + "DROP TABLE IF EXISTS catchpointkvstore", + "DELETE FROM accounttotals where id='catchpointStaging'", + } + + if newCatchup { + // SQLite has no way to rename an existing index. So, we need + // to cook up a fresh name for the index, which will be kept + // around after we rename the table from "catchpointbalances" + // to "accountbase". To construct a unique index name, we + // use the current time. + // Apply the same logic to + now := time.Now().UnixNano() + idxnameBalances := fmt.Sprintf("onlineaccountbals_idx_%d", now) + idxnameAddress := fmt.Sprintf("accountbase_address_idx_%d", now) + + s = append(s, + "CREATE TABLE IF NOT EXISTS catchpointassetcreators (asset integer primary key, creator blob, ctype integer)", + "CREATE TABLE IF NOT EXISTS catchpointbalances (addrid INTEGER PRIMARY KEY NOT NULL, address blob NOT NULL, data blob, normalizedonlinebalance INTEGER)", + "CREATE TABLE IF NOT EXISTS catchpointpendinghashes (data blob)", + "CREATE TABLE IF NOT EXISTS catchpointaccounthashes (id integer primary key, data blob)", + "CREATE TABLE IF NOT EXISTS catchpointresources (addrid INTEGER NOT NULL, aidx INTEGER NOT NULL, data BLOB NOT NULL, PRIMARY KEY (addrid, aidx) ) WITHOUT ROWID", + "CREATE TABLE IF NOT EXISTS catchpointkvstore (key blob primary key, value blob)", + + CreateNormalizedOnlineBalanceIndex(idxnameBalances, "catchpointbalances"), // should this be removed ? + CreateUniqueAddressBalanceIndex(idxnameAddress, "catchpointbalances"), + ) + } + + for _, stmt := range s { + _, err = cw.e.Exec(stmt) + if err != nil { + return err + } + } + + return nil +} + +// TODO: will make private on the next PR once the migration/creations are moved out of `ledger`. + +// CreateUniqueAddressBalanceIndex is sql query to create a uninque index on `address`. +func CreateUniqueAddressBalanceIndex(idxname string, tablename string) string { + return fmt.Sprintf(`CREATE UNIQUE INDEX IF NOT EXISTS %s ON %s (address)`, idxname, tablename) +} + +// CreateNormalizedOnlineBalanceIndex handles accountbase/catchpointbalances tables +func CreateNormalizedOnlineBalanceIndex(idxname string, tablename string) string { + return fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s + ON %s ( normalizedonlinebalance, address, data ) WHERE normalizedonlinebalance>0`, idxname, tablename) +} + +// ApplyCatchpointStagingBalances switches the staged catchpoint catchup tables onto the actual +// tables and update the correct balance round. This is the final step in switching onto the new catchpoint round. +func (cw *catchpointWriter) ApplyCatchpointStagingBalances(ctx context.Context, balancesRound basics.Round, merkleRootRound basics.Round) (err error) { + stmts := []string{ + "DROP TABLE IF EXISTS accountbase", + "DROP TABLE IF EXISTS assetcreators", + "DROP TABLE IF EXISTS accounthashes", + "DROP TABLE IF EXISTS resources", + "DROP TABLE IF EXISTS kvstore", + + "ALTER TABLE catchpointbalances RENAME TO accountbase", + "ALTER TABLE catchpointassetcreators RENAME TO assetcreators", + "ALTER TABLE catchpointaccounthashes RENAME TO accounthashes", + "ALTER TABLE catchpointresources RENAME TO resources", + "ALTER TABLE catchpointkvstore RENAME TO kvstore", + } + + for _, stmt := range stmts { + _, err = cw.e.Exec(stmt) + if err != nil { + return err + } + } + + _, err = cw.e.Exec("INSERT OR REPLACE INTO acctrounds(id, rnd) VALUES('acctbase', ?)", balancesRound) + if err != nil { + return err + } + + _, err = cw.e.Exec("INSERT OR REPLACE INTO acctrounds(id, rnd) VALUES('hashbase', ?)", merkleRootRound) + if err != nil { + return err + } + + return +}