Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ledger: new db methods to abstract queries needed by the deltas (9 of N) #4864

Merged
merged 4 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 11 additions & 36 deletions ledger/accountdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,17 +301,7 @@ func (a *compactResourcesDeltas) resourcesLoadOld(tx *sql.Tx, knownAddresses map
if len(a.misses) == 0 {
return nil
}
selectStmt, err := tx.Prepare("SELECT data FROM resources WHERE addrid = ? AND aidx = ?")
if err != nil {
return
}
defer selectStmt.Close()

addrRowidStmt, err := tx.Prepare("SELECT rowid FROM accountbase WHERE address=?")
if err != nil {
return
}
defer addrRowidStmt.Close()
arw := store.NewAccountsSQLReaderWriter(tx)

defer func() {
a.misses = nil
Expand All @@ -327,7 +317,7 @@ func (a *compactResourcesDeltas) resourcesLoadOld(tx *sql.Tx, knownAddresses map
if delta.oldResource.Addrid != 0 {
addrid = delta.oldResource.Addrid
} else if addrid, ok = knownAddresses[addr]; !ok {
err = addrRowidStmt.QueryRow(addr[:]).Scan(&addrid)
addrid, err = arw.LookupAccountRowID(addr)
if err != nil {
if err != sql.ErrNoRows {
err = fmt.Errorf("base account cannot be read while processing resource for addr=%s, aidx=%d: %w", addr.String(), aidx, err)
Expand All @@ -340,8 +330,7 @@ func (a *compactResourcesDeltas) resourcesLoadOld(tx *sql.Tx, knownAddresses map
continue
}
}
resDataBuf = nil
err = selectStmt.QueryRow(addrid, aidx).Scan(&resDataBuf)
resDataBuf, err = arw.LookupResourceDataByAddrID(addrid, aidx)
switch err {
case nil:
if len(resDataBuf) > 0 {
Expand Down Expand Up @@ -476,31 +465,25 @@ func (a *compactAccountDeltas) accountsLoadOld(tx *sql.Tx) (err error) {
if len(a.misses) == 0 {
return nil
}
selectStmt, err := tx.Prepare("SELECT rowid, data FROM accountbase WHERE address=?")
if err != nil {
return
}
defer selectStmt.Close()
arw := store.NewAccountsSQLReaderWriter(tx)
defer func() {
a.misses = nil
}()
var rowid sql.NullInt64
var acctDataBuf []byte
for _, idx := range a.misses {
addr := a.deltas[idx].address
err = selectStmt.QueryRow(addr[:]).Scan(&rowid, &acctDataBuf)
rowid, acctDataBuf, err := arw.LookupAccountDataByAddress(addr)
switch err {
case nil:
if len(acctDataBuf) > 0 {
persistedAcctData := &store.PersistedAccountData{Addr: addr, Rowid: rowid.Int64}
persistedAcctData := &store.PersistedAccountData{Addr: addr, Rowid: rowid}
err = protocol.Decode(acctDataBuf, &persistedAcctData.AccountData)
if err != nil {
return err
}
a.updateOld(idx, *persistedAcctData)
} else {
// to retain backward compatibility, we will treat this condition as if we don't have the account.
a.updateOld(idx, store.PersistedAccountData{Addr: addr, Rowid: rowid.Int64})
a.updateOld(idx, store.PersistedAccountData{Addr: addr, Rowid: rowid})
}
case sql.ErrNoRows:
// we don't have that account, just return an empty record.
Expand Down Expand Up @@ -618,37 +601,29 @@ func (a *compactOnlineAccountDeltas) accountsLoadOld(tx *sql.Tx) (err error) {
if len(a.misses) == 0 {
return nil
}
// fetch the latest entry
selectStmt, err := tx.Prepare("SELECT rowid, data FROM onlineaccounts WHERE address=? ORDER BY updround DESC LIMIT 1")
if err != nil {
return
}
defer selectStmt.Close()
arw := store.NewAccountsSQLReaderWriter(tx)
defer func() {
a.misses = nil
}()
var rowid sql.NullInt64
var acctDataBuf []byte
for _, idx := range a.misses {
addr := a.deltas[idx].address
err = selectStmt.QueryRow(addr[:]).Scan(&rowid, &acctDataBuf)
rowid, acctDataBuf, err := arw.LookupOnlineAccountDataByAddress(addr)
switch err {
case nil:
if len(acctDataBuf) > 0 {
persistedAcctData := &store.PersistedOnlineAccountData{Addr: addr, Rowid: rowid.Int64}
persistedAcctData := &store.PersistedOnlineAccountData{Addr: addr, Rowid: rowid}
err = protocol.Decode(acctDataBuf, &persistedAcctData.AccountData)
if err != nil {
return err
}
a.updateOld(idx, *persistedAcctData)
} else {
// empty data means offline account
a.updateOld(idx, store.PersistedOnlineAccountData{Addr: addr, Rowid: rowid.Int64})
a.updateOld(idx, store.PersistedOnlineAccountData{Addr: addr, Rowid: rowid})
}
case sql.ErrNoRows:
// we don't have that account, just return an empty record.
a.updateOld(idx, store.PersistedOnlineAccountData{Addr: addr})
err = nil
default:
// unexpected error - let the caller know that we couldn't complete the operation.
return err
Expand Down
80 changes: 78 additions & 2 deletions ledger/store/accountsV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
)

type accountsV2Reader struct {
q db.Queryable
q db.Queryable
preparedStatements map[string]*sql.Stmt
}

type accountsV2Writer struct {
Expand All @@ -47,11 +48,27 @@ type accountsV2ReaderWriter struct {
// NewAccountsSQLReaderWriter creates a Catchpoint SQL reader+writer
func NewAccountsSQLReaderWriter(e db.Executable) *accountsV2ReaderWriter {
return &accountsV2ReaderWriter{
accountsV2Reader{q: e},
accountsV2Reader{q: e, preparedStatements: make(map[string]*sql.Stmt)},
accountsV2Writer{e: e},
}
}

func (r *accountsV2Reader) getOrPrepare(queryString string) (stmt *sql.Stmt, err error) {
// fetch statement (use the query as the key)
if stmt, ok := r.preparedStatements[queryString]; ok {
return stmt, nil
}
// we do not have it, prepare it
stmt, err = r.q.Prepare(queryString)
if err != nil {
return
}
// cache the statement
r.preparedStatements[queryString] = stmt

return stmt, nil
}

// AccountsTotals returns account totals
func (r *accountsV2Reader) AccountsTotals(ctx context.Context, catchpointStaging bool) (totals ledgercore.AccountTotals, err error) {
id := ""
Expand Down Expand Up @@ -269,6 +286,65 @@ func (r *accountsV2Reader) LookupAccountAddressFromAddressID(ctx context.Context
return
}

func (r *accountsV2Reader) LookupAccountDataByAddress(addr basics.Address) (rowid int64, data []byte, err error) {
// optimize this query for repeated usage
selectStmt, err := r.getOrPrepare("SELECT rowid, data FROM accountbase WHERE address=?")
if err != nil {
return
}

err = selectStmt.QueryRow(addr[:]).Scan(&rowid, &data)
if err != nil {
return
}
return rowid, data, err
}

// LookupOnlineAccountDataByAddress looks up online account data by address.
func (r *accountsV2Reader) LookupOnlineAccountDataByAddress(addr basics.Address) (rowid int64, data []byte, err error) {
// optimize this query for repeated usage
selectStmt, err := r.getOrPrepare("SELECT rowid, data FROM onlineaccounts WHERE address=? ORDER BY updround DESC LIMIT 1")
if err != nil {
return
}

err = selectStmt.QueryRow(addr[:]).Scan(&rowid, &data)
if err != nil {
return
}
return rowid, data, err
}

// LookupAccountRowID looks up the rowid of an account based on its address.
func (r *accountsV2Reader) LookupAccountRowID(addr basics.Address) (rowid int64, err error) {
// optimize this query for repeated usage
addrRowidStmt, err := r.getOrPrepare("SELECT rowid FROM accountbase WHERE address=?")
if err != nil {
return
}

err = addrRowidStmt.QueryRow(addr[:]).Scan(&rowid)
if err != nil {
return
}
return rowid, err
}

// LookupResourceDataByAddrID looks up the resource data by account rowid + resource aidx.
func (r *accountsV2Reader) LookupResourceDataByAddrID(addrid int64, aidx basics.CreatableIndex) (data []byte, err error) {
// optimize this query for repeated usage
selectStmt, err := r.getOrPrepare("SELECT data FROM resources WHERE addrid = ? AND aidx = ?")
if err != nil {
return
}

err = selectStmt.QueryRow(addrid, aidx).Scan(&data)
if err != nil {
return
}
return data, err
}

// LoadAllFullAccounts loads all accounts from balancesTable and resourcesTable.
// On every account full load it invokes acctCb callback to report progress and data.
func (r *accountsV2Reader) LoadAllFullAccounts(
Expand Down