From 0f21cb47fbd246ded08278fb8ecba435399675ba Mon Sep 17 00:00:00 2001 From: JoeGruff Date: Wed, 25 Jan 2023 14:32:42 +0900 Subject: [PATCH] client/eth: Check provider header times. When fetching a new or cached header with a provider, do a basic check on the header's time to determine if the header, and so the provider, are up to date. --- client/asset/eth/eth.go | 12 +- client/asset/eth/eth_test.go | 20 +- client/asset/eth/multirpc.go | 251 ++++++++++++-------- client/asset/eth/multirpc_live_test.go | 25 +- client/asset/eth/nodeclient.go | 11 +- client/asset/eth/nodeclient_harness_test.go | 27 +-- 6 files changed, 191 insertions(+), 155 deletions(-) diff --git a/client/asset/eth/eth.go b/client/asset/eth/eth.go index 78dc878b89..81b7803f57 100644 --- a/client/asset/eth/eth.go +++ b/client/asset/eth/eth.go @@ -325,7 +325,7 @@ type ethFetcher interface { sendSignedTransaction(ctx context.Context, tx *types.Transaction) error sendTransaction(ctx context.Context, txOpts *bind.TransactOpts, to common.Address, data []byte) (*types.Transaction, error) signData(data []byte) (sig, pubKey []byte, err error) - syncProgress(context.Context) (*ethereum.SyncProgress, error) + syncProgress(context.Context) (progress *ethereum.SyncProgress, tipTime uint64, err error) transactionConfirmations(context.Context, common.Hash) (uint32, error) getTransaction(context.Context, common.Hash) (*types.Transaction, int64, error) txOpts(ctx context.Context, val, maxGas uint64, maxFeeRate, nonce *big.Int) (*bind.TransactOpts, error) @@ -622,7 +622,7 @@ func createWallet(createWalletParams *asset.CreateWalletParams, skipConnect bool } if len(unknownEndpoints) > 0 && createWalletParams.Net == dex.Mainnet { - providers, err := connectProviders(ctx, unknownEndpoints, createWalletParams.Logger, big.NewInt(chainIDs[createWalletParams.Net])) + providers, err := connectProviders(ctx, unknownEndpoints, createWalletParams.Logger, big.NewInt(chainIDs[createWalletParams.Net]), createWalletParams.Net) if err != nil { return err } @@ -2885,17 +2885,13 @@ func (*baseWallet) ValidateSecret(secret, secretHash []byte) bool { // more, requesting the best block header starts to fail after a few tries // during initial sync. Investigate how to get correct sync progress. func (eth *baseWallet) SyncStatus() (bool, float32, error) { - prog, err := eth.node.syncProgress(eth.ctx) + prog, tipTime, err := eth.node.syncProgress(eth.ctx) if err != nil { return false, 0, err } checkHeaderTime := func() (bool, error) { - bh, err := eth.node.bestHeader(eth.ctx) - if err != nil { - return false, err - } // Time in the header is in seconds. - timeDiff := time.Now().Unix() - int64(bh.Time) + timeDiff := time.Now().Unix() - int64(tipTime) if timeDiff > dexeth.MaxBlockInterval && eth.net != dex.Simnet { eth.log.Infof("Time since last eth block (%d sec) exceeds %d sec."+ "Assuming not in sync. Ensure your computer's system clock "+ diff --git a/client/asset/eth/eth_test.go b/client/asset/eth/eth_test.go index fe6488dabb..7c756ffd31 100644 --- a/client/asset/eth/eth_test.go +++ b/client/asset/eth/eth_test.go @@ -97,6 +97,8 @@ type testNode struct { bestHdr *types.Header bestHdrErr error syncProg ethereum.SyncProgress + syncProgT uint64 + syncProgErr error bal *big.Int balErr error signDataErr error @@ -206,8 +208,8 @@ func (n *testNode) lock() error { func (n *testNode) locked() bool { return false } -func (n *testNode) syncProgress(context.Context) (*ethereum.SyncProgress, error) { - return &n.syncProg, nil +func (n *testNode) syncProgress(context.Context) (prog *ethereum.SyncProgress, bestBlockUNIXTime uint64, err error) { + return &n.syncProg, n.syncProgT, n.syncProgErr } func (n *testNode) peerCount() uint32 { return 1 @@ -465,8 +467,8 @@ func TestSyncStatus(t *testing.T) { tests := []struct { name string syncProg ethereum.SyncProgress + syncProgErr error subSecs uint64 - bestHdrErr error wantErr, wantSynced bool wantRatio float32 }{{ @@ -492,22 +494,22 @@ func TestSyncStatus(t *testing.T) { }, subSecs: dexeth.MaxBlockInterval + 1, }, { - name: "best header error", - bestHdrErr: errors.New(""), + name: "sync progress error", syncProg: ethereum.SyncProgress{ CurrentBlock: 25, HighestBlock: 0, }, - wantErr: true, + syncProgErr: errors.New(""), + wantErr: true, }} for _, test := range tests { nowInSecs := uint64(time.Now().Unix()) ctx, cancel := context.WithCancel(context.Background()) node := &testNode{ - syncProg: test.syncProg, - bestHdr: &types.Header{Time: nowInSecs - test.subSecs}, - bestHdrErr: test.bestHdrErr, + syncProg: test.syncProg, + syncProgT: nowInSecs - test.subSecs, + syncProgErr: test.syncProgErr, } eth := &baseWallet{ node: node, diff --git a/client/asset/eth/multirpc.go b/client/asset/eth/multirpc.go index 2a38d45f54..0b975ba474 100644 --- a/client/asset/eth/multirpc.go +++ b/client/asset/eth/multirpc.go @@ -42,6 +42,10 @@ const ( // failQuarantine is how long we will wait after a failed request before // trying a provider again. failQuarantine = time.Minute + // headerCheckInterval is the time between header checks. Slightly less + // than the fail quarantine to ensure providers with old headers stay + // quarantined. + headerCheckInterval = time.Second * 50 // receiptCacheExpiration is how long we will track a receipt after the // last request. There is no persistent storage, so all receipts are cached // in-memory. @@ -83,7 +87,9 @@ type provider struct { host string ec *combinedRPCClient ws bool + net dex.Network tipCapV atomic.Value // *cachedTipCap + stop func() // tip tracks the best known header as well as any error encountered tip struct { @@ -95,13 +101,22 @@ type provider struct { } } -func (p *provider) setTip(header *types.Header) { +func (p *provider) shutdown() { + p.stop() + p.ec.Close() +} + +func (p *provider) setTip(header *types.Header, log dex.Logger) { p.tip.Lock() p.tip.header = header p.tip.headerStamp = time.Now() p.tip.failStamp = time.Time{} + unfailed := p.tip.failCount != 0 p.tip.failCount = 0 p.tip.Unlock() + if unfailed { + log.Debugf("Provider at %s was failed but is now useable again.", p.host) + } } // cachedTip retrieves the last known best header. @@ -117,6 +132,7 @@ func (p *provider) cachedTip() *types.Header { p.tip.RLock() defer p.tip.RUnlock() + if time.Since(p.tip.failStamp) < failQuarantine || time.Since(p.tip.headerStamp) > stale { return nil } @@ -142,27 +158,30 @@ func (p *provider) failed() bool { // bestHeader get the best known header from the provider, cached if available, // otherwise a new RPC call is made. func (p *provider) bestHeader(ctx context.Context, log dex.Logger) (*types.Header, error) { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() // Check if we have a cached header. if tip := p.cachedTip(); tip != nil { - log.Tracef("using cached header from %q", p.host) + log.Tracef("Using cached header from %q", p.host) return tip, nil } - log.Tracef("fetching fresh header from %q", p.host) + log.Tracef("Fetching fresh header from %q", p.host) hdr, err := p.ec.HeaderByNumber(ctx, nil /* latest */) if err != nil { p.setFailed() return nil, fmt.Errorf("HeaderByNumber error: %w", err) } - p.setTip(hdr) + timeDiff := time.Now().Unix() - int64(hdr.Time) + if timeDiff > dexeth.MaxBlockInterval && p.net != dex.Simnet { + p.setFailed() + return nil, fmt.Errorf("time since last eth block (%d sec) exceeds %d sec. "+ + "Assuming provider %s is not in sync. Ensure your computer's system clock "+ + "is correct.", timeDiff, dexeth.MaxBlockInterval, p.host) + } + p.setTip(hdr, log) return hdr, nil } func (p *provider) headerByHash(ctx context.Context, h common.Hash) (*types.Header, error) { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() hdr, err := p.ec.HeaderByHash(ctx, h) if err != nil { p.setFailed() @@ -174,8 +193,6 @@ func (p *provider) headerByHash(ctx context.Context, h common.Hash) (*types.Head // suggestTipCap returns a tip cap suggestion, cached if available, otherwise a // new RPC call is made. func (p *provider) suggestTipCap(ctx context.Context, log dex.Logger) *big.Int { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() if cachedV := p.tipCapV.Load(); cachedV != nil { rec := cachedV.(*cachedTipCap) if time.Since(rec.stamp) < tipCapSuggestionExpiration { @@ -202,6 +219,32 @@ func (p *provider) suggestTipCap(ctx context.Context, log dex.Logger) *big.Int { return tipCap } +// refreshHeader fetches a header every headerCheckInterval. This keeps the +// cached header up to date or fails the provider if there is a problem getting +// the header. +func (p *provider) refreshHeader(ctx context.Context, log dex.Logger) { + log.Tracef("handling header refreshes for %q", p.host) + ticker := time.NewTicker(headerCheckInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + // Fetching the best header will check that either the + // provider's cached header is not too old or that a + // newly fetched header is not too old. If it is too + // old that indicates the provider is not in sync and + // should not be used. + innerCtx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) + if _, err := p.bestHeader(innerCtx, log); err != nil { + log.Warnf("Problem getting best header from provider %s: %s.", p.host, err) + } + cancel() + case <-ctx.Done(): + return + } + } +} + // subscribeHeaders starts a listening loop for header updates for a provider. // The Subscription and header chan are passed in, because error-free // instantiation of these variable is necessary to accepting that a websocket @@ -256,7 +299,7 @@ func (p *provider) subscribeHeaders(ctx context.Context, sub ethereum.Subscripti select { case hdr := <-h: log.Tracef("%q reported new tip at height %s (%s)", p.host, hdr.Number, hdr.Hash()) - p.setTip(hdr) + p.setTip(hdr, log) case err, ok := <-sub.Err(): if !ok { // Subscription cancelled @@ -300,6 +343,7 @@ type multiRPCClient struct { creds *accountCredentials log dex.Logger chainID *big.Int + net dex.Network providerMtx sync.RWMutex endpoints []string @@ -336,6 +380,7 @@ func newMultiRPCClient(dir string, endpoints []string, log dex.Logger, cfg *para } m := &multiRPCClient{ + net: net, cfg: cfg, log: log, creds: creds, @@ -352,14 +397,14 @@ func newMultiRPCClient(dir string, endpoints []string, log dex.Logger, cfg *para // list of providers that were successfully connected. It is not an error for a // connection to fail. The caller can infer failed connections from the length // and contents of the returned provider list. -func connectProviders(ctx context.Context, endpoints []string, log dex.Logger, chainID *big.Int) ([]*provider, error) { +func connectProviders(ctx context.Context, endpoints []string, log dex.Logger, chainID *big.Int, net dex.Network) ([]*provider, error) { providers := make([]*provider, 0, len(endpoints)) var success bool defer func() { if !success { for _, p := range providers { - p.ec.Close() + p.shutdown() } } }() @@ -467,18 +512,38 @@ func connectProviders(ctx context.Context, endpoints []string, log dex.Logger, c p := &provider{ host: host, ws: sub != nil, + net: net, ec: &combinedRPCClient{ Client: ec, rpc: rpcClient, }, } - p.setTip(hdr) - providers = append(providers, p) + p.setTip(hdr, log) + + ctx, cancel := context.WithCancel(ctx) + var wg sync.WaitGroup // Start websocket listen loop. if sub != nil { - go p.subscribeHeaders(ctx, sub, h, log) + wg.Add(1) + go func() { + p.subscribeHeaders(ctx, sub, h, log) + wg.Done() + }() } + wg.Add(1) + go func() { + p.refreshHeader(ctx, log) + wg.Done() + }() + + p.stop = func() { + cancel() + wg.Wait() + } + + providers = append(providers, p) + return nil } @@ -499,7 +564,7 @@ func connectProviders(ctx context.Context, endpoints []string, log dex.Logger, c } func (m *multiRPCClient) connect(ctx context.Context) (err error) { - providers, err := connectProviders(ctx, m.endpoints, m.log, m.chainID) + providers, err := connectProviders(ctx, m.endpoints, m.log, m.chainID, m.net) if err != nil { return err } @@ -524,7 +589,7 @@ func (m *multiRPCClient) connect(ctx context.Context) (err error) { go func() { <-ctx.Done() for _, p := range m.providerList() { - p.ec.Close() + p.shutdown() } }() @@ -575,7 +640,7 @@ func (m *multiRPCClient) reconfigure(ctx context.Context, settings map[string]st return errors.New("no providers specified") } endpoints := strings.Split(providerDef, " ") - providers, err := connectProviders(ctx, endpoints, m.log, m.chainID) + providers, err := connectProviders(ctx, endpoints, m.log, m.chainID, m.net) if err != nil { return err } @@ -585,7 +650,7 @@ func (m *multiRPCClient) reconfigure(ctx context.Context, settings map[string]st m.endpoints = endpoints m.providerMtx.Unlock() for _, p := range oldProviders { - p.ec.Close() + p.shutdown() } return nil } @@ -639,9 +704,7 @@ func (m *multiRPCClient) transactionReceipt(ctx context.Context, txHash common.H } // Fetch a fresh one. - if err = m.withPreferred(func(p *provider) error { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() + if err = m.withPreferred(ctx, func(ctx context.Context, p *provider) error { r, err = p.ec.TransactionReceipt(ctx, txHash) return err }); err != nil { @@ -690,8 +753,6 @@ func (tx *rpcTransaction) UnmarshalJSON(b []byte) error { } func getRPCTransaction(ctx context.Context, p *provider, txHash common.Hash) (*rpcTransaction, error) { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() var resp *rpcTransaction err := p.ec.rpc.CallContext(ctx, &resp, "eth_getTransactionByHash", txHash) if err != nil { @@ -708,7 +769,7 @@ func getRPCTransaction(ctx context.Context, p *provider, txHash common.Hash) (*r } func (m *multiRPCClient) getTransaction(ctx context.Context, txHash common.Hash) (tx *types.Transaction, h int64, err error) { - return tx, h, m.withPreferred(func(p *provider) error { + return tx, h, m.withPreferred(ctx, func(ctx context.Context, p *provider) error { resp, err := getRPCTransaction(ctx, p, txHash) if err != nil { if isNotFoundError(err) { @@ -731,9 +792,7 @@ func (m *multiRPCClient) getTransaction(ctx context.Context, txHash common.Hash) } func (m *multiRPCClient) getConfirmedNonce(ctx context.Context, blockNumber int64) (n uint64, err error) { - return n, m.withPreferred(func(p *provider) error { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() + return n, m.withPreferred(ctx, func(ctx context.Context, p *provider) error { n, err = p.ec.PendingNonceAt(ctx, m.address()) return err }) @@ -789,8 +848,11 @@ func errorFilter(err error, matches ...interface{}) bool { } // withOne runs the provider function against the providers in order until one -// succeeds or all have failed. -func (m *multiRPCClient) withOne(providers []*provider, f func(*provider) error, acceptabilityFilters ...acceptabilityFilter) (superError error) { +// succeeds or all have failed. The context used to run functions has a time +// limit equal to defaultRequestTimeout for all requests to return. If +// operations are expected to run longer than that the calling function should +// not use the altered context. +func (m *multiRPCClient) withOne(ctx context.Context, providers []*provider, f func(context.Context, *provider) error, acceptabilityFilters ...acceptabilityFilter) (superError error) { readyProviders := make([]*provider, 0, len(providers)) for _, p := range providers { if !p.failed() { @@ -803,9 +865,11 @@ func (m *multiRPCClient) withOne(providers []*provider, f func(*provider) error, readyProviders = providers } for _, p := range readyProviders { - err := f(p) + ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) + err := f(ctx, p) + cancel() if err == nil { - break + return nil } if superError == nil { superError = err @@ -825,21 +889,24 @@ func (m *multiRPCClient) withOne(providers []*provider, f func(*provider) error, } } } + if superError == nil { + return errors.New("all providers in a failed state") + } return } // withAny runs the provider function against known providers in random order // until one succeeds or all have failed. -func (m *multiRPCClient) withAny(f func(*provider) error, acceptabilityFilters ...acceptabilityFilter) error { +func (m *multiRPCClient) withAny(ctx context.Context, f func(context.Context, *provider) error, acceptabilityFilters ...acceptabilityFilter) error { providers := m.providerList() shuffleProviders(providers) - return m.withOne(providers, f, acceptabilityFilters...) + return m.withOne(ctx, providers, f, acceptabilityFilters...) } // withPreferred is like withAny, but will prioritize recently used nonce // providers. -func (m *multiRPCClient) withPreferred(f func(*provider) error, acceptabilityFilters ...acceptabilityFilter) error { - return m.withOne(m.nonceProviderList(), f, acceptabilityFilters...) +func (m *multiRPCClient) withPreferred(ctx context.Context, f func(context.Context, *provider) error, acceptabilityFilters ...acceptabilityFilter) error { + return m.withOne(ctx, m.nonceProviderList(), f, acceptabilityFilters...) } // nonceProviderList returns the randomized provider list, but with any recent @@ -877,9 +944,7 @@ func (m *multiRPCClient) nextNonce(ctx context.Context) (nonce uint64, err error checkDelay := time.Second * 5 for i := 0; i < checks; i++ { var host string - err = m.withPreferred(func(p *provider) error { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() + err = m.withPreferred(ctx, func(ctx context.Context, p *provider) error { host = p.host nonce, err = p.ec.PendingNonceAt(ctx, m.creds.addr) return err @@ -909,9 +974,7 @@ func (m *multiRPCClient) address() common.Address { } func (m *multiRPCClient) addressBalance(ctx context.Context, addr common.Address) (bal *big.Int, err error) { - return bal, m.withAny(func(p *provider) error { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() + return bal, m.withAny(ctx, func(ctx context.Context, p *provider) error { bal, err = p.ec.BalanceAt(ctx, addr, nil /* latest */) return err }) @@ -941,14 +1004,14 @@ func (m *multiRPCClient) bestHeader(ctx context.Context) (hdr *types.Header, err return bestHeader, nil } - return hdr, m.withAny(func(p *provider) error { + return hdr, m.withAny(ctx, func(ctx context.Context, p *provider) error { hdr, err = p.bestHeader(ctx, m.log) return err }, allRPCErrorsAreFails) } func (m *multiRPCClient) headerByHash(ctx context.Context, h common.Hash) (hdr *types.Header, err error) { - return hdr, m.withAny(func(p *provider) error { + return hdr, m.withAny(ctx, func(ctx context.Context, p *provider) error { hdr, err = p.headerByHash(ctx, h) return err }) @@ -988,16 +1051,14 @@ func (m *multiRPCClient) pendingTransactions() ([]*types.Transaction, error) { func (m *multiRPCClient) shutdown() { for _, p := range m.providerList() { - p.ec.Close() + p.shutdown() } } func (m *multiRPCClient) sendSignedTransaction(ctx context.Context, tx *types.Transaction) error { var lastProvider *provider - if err := m.withPreferred(func(p *provider) error { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() + if err := m.withPreferred(ctx, func(ctx context.Context, p *provider) error { lastProvider = p m.log.Tracef("Sending signed tx via %q", p.host) return p.ec.SendTransaction(ctx, tx) @@ -1036,14 +1097,16 @@ func (m *multiRPCClient) signData(data []byte) (sig, pubKey []byte, err error) { return signData(m.creds, data) } -// syncProgress: We're going to lie and just always say we're synced if we -// can get a header. -func (m *multiRPCClient) syncProgress(ctx context.Context) (prog *ethereum.SyncProgress, err error) { - return prog, m.withAny(func(p *provider) error { +// syncProgress: Current and Highest blocks are not very useful for the caller, +// but the best header's time in seconds can be used to determine if the +// provider is out of sync. +func (m *multiRPCClient) syncProgress(ctx context.Context) (prog *ethereum.SyncProgress, tipTime uint64, err error) { + return prog, tipTime, m.withAny(ctx, func(ctx context.Context, p *provider) error { tip, err := p.bestHeader(ctx, m.log) if err != nil { return err } + tipTime = tip.Time prog = ðereum.SyncProgress{ CurrentBlock: tip.Number.Uint64(), @@ -1056,10 +1119,8 @@ func (m *multiRPCClient) syncProgress(ctx context.Context) (prog *ethereum.SyncP func (m *multiRPCClient) transactionConfirmations(ctx context.Context, txHash common.Hash) (confs uint32, err error) { var r *types.Receipt var tip *types.Header - if err := m.withPreferred(func(p *provider) error { - ctxt, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - r, err = p.ec.TransactionReceipt(ctxt, txHash) - cancel() + if err := m.withPreferred(ctx, func(ctx context.Context, p *provider) error { + r, err = p.ec.TransactionReceipt(ctx, txHash) if err != nil { return err } @@ -1115,7 +1176,7 @@ func (m *multiRPCClient) txOpts(ctx context.Context, val, maxGas uint64, maxFeeR } func (m *multiRPCClient) currentFees(ctx context.Context) (baseFees, tipCap *big.Int, err error) { - return baseFees, tipCap, m.withAny(func(p *provider) error { + return baseFees, tipCap, m.withAny(ctx, func(ctx context.Context, p *provider) error { hdr, err := p.bestHeader(ctx, m.log) if err != nil { return err @@ -1142,70 +1203,56 @@ func (m *multiRPCClient) unlock(pw string) error { var _ bind.ContractBackend = (*multiRPCClient)(nil) func (m *multiRPCClient) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) (code []byte, err error) { - return code, m.withAny(func(p *provider) error { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() + return code, m.withAny(ctx, func(ctx context.Context, p *provider) error { code, err = p.ec.CodeAt(ctx, contract, blockNumber) return err }) } func (m *multiRPCClient) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) (res []byte, err error) { - return res, m.withPreferred(func(p *provider) error { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() + return res, m.withPreferred(ctx, func(ctx context.Context, p *provider) error { res, err = p.ec.CallContract(ctx, call, blockNumber) return err }) } func (m *multiRPCClient) HeaderByNumber(ctx context.Context, number *big.Int) (hdr *types.Header, err error) { - return hdr, m.withAny(func(p *provider) error { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() + return hdr, m.withAny(ctx, func(ctx context.Context, p *provider) error { hdr, err = p.ec.HeaderByNumber(ctx, number) return err }) } func (m *multiRPCClient) PendingCodeAt(ctx context.Context, account common.Address) (code []byte, err error) { - return code, m.withAny(func(p *provider) error { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() + return code, m.withAny(ctx, func(ctx context.Context, p *provider) error { code, err = p.ec.PendingCodeAt(ctx, account) return err }) } func (m *multiRPCClient) PendingNonceAt(ctx context.Context, account common.Address) (nonce uint64, err error) { - return nonce, m.withPreferred(func(p *provider) error { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() + return nonce, m.withPreferred(ctx, func(ctx context.Context, p *provider) error { nonce, err = p.ec.PendingNonceAt(ctx, account) return err }) } func (m *multiRPCClient) SuggestGasPrice(ctx context.Context) (price *big.Int, err error) { - return price, m.withAny(func(p *provider) error { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() + return price, m.withAny(ctx, func(ctx context.Context, p *provider) error { price, err = p.ec.SuggestGasPrice(ctx) return err }) } func (m *multiRPCClient) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, err error) { - return tipCap, m.withAny(func(p *provider) error { + return tipCap, m.withAny(ctx, func(ctx context.Context, p *provider) error { tipCap = p.suggestTipCap(ctx, m.log) return nil }) } func (m *multiRPCClient) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint64, err error) { - return gas, m.withAny(func(p *provider) error { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() + return gas, m.withAny(ctx, func(ctx context.Context, p *provider) error { gas, err = p.ec.EstimateGas(ctx, call) return err }, func(err error) (discard, propagate, fail bool) { @@ -1219,18 +1266,14 @@ func (m *multiRPCClient) SendTransaction(ctx context.Context, tx *types.Transact } func (m *multiRPCClient) FilterLogs(ctx context.Context, query ethereum.FilterQuery) (logs []types.Log, err error) { - return logs, m.withAny(func(p *provider) error { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() + return logs, m.withAny(ctx, func(ctx context.Context, p *provider) error { logs, err = p.ec.FilterLogs(ctx, query) return err }) } func (m *multiRPCClient) SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (sub ethereum.Subscription, err error) { - return sub, m.withAny(func(p *provider) error { - ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) - defer cancel() + return sub, m.withAny(ctx, func(ctx context.Context, p *provider) error { sub, err = p.ec.SubscribeFilterLogs(ctx, query, ch) return err }) @@ -1247,10 +1290,11 @@ const ( providerInfura = "infura.io" providerRivetCloud = "rivet.cloud" providerAlchemy = "alchemy.com" + providerAnkr = "ankr.com" + providerBlast = "blastapi.io" // Non-compliant providers providerCloudflareETH = "cloudflare-eth.com" // "SuggestGasTipCap" error: Method not found - providerAnkr = "ankr.com" // "SyncProgress" error: the method eth_syncing does not exist/is not available ) var compliantProviders = map[string]struct{}{ @@ -1262,11 +1306,12 @@ var compliantProviders = map[string]struct{}{ providerInfura: {}, providerRivetCloud: {}, providerAlchemy: {}, + providerAnkr: {}, + providerBlast: {}, } var nonCompliantProviders = map[string]struct{}{ providerCloudflareETH: {}, - providerAnkr: {}, } func providerIsCompliant(addr string) (known, compliant bool) { @@ -1280,12 +1325,12 @@ func providerIsCompliant(addr string) (known, compliant bool) { type rpcTest struct { name string - f func(*provider) error + f func(context.Context, *provider) error } // newCompatibilityTests returns a list of RPC tests to run to determine API // compatibility. -func newCompatibilityTests(ctx context.Context, cb bind.ContractBackend, log slog.Logger) []*rpcTest { +func newCompatibilityTests(cb bind.ContractBackend, log slog.Logger) []*rpcTest { var ( // Vitalik's address from https://twitter.com/VitalikButerin/status/1050126908589887488 mainnetAddr = common.HexToAddress("0xab5801a7d398351b8be11c439e05c5b3259aec9b") @@ -1299,35 +1344,35 @@ func newCompatibilityTests(ctx context.Context, cb bind.ContractBackend, log slo return []*rpcTest{ { name: "HeaderByNumber", - f: func(p *provider) error { + f: func(ctx context.Context, p *provider) error { _, err := p.ec.HeaderByNumber(ctx, nil /* latest */) return err }, }, { name: "HeaderByHash", - f: func(p *provider) error { + f: func(ctx context.Context, p *provider) error { _, err := p.ec.HeaderByHash(ctx, mainnetBlockHash) return err }, }, { name: "TransactionReceipt", - f: func(p *provider) error { + f: func(ctx context.Context, p *provider) error { _, err := p.ec.TransactionReceipt(ctx, mainnetTxHash) return err }, }, { name: "PendingNonceAt", - f: func(p *provider) error { + f: func(ctx context.Context, p *provider) error { _, err := p.ec.PendingNonceAt(ctx, mainnetAddr) return err }, }, { name: "SuggestGasTipCap", - f: func(p *provider) error { + f: func(ctx context.Context, p *provider) error { tipCap, err := p.ec.SuggestGasTipCap(ctx) if err != nil { return err @@ -1338,7 +1383,7 @@ func newCompatibilityTests(ctx context.Context, cb bind.ContractBackend, log slo }, { name: "BalanceAt", - f: func(p *provider) error { + f: func(ctx context.Context, p *provider) error { bal, err := p.ec.BalanceAt(ctx, mainnetAddr, nil) if err != nil { return err @@ -1349,7 +1394,7 @@ func newCompatibilityTests(ctx context.Context, cb bind.ContractBackend, log slo }, { name: "CodeAt", - f: func(p *provider) error { + f: func(ctx context.Context, p *provider) error { code, err := p.ec.CodeAt(ctx, mainnetUSDC, nil) if err != nil { return err @@ -1360,7 +1405,7 @@ func newCompatibilityTests(ctx context.Context, cb bind.ContractBackend, log slo }, { name: "CallContract(balanceOf)", - f: func(p *provider) error { + f: func(ctx context.Context, p *provider) error { caller, err := erc20.NewIERC20(mainnetUSDC, cb) if err != nil { return err @@ -1381,7 +1426,7 @@ func newCompatibilityTests(ctx context.Context, cb bind.ContractBackend, log slo }, { name: "ChainID", - f: func(p *provider) error { + f: func(ctx context.Context, p *provider) error { chainID, err := p.ec.ChainID(ctx) if err != nil { return err @@ -1392,7 +1437,7 @@ func newCompatibilityTests(ctx context.Context, cb bind.ContractBackend, log slo }, { name: "PendingNonceAt", - f: func(p *provider) error { + f: func(ctx context.Context, p *provider) error { n, err := p.ec.PendingNonceAt(ctx, mainnetAddr) if err != nil { return err @@ -1403,7 +1448,7 @@ func newCompatibilityTests(ctx context.Context, cb bind.ContractBackend, log slo }, { name: "getRPCTransaction", - f: func(p *provider) error { + f: func(ctx context.Context, p *provider) error { rpcTx, err := getRPCTransaction(ctx, p, mainnetTxHash) if err != nil { return err @@ -1454,8 +1499,8 @@ func checkProvidersCompliance(ctx context.Context, walletDir string, providers [ if known, _ := providerIsCompliant(p.host); !known { // Need to run API tests on this endpoint. - for _, t := range newCompatibilityTests(ctx, p.ec, nil /* logger is for testing only */) { - if err := t.f(p); err != nil { + for _, t := range newCompatibilityTests(p.ec, nil /* logger is for testing only */) { + if err := t.f(ctx, p); err != nil { log.Errorf("RPC Provider @ %q has a non-compliant API: %v", err) return fmt.Errorf("RPC Provider @ %q has a non-compliant API", p.host) } diff --git a/client/asset/eth/multirpc_live_test.go b/client/asset/eth/multirpc_live_test.go index 49ef056fbd..fcfeb16b35 100644 --- a/client/asset/eth/multirpc_live_test.go +++ b/client/asset/eth/multirpc_live_test.go @@ -32,6 +32,7 @@ const ( var ( dextestDir = filepath.Join(os.Getenv("HOME"), "dextest") harnessDir = filepath.Join(dextestDir, "eth", "harness-ctl") + ctx = context.Background() ) func harnessCmd(ctx context.Context, exe string, args ...string) (string, error) { @@ -47,9 +48,6 @@ func mine(ctx context.Context) error { } func testEndpoint(endpoints []string, syncBlocks uint64, tFunc func(context.Context, *multiRPCClient)) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - dir, _ := os.MkdirTemp("", "") defer os.RemoveAll(dir) @@ -96,7 +94,6 @@ func testEndpoint(endpoints []string, syncBlocks uint64, tFunc func(context.Cont tFunc(ctx, cl) } - cancel() time.Sleep(time.Second) return nil @@ -245,7 +242,7 @@ func testMonitorNet(t *testing.T, net dex.Network) { t.Fatal(err) } - ctx, cancel := context.WithTimeout(context.Background(), time.Hour) + ctx, cancel := context.WithTimeout(ctx, time.Hour) defer cancel() if err := cl.connect(ctx); err != nil { @@ -266,15 +263,13 @@ func TestRPC(t *testing.T) { t.Fatal(err) } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() if err := cl.connect(ctx); err != nil { t.Fatalf("connect error: %v", err) } - for _, tt := range newCompatibilityTests(ctx, cl, cl.log) { + for _, tt := range newCompatibilityTests(cl, cl.log) { tStart := time.Now() - if err := cl.withAny(tt.f); err != nil { + if err := cl.withAny(ctx, tt.f); err != nil { t.Fatalf("%q: %v", tt.name, err) } fmt.Printf("### %q: %s \n", tt.name, time.Since(tStart)) @@ -295,8 +290,6 @@ func TestFreeServers(t *testing.T) { runTest := func(endpoint string) error { dir, _ := os.MkdirTemp("", "") defer os.RemoveAll(dir) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() cl, err := tRPCClient(dir, encode.RandomBytes(32), []string{endpoint}, dex.Mainnet, true) if err != nil { return fmt.Errorf("tRPCClient error: %v", err) @@ -304,9 +297,9 @@ func TestFreeServers(t *testing.T) { if err := cl.connect(ctx); err != nil { return fmt.Errorf("connect error: %v", err) } - return cl.withAny(func(p *provider) error { - for _, tt := range newCompatibilityTests(ctx, cl, cl.log) { - if err := tt.f(p); err != nil { + return cl.withAny(ctx, func(ctx context.Context, p *provider) error { + for _, tt := range newCompatibilityTests(cl, cl.log) { + if err := tt.f(ctx, p); err != nil { return fmt.Errorf("%q error: %v", tt.name, err) } fmt.Printf("#### %q passed %q \n", endpoint, tt.name) @@ -334,11 +327,9 @@ func TestFreeServers(t *testing.T) { func TestMainnetCompliance(t *testing.T) { providerFile := readProviderFile(t, dex.Mainnet) dir, _ := os.MkdirTemp("", "") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() log := dex.StdOutLogger("T", dex.LevelTrace) - providers, err := connectProviders(ctx, providerFile.Providers, log, big.NewInt(chainIDs[dex.Mainnet])) + providers, err := connectProviders(ctx, providerFile.Providers, log, big.NewInt(chainIDs[dex.Mainnet]), dex.Mainnet) if err != nil { t.Fatal(err) } diff --git a/client/asset/eth/nodeclient.go b/client/asset/eth/nodeclient.go index 999b3cf88e..49e18e51f7 100644 --- a/client/asset/eth/nodeclient.go +++ b/client/asset/eth/nodeclient.go @@ -248,10 +248,15 @@ func (n *nodeClient) sendTransaction(ctx context.Context, txOpts *bind.TransactO return tx, n.leth.ApiBackend.SendTx(ctx, tx) } -// syncProgress return the current sync progress. Returns no error and nil when not syncing. -func (n *nodeClient) syncProgress(_ context.Context) (*ethereum.SyncProgress, error) { +// syncProgress return the current sync progress and the best block's header +// time in seconds. Returns no error and nil when not syncing. +func (n *nodeClient) syncProgress(ctx context.Context) (*ethereum.SyncProgress, uint64, error) { + hdr, err := n.bestHeader(ctx) + if err != nil { + return nil, 0, err + } p := n.leth.ApiBackend.SyncProgress() - return &p, nil + return &p, hdr.Time, nil } // signData uses the private key of the address to sign a piece of data. diff --git a/client/asset/eth/nodeclient_harness_test.go b/client/asset/eth/nodeclient_harness_test.go index 352a064bab..3cf6b049d1 100644 --- a/client/asset/eth/nodeclient_harness_test.go +++ b/client/asset/eth/nodeclient_harness_test.go @@ -438,7 +438,10 @@ func runSimnet(m *testing.M) (int, error) { } // Fund the wallets. - homeDir, _ := os.UserHomeDir() + homeDir, err := os.UserHomeDir() + if err != nil { + return 1, err + } harnessCtlDir := filepath.Join(homeDir, "dextest", "eth", "harness-ctl") send := func(exe, addr, amt string) error { cmd := exec.CommandContext(ctx, exe, addr, amt) @@ -737,22 +740,13 @@ func syncClient(cl ethFetcher) error { if err := ctx.Err(); err != nil { return err } - prog, err := cl.syncProgress(ctx) + prog, tipTime, err := cl.syncProgress(ctx) if err != nil { return err } if isTestnet { - if prog.HighestBlock == 0 { - bh, err := cl.bestHeader(ctx) - if err != nil { - return err - } - // Time in the header is in seconds. - timeDiff := time.Now().Unix() - int64(bh.Time) - if timeDiff < dexeth.MaxBlockInterval { - return nil - } - } else if prog.CurrentBlock >= prog.HighestBlock { + timeDiff := time.Now().Unix() - int64(tipTime) + if timeDiff < dexeth.MaxBlockInterval { return nil } } else { @@ -1111,7 +1105,7 @@ func testSwap(t *testing.T, assetID uint32) { } func testSyncProgress(t *testing.T) { - p, err := ethClient.syncProgress(ctx) + p, _, err := ethClient.syncProgress(ctx) if err != nil { t.Fatal(err) } @@ -2016,7 +2010,10 @@ func testRefund(t *testing.T, assetID uint32) { t.Fatalf("%s: pre-redeem mining error: %v", test.name, err) } - txOpts, _ = participantEthClient.txOpts(ctx, 0, gases.RedeemN(1), nil, nil) + txOpts, err = participantEthClient.txOpts(ctx, 0, gases.RedeemN(1), nil, nil) + if err != nil { + t.Fatalf("%s: txOpts error: %v", test.name, err) + } _, err := pc.redeem(txOpts, []*asset.Redemption{newRedeem(secret, secretHash)}) if err != nil { t.Fatalf("%s: redeem error: %v", test.name, err)