Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [cheqd] store latest height when updating top accounts records #560

Merged
merged 14 commits into from
Apr 15, 2023
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- ([\#547](https://github.com/forbole/bdjuno/pull/547)) Upgraded `cheqd-node` to `v1.3.0` and added MsgDeactivateDidDoc handling
- ([\#552](https://github.com/forbole/bdjuno/pull/552)) Upgraded `cheqd-node` to `v1.4.0`
- ([\#555](https://github.com/forbole/bdjuno/pull/555)) Added `height` column to top accounts table
- ([\#560](https://github.com/forbole/bdjuno/pull/560)) Stored latest block height instead of height `0` when updating top accounts and added RefreshAvailableBalance to periodic ops


#### CI
Expand Down
10 changes: 8 additions & 2 deletions cmd/parse/top_accounts/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func allCmd(parseConfig *parsecmdtypes.Config) *cobra.Command {
bankModule := bank.NewModule(nil, sources.BankSource, parseCtx.EncodingConfig.Codec, db)
distriModule := distribution.NewModule(sources.DistrSource, parseCtx.EncodingConfig.Codec, db)
stakingModule := staking.NewModule(sources.StakingSource, parseCtx.EncodingConfig.Codec, db)
topaccountsModule := topaccounts.NewModule(sources.AuthSource, bankModule, distriModule, stakingModule, nil, parseCtx.EncodingConfig.Codec, db)
topaccountsModule := topaccounts.NewModule(authModule, sources.AuthSource, bankModule, distriModule, stakingModule, nil, parseCtx.EncodingConfig.Codec, parseCtx.Node, db)

// Get workers
exportQueue := NewQueue(5)
Expand All @@ -66,8 +66,14 @@ func allCmd(parseConfig *parsecmdtypes.Config) *cobra.Command {

waitGroup.Add(1)

// Query the latest chain height
height, err := parseCtx.Node.LatestHeight()
if err != nil {
return fmt.Errorf("error while getting chain latest block height: %s", err)
}

// Get all base accounts, height set to 0 for querying the latest data on chain
accounts, err := authModule.GetAllBaseAccounts(0)
accounts, err := authModule.GetAllBaseAccounts(height)
if err != nil {
return fmt.Errorf("error while getting base accounts: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion database/top_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (db *Db) SaveTopAccountsBalance(column string, bals []types.NativeTokenAmou
}

stmt = stmt[:len(stmt)-1]
stmt += fmt.Sprintf("ON CONFLICT (address) DO UPDATE SET %s = excluded.%s,height = excluded.height WHERE top_accounts.height <= excluded.height", column, column)
stmt += fmt.Sprintf("ON CONFLICT (address) DO UPDATE SET %s = excluded.%s, height = excluded.height WHERE top_accounts.height <= excluded.height", column, column)

_, err := db.SQL.Exec(stmt, params...)
return err
Expand Down
2 changes: 1 addition & 1 deletion modules/distribution/utils_rewards.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/rs/zerolog/log"
)

func (m *Module) RefreshDelegatorRewards(height int64, delegators []string) error {
func (m *Module) RefreshDelegatorRewards(delegators []string, height int64) error {
log.Debug().
Str("module", "distribution").
Int64("height", height).Msg("updating rewards")
Expand Down
2 changes: 1 addition & 1 deletion modules/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (r *Registrar) BuildModules(ctx registrar.Context) jmodules.Modules {
stakingModule := staking.NewModule(sources.StakingSource, cdc, db)
govModule := gov.NewModule(sources.GovSource, authModule, distrModule, mintModule, slashingModule, stakingModule, cdc, db)
upgradeModule := upgrade.NewModule(db, stakingModule)
topAccountsModule := topaccounts.NewModule(sources.AuthSource, bankModule, distrModule, stakingModule, r.parser, cdc, db)
topAccountsModule := topaccounts.NewModule(authModule, sources.AuthSource, bankModule, distrModule, stakingModule, r.parser, cdc, ctx.Proxy, db)

return []jmodules.Module{
messages.NewModule(r.parser, cdc, ctx.Database),
Expand Down
2 changes: 1 addition & 1 deletion modules/staking/utils_delegations.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/rs/zerolog/log"
)

func (m *Module) RefreshDelegations(height int64, delegatorAddr string) error {
func (m *Module) RefreshDelegations(delegatorAddr string, height int64) error {
log.Debug().
Str("module", "staking").
Int64("height", height).Msg("updating delegations")
Expand Down
2 changes: 1 addition & 1 deletion modules/staking/utils_redelegations.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/rs/zerolog/log"
)

func (m *Module) RefreshRedelegations(height int64, delegatorAddr string) error {
func (m *Module) RefreshRedelegations(delegatorAddr string, height int64) error {
log.Debug().
Str("module", "staking").
Int64("height", height).Msg("updating redelegations")
Expand Down
2 changes: 1 addition & 1 deletion modules/staking/utils_unbondings.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/rs/zerolog/log"
)

func (m *Module) RefreshUnbondings(height int64, delegatorAddr string) error {
func (m *Module) RefreshUnbondings(delegatorAddr string, height int64) error {
log.Debug().
Str("module", "staking").
Int64("height", height).Msg("updating unbonding delegations")
Expand Down
17 changes: 13 additions & 4 deletions modules/top_accounts/expected_modules.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package top_accounts

import (
"github.com/forbole/bdjuno/v4/types"
)

type AuthModule interface {
GetAllBaseAccounts(height int64) ([]types.Account, error)
}

type AuthSource interface {
GetTotalNumberOfAccounts(height int64) (uint64, error)
}

type BankModule interface {
UpdateBalances(addresses []string, height int64) error
}

type DistrModule interface {
RefreshDelegatorRewards(height int64, delegators []string) error
RefreshDelegatorRewards(delegators []string, height int64) error
}

type StakingModule interface {
RefreshDelegations(height int64, delegatorAddr string) error
RefreshRedelegations(height int64, delegatorAddr string) error
RefreshUnbondings(height int64, delegatorAddr string) error
RefreshDelegations(delegatorAddr string, height int64) error
RefreshRedelegations(delegatorAddr string, height int64) error
RefreshUnbondings(delegatorAddr string, height int64) error
}
24 changes: 12 additions & 12 deletions modules/top_accounts/handle_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (m *Module) HandleMsg(index int, msg sdk.Msg, tx *juno.Tx) error {
switch cosmosMsg := msg.(type) {

case *stakingtypes.MsgDelegate:
return m.handleMsgDelegate(tx.Height, cosmosMsg.DelegatorAddress)
return m.handleMsgDelegate(cosmosMsg.DelegatorAddress, tx.Height)

case *stakingtypes.MsgBeginRedelegate:
return m.handleMsgBeginRedelegate(tx, index, cosmosMsg.DelegatorAddress)
Expand All @@ -49,15 +49,15 @@ func (m *Module) HandleMsg(index int, msg sdk.Msg, tx *juno.Tx) error {

// Handle x/distribution delegator rewards
case *distritypes.MsgWithdrawDelegatorReward:
return m.handleMsgWithdrawDelegatorReward(tx.Height, cosmosMsg.DelegatorAddress)
return m.handleMsgWithdrawDelegatorReward(cosmosMsg.DelegatorAddress, tx.Height)

}

return nil
}

func (m *Module) handleMsgDelegate(height int64, delAddr string) error {
err := m.stakingModule.RefreshDelegations(height, delAddr)
func (m *Module) handleMsgDelegate(delAddr string, height int64) error {
err := m.stakingModule.RefreshDelegations(delAddr, height)
if err != nil {
return fmt.Errorf("error while refreshing delegations while handling MsgDelegate: %s", err)
}
Expand All @@ -73,7 +73,7 @@ func (m *Module) handleMsgDelegate(height int64, delAddr string) error {
func (m *Module) handleMsgBeginRedelegate(
tx *juno.Tx, index int, delAddr string) error {

err := m.stakingModule.RefreshRedelegations(tx.Height, delAddr)
err := m.stakingModule.RefreshRedelegations(delAddr, tx.Height)
if err != nil {
return fmt.Errorf("error while refreshing redelegations while handling MsgBeginRedelegate: %s", err)
}
Expand All @@ -99,15 +99,15 @@ func (m *Module) handleMsgBeginRedelegate(
}

// When the time expires, refresh the delegations & redelegations
time.AfterFunc(time.Until(completionTime), m.refreshDelegations(tx.Height, delAddr))
time.AfterFunc(time.Until(completionTime), m.refreshDelegations(delAddr, tx.Height))
time.AfterFunc(time.Until(completionTime), m.refreshRedelegations(tx, delAddr))

return nil
}

// handleMsgUndelegate handles a MsgUndelegate storing the data inside the database
func (m *Module) handleMsgUndelegate(tx *juno.Tx, index int, delAddr string) error {
err := m.stakingModule.RefreshUnbondings(tx.Height, delAddr)
err := m.stakingModule.RefreshUnbondings(delAddr, tx.Height)
if err != nil {
return fmt.Errorf("error while refreshing undelegations while handling MsgUndelegate: %s", err)
}
Expand All @@ -133,15 +133,15 @@ func (m *Module) handleMsgUndelegate(tx *juno.Tx, index int, delAddr string) err
}

// When the time expires, refresh the delegations & unbondings & available balance
time.AfterFunc(time.Until(completionTime), m.refreshDelegations(tx.Height, delAddr))
time.AfterFunc(time.Until(completionTime), m.refreshUnbondings(tx.Height, delAddr))
time.AfterFunc(time.Until(completionTime), m.refreshBalance(tx.Height, delAddr))
time.AfterFunc(time.Until(completionTime), m.refreshDelegations(delAddr, tx.Height))
time.AfterFunc(time.Until(completionTime), m.refreshUnbondings(delAddr, tx.Height))
time.AfterFunc(time.Until(completionTime), m.refreshBalance(delAddr, tx.Height))

return nil
}

func (m *Module) handleMsgWithdrawDelegatorReward(height int64, delAddr string) error {
err := m.distrModule.RefreshDelegatorRewards(height, []string{delAddr})
func (m *Module) handleMsgWithdrawDelegatorReward(delAddr string, height int64) error {
err := m.distrModule.RefreshDelegatorRewards([]string{delAddr}, height)
if err != nil {
return fmt.Errorf("error while refreshing delegator rewards: %s", err)
}
Expand Down
54 changes: 52 additions & 2 deletions modules/top_accounts/handle_periodic_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ func (m *Module) RegisterPeriodicOperations(scheduler *gocron.Scheduler) error {
return fmt.Errorf("error while setting up total top accounts periodic operation: %s", err)
}

if _, err := scheduler.Every(1).Day().At("00:00").Do(func() {
utils.WatchMethod(m.RefreshAvailableBalance)
}); err != nil {
return fmt.Errorf("error while setting up top accounts periodic operation: %s", err)
}

if _, err := scheduler.Every(1).Day().At("00:00").Do(func() {
utils.WatchMethod(m.RefreshRewards)
}); err != nil {
Expand All @@ -38,7 +44,7 @@ func (m *Module) RefreshTotalAccounts() error {
return fmt.Errorf("error while getting latest block height: %s", err)
}

totalAccountsNumber, err := m.authModule.GetTotalNumberOfAccounts(height)
totalAccountsNumber, err := m.authSource.GetTotalNumberOfAccounts(height)
if err != nil {
return fmt.Errorf("error while getting total number of accounts: %s", err)
}
Expand All @@ -51,6 +57,50 @@ func (m *Module) RefreshTotalAccounts() error {
return nil
}

// RefreshAvailableBalance refreshes latest available balance in db
func (m *Module) RefreshAvailableBalance() error {
log.Trace().Str("module", "top accounts").Str("operation", "refresh available balance").
Msg("refreshing available balance")

height, err := m.db.GetLastBlockHeight()
if err != nil {
return fmt.Errorf("error while getting latest block height: %s", err)
}

accounts, err := m.authModule.GetAllBaseAccounts(height)
if err != nil {
return fmt.Errorf("error while getting base accounts: %s", err)
}

if len(accounts) == 0 {
return nil
}

// Store accounts
err = m.db.SaveAccounts(accounts)
if err != nil {
return err
}

// Parse addresses to []string
var addresses []string
for _, a := range accounts {
addresses = append(addresses, a.Address)
}

err = m.bankModule.UpdateBalances(addresses, height)
if err != nil {
return fmt.Errorf("error while refreshing top accounts balances, error: %s", err)
}

err = m.refreshTopAccountsSum(addresses, height)
if err != nil {
return fmt.Errorf("error while refreshing top accounts sum value: %s", err)
}

return nil
}

// RefreshRewards refreshes the rewards for all delegators
func (m *Module) RefreshRewards() error {
log.Trace().Str("module", "top accounts").Str("operation", "refresh rewards").
Expand All @@ -72,7 +122,7 @@ func (m *Module) RefreshRewards() error {
}

// Refresh rewards
err = m.distrModule.RefreshDelegatorRewards(height, delegators)
err = m.distrModule.RefreshDelegatorRewards(delegators, height)
if err != nil {
return fmt.Errorf("error while refreshing delegators rewards: %s", err)
}
Expand Down
9 changes: 8 additions & 1 deletion modules/top_accounts/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/cosmos/cosmos-sdk/codec"

"github.com/forbole/bdjuno/v4/database"
"github.com/forbole/juno/v4/node"

"github.com/forbole/juno/v4/modules"
junomessages "github.com/forbole/juno/v4/modules/messages"
Expand All @@ -15,12 +16,14 @@ var (
_ modules.PeriodicOperationsModule = &Module{}
)

// Module represent x/gov module
// Module represent x/top_accounts module
type Module struct {
cdc codec.Codec
node node.Node
db *database.Db
messageParser junomessages.MessageAddressesParser
authModule AuthModule
authSource AuthSource
bankModule BankModule
distrModule DistrModule
stakingModule StakingModule
Expand All @@ -29,16 +32,20 @@ type Module struct {
// NewModule returns a new Module instance
func NewModule(
authModule AuthModule,
authSource AuthSource,
bankModule BankModule,
distrModule DistrModule,
stakingModule StakingModule,
messageParser junomessages.MessageAddressesParser,
cdc codec.Codec,
node node.Node,
db *database.Db,
) *Module {
return &Module{
cdc: cdc,
node: node,
authModule: authModule,
authSource: authSource,
bankModule: bankModule,
distrModule: distrModule,
messageParser: messageParser,
Expand Down
12 changes: 6 additions & 6 deletions modules/top_accounts/utils_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func (m *Module) refreshTopAccountsSum(addresses []string, height int64) error {
return nil
}

func (m *Module) refreshDelegations(height int64, delegator string) func() {
func (m *Module) refreshDelegations(delegator string, height int64) func() {
return func() {
err := m.stakingModule.RefreshDelegations(height, delegator)
err := m.stakingModule.RefreshDelegations(delegator, height)
if err != nil {
log.Error().Str("module", "top_accounts").Err(err).
Str("operation", "refresh delegations").Msg("error while refreshing delegations")
Expand All @@ -34,25 +34,25 @@ func (m *Module) refreshDelegations(height int64, delegator string) func() {

func (m *Module) refreshRedelegations(tx *juno.Tx, delegatorAddr string) func() {
return func() {
err := m.stakingModule.RefreshRedelegations(tx.Height, delegatorAddr)
err := m.stakingModule.RefreshRedelegations(delegatorAddr, tx.Height)
if err != nil {
log.Error().Str("module", "top_accounts").Err(err).
Str("operation", "refresh redelegations").Msg("error while refreshing redelegations")
}
}
}

func (m *Module) refreshUnbondings(height int64, delegatorAddr string) func() {
func (m *Module) refreshUnbondings(delegatorAddr string, height int64) func() {
return func() {
err := m.stakingModule.RefreshUnbondings(height, delegatorAddr)
err := m.stakingModule.RefreshUnbondings(delegatorAddr, height)
if err != nil {
log.Error().Str("module", "top_accounts").Err(err).
Str("operation", "refresh unbondings").Msg("error while refreshing unbonding delegations")
}
}
}

func (m *Module) refreshBalance(height int64, address string) func() {
func (m *Module) refreshBalance(address string, height int64) func() {
return func() {
err := m.bankModule.UpdateBalances([]string{address}, height)
if err != nil {
Expand Down
Loading