From 471d74f1cc2a2c0588e0f3b1463ba934f2021a61 Mon Sep 17 00:00:00 2001 From: Sergey <83376337+freak12techno@users.noreply.github.com> Date: Sun, 28 Apr 2024 00:47:53 +0300 Subject: [PATCH] feat: store proposals fetch height (#76) * feat: store proposals fetch height * chore: preserve older height --- pkg/fetchers/cosmos/fetcher.go | 6 ++-- pkg/fetchers/cosmos/proposals_v1.go | 24 +++++++++++----- pkg/fetchers/cosmos/proposals_v1beta1.go | 28 +++++++++++++++---- pkg/fetchers/cosmos/tally.go | 2 +- pkg/fetchers/fetcher.go | 2 +- pkg/fetchers/neutron/proposals.go | 17 +++++++----- pkg/state/generator.go | 10 ++++++- pkg/state/state.go | 35 +++++++++++++++++++++--- pkg/state/state_test.go | 19 +++++++++++++ 9 files changed, 114 insertions(+), 29 deletions(-) diff --git a/pkg/fetchers/cosmos/fetcher.go b/pkg/fetchers/cosmos/fetcher.go index fa1aa13..5e67e7c 100644 --- a/pkg/fetchers/cosmos/fetcher.go +++ b/pkg/fetchers/cosmos/fetcher.go @@ -26,12 +26,12 @@ func NewRPC(chainConfig *types.Chain, logger zerolog.Logger) *RPC { } } -func (rpc *RPC) GetAllProposals() ([]types.Proposal, *types.QueryError) { +func (rpc *RPC) GetAllProposals(prevHeight int64) ([]types.Proposal, int64, *types.QueryError) { if rpc.ProposalsType == "v1" { - return rpc.GetAllV1Proposals() + return rpc.GetAllV1Proposals(prevHeight) } - return rpc.GetAllV1beta1Proposals() + return rpc.GetAllV1beta1Proposals(prevHeight) } func (rpc *RPC) GetStakingPool() (*responses.PoolRPCResponse, *types.QueryError) { diff --git a/pkg/fetchers/cosmos/proposals_v1.go b/pkg/fetchers/cosmos/proposals_v1.go index 300b7a4..afed19f 100644 --- a/pkg/fetchers/cosmos/proposals_v1.go +++ b/pkg/fetchers/cosmos/proposals_v1.go @@ -8,10 +8,14 @@ import ( "main/pkg/utils" ) -func (rpc *RPC) GetAllV1Proposals() ([]types.Proposal, *types.QueryError) { +func (rpc *RPC) GetAllV1Proposals( + prevHeight int64, +) ([]types.Proposal, int64, *types.QueryError) { proposals := []types.Proposal{} offset := 0 + lastHeight := prevHeight + for { url := fmt.Sprintf( // 2 is for PROPOSAL_STATUS_VOTING_PERIOD @@ -21,23 +25,29 @@ func (rpc *RPC) GetAllV1Proposals() ([]types.Proposal, *types.QueryError) { ) var batchProposals responses.V1ProposalsRPCResponse - errs, header := rpc.Client.GetWithPredicate(url, &batchProposals, types.HTTPPredicateAlwaysPass()) + errs, header := rpc.Client.GetWithPredicate( + url, + &batchProposals, + types.HTTPPredicateCheckHeightAfter(lastHeight), + ) if len(errs) > 0 { - return nil, &types.QueryError{ + return nil, 0, &types.QueryError{ QueryError: nil, NodeErrors: errs, } } - _, err := utils.GetBlockHeightFromHeader(header) + height, err := utils.GetBlockHeightFromHeader(header) if err != nil { - return nil, &types.QueryError{ + return nil, 0, &types.QueryError{ QueryError: errors.New("got error when parsing proposal height"), } } + lastHeight = height + if batchProposals.Message != "" { - return nil, &types.QueryError{ + return nil, height, &types.QueryError{ QueryError: errors.New(batchProposals.Message), } } @@ -53,5 +63,5 @@ func (rpc *RPC) GetAllV1Proposals() ([]types.Proposal, *types.QueryError) { offset += PaginationLimit } - return proposals, nil + return proposals, lastHeight, nil } diff --git a/pkg/fetchers/cosmos/proposals_v1beta1.go b/pkg/fetchers/cosmos/proposals_v1beta1.go index 8eb4089..3cd5cea 100644 --- a/pkg/fetchers/cosmos/proposals_v1beta1.go +++ b/pkg/fetchers/cosmos/proposals_v1beta1.go @@ -8,10 +8,14 @@ import ( "main/pkg/utils" ) -func (rpc *RPC) GetAllV1beta1Proposals() ([]types.Proposal, *types.QueryError) { +func (rpc *RPC) GetAllV1beta1Proposals( + prevHeight int64, +) ([]types.Proposal, int64, *types.QueryError) { proposals := []types.Proposal{} offset := 0 + lastHeight := prevHeight + for { url := fmt.Sprintf( // 2 is for PROPOSAL_STATUS_VOTING_PERIOD @@ -21,19 +25,33 @@ func (rpc *RPC) GetAllV1beta1Proposals() ([]types.Proposal, *types.QueryError) { ) var batchProposals responses.V1Beta1ProposalsRPCResponse - if errs := rpc.Client.Get(url, &batchProposals); len(errs) > 0 { - return nil, &types.QueryError{ + errs, header := rpc.Client.GetWithPredicate( + url, + &batchProposals, + types.HTTPPredicateCheckHeightAfter(lastHeight), + ) + if len(errs) > 0 { + return nil, 0, &types.QueryError{ QueryError: nil, NodeErrors: errs, } } + height, err := utils.GetBlockHeightFromHeader(header) + if err != nil { + return nil, 0, &types.QueryError{ + QueryError: errors.New("got error when parsing proposals height"), + } + } + if batchProposals.Message != "" { - return nil, &types.QueryError{ + return nil, height, &types.QueryError{ QueryError: errors.New(batchProposals.Message), } } + lastHeight = height + parsedProposals := utils.Map(batchProposals.Proposals, func(p responses.V1beta1Proposal) types.Proposal { return p.ToProposal() }) @@ -45,5 +63,5 @@ func (rpc *RPC) GetAllV1beta1Proposals() ([]types.Proposal, *types.QueryError) { offset += PaginationLimit } - return proposals, nil + return proposals, lastHeight, nil } diff --git a/pkg/fetchers/cosmos/tally.go b/pkg/fetchers/cosmos/tally.go index 9233429..b31ee12 100644 --- a/pkg/fetchers/cosmos/tally.go +++ b/pkg/fetchers/cosmos/tally.go @@ -61,7 +61,7 @@ func (rpc *RPC) GetTallies() (types.ChainTallyInfos, error) { go func() { defer wg.Done() - chainProposals, err := rpc.GetAllProposals() + chainProposals, _, err := rpc.GetAllProposals(0) mutex.Lock() diff --git a/pkg/fetchers/fetcher.go b/pkg/fetchers/fetcher.go index 1bb6c00..4a7dddf 100644 --- a/pkg/fetchers/fetcher.go +++ b/pkg/fetchers/fetcher.go @@ -9,7 +9,7 @@ import ( ) type Fetcher interface { - GetAllProposals() ([]types.Proposal, *types.QueryError) + GetAllProposals(prevHeight int64) ([]types.Proposal, int64, *types.QueryError) GetVote(proposal, voter string, prevHeight int64) (*types.Vote, int64, *types.QueryError) GetTallies() (types.ChainTallyInfos, error) diff --git a/pkg/fetchers/neutron/proposals.go b/pkg/fetchers/neutron/proposals.go index 730fec7..57632cb 100644 --- a/pkg/fetchers/neutron/proposals.go +++ b/pkg/fetchers/neutron/proposals.go @@ -5,21 +5,24 @@ import ( "main/pkg/types" ) -func (fetcher *Fetcher) GetAllProposals() ([]types.Proposal, *types.QueryError) { +func (fetcher *Fetcher) GetAllProposals( + prevHeight int64, +) ([]types.Proposal, int64, *types.QueryError) { query := "{\"list_proposals\": {}}" var proposals responses.ProposalsResponse - if _, err := fetcher.GetSmartContractState(query, &proposals, 0); err != nil { - return nil, err + height, err := fetcher.GetSmartContractState(query, &proposals, prevHeight) + if err != nil { + return nil, height, err } - proposalsParsed, err := proposals.ToProposals() - if err != nil { - return nil, &types.QueryError{ + proposalsParsed, parseErr := proposals.ToProposals() + if parseErr != nil { + return nil, height, &types.QueryError{ QueryError: err, NodeErrors: nil, } } - return proposalsParsed, nil + return proposalsParsed, height, nil } diff --git a/pkg/state/generator.go b/pkg/state/generator.go index dadafa6..7aa16e3 100644 --- a/pkg/state/generator.go +++ b/pkg/state/generator.go @@ -46,13 +46,15 @@ func (g *Generator) ProcessChain( ) { fetcher := fetchers.GetFetcher(chain, g.Logger) - proposals, err := fetcher.GetAllProposals() + prevHeight := oldState.GetLastProposalsHeight(chain) + proposals, proposalsHeight, err := fetcher.GetAllProposals(prevHeight) if err != nil { g.Logger.Warn().Err(err).Msg("Error processing proposals") g.Mutex.Lock() defer g.Mutex.Unlock() state.SetChainProposalsError(chain, err) + state.SetChainProposalsHeight(chain, prevHeight) stateChain, found := oldState.ChainInfos[chain.Name] if found { @@ -66,8 +68,11 @@ func (g *Generator) ProcessChain( g.Logger.Info(). Str("chain", chain.Name). Int("len", len(proposals)). + Int64("height", proposalsHeight). Msg("Got proposals") + state.SetChainProposalsHeight(chain, proposalsHeight) + var wg sync.WaitGroup for _, proposal := range proposals { @@ -128,6 +133,9 @@ func (g *Generator) ProcessProposalAndWallet( if err != nil { proposalVote.Error = err + if found { + proposalVote.Height = oldVote.Height + } } else { proposalVote.Vote = vote proposalVote.Height = voteHeight diff --git a/pkg/state/state.go b/pkg/state/state.go index 94abc0a..08ff2a3 100644 --- a/pkg/state/state.go +++ b/pkg/state/state.go @@ -25,9 +25,10 @@ type WalletVotes struct { } type ChainInfo struct { - Chain *types.Chain - ProposalVotes map[string]WalletVotes - ProposalsError *types.QueryError + Chain *types.Chain + ProposalVotes map[string]WalletVotes + ProposalsError *types.QueryError + ProposalsHeight int64 } func (c ChainInfo) HasProposalsError() bool { @@ -44,6 +45,14 @@ func NewState() State { } } +func (s *State) GetLastProposalsHeight(chain *types.Chain) int64 { + if chainInfo, ok := s.ChainInfos[chain.Name]; !ok { + return 0 + } else { + return chainInfo.ProposalsHeight + } +} + func (s *State) SetVote(chain *types.Chain, proposal types.Proposal, wallet *types.Wallet, vote ProposalVote) { if _, ok := s.ChainInfos[chain.Name]; !ok { s.ChainInfos[chain.Name] = &ChainInfo{ @@ -69,7 +78,25 @@ func (s *State) SetChainProposalsError(chain *types.Chain, err *types.QueryError } } -func (s *State) SetChainVotes(chain *types.Chain, votes map[string]WalletVotes) { +func (s *State) SetChainProposalsHeight( + chain *types.Chain, + height int64, +) { + if _, ok := s.ChainInfos[chain.Name]; !ok { + s.ChainInfos[chain.Name] = &ChainInfo{ + Chain: chain, + ProposalVotes: make(map[string]WalletVotes), + } + } + + stateChain := s.ChainInfos[chain.Name] + stateChain.ProposalsHeight = height +} + +func (s *State) SetChainVotes( + chain *types.Chain, + votes map[string]WalletVotes, +) { stateChain := s.ChainInfos[chain.Name] stateChain.ProposalVotes = votes } diff --git a/pkg/state/state_test.go b/pkg/state/state_test.go index 5588a4f..99df942 100644 --- a/pkg/state/state_test.go +++ b/pkg/state/state_test.go @@ -98,6 +98,25 @@ func TestSetProposalErrorWithChainInfo(t *testing.T) { assert.Equal(t, "test error", err.QueryError.Error(), "Errors text should match!") } +func TestSetProposalLastHeight(t *testing.T) { + t.Parallel() + + state := State{ + ChainInfos: map[string]*ChainInfo{ + "chain1": {ProposalsHeight: 123}, + }, + } + + assert.Equal(t, int64(123), state.GetLastProposalsHeight(&types.Chain{Name: "chain1"})) + assert.Equal(t, int64(0), state.GetLastProposalsHeight(&types.Chain{Name: "chain2"})) + + state.SetChainProposalsHeight(&types.Chain{Name: "chain1"}, 456) + state.SetChainProposalsHeight(&types.Chain{Name: "chain2"}, 789) + + assert.Equal(t, int64(456), state.GetLastProposalsHeight(&types.Chain{Name: "chain1"})) + assert.Equal(t, int64(789), state.GetLastProposalsHeight(&types.Chain{Name: "chain2"})) +} + func TestGetVoteWithoutChainInfo(t *testing.T) { t.Parallel()