Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [cheqd] add parse cmd to top accounts module #511

Merged
merged 15 commits into from
Feb 3, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#### [cheqd] Top Accounts Module
- ([\#510](https://github.com/forbole/bdjuno/pull/510)) Implemented `top_accounts` module to store chain native token's balance for ranking
- ([\#511](https://github.com/forbole/bdjuno/pull/511)) Implemented parse cmd for `top_accounts` module


#### CI
Expand Down
2 changes: 2 additions & 0 deletions cmd/parse/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
parsemint "github.com/forbole/bdjuno/v3/cmd/parse/mint"
parsepricefeed "github.com/forbole/bdjuno/v3/cmd/parse/pricefeed"
parsestaking "github.com/forbole/bdjuno/v3/cmd/parse/staking"
parsetopaccounts "github.com/forbole/bdjuno/v3/cmd/parse/top_accounts"
parsetransaction "github.com/forbole/juno/v3/cmd/parse/transactions"
)

Expand All @@ -38,6 +39,7 @@ func NewParseCmd(parseCfg *parse.Config) *cobra.Command {
parsemint.NewMintCmd(parseCfg),
parsepricefeed.NewPricefeedCmd(parseCfg),
parsestaking.NewStakingCmd(parseCfg),
parsetopaccounts.NewTopAccountsCmd(parseCfg),
parsetransaction.NewTransactionsCmd(parseCfg),
)

Expand Down
122 changes: 122 additions & 0 deletions cmd/parse/top_accounts/all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package top_accounts

import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"

"github.com/forbole/bdjuno/v3/modules/bank"
"github.com/forbole/bdjuno/v3/modules/distribution"
"github.com/forbole/bdjuno/v3/modules/staking"
topaccounts "github.com/forbole/bdjuno/v3/modules/top_accounts"
modulestypes "github.com/forbole/bdjuno/v3/modules/types"
"github.com/forbole/bdjuno/v3/types"
"github.com/rs/zerolog/log"

parsecmdtypes "github.com/forbole/juno/v3/cmd/parse/types"
"github.com/forbole/juno/v3/parser"
"github.com/forbole/juno/v3/types/config"
"github.com/spf13/cobra"

"github.com/forbole/bdjuno/v3/database"
"github.com/forbole/bdjuno/v3/modules/auth"
)

var (
waitGroup sync.WaitGroup
)

const (
flagWorker = "worker"
)

func allCmd(parseConfig *parsecmdtypes.Config) *cobra.Command {
cmd := &cobra.Command{
Use: "all",
RunE: func(cmd *cobra.Command, args []string) error {
parseCtx, err := parsecmdtypes.GetParserContext(config.Cfg, parseConfig)
if err != nil {
return err
}

sources, err := modulestypes.BuildSources(config.Cfg.Node, parseCtx.EncodingConfig)
if err != nil {
return err
}

// Get the database
db := database.Cast(parseCtx.Database)

// Build modules
authModule := auth.NewModule(sources.AuthSource, nil, parseCtx.EncodingConfig.Marshaler, db)
bankModule := bank.NewModule(nil, sources.BankSource, parseCtx.EncodingConfig.Marshaler, db)
distriModule := distribution.NewModule(sources.DistrSource, parseCtx.EncodingConfig.Marshaler, db)
stakingModule := staking.NewModule(sources.StakingSource, parseCtx.EncodingConfig.Marshaler, db)
topaccountsModule := topaccounts.NewModule(bankModule, distriModule, stakingModule, nil, parseCtx.EncodingConfig.Marshaler, db)

// Get workers
exportQueue := NewQueue(5)
workerCount, _ := cmd.Flags().GetInt64(flagWorker)
workers := make([]Worker, workerCount)
for i := range workers {
workers[i] = NewWorker(exportQueue, topaccountsModule)
}

waitGroup.Add(1)

// Get all base accounts, height set to 0 for querying the latest data on chain
accounts, err := authModule.GetAllBaseAccounts(0)
if err != nil {
return fmt.Errorf("error while getting base accounts: %s", err)
}

log.Debug().Int("total", len(accounts)).Msg("saving accounts...")
// Store accounts
err = db.SaveAccounts(accounts)
if err != nil {
return err
}

for i, w := range workers {
log.Debug().Int("number", i+1).Msg("starting worker...")
go w.start()
}

trapSignal(parseCtx)

go enqueueAddresses(exportQueue, accounts)

waitGroup.Wait()
return nil
},
}

cmd.Flags().Int64(flagWorker, 1, "worker count")

return cmd
}

func enqueueAddresses(exportQueue AddressQueue, accounts []types.Account) {
for _, account := range accounts {
exportQueue <- account.Address
}
}

// trapSignal will listen for any OS signal and invoke Done on the main
// WaitGroup allowing the main process to gracefully exit.
func trapSignal(ctx *parser.Context) {
var sigCh = make(chan os.Signal, 1)

signal.Notify(sigCh, syscall.SIGTERM)
signal.Notify(sigCh, syscall.SIGINT)

go func() {
sig := <-sigCh
log.Info().Str("signal", sig.String()).Msg("caught signal; shutting down...")
defer ctx.Node.Stop()
defer ctx.Database.Close()
defer waitGroup.Done()
}()
}
18 changes: 18 additions & 0 deletions cmd/parse/top_accounts/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package top_accounts

import (
parsecmdtypes "github.com/forbole/juno/v3/cmd/parse/types"
"github.com/spf13/cobra"
)

func NewTopAccountsCmd(parseConfig *parsecmdtypes.Config) *cobra.Command {
cmd := &cobra.Command{
Use: "top-accounts",
}

cmd.AddCommand(
allCmd(parseConfig),
)

return cmd
}
38 changes: 38 additions & 0 deletions cmd/parse/top_accounts/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package top_accounts

import (
topaccounts "github.com/forbole/bdjuno/v3/modules/top_accounts"
"github.com/rs/zerolog/log"
)

type AddressQueue chan string

func NewQueue(size int) AddressQueue {
return make(chan string, size)
}

type Worker struct {
queue AddressQueue
topaccountsModule *topaccounts.Module
}

func NewWorker(queue AddressQueue, topaccountsModule *topaccounts.Module) Worker {
return Worker{
queue: queue,
topaccountsModule: topaccountsModule,
}
}

func (w Worker) start() {
for address := range w.queue {
err := w.topaccountsModule.RefreshAll(address)
if err != nil {
log.Error().Str("account", address).Err(err).Msg("re-enqueueing failed address")

go func(address string) {
w.queue <- address
}(address)
}

}
}
5 changes: 4 additions & 1 deletion modules/auth/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/cosmos/cosmos-sdk/codec"

"github.com/forbole/bdjuno/v3/database"
"github.com/forbole/bdjuno/v3/modules/auth/source"

"github.com/forbole/juno/v3/modules"
"github.com/forbole/juno/v3/modules/messages"
Expand All @@ -20,12 +21,14 @@ type Module struct {
cdc codec.Codec
db *database.Db
messagesParser messages.MessageAddressesParser
source source.Source
}

// NewModule builds a new Module instance
func NewModule(messagesParser messages.MessageAddressesParser, cdc codec.Codec, db *database.Db) *Module {
func NewModule(source source.Source, messagesParser messages.MessageAddressesParser, cdc codec.Codec, db *database.Db) *Module {
return &Module{
messagesParser: messagesParser,
source: source,
cdc: cdc,
db: db,
}
Expand Down
78 changes: 78 additions & 0 deletions modules/auth/source/local/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package local

import (
"fmt"

codectypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/types/query"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
"github.com/rs/zerolog/log"

"github.com/forbole/juno/v3/node/local"

sdk "github.com/cosmos/cosmos-sdk/types"
source "github.com/forbole/bdjuno/v3/modules/auth/source"
)

var (
_ source.Source = &Source{}
)

// Source implements authsource.Source by using a local node
type Source struct {
*local.Source
q authtypes.QueryServer
}

// NewSource builds a new Source instance
func NewSource(source *local.Source, q authtypes.QueryServer) *Source {
return &Source{
Source: source,
q: q,
}
}

func (s Source) GetAllAnyAccounts(height int64) ([]*codectypes.Any, error) {
log.Debug().Msg("getting all accounts")
ctx, err := s.LoadHeight(height)
if err != nil {
return nil, fmt.Errorf("error while loading height: %s", err)
}

var accounts []*codectypes.Any
var nextKey []byte
var stop = false
var counter uint64
var totalCounts uint64

// Get 1000 accounts per query
var pageLimit uint64 = 1000

for !stop {
// Get accounts
res, err := s.q.Accounts(
sdk.WrapSDKContext(ctx),
&authtypes.QueryAccountsRequest{
Pagination: &query.PageRequest{
Key: nextKey,
Limit: pageLimit,
CountTotal: true,
},
})
if err != nil {
return nil, fmt.Errorf("error while getting any accounts from source: %s", err)
}
nextKey = res.Pagination.NextKey
stop = len(res.Pagination.NextKey) == 0
accounts = append(accounts, res.Accounts...)

// Log getting accounts progress
if res.Pagination.GetTotal() != 0 {
totalCounts = res.Pagination.GetTotal()
}
counter += uint64(len(res.Accounts))
log.Debug().Uint64("total accounts", totalCounts).Uint64("current counter", counter).Msg("getting accounts...")
}

return accounts, nil
}
72 changes: 72 additions & 0 deletions modules/auth/source/remote/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package remote

import (
"fmt"

"github.com/cosmos/cosmos-sdk/types/query"
"github.com/forbole/juno/v3/node/remote"
"github.com/rs/zerolog/log"

codectypes "github.com/cosmos/cosmos-sdk/codec/types"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
source "github.com/forbole/bdjuno/v3/modules/auth/source"
)

var (
_ source.Source = &Source{}
)

type Source struct {
*remote.Source
authClient authtypes.QueryClient
}

// NewSource builds a new Source instance
func NewSource(source *remote.Source, authClient authtypes.QueryClient) *Source {
return &Source{
Source: source,
authClient: authClient,
}
}

func (s Source) GetAllAnyAccounts(height int64) ([]*codectypes.Any, error) {
log.Debug().Msg("getting all accounts")
ctx := remote.GetHeightRequestContext(s.Ctx, height)

var accounts []*codectypes.Any
var nextKey []byte
var stop = false
var counter uint64
var totalCounts uint64

// Get 1000 accounts per query
var pageLimit uint64 = 1000

for !stop {
// Get accounts
res, err := s.authClient.Accounts(
ctx,
&authtypes.QueryAccountsRequest{
Pagination: &query.PageRequest{
Key: nextKey,
Limit: pageLimit,
CountTotal: true,
},
})
if err != nil {
return nil, fmt.Errorf("error while getting any accounts from source: %s", err)
}
nextKey = res.Pagination.NextKey
stop = len(res.Pagination.NextKey) == 0
accounts = append(accounts, res.Accounts...)

// Log getting accounts progress
if res.Pagination.GetTotal() != 0 {
totalCounts = res.Pagination.GetTotal()
}
counter += uint64(len(res.Accounts))
log.Debug().Uint64("total accounts", totalCounts).Uint64("current counter", counter).Msg("getting accounts...")
}

return accounts, nil
}
7 changes: 7 additions & 0 deletions modules/auth/source/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package source

import codectypes "github.com/cosmos/cosmos-sdk/codec/types"

type Source interface {
GetAllAnyAccounts(height int64) ([]*codectypes.Any, error)
}
Loading