From d86e97e7fb984f6b7c1f55909d3b3257a089b92d Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Sun, 25 Apr 2021 11:47:50 +0000 Subject: [PATCH] multi: fix chain notif. blocking by pmt processor. This fixes chain notification messages being blocked by payment processing calls when tx confirmations are not accurately being reported. The fix includes: - ensuring a payment processing call concludes before starting another one via the processing flag. - calling the payment process in a goroutine to avoid blocking the chainstate process. - Keeping track of tx confirmation failures and starting a wallet rescan if the failures exceed threshold. Associated tests have been updated. --- errors/error.go | 3 + errors/error_test.go | 1 + pool/chainstate.go | 24 +++---- pool/chainstate_test.go | 1 - pool/hub.go | 1 + pool/hub_test.go | 44 ++++++++++++ pool/paymentmgr.go | 147 ++++++++++++++++++++++++++++++++++++---- pool/paymentmgr_test.go | 131 +++++++++++++++++++++++++++++++++++ 8 files changed, 326 insertions(+), 26 deletions(-) diff --git a/errors/error.go b/errors/error.go index 4a516f24..9a69baf0 100644 --- a/errors/error.go +++ b/errors/error.go @@ -143,6 +143,9 @@ const ( // CreateAmount indicates an amount creation error. CreateAmount = ErrorKind("CreateAmount") + + // Rescan indicates an wallet rescan error. + Rescan = ErrorKind("Rescan") ) // Error satisfies the error interface and prints human-readable errors. diff --git a/errors/error_test.go b/errors/error_test.go index cb72f986..94f02530 100644 --- a/errors/error_test.go +++ b/errors/error_test.go @@ -58,6 +58,7 @@ func TestErrorKindStringer(t *testing.T) { {TxIn, "TxIn"}, {ContextCancelled, "ContextCancelled"}, {CreateAmount, "CreateAmount"}, + {Rescan, "Rescan"}, } for i, test := range tests { diff --git a/pool/chainstate.go b/pool/chainstate.go index 335d4f42..6221b3e2 100644 --- a/pool/chainstate.go +++ b/pool/chainstate.go @@ -286,18 +286,6 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { coinbaseTx := block.Transactions[0] treasuryActive := isTreasuryActive(coinbaseTx) - // Process mature payments. - err = cs.cfg.PayDividends(ctx, header.Height, treasuryActive) - if err != nil { - log.Errorf("unable to process payments: %v", err) - close(msg.Done) - cs.cfg.Cancel() - continue - } - - // Signal the gui cache of paid dividends. - cs.cfg.SignalCache(DividendsPaid) - // Check if the parent of the connected block is an accepted work // of the pool. parentHeight := header.Height - 1 @@ -398,6 +386,18 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { cs.cfg.Cancel() continue } + + // Process mature payments without blocking. + go func(cfg *ChainStateConfig, height uint32, treasuryActive bool) { + err = cs.cfg.PayDividends(ctx, header.Height, treasuryActive) + if err != nil { + log.Errorf("unable to process payments: %v", err) + cfg.Cancel() + } + + // Signal the gui cache of paid dividends. + cfg.SignalCache(DividendsPaid) + }(cs.cfg, header.Height, treasuryActive) } close(msg.Done) diff --git a/pool/chainstate_test.go b/pool/chainstate_test.go index 75cf76d1..32e17ed8 100644 --- a/pool/chainstate_test.go +++ b/pool/chainstate_test.go @@ -339,7 +339,6 @@ func testChainState(t *testing.T) { } cs.discCh <- discConfMsg <-discConfMsg.Done - cs.cfg.PayDividends = payDividends // Ensure the last work height can be updated. initialLastWorkHeight := cs.fetchLastWorkHeight() diff --git a/pool/hub.go b/pool/hub.go index 41a7a633..7677f8b3 100644 --- a/pool/hub.go +++ b/pool/hub.go @@ -100,6 +100,7 @@ var ( type WalletConnection interface { SignTransaction(context.Context, *walletrpc.SignTransactionRequest, ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) PublishTransaction(context.Context, *walletrpc.PublishTransactionRequest, ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) + Rescan(ctx context.Context, in *walletrpc.RescanRequest, opts ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) } // NodeConnection defines the functionality needed by a mining node diff --git a/pool/hub_test.go b/pool/hub_test.go index c3b46bad..94db148a 100644 --- a/pool/hub_test.go +++ b/pool/hub_test.go @@ -23,8 +23,43 @@ import ( chainjson "github.com/decred/dcrd/rpc/jsonrpc/types/v2" "github.com/decred/dcrd/wire" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) +type tRescanClient struct { + resp *walletrpc.RescanResponse + err error + grpc.ClientStream +} + +func (r *tRescanClient) Recv() (*walletrpc.RescanResponse, error) { + return r.resp, r.err +} + +func (r *tRescanClient) Header() (metadata.MD, error) { + return nil, nil +} + +func (r *tRescanClient) Trailer() metadata.MD { + return nil +} + +func (r *tRescanClient) CloseSend() error { + return nil +} + +func (r *tRescanClient) Context() context.Context { + return nil +} + +func (r *tRescanClient) SendMsg(m interface{}) error { + return nil +} + +func (r *tRescanClient) RecvMsg(m interface{}) error { + return nil +} + type tWalletConnection struct { } @@ -109,6 +144,15 @@ func (t *tWalletConnection) PublishTransaction(context.Context, *walletrpc.Publi }, nil } +func (t *tWalletConnection) Rescan(context.Context, *walletrpc.RescanRequest, ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + client := new(tRescanClient) + client.resp = &walletrpc.RescanResponse{ + RescannedThrough: 10000, + } + + return client, nil +} + type tNodeConnection struct{} func (t *tNodeConnection) CreateRawTransaction(context.Context, []chainjson.TransactionInput, map[dcrutil.Address]dcrutil.Amount, *int64, *int64) (*wire.MsgTx, error) { diff --git a/pool/paymentmgr.go b/pool/paymentmgr.go index 3a97ac13..94463cb0 100644 --- a/pool/paymentmgr.go +++ b/pool/paymentmgr.go @@ -10,6 +10,8 @@ import ( "fmt" "math/big" "math/rand" + "sync" + "sync/atomic" "time" "decred.org/dcrwallet/rpc/walletrpc" @@ -36,6 +38,10 @@ const ( // output value of a transaction is allowed to be short of the // provided input due to rounding errors. maxRoundingDiff = dcrutil.Amount(500) + + // maxTxConfThreshold is the total number of coinbase confirmation + // failures before a wallet rescan is requested. + maxTxConfThreshold = uint32(3) ) // TxCreator defines the functionality needed by a transaction creator for the @@ -57,6 +63,8 @@ type TxBroadcaster interface { SignTransaction(context.Context, *walletrpc.SignTransactionRequest, ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) // PublishTransaction broadcasts the transaction unto the network. PublishTransaction(context.Context, *walletrpc.PublishTransactionRequest, ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) + // Rescan requests a wallet utxo rescan. + Rescan(ctx context.Context, in *walletrpc.RescanRequest, opts ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) } // confNotifMsg represents a tx confirmation notification message. @@ -65,6 +73,12 @@ type confNotifMsg struct { err error } +// rescanMsg represents a rescan response. +type rescanMsg struct { + resp *walletrpc.RescanResponse + err error +} + // PaymentMgrConfig contains all of the configuration values which should be // provided when creating a new instance of PaymentMgr. type PaymentMgrConfig struct { @@ -107,7 +121,11 @@ type PaymentMgrConfig struct { // PaymentMgr handles generating shares and paying out dividends to // participating accounts. type PaymentMgr struct { - cfg *PaymentMgrConfig + failedTxConfs uint32 // update atomically. + + processing bool + cfg *PaymentMgrConfig + mtx sync.Mutex } // NewPaymentMgr creates a new payment manager. @@ -148,6 +166,21 @@ func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) { return pm, nil } +// isProcessing returns whether the payment manager is in the process of +// paying out dividends. +func (pm *PaymentMgr) isProcessing() bool { + pm.mtx.Lock() + defer pm.mtx.Unlock() + return pm.processing +} + +// setProcessing sets the processing flag to the provided boolean. +func (pm *PaymentMgr) setProcessing(status bool) { + pm.mtx.Lock() + pm.processing = status + pm.mtx.Unlock() +} + // sharePercentages calculates the percentages due each participating account // according to their weighted shares. func (pm *PaymentMgr) sharePercentages(shares []*Share) (map[string]*big.Rat, error) { @@ -452,8 +485,8 @@ func fetchTxConfNotifications(ctx context.Context, notifSource func() (*walletrp select { case <-ctx.Done(): - log.Tracef("%s: unable to fetch tx confirmation notifications", - funcName) + log.Tracef("%s: context cancelled fetching tx confirmation "+ + "notifications", funcName) return nil, errs.ContextCancelled case notif := <-notifCh: close(notifCh) @@ -470,7 +503,7 @@ func fetchTxConfNotifications(ctx context.Context, notifSource func() (*walletrp // transaction hashes are spendable by the expected maximum spendable height. // // The context passed to this function must have a corresponding -// cancellation to allow for a clean shutdown process +// cancellation to allow for a clean shutdown process. func (pm *PaymentMgr) confirmCoinbases(ctx context.Context, txHashes map[string]*chainhash.Hash, spendableHeight uint32) error { funcName := "confirmCoinbases" hashes := make([]*chainhash.Hash, 0, len(txHashes)) @@ -520,6 +553,59 @@ func (pm *PaymentMgr) confirmCoinbases(ctx context.Context, txHashes map[string] } } +// fetchRecanReponse is a helper function for fetching rescan response. +// It will return when either a response or error is received from the +// provided rescan source, or when the provided context is cancelled. +func fetchRescanResponse(ctx context.Context, rescanSource func() (*walletrpc.RescanResponse, error)) (*walletrpc.RescanResponse, error) { + funcName := "fetchRescanResponse" + respCh := make(chan *rescanMsg) + go func(ch chan *rescanMsg) { + resp, err := rescanSource() + ch <- &rescanMsg{ + resp: resp, + err: err, + } + }(respCh) + + select { + case <-ctx.Done(): + log.Tracef("%s: context cancelled fetching rescan response", funcName) + return nil, errs.ContextCancelled + case msg := <-respCh: + close(respCh) + if msg.err != nil { + desc := fmt.Sprintf("%s: unable fetch wallet rescan response, %s", + funcName, msg.err) + return nil, errs.PoolError(errs.Rescan, desc) + } + return msg.resp, nil + } +} + +// monitorRescan ensures the wallet rescans up to the provided block height. +// +// The context passed to this function must have a corresponding +// cancellation to allow for a clean shutdown process. +func (pm *PaymentMgr) monitorRescan(ctx context.Context, rescanSource walletrpc.WalletService_RescanClient, height int32) error { + funcName := "monitorRescan" + for { + resp, err := fetchRescanResponse(ctx, rescanSource.Recv) + if err != nil { + if errors.Is(err, errs.ContextCancelled) { + desc := fmt.Sprintf("%s: cancelled wallet rescan", funcName) + return errs.PoolError(errs.ContextCancelled, desc) + } + return err + } + + // Stop monitoring once the most recent block height has been rescanned. + if resp.RescannedThrough >= height { + log.Infof("wallet rescanned through height #%d", height) + return nil + } + } +} + // generatePayoutTxDetails creates the payout transaction inputs and outputs // from the provided payments func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator, feeAddr dcrutil.Address, payments map[string][]*Payment, treasuryActive bool) ([]chainjson.TransactionInput, @@ -644,13 +730,51 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA return nil } + if pm.isProcessing() { + log.Info("Waiting for in-progress payment processing to " + + "complete before starting another") + return nil + } + + pm.setProcessing(true) + defer pm.setProcessing(false) + + txB := pm.cfg.FetchTxBroadcaster() + if txB == nil { + desc := fmt.Sprintf("%s: tx broadcaster cannot be nil", funcName) + return errs.PoolError(errs.Disconnected, desc) + } + + // Request a wallet rescan if tx confirmation failures are + // at threshold. + pCtx, pCancel := context.WithTimeout(ctx, pm.cfg.CoinbaseConfTimeout) + defer pCancel() + + txConfCount := atomic.LoadUint32(&pm.failedTxConfs) + if txConfCount == maxTxConfThreshold { + beginHeight := int32(height) - (int32(pm.cfg.ActiveNet.CoinbaseMaturity) * 2) + rescanReq := &walletrpc.RescanRequest{ + BeginHeight: beginHeight, + } + rescanSource, err := txB.Rescan(pCtx, rescanReq) + if err != nil { + desc := fmt.Sprintf("%s: tx creator cannot be nil", funcName) + return errs.PoolError(errs.Rescan, desc) + } + + err = pm.monitorRescan(pCtx, rescanSource, int32(height)) + if err != nil { + return err + } + } + txC := pm.cfg.FetchTxCreator() if txC == nil { desc := fmt.Sprintf("%s: tx creator cannot be nil", funcName) return errs.PoolError(errs.Disconnected, desc) } - // remove all matured orphaned payments. Since the associated blocks + // Remove all matured orphaned payments. Since the associated blocks // to these payments are not part of the main chain they will not be // paid out. pmts, err := pm.pruneOrphanedPayments(ctx, mPmts) @@ -700,10 +824,10 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA maxSpendableHeight = height } - tCtx, tCancel := context.WithTimeout(ctx, pm.cfg.CoinbaseConfTimeout) - defer tCancel() - err = pm.confirmCoinbases(tCtx, inputTxHashes, maxSpendableHeight) + err = pm.confirmCoinbases(pCtx, inputTxHashes, maxSpendableHeight) if err != nil { + atomic.AddUint32(&pm.failedTxConfs, 1) + // Do not error if coinbase spendable confirmation requests are // terminated by the context cancellation. if !errors.Is(err, errs.ContextCancelled) { @@ -725,11 +849,6 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA return err } - txB := pm.cfg.FetchTxBroadcaster() - if txB == nil { - desc := fmt.Sprintf("%s: tx broadcaster cannot be nil", funcName) - return errs.PoolError(errs.Disconnected, desc) - } signTxReq := &walletrpc.SignTransactionRequest{ SerializedTransaction: txBytes, Passphrase: []byte(pm.cfg.WalletPass), @@ -788,5 +907,7 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA return err } + atomic.StoreUint32(&pm.failedTxConfs, 0) + return nil } diff --git a/pool/paymentmgr_test.go b/pool/paymentmgr_test.go index 9916f827..1d6309f4 100644 --- a/pool/paymentmgr_test.go +++ b/pool/paymentmgr_test.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "math/big" + "sync/atomic" "testing" "time" @@ -59,6 +60,7 @@ func (txC *txCreatorImpl) CreateRawTransaction(ctx context.Context, inputs []cha type txBroadcasterImpl struct { signTransaction func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) publishTransaction func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) + rescan func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) } // SignTransaction signs transaction inputs, unlocking them for use. @@ -71,6 +73,11 @@ func (txB *txBroadcasterImpl) PublishTransaction(ctx context.Context, req *walle return txB.publishTransaction(ctx, req, options...) } +// Rescan begins a rescan of all transactions related to the wallet. +func (txB *txBroadcasterImpl) Rescan(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return txB.rescan(ctx, req, options...) +} + func TestSharePercentages(t *testing.T) { mgr := PaymentMgr{} @@ -931,6 +938,13 @@ func testPaymentMgrPayment(t *testing.T) { mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { return 16, nil } + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return nil, fmt.Errorf("unable to sign transaction") + }, + } + } err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) if !errors.Is(err, errs.TxOut) { @@ -1128,6 +1142,109 @@ func testPaymentMgrPayment(t *testing.T) { t.Fatalf("expected a publish error, got %v", err) } + // Ensure dividend payment returns an error if fetching the wallet client + // fails. + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return &walletrpc.SignTransactionResponse{ + Transaction: txBytes, + }, nil + }, + publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return nil, fmt.Errorf("unable to publish transaction") + }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return nil, fmt.Errorf("unable to create rescan client") + }, + } + } + + atomic.StoreUint32(&mgr.failedTxConfs, maxTxConfThreshold) + + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) + if !errors.Is(err, errs.Rescan) { + cancel() + t.Fatalf("expected a rescan error, got %v", err) + } + + // Ensure dividend payment returns an error if fetching rescan responses fail. + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return &walletrpc.SignTransactionResponse{ + Transaction: txBytes, + }, nil + }, + publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return nil, fmt.Errorf("unable to publish transaction") + }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return &tRescanClient{ + err: fmt.Errorf("internal error"), + }, nil + }, + } + } + + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) + if !errors.Is(err, errs.Rescan) { + cancel() + t.Fatalf("expected a rescan error, got %v", err) + } + + // Ensure dividend payment returns an error if fetching rescan responses fail. + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return &walletrpc.SignTransactionResponse{ + Transaction: txBytes, + }, nil + }, + publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return nil, fmt.Errorf("unable to publish transaction") + }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return &tRescanClient{ + err: fmt.Errorf("internal error"), + }, nil + }, + } + } + + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) + if !errors.Is(err, errs.Rescan) { + cancel() + t.Fatalf("expected a rescan error, got %v", err) + } + + // Ensure wallet rescan succeeds when it scans through the current height. + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return &walletrpc.SignTransactionResponse{ + Transaction: txBytes, + }, nil + }, + publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return nil, fmt.Errorf("unable to publish transaction") + }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return &tRescanClient{ + resp: &walletrpc.RescanResponse{ + RescannedThrough: 30, + }, + }, nil + }, + } + } + + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) + if !errors.Is(err, errs.PublishTx) { + cancel() + t.Fatalf("expected a publish error, got %v", err) + } + // Ensure paying dividend payment succeeds with valid inputs. txHash, _ := hex.DecodeString("013264da8cc53f70022dc2b5654ebefc9ecfed24ea18dfcfc9adca5642d4fe66") mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { @@ -1142,6 +1259,13 @@ func testPaymentMgrPayment(t *testing.T) { TransactionHash: txHash, }, nil }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return &tRescanClient{ + resp: &walletrpc.RescanResponse{ + RescannedThrough: 30, + }, + }, nil + }, } } @@ -1151,6 +1275,13 @@ func testPaymentMgrPayment(t *testing.T) { t.Fatalf("unexpected dividend payment error, got %v", err) } + // Ensure the tx confirmation failure count reset to zero on a sucessful + // dividend payment. + txConfCount := atomic.LoadUint32(&mgr.failedTxConfs) + if txConfCount != 0 { + t.Fatalf("expected tx conf failure count to be %d, got %d", 0, txConfCount) + } + cancel() // Reset backed up values to their defaults.