Skip to content

Commit

Permalink
ledger: new db methods to abstract queries needed by the deltas (#4864)
Browse files Browse the repository at this point in the history
  • Loading branch information
icorderi committed Dec 8, 2022
1 parent b0ee6be commit ebe4e80
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 38 deletions.
47 changes: 11 additions & 36 deletions ledger/accountdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,17 +301,7 @@ func (a *compactResourcesDeltas) resourcesLoadOld(tx *sql.Tx, knownAddresses map
if len(a.misses) == 0 {
return nil
}
selectStmt, err := tx.Prepare("SELECT data FROM resources WHERE addrid = ? AND aidx = ?")
if err != nil {
return
}
defer selectStmt.Close()

addrRowidStmt, err := tx.Prepare("SELECT rowid FROM accountbase WHERE address=?")
if err != nil {
return
}
defer addrRowidStmt.Close()
arw := store.NewAccountsSQLReaderWriter(tx)

defer func() {
a.misses = nil
Expand All @@ -327,7 +317,7 @@ func (a *compactResourcesDeltas) resourcesLoadOld(tx *sql.Tx, knownAddresses map
if delta.oldResource.Addrid != 0 {
addrid = delta.oldResource.Addrid
} else if addrid, ok = knownAddresses[addr]; !ok {
err = addrRowidStmt.QueryRow(addr[:]).Scan(&addrid)
addrid, err = arw.LookupAccountRowID(addr)
if err != nil {
if err != sql.ErrNoRows {
err = fmt.Errorf("base account cannot be read while processing resource for addr=%s, aidx=%d: %w", addr.String(), aidx, err)
Expand All @@ -340,8 +330,7 @@ func (a *compactResourcesDeltas) resourcesLoadOld(tx *sql.Tx, knownAddresses map
continue
}
}
resDataBuf = nil
err = selectStmt.QueryRow(addrid, aidx).Scan(&resDataBuf)
resDataBuf, err = arw.LookupResourceDataByAddrID(addrid, aidx)
switch err {
case nil:
if len(resDataBuf) > 0 {
Expand Down Expand Up @@ -476,31 +465,25 @@ func (a *compactAccountDeltas) accountsLoadOld(tx *sql.Tx) (err error) {
if len(a.misses) == 0 {
return nil
}
selectStmt, err := tx.Prepare("SELECT rowid, data FROM accountbase WHERE address=?")
if err != nil {
return
}
defer selectStmt.Close()
arw := store.NewAccountsSQLReaderWriter(tx)
defer func() {
a.misses = nil
}()
var rowid sql.NullInt64
var acctDataBuf []byte
for _, idx := range a.misses {
addr := a.deltas[idx].address
err = selectStmt.QueryRow(addr[:]).Scan(&rowid, &acctDataBuf)
rowid, acctDataBuf, err := arw.LookupAccountDataByAddress(addr)
switch err {
case nil:
if len(acctDataBuf) > 0 {
persistedAcctData := &store.PersistedAccountData{Addr: addr, Rowid: rowid.Int64}
persistedAcctData := &store.PersistedAccountData{Addr: addr, Rowid: rowid}
err = protocol.Decode(acctDataBuf, &persistedAcctData.AccountData)
if err != nil {
return err
}
a.updateOld(idx, *persistedAcctData)
} else {
// to retain backward compatibility, we will treat this condition as if we don't have the account.
a.updateOld(idx, store.PersistedAccountData{Addr: addr, Rowid: rowid.Int64})
a.updateOld(idx, store.PersistedAccountData{Addr: addr, Rowid: rowid})
}
case sql.ErrNoRows:
// we don't have that account, just return an empty record.
Expand Down Expand Up @@ -618,37 +601,29 @@ func (a *compactOnlineAccountDeltas) accountsLoadOld(tx *sql.Tx) (err error) {
if len(a.misses) == 0 {
return nil
}
// fetch the latest entry
selectStmt, err := tx.Prepare("SELECT rowid, data FROM onlineaccounts WHERE address=? ORDER BY updround DESC LIMIT 1")
if err != nil {
return
}
defer selectStmt.Close()
arw := store.NewAccountsSQLReaderWriter(tx)
defer func() {
a.misses = nil
}()
var rowid sql.NullInt64
var acctDataBuf []byte
for _, idx := range a.misses {
addr := a.deltas[idx].address
err = selectStmt.QueryRow(addr[:]).Scan(&rowid, &acctDataBuf)
rowid, acctDataBuf, err := arw.LookupOnlineAccountDataByAddress(addr)
switch err {
case nil:
if len(acctDataBuf) > 0 {
persistedAcctData := &store.PersistedOnlineAccountData{Addr: addr, Rowid: rowid.Int64}
persistedAcctData := &store.PersistedOnlineAccountData{Addr: addr, Rowid: rowid}
err = protocol.Decode(acctDataBuf, &persistedAcctData.AccountData)
if err != nil {
return err
}
a.updateOld(idx, *persistedAcctData)
} else {
// empty data means offline account
a.updateOld(idx, store.PersistedOnlineAccountData{Addr: addr, Rowid: rowid.Int64})
a.updateOld(idx, store.PersistedOnlineAccountData{Addr: addr, Rowid: rowid})
}
case sql.ErrNoRows:
// we don't have that account, just return an empty record.
a.updateOld(idx, store.PersistedOnlineAccountData{Addr: addr})
err = nil
default:
// unexpected error - let the caller know that we couldn't complete the operation.
return err
Expand Down
80 changes: 78 additions & 2 deletions ledger/store/accountsV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
)

type accountsV2Reader struct {
q db.Queryable
q db.Queryable
preparedStatements map[string]*sql.Stmt
}

type accountsV2Writer struct {
Expand All @@ -47,11 +48,27 @@ type accountsV2ReaderWriter struct {
// NewAccountsSQLReaderWriter creates a Catchpoint SQL reader+writer
func NewAccountsSQLReaderWriter(e db.Executable) *accountsV2ReaderWriter {
return &accountsV2ReaderWriter{
accountsV2Reader{q: e},
accountsV2Reader{q: e, preparedStatements: make(map[string]*sql.Stmt)},
accountsV2Writer{e: e},
}
}

func (r *accountsV2Reader) getOrPrepare(queryString string) (stmt *sql.Stmt, err error) {
// fetch statement (use the query as the key)
if stmt, ok := r.preparedStatements[queryString]; ok {
return stmt, nil
}
// we do not have it, prepare it
stmt, err = r.q.Prepare(queryString)
if err != nil {
return
}
// cache the statement
r.preparedStatements[queryString] = stmt

return stmt, nil
}

// AccountsTotals returns account totals
func (r *accountsV2Reader) AccountsTotals(ctx context.Context, catchpointStaging bool) (totals ledgercore.AccountTotals, err error) {
id := ""
Expand Down Expand Up @@ -269,6 +286,65 @@ func (r *accountsV2Reader) LookupAccountAddressFromAddressID(ctx context.Context
return
}

func (r *accountsV2Reader) LookupAccountDataByAddress(addr basics.Address) (rowid int64, data []byte, err error) {
// optimize this query for repeated usage
selectStmt, err := r.getOrPrepare("SELECT rowid, data FROM accountbase WHERE address=?")
if err != nil {
return
}

err = selectStmt.QueryRow(addr[:]).Scan(&rowid, &data)
if err != nil {
return
}
return rowid, data, err
}

// LookupOnlineAccountDataByAddress looks up online account data by address.
func (r *accountsV2Reader) LookupOnlineAccountDataByAddress(addr basics.Address) (rowid int64, data []byte, err error) {
// optimize this query for repeated usage
selectStmt, err := r.getOrPrepare("SELECT rowid, data FROM onlineaccounts WHERE address=? ORDER BY updround DESC LIMIT 1")
if err != nil {
return
}

err = selectStmt.QueryRow(addr[:]).Scan(&rowid, &data)
if err != nil {
return
}
return rowid, data, err
}

// LookupAccountRowID looks up the rowid of an account based on its address.
func (r *accountsV2Reader) LookupAccountRowID(addr basics.Address) (rowid int64, err error) {
// optimize this query for repeated usage
addrRowidStmt, err := r.getOrPrepare("SELECT rowid FROM accountbase WHERE address=?")
if err != nil {
return
}

err = addrRowidStmt.QueryRow(addr[:]).Scan(&rowid)
if err != nil {
return
}
return rowid, err
}

// LookupResourceDataByAddrID looks up the resource data by account rowid + resource aidx.
func (r *accountsV2Reader) LookupResourceDataByAddrID(addrid int64, aidx basics.CreatableIndex) (data []byte, err error) {
// optimize this query for repeated usage
selectStmt, err := r.getOrPrepare("SELECT data FROM resources WHERE addrid = ? AND aidx = ?")
if err != nil {
return
}

err = selectStmt.QueryRow(addrid, aidx).Scan(&data)
if err != nil {
return
}
return data, err
}

// LoadAllFullAccounts loads all accounts from balancesTable and resourcesTable.
// On every account full load it invokes acctCb callback to report progress and data.
func (r *accountsV2Reader) LoadAllFullAccounts(
Expand Down

0 comments on commit ebe4e80

Please sign in to comment.