From a05cedf74b065328124cf24c380add6d0d6136ab Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Fri, 30 Apr 2021 14:46:38 +0000 Subject: [PATCH 1/6] multi: allow rescan after multiple conf failures. This incorporates wallet rescanning into the payment process to restore tx conf functionality when the wallet gets into an error state. Associated tests have been added. --- errors/error.go | 3 + errors/error_test.go | 1 + pool/hub.go | 1 + pool/hub_test.go | 44 ++++++++++++ pool/paymentmgr.go | 146 ++++++++++++++++++++++++++++++++++++---- pool/paymentmgr_test.go | 131 +++++++++++++++++++++++++++++++++++ 6 files changed, 313 insertions(+), 13 deletions(-) diff --git a/errors/error.go b/errors/error.go index 4a516f24..9a69baf0 100644 --- a/errors/error.go +++ b/errors/error.go @@ -143,6 +143,9 @@ const ( // CreateAmount indicates an amount creation error. CreateAmount = ErrorKind("CreateAmount") + + // Rescan indicates an wallet rescan error. + Rescan = ErrorKind("Rescan") ) // Error satisfies the error interface and prints human-readable errors. diff --git a/errors/error_test.go b/errors/error_test.go index cb72f986..94f02530 100644 --- a/errors/error_test.go +++ b/errors/error_test.go @@ -58,6 +58,7 @@ func TestErrorKindStringer(t *testing.T) { {TxIn, "TxIn"}, {ContextCancelled, "ContextCancelled"}, {CreateAmount, "CreateAmount"}, + {Rescan, "Rescan"}, } for i, test := range tests { diff --git a/pool/hub.go b/pool/hub.go index 41a7a633..7677f8b3 100644 --- a/pool/hub.go +++ b/pool/hub.go @@ -100,6 +100,7 @@ var ( type WalletConnection interface { SignTransaction(context.Context, *walletrpc.SignTransactionRequest, ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) PublishTransaction(context.Context, *walletrpc.PublishTransactionRequest, ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) + Rescan(ctx context.Context, in *walletrpc.RescanRequest, opts ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) } // NodeConnection defines the functionality needed by a mining node diff --git a/pool/hub_test.go b/pool/hub_test.go index c3b46bad..94db148a 100644 --- a/pool/hub_test.go +++ b/pool/hub_test.go @@ -23,8 +23,43 @@ import ( chainjson "github.com/decred/dcrd/rpc/jsonrpc/types/v2" "github.com/decred/dcrd/wire" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) +type tRescanClient struct { + resp *walletrpc.RescanResponse + err error + grpc.ClientStream +} + +func (r *tRescanClient) Recv() (*walletrpc.RescanResponse, error) { + return r.resp, r.err +} + +func (r *tRescanClient) Header() (metadata.MD, error) { + return nil, nil +} + +func (r *tRescanClient) Trailer() metadata.MD { + return nil +} + +func (r *tRescanClient) CloseSend() error { + return nil +} + +func (r *tRescanClient) Context() context.Context { + return nil +} + +func (r *tRescanClient) SendMsg(m interface{}) error { + return nil +} + +func (r *tRescanClient) RecvMsg(m interface{}) error { + return nil +} + type tWalletConnection struct { } @@ -109,6 +144,15 @@ func (t *tWalletConnection) PublishTransaction(context.Context, *walletrpc.Publi }, nil } +func (t *tWalletConnection) Rescan(context.Context, *walletrpc.RescanRequest, ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + client := new(tRescanClient) + client.resp = &walletrpc.RescanResponse{ + RescannedThrough: 10000, + } + + return client, nil +} + type tNodeConnection struct{} func (t *tNodeConnection) CreateRawTransaction(context.Context, []chainjson.TransactionInput, map[dcrutil.Address]dcrutil.Amount, *int64, *int64) (*wire.MsgTx, error) { diff --git a/pool/paymentmgr.go b/pool/paymentmgr.go index 3a97ac13..7148f674 100644 --- a/pool/paymentmgr.go +++ b/pool/paymentmgr.go @@ -10,6 +10,8 @@ import ( "fmt" "math/big" "math/rand" + "sync" + "sync/atomic" "time" "decred.org/dcrwallet/rpc/walletrpc" @@ -36,6 +38,10 @@ const ( // output value of a transaction is allowed to be short of the // provided input due to rounding errors. maxRoundingDiff = dcrutil.Amount(500) + + // maxTxConfThreshold is the total number of coinbase confirmation + // failures before a wallet rescan is requested. + maxTxConfThreshold = uint32(3) ) // TxCreator defines the functionality needed by a transaction creator for the @@ -57,6 +63,8 @@ type TxBroadcaster interface { SignTransaction(context.Context, *walletrpc.SignTransactionRequest, ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) // PublishTransaction broadcasts the transaction unto the network. PublishTransaction(context.Context, *walletrpc.PublishTransactionRequest, ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) + // Rescan requests a wallet utxo rescan. + Rescan(ctx context.Context, in *walletrpc.RescanRequest, opts ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) } // confNotifMsg represents a tx confirmation notification message. @@ -65,6 +73,12 @@ type confNotifMsg struct { err error } +// rescanMsg represents a rescan response. +type rescanMsg struct { + resp *walletrpc.RescanResponse + err error +} + // PaymentMgrConfig contains all of the configuration values which should be // provided when creating a new instance of PaymentMgr. type PaymentMgrConfig struct { @@ -107,7 +121,11 @@ type PaymentMgrConfig struct { // PaymentMgr handles generating shares and paying out dividends to // participating accounts. type PaymentMgr struct { - cfg *PaymentMgrConfig + failedTxConfs uint32 // update atomically. + + processing bool + cfg *PaymentMgrConfig + mtx sync.Mutex } // NewPaymentMgr creates a new payment manager. @@ -148,6 +166,21 @@ func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) { return pm, nil } +// isProcessing returns whether the payment manager is in the process of +// paying out dividends. +func (pm *PaymentMgr) isProcessing() bool { + pm.mtx.Lock() + defer pm.mtx.Unlock() + return pm.processing +} + +// setProcessing sets the processing flag to the provided boolean. +func (pm *PaymentMgr) setProcessing(status bool) { + pm.mtx.Lock() + pm.processing = status + pm.mtx.Unlock() +} + // sharePercentages calculates the percentages due each participating account // according to their weighted shares. func (pm *PaymentMgr) sharePercentages(shares []*Share) (map[string]*big.Rat, error) { @@ -452,8 +485,8 @@ func fetchTxConfNotifications(ctx context.Context, notifSource func() (*walletrp select { case <-ctx.Done(): - log.Tracef("%s: unable to fetch tx confirmation notifications", - funcName) + log.Tracef("%s: context cancelled fetching tx confirmation "+ + "notifications", funcName) return nil, errs.ContextCancelled case notif := <-notifCh: close(notifCh) @@ -470,7 +503,7 @@ func fetchTxConfNotifications(ctx context.Context, notifSource func() (*walletrp // transaction hashes are spendable by the expected maximum spendable height. // // The context passed to this function must have a corresponding -// cancellation to allow for a clean shutdown process +// cancellation to allow for a clean shutdown process. func (pm *PaymentMgr) confirmCoinbases(ctx context.Context, txHashes map[string]*chainhash.Hash, spendableHeight uint32) error { funcName := "confirmCoinbases" hashes := make([]*chainhash.Hash, 0, len(txHashes)) @@ -520,6 +553,59 @@ func (pm *PaymentMgr) confirmCoinbases(ctx context.Context, txHashes map[string] } } +// fetchRecanReponse is a helper function for fetching rescan response. +// It will return when either a response or error is received from the +// provided rescan source, or when the provided context is cancelled. +func fetchRescanResponse(ctx context.Context, rescanSource func() (*walletrpc.RescanResponse, error)) (*walletrpc.RescanResponse, error) { + funcName := "fetchRescanResponse" + respCh := make(chan *rescanMsg) + go func(ch chan *rescanMsg) { + resp, err := rescanSource() + ch <- &rescanMsg{ + resp: resp, + err: err, + } + }(respCh) + + select { + case <-ctx.Done(): + log.Tracef("%s: context cancelled fetching rescan response", funcName) + return nil, errs.ContextCancelled + case msg := <-respCh: + close(respCh) + if msg.err != nil { + desc := fmt.Sprintf("%s: unable fetch wallet rescan response, %s", + funcName, msg.err) + return nil, errs.PoolError(errs.Rescan, desc) + } + return msg.resp, nil + } +} + +// monitorRescan ensures the wallet rescans up to the provided block height. +// +// The context passed to this function must have a corresponding +// cancellation to allow for a clean shutdown process. +func (pm *PaymentMgr) monitorRescan(ctx context.Context, rescanSource walletrpc.WalletService_RescanClient, height int32) error { + funcName := "monitorRescan" + for { + resp, err := fetchRescanResponse(ctx, rescanSource.Recv) + if err != nil { + if errors.Is(err, errs.ContextCancelled) { + desc := fmt.Sprintf("%s: cancelled wallet rescan", funcName) + return errs.PoolError(errs.ContextCancelled, desc) + } + return err + } + + // Stop monitoring once the most recent block height has been rescanned. + if resp.RescannedThrough >= height { + log.Infof("wallet rescanned through height #%d", height) + return nil + } + } +} + // generatePayoutTxDetails creates the payout transaction inputs and outputs // from the provided payments func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator, feeAddr dcrutil.Address, payments map[string][]*Payment, treasuryActive bool) ([]chainjson.TransactionInput, @@ -644,13 +730,50 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA return nil } + if pm.isProcessing() { + log.Info("payment processing already in progress, terminating") + return nil + } + + pm.setProcessing(true) + defer pm.setProcessing(false) + + txB := pm.cfg.FetchTxBroadcaster() + if txB == nil { + desc := fmt.Sprintf("%s: tx broadcaster cannot be nil", funcName) + return errs.PoolError(errs.Disconnected, desc) + } + + // Request a wallet rescan if tx confirmation failures are + // at threshold. + pCtx, pCancel := context.WithTimeout(ctx, pm.cfg.CoinbaseConfTimeout) + defer pCancel() + + txConfCount := atomic.LoadUint32(&pm.failedTxConfs) + if txConfCount == maxTxConfThreshold { + beginHeight := int32(height) - (int32(pm.cfg.ActiveNet.CoinbaseMaturity) * 2) + rescanReq := &walletrpc.RescanRequest{ + BeginHeight: beginHeight, + } + rescanSource, err := txB.Rescan(pCtx, rescanReq) + if err != nil { + desc := fmt.Sprintf("%s: tx creator cannot be nil", funcName) + return errs.PoolError(errs.Rescan, desc) + } + + err = pm.monitorRescan(pCtx, rescanSource, int32(height)) + if err != nil { + return err + } + } + txC := pm.cfg.FetchTxCreator() if txC == nil { desc := fmt.Sprintf("%s: tx creator cannot be nil", funcName) return errs.PoolError(errs.Disconnected, desc) } - // remove all matured orphaned payments. Since the associated blocks + // Remove all matured orphaned payments. Since the associated blocks // to these payments are not part of the main chain they will not be // paid out. pmts, err := pm.pruneOrphanedPayments(ctx, mPmts) @@ -700,10 +823,10 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA maxSpendableHeight = height } - tCtx, tCancel := context.WithTimeout(ctx, pm.cfg.CoinbaseConfTimeout) - defer tCancel() - err = pm.confirmCoinbases(tCtx, inputTxHashes, maxSpendableHeight) + err = pm.confirmCoinbases(pCtx, inputTxHashes, maxSpendableHeight) if err != nil { + atomic.AddUint32(&pm.failedTxConfs, 1) + // Do not error if coinbase spendable confirmation requests are // terminated by the context cancellation. if !errors.Is(err, errs.ContextCancelled) { @@ -725,11 +848,6 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA return err } - txB := pm.cfg.FetchTxBroadcaster() - if txB == nil { - desc := fmt.Sprintf("%s: tx broadcaster cannot be nil", funcName) - return errs.PoolError(errs.Disconnected, desc) - } signTxReq := &walletrpc.SignTransactionRequest{ SerializedTransaction: txBytes, Passphrase: []byte(pm.cfg.WalletPass), @@ -788,5 +906,7 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA return err } + atomic.StoreUint32(&pm.failedTxConfs, 0) + return nil } diff --git a/pool/paymentmgr_test.go b/pool/paymentmgr_test.go index 9916f827..1d6309f4 100644 --- a/pool/paymentmgr_test.go +++ b/pool/paymentmgr_test.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "math/big" + "sync/atomic" "testing" "time" @@ -59,6 +60,7 @@ func (txC *txCreatorImpl) CreateRawTransaction(ctx context.Context, inputs []cha type txBroadcasterImpl struct { signTransaction func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) publishTransaction func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) + rescan func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) } // SignTransaction signs transaction inputs, unlocking them for use. @@ -71,6 +73,11 @@ func (txB *txBroadcasterImpl) PublishTransaction(ctx context.Context, req *walle return txB.publishTransaction(ctx, req, options...) } +// Rescan begins a rescan of all transactions related to the wallet. +func (txB *txBroadcasterImpl) Rescan(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return txB.rescan(ctx, req, options...) +} + func TestSharePercentages(t *testing.T) { mgr := PaymentMgr{} @@ -931,6 +938,13 @@ func testPaymentMgrPayment(t *testing.T) { mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { return 16, nil } + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return nil, fmt.Errorf("unable to sign transaction") + }, + } + } err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) if !errors.Is(err, errs.TxOut) { @@ -1128,6 +1142,109 @@ func testPaymentMgrPayment(t *testing.T) { t.Fatalf("expected a publish error, got %v", err) } + // Ensure dividend payment returns an error if fetching the wallet client + // fails. + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return &walletrpc.SignTransactionResponse{ + Transaction: txBytes, + }, nil + }, + publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return nil, fmt.Errorf("unable to publish transaction") + }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return nil, fmt.Errorf("unable to create rescan client") + }, + } + } + + atomic.StoreUint32(&mgr.failedTxConfs, maxTxConfThreshold) + + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) + if !errors.Is(err, errs.Rescan) { + cancel() + t.Fatalf("expected a rescan error, got %v", err) + } + + // Ensure dividend payment returns an error if fetching rescan responses fail. + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return &walletrpc.SignTransactionResponse{ + Transaction: txBytes, + }, nil + }, + publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return nil, fmt.Errorf("unable to publish transaction") + }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return &tRescanClient{ + err: fmt.Errorf("internal error"), + }, nil + }, + } + } + + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) + if !errors.Is(err, errs.Rescan) { + cancel() + t.Fatalf("expected a rescan error, got %v", err) + } + + // Ensure dividend payment returns an error if fetching rescan responses fail. + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return &walletrpc.SignTransactionResponse{ + Transaction: txBytes, + }, nil + }, + publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return nil, fmt.Errorf("unable to publish transaction") + }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return &tRescanClient{ + err: fmt.Errorf("internal error"), + }, nil + }, + } + } + + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) + if !errors.Is(err, errs.Rescan) { + cancel() + t.Fatalf("expected a rescan error, got %v", err) + } + + // Ensure wallet rescan succeeds when it scans through the current height. + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return &walletrpc.SignTransactionResponse{ + Transaction: txBytes, + }, nil + }, + publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return nil, fmt.Errorf("unable to publish transaction") + }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return &tRescanClient{ + resp: &walletrpc.RescanResponse{ + RescannedThrough: 30, + }, + }, nil + }, + } + } + + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) + if !errors.Is(err, errs.PublishTx) { + cancel() + t.Fatalf("expected a publish error, got %v", err) + } + // Ensure paying dividend payment succeeds with valid inputs. txHash, _ := hex.DecodeString("013264da8cc53f70022dc2b5654ebefc9ecfed24ea18dfcfc9adca5642d4fe66") mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { @@ -1142,6 +1259,13 @@ func testPaymentMgrPayment(t *testing.T) { TransactionHash: txHash, }, nil }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return &tRescanClient{ + resp: &walletrpc.RescanResponse{ + RescannedThrough: 30, + }, + }, nil + }, } } @@ -1151,6 +1275,13 @@ func testPaymentMgrPayment(t *testing.T) { t.Fatalf("unexpected dividend payment error, got %v", err) } + // Ensure the tx confirmation failure count reset to zero on a sucessful + // dividend payment. + txConfCount := atomic.LoadUint32(&mgr.failedTxConfs) + if txConfCount != 0 { + t.Fatalf("expected tx conf failure count to be %d, got %d", 0, txConfCount) + } + cancel() // Reset backed up values to their defaults. From 0749d75ffc1b6dca774341e06dbaf4517c22cfd1 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Sat, 1 May 2021 20:46:57 +0000 Subject: [PATCH 2/6] pool: avoid blocking chain ntfns on pmt processing. This dedicates a lifecycle process for payment processing and updates the necessary configs. Payment processing is now triggered via signals from the chainstate to the payment manager. Associated tests have been added. --- pool/chainstate.go | 57 ++++++----- pool/chainstate_test.go | 7 +- pool/hub.go | 7 +- pool/paymentmgr.go | 52 +++++++++- pool/paymentmgr_test.go | 209 ++++++++++++++++++++++++++++++++++++++-- pool/pool_test.go | 1 + 6 files changed, 288 insertions(+), 45 deletions(-) diff --git a/pool/chainstate.go b/pool/chainstate.go index 335d4f42..e837e91f 100644 --- a/pool/chainstate.go +++ b/pool/chainstate.go @@ -31,8 +31,8 @@ type ChainStateConfig struct { db Database // SoloPool represents the solo pool mining mode. SoloPool bool - // PayDividends pays mature mining rewards to participating accounts. - PayDividends func(context.Context, uint32, bool) error + // ProcessPayments relays payment signals for Processing. + ProcessPayments func(msg *paymentMsg) // GeneratePayments creates payments for participating accounts in pool // mining mode based on the configured payment scheme. GeneratePayments func(uint32, *PaymentSource, dcrutil.Amount, int64) error @@ -211,6 +211,31 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { continue } + block, err := cs.cfg.GetBlock(ctx, &header.PrevBlock) + if err != nil { + // Errors generated fetching blocks of confirmed mined + // work are curently fatal because payments are + // sourced from coinbases. The chainstate process will be + // terminated as a result. + log.Errorf("unable to fetch block with hash %x: %v", + header.PrevBlock, err) + close(msg.Done) + cs.cfg.Cancel() + continue + } + + coinbaseTx := block.Transactions[0] + treasuryActive := isTreasuryActive(coinbaseTx) + + soloPool := cs.cfg.SoloPool + if !soloPool { + go cs.cfg.ProcessPayments(&paymentMsg{ + CurrentHeight: header.Height, + TreasuryActive: treasuryActive, + Done: make(chan bool), + }) + } + // Prune invalidated jobs and accepted work. if header.Height > MaxReorgLimit { pruneLimit := header.Height - MaxReorgLimit @@ -270,34 +295,6 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { } } - block, err := cs.cfg.GetBlock(ctx, &header.PrevBlock) - if err != nil { - // Errors generated fetching blocks of confirmed mined - // work are curently fatal because payments are - // sourced from coinbases. The chainstate process will be - // terminated as a result. - log.Errorf("unable to fetch block with hash %x: %v", - header.PrevBlock, err) - close(msg.Done) - cs.cfg.Cancel() - continue - } - - coinbaseTx := block.Transactions[0] - treasuryActive := isTreasuryActive(coinbaseTx) - - // Process mature payments. - err = cs.cfg.PayDividends(ctx, header.Height, treasuryActive) - if err != nil { - log.Errorf("unable to process payments: %v", err) - close(msg.Done) - cs.cfg.Cancel() - continue - } - - // Signal the gui cache of paid dividends. - cs.cfg.SignalCache(DividendsPaid) - // Check if the parent of the connected block is an accepted work // of the pool. parentHeight := header.Height - 1 diff --git a/pool/chainstate_test.go b/pool/chainstate_test.go index 75cf76d1..101431c6 100644 --- a/pool/chainstate_test.go +++ b/pool/chainstate_test.go @@ -44,9 +44,7 @@ func testChainState(t *testing.T) { t.Fatalf("unexpected serialization error: %v", err) } - payDividends := func(context.Context, uint32, bool) error { - return nil - } + processPayments := func(*paymentMsg) {} generatePayments := func(uint32, *PaymentSource, dcrutil.Amount, int64) error { return nil } @@ -78,7 +76,7 @@ func testChainState(t *testing.T) { cCfg := &ChainStateConfig{ db: db, SoloPool: false, - PayDividends: payDividends, + ProcessPayments: processPayments, GeneratePayments: generatePayments, GetBlock: getBlock, GetBlockConfirmations: getBlockConfirmations, @@ -339,7 +337,6 @@ func testChainState(t *testing.T) { } cs.discCh <- discConfMsg <-discConfMsg.Done - cs.cfg.PayDividends = payDividends // Ensure the last work height can be updated. initialLastWorkHeight := cs.fetchLastWorkHeight() diff --git a/pool/hub.go b/pool/hub.go index 7677f8b3..049a2096 100644 --- a/pool/hub.go +++ b/pool/hub.go @@ -258,6 +258,8 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { FetchTxCreator: func() TxCreator { return h.nodeConn }, FetchTxBroadcaster: func() TxBroadcaster { return h.walletConn }, CoinbaseConfTimeout: h.cfg.CoinbaseConfTimeout, + SignalCache: h.SignalCache, + HubWg: h.wg, } var err error @@ -269,7 +271,7 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { sCfg := &ChainStateConfig{ db: h.cfg.DB, SoloPool: h.cfg.SoloPool, - PayDividends: h.paymentMgr.payDividends, + ProcessPayments: h.paymentMgr.processPayments, GeneratePayments: h.paymentMgr.generatePayments, GetBlock: h.getBlock, GetBlockConfirmations: h.getBlockConfirmations, @@ -642,9 +644,10 @@ func (h *Hub) shutdown() { // Run handles the process lifecycles of the pool hub. func (h *Hub) Run(ctx context.Context) { - h.wg.Add(2) + h.wg.Add(3) go h.endpoint.run(ctx) go h.chainState.handleChainUpdates(ctx) + go h.paymentMgr.handlePayments(ctx) // Wait until all hub processes have terminated, and then shutdown. h.wg.Wait() diff --git a/pool/paymentmgr.go b/pool/paymentmgr.go index 7148f674..d17e11b6 100644 --- a/pool/paymentmgr.go +++ b/pool/paymentmgr.go @@ -42,6 +42,9 @@ const ( // maxTxConfThreshold is the total number of coinbase confirmation // failures before a wallet rescan is requested. maxTxConfThreshold = uint32(3) + + // paymentBufferSize repreents the buffering on the payment channel. + paymentBufferSize = uint32(30) ) // TxCreator defines the functionality needed by a transaction creator for the @@ -116,6 +119,19 @@ type PaymentMgrConfig struct { // CoinbaseConfTimeout is the duration to wait for coinbase confirmations // when generating a payout transaction. CoinbaseConfTimeout time.Duration + // Cancel represents the pool's context cancellation function. + Cancel context.CancelFunc + // SignalCache sends the provided cache update event to the gui cache. + SignalCache func(event CacheUpdateEvent) + // HubWg represents the hub's waitgroup. + HubWg *sync.WaitGroup +} + +// paymentMsg represents a payment processing signal. +type paymentMsg struct { + CurrentHeight uint32 + TreasuryActive bool + Done chan bool } // PaymentMgr handles generating shares and paying out dividends to @@ -123,6 +139,7 @@ type PaymentMgrConfig struct { type PaymentMgr struct { failedTxConfs uint32 // update atomically. + paymentCh chan *paymentMsg processing bool cfg *PaymentMgrConfig mtx sync.Mutex @@ -131,7 +148,8 @@ type PaymentMgr struct { // NewPaymentMgr creates a new payment manager. func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) { pm := &PaymentMgr{ - cfg: pCfg, + cfg: pCfg, + paymentCh: make(chan *paymentMsg, paymentBufferSize), } rand.Seed(time.Now().UnixNano()) @@ -910,3 +928,35 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA return nil } + +// processPayments relays payment signals for processing. +func (pm *PaymentMgr) processPayments(msg *paymentMsg) { + pm.paymentCh <- msg +} + +// handlePayments processes dividend payment signals. +func (pm *PaymentMgr) handlePayments(ctx context.Context) { + for { + select { + case <-ctx.Done(): + pm.cfg.HubWg.Done() + return + + case msg := <-pm.paymentCh: + if !pm.cfg.SoloPool { + err := pm.payDividends(ctx, msg.CurrentHeight, msg.TreasuryActive) + if err != nil { + log.Errorf("unable to process payments: %v", err) + close(msg.Done) + pm.cfg.Cancel() + continue + } + + // Signal the gui cache of paid dividends. + pm.cfg.SignalCache(DividendsPaid) + + close(msg.Done) + } + } + } +} diff --git a/pool/paymentmgr_test.go b/pool/paymentmgr_test.go index 1d6309f4..48671e46 100644 --- a/pool/paymentmgr_test.go +++ b/pool/paymentmgr_test.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "math/big" + "sync" "sync/atomic" "testing" "time" @@ -150,7 +151,7 @@ func TestSharePercentages(t *testing.T) { } } -func createPaymentMgr(paymentMethod string) (*PaymentMgr, error) { +func createPaymentMgr(paymentMethod string) (*PaymentMgr, context.Context, context.CancelFunc, error) { activeNet := chaincfg.SimNetParams() getBlockConfirmations := func(context.Context, *chainhash.Hash) (int64, error) { @@ -165,6 +166,11 @@ func createPaymentMgr(paymentMethod string) (*PaymentMgr, error) { return nil } + signalCache := func(CacheUpdateEvent) { + // Do nothing. + } + + ctx, cancel := context.WithCancel(context.Background()) pCfg := &PaymentMgrConfig{ db: db, ActiveNet: activeNet, @@ -176,12 +182,21 @@ func createPaymentMgr(paymentMethod string) (*PaymentMgr, error) { FetchTxCreator: fetchTxCreator, FetchTxBroadcaster: fetchTxBroadcaster, PoolFeeAddrs: []dcrutil.Address{poolFeeAddrs}, + Cancel: cancel, + SignalCache: signalCache, + HubWg: new(sync.WaitGroup), + } + + mgr, err := NewPaymentMgr(pCfg) + if err != nil { + return nil, nil, nil, err } - return NewPaymentMgr(pCfg) + + return mgr, ctx, cancel, err } func testPaymentMgrPPS(t *testing.T) { - mgr, err := createPaymentMgr(PPS) + mgr, _, _, err := createPaymentMgr(PPS) if err != nil { t.Fatalf("[createPaymentMgr] unexpected error: %v", err) } @@ -278,7 +293,7 @@ func testPaymentMgrPPS(t *testing.T) { } func testPaymentMgrPPLNS(t *testing.T) { - mgr, err := createPaymentMgr(PPLNS) + mgr, _, _, err := createPaymentMgr(PPLNS) if err != nil { t.Fatalf("[createPaymentMgr] unexpected error: %v", err) } @@ -376,7 +391,7 @@ func testPaymentMgrPPLNS(t *testing.T) { } func testPaymentMgrMaturity(t *testing.T) { - mgr, err := createPaymentMgr(PPLNS) + mgr, _, _, err := createPaymentMgr(PPLNS) if err != nil { t.Fatalf("[createPaymentMgr] unexpected error: %v", err) } @@ -472,7 +487,7 @@ func testPaymentMgrPayment(t *testing.T) { t.Fatalf("failed to insert account: %v", err) } - mgr, err := createPaymentMgr(PPS) + mgr, _, _, err := createPaymentMgr(PPS) if err != nil { t.Fatalf("[createPaymentMgr] unexpected error: %v", err) } @@ -1296,7 +1311,7 @@ func testPaymentMgrPayment(t *testing.T) { } func testPaymentMgrDust(t *testing.T) { - mgr, err := createPaymentMgr(PPLNS) + mgr, _, _, err := createPaymentMgr(PPLNS) if err != nil { t.Fatalf("[createPaymentMgr] unexpected error: %v", err) } @@ -1375,3 +1390,183 @@ func testPaymentMgrDust(t *testing.T) { "than the initial (%v)", ft, expectedFeeAmt) } } + +func testPaymentMgrSignals(t *testing.T) { + // Insert some test accounts. + accountX := NewAccount(xAddr) + err := db.persistAccount(accountX) + if err != nil { + t.Fatalf("failed to insert account: %v", err) + } + + accountY := NewAccount(yAddr) + err = db.persistAccount(accountY) + if err != nil { + t.Fatalf("failed to insert account: %v", err) + } + + mgr, ctx, cancel, err := createPaymentMgr(PPLNS) + if err != nil { + t.Fatalf("[createPaymentMgr] unexpected error: %v", err) + } + + var randBytes [chainhash.HashSize + 1]byte + _, err = rand.Read(randBytes[:]) + if err != nil { + t.Fatalf("unable to generate random bytes: %v", err) + } + + randHash := chainhash.HashH(randBytes[:]) + randSource := &PaymentSource{ + BlockHash: randHash.String(), + Coinbase: randHash.String(), + } + + height := uint32(10) + estMaturity := uint32(26) + amt, _ := dcrutil.NewAmount(5) + + pmtX := NewPayment(xID, zeroSource, amt, height, estMaturity) + err = db.PersistPayment(pmtX) + if err != nil { + cancel() + t.Fatal(err) + } + + pmtY := NewPayment(yID, randSource, amt, height, estMaturity) + err = db.PersistPayment(pmtY) + if err != nil { + cancel() + t.Fatal(err) + } + + txHashes := make(map[string]*chainhash.Hash) + hashA := chainhash.Hash{'a'} + txHashes[hashA.String()] = &hashA + hashB := chainhash.Hash{'b'} + txHashes[hashB.String()] = &hashB + hashC := chainhash.Hash{'c'} + txHashes[hashC.String()] = &hashC + + mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { + return int64(estMaturity) + 1, nil + } + + txConfs := make([]*walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations, 0) + confA := walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ + TxHash: zeroHash[:], + Confirmations: 50, + BlockHash: []byte(zeroSource.BlockHash), + BlockHeight: 60, + } + txConfs = append(txConfs, &confA) + confB := walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ + TxHash: randHash[:], + Confirmations: 50, + BlockHash: []byte(zeroSource.BlockHash), + BlockHeight: 60, + } + txConfs = append(txConfs, &confB) + + mgr.cfg.CoinbaseConfTimeout = time.Millisecond * 500 + mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + return func() (*walletrpc.ConfirmationNotificationsResponse, error) { + return &walletrpc.ConfirmationNotificationsResponse{ + Confirmations: txConfs, + }, nil + }, nil + } + + txBytes := []byte("01000000018e17619f0d627c2769ee3f957582691aea59c2" + + "e79cc45b8ba1f08485dd88d75c0300000001ffffffff017a64e43703000000" + + "00001976a914978fa305bd66f63f0de847338bb56ff65fa8e27288ac000000" + + "000000000001f46ce43703000000846c0700030000006b483045022100d668" + + "5812801db991b72e80863eba7058466dfebb4aba0af75ab47bade177325102" + + "205f466fc47435c1a177482e527ff0e76f3c2c613940b358e57f0f0d78d5f2" + + "ffcb012102d040a4c34ae65a2b87ea8e9df7413e6504e5f27c6bde019a78ee" + + "96145b27c517") + txHash, _ := hex.DecodeString("013264da8cc53f70022dc2b5654ebefc9ecfed24ea18dfcfc9adca5642d4fe66") + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return &walletrpc.SignTransactionResponse{ + Transaction: txBytes, + }, nil + }, + publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return &walletrpc.PublishTransactionResponse{ + TransactionHash: txHash, + }, nil + }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return &tRescanClient{ + resp: &walletrpc.RescanResponse{ + RescannedThrough: 30, + }, + }, nil + }, + } + } + + mgr.cfg.FetchTxCreator = func() TxCreator { + return &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return &chainjson.GetTxOutResult{ + BestBlock: chainhash.Hash{0}.String(), + Confirmations: int64(estMaturity) + 1, + Value: 5, + Coinbase: true, + }, nil + }, + createRawTransaction: func(ctx context.Context, inputs []chainjson.TransactionInput, amounts map[dcrutil.Address]dcrutil.Amount, lockTime *int64, expiry *int64) (*wire.MsgTx, error) { + return &wire.MsgTx{}, nil + }, + } + } + mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { + return int64(estMaturity) + 1, nil + } + + // Ensure the payment lifecycle process recieves the payment signal and + // processes mature payments. + msgA := paymentMsg{ + CurrentHeight: estMaturity + 1, + TreasuryActive: false, + Done: make(chan bool), + } + + mgr.cfg.HubWg.Add(1) + go mgr.handlePayments(ctx) + + mgr.processPayments(&msgA) + <-msgA.Done + + // Esure the payment lifecycle process cancels the context when an + // error is encountered. + mgr.cfg.FetchTxCreator = func() TxCreator { + return &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return &chainjson.GetTxOutResult{ + BestBlock: chainhash.Hash{0}.String(), + Confirmations: int64(estMaturity) + 1, + Value: 5, + Coinbase: true, + }, nil + }, + createRawTransaction: func(ctx context.Context, inputs []chainjson.TransactionInput, amounts map[dcrutil.Address]dcrutil.Amount, lockTime *int64, expiry *int64) (*wire.MsgTx, error) { + return nil, fmt.Errorf("unable to create raw transactions") + }, + } + } + + msgB := paymentMsg{ + CurrentHeight: estMaturity + 1, + TreasuryActive: false, + Done: make(chan bool), + } + mgr.processPayments(&msgB) + <-msgB.Done + + cancel() + mgr.cfg.HubWg.Wait() +} diff --git a/pool/pool_test.go b/pool/pool_test.go index e3e4a3b9..265dd242 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -103,6 +103,7 @@ func TestPool(t *testing.T) { "testPaymentMgrMaturity": testPaymentMgrMaturity, "testPaymentMgrPayment": testPaymentMgrPayment, "testPaymentMgrDust": testPaymentMgrDust, + "testPaymentSignals": testPaymentMgrSignals, "testChainState": testChainState, "testHub": testHub, } From 216be09326474fe148b62c800ba97f96ed843edc Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Tue, 4 May 2021 19:54:28 +0000 Subject: [PATCH 3/6] pool: ensure rescans cover erring tx conf hashes. This updates the dividend payment process to get the lowest failing tx conf height to use for the start of a wallet rescan to ensure all failing tx confs are covered. --- pool/hub.go | 2 +- pool/paymentmgr.go | 66 +++++++++++++++++++++++++++++++++-------- pool/paymentmgr_test.go | 54 ++++++++++++++++++++++----------- 3 files changed, 91 insertions(+), 31 deletions(-) diff --git a/pool/hub.go b/pool/hub.go index 049a2096..c3f93e81 100644 --- a/pool/hub.go +++ b/pool/hub.go @@ -445,7 +445,7 @@ func (h *Hub) getWork(ctx context.Context) (string, string, error) { // getTxConfNotifications streams transaction confirmation notifications for // the provided transaction hashes. -func (h *Hub) getTxConfNotifications(txHashes []*chainhash.Hash, stopAfter int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { +func (h *Hub) getTxConfNotifications(txHashes []chainhash.Hash, stopAfter int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { hashes := make([][]byte, 0, len(txHashes)) for _, hash := range txHashes { hashes = append(hashes, hash[:]) diff --git a/pool/paymentmgr.go b/pool/paymentmgr.go index d17e11b6..f8e0e9d7 100644 --- a/pool/paymentmgr.go +++ b/pool/paymentmgr.go @@ -109,7 +109,7 @@ type PaymentMgrConfig struct { GetBlockConfirmations func(context.Context, *chainhash.Hash) (int64, error) // GetTxConfNotifications streams transaction confirmation notifications on // the provided hashes. - GetTxConfNotifications func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) + GetTxConfNotifications func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) // FetchTxCreator returns a transaction creator that allows coinbase lookups // and payment transaction creation. FetchTxCreator func() TxCreator @@ -138,9 +138,10 @@ type paymentMsg struct { // participating accounts. type PaymentMgr struct { failedTxConfs uint32 // update atomically. + txConfHashes map[chainhash.Hash]uint32 - paymentCh chan *paymentMsg processing bool + paymentCh chan *paymentMsg cfg *PaymentMgrConfig mtx sync.Mutex } @@ -148,8 +149,9 @@ type PaymentMgr struct { // NewPaymentMgr creates a new payment manager. func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) { pm := &PaymentMgr{ - cfg: pCfg, - paymentCh: make(chan *paymentMsg, paymentBufferSize), + cfg: pCfg, + txConfHashes: make(map[chainhash.Hash]uint32), + paymentCh: make(chan *paymentMsg, paymentBufferSize), } rand.Seed(time.Now().UnixNano()) @@ -522,10 +524,10 @@ func fetchTxConfNotifications(ctx context.Context, notifSource func() (*walletrp // // The context passed to this function must have a corresponding // cancellation to allow for a clean shutdown process. -func (pm *PaymentMgr) confirmCoinbases(ctx context.Context, txHashes map[string]*chainhash.Hash, spendableHeight uint32) error { +func (pm *PaymentMgr) confirmCoinbases(ctx context.Context, txHashes map[chainhash.Hash]uint32, spendableHeight uint32) error { funcName := "confirmCoinbases" - hashes := make([]*chainhash.Hash, 0, len(txHashes)) - for _, hash := range txHashes { + hashes := make([]chainhash.Hash, 0, len(txHashes)) + for hash := range txHashes { hashes = append(hashes, hash) } @@ -561,7 +563,7 @@ func (pm *PaymentMgr) confirmCoinbases(ctx context.Context, txHashes map[string] // Remove spendable coinbase from the tx hash set. All // coinbases are spendable when the tx hash set is empty. - delete(txHashes, hash.String()) + delete(txHashes, *hash) } } @@ -627,7 +629,7 @@ func (pm *PaymentMgr) monitorRescan(ctx context.Context, rescanSource walletrpc. // generatePayoutTxDetails creates the payout transaction inputs and outputs // from the provided payments func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator, feeAddr dcrutil.Address, payments map[string][]*Payment, treasuryActive bool) ([]chainjson.TransactionInput, - map[string]*chainhash.Hash, map[string]dcrutil.Amount, dcrutil.Amount, error) { + map[chainhash.Hash]uint32, map[string]dcrutil.Amount, dcrutil.Amount, error) { funcName := "generatePayoutTxDetails" // The coinbase output prior to @@ -641,7 +643,7 @@ func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator var tIn, tOut dcrutil.Amount inputs := make([]chainjson.TransactionInput, 0) - inputTxHashes := make(map[string]*chainhash.Hash) + inputTxHashes := make(map[chainhash.Hash]uint32) outputs := make(map[string]dcrutil.Amount) for _, pmtSet := range payments { coinbaseTx := pmtSet[0].Source.Coinbase @@ -675,7 +677,7 @@ func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator Tree: wire.TxTreeRegular, } inputs = append(inputs, in) - inputTxHashes[txHash.String()] = txHash + inputTxHashes[*txHash] = pmtSet[0].Height prevOutV, err := dcrutil.NewAmount(in.Amount) if err != nil { @@ -769,9 +771,35 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA txConfCount := atomic.LoadUint32(&pm.failedTxConfs) if txConfCount == maxTxConfThreshold { - beginHeight := int32(height) - (int32(pm.cfg.ActiveNet.CoinbaseMaturity) * 2) + beginHeight := uint32(0) + + // Having no tx conf hashes at threshold indicates an + // underlining error. + pm.mtx.Lock() + if len(pm.txConfHashes) == 0 { + pm.mtx.Unlock() + desc := fmt.Sprintf("%s: no tx conf hashes to rescan for", + funcName) + return errs.PoolError(errs.TxConf, desc) + } + + // Find the lowest height to start the rescan from. + for _, height := range pm.txConfHashes { + if beginHeight == 0 { + beginHeight = height + } + + if beginHeight > height { + beginHeight = height + } + } + pm.mtx.Unlock() + + // Start the rescan a block height below the lowest reported block + // having inaccurate confirmation information. + log.Infof("wallet rescanning from height #%d", beginHeight-1) rescanReq := &walletrpc.RescanRequest{ - BeginHeight: beginHeight, + BeginHeight: int32(beginHeight - 1), } rescanSource, err := txB.Rescan(pCtx, rescanReq) if err != nil { @@ -845,6 +873,13 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA if err != nil { atomic.AddUint32(&pm.failedTxConfs, 1) + // Track the transactions with inaccurate confirmation data. + pm.mtx.Lock() + for k, v := range inputTxHashes { + pm.txConfHashes[k] = v + } + pm.mtx.Unlock() + // Do not error if coinbase spendable confirmation requests are // terminated by the context cancellation. if !errors.Is(err, errs.ContextCancelled) { @@ -924,8 +959,13 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA return err } + // Reset the failed tx conf counter and clear the hashes. atomic.StoreUint32(&pm.failedTxConfs, 0) + pm.mtx.Lock() + pm.txConfHashes = make(map[chainhash.Hash]uint32) + pm.mtx.Unlock() + return nil } diff --git a/pool/paymentmgr_test.go b/pool/paymentmgr_test.go index 48671e46..e5d610b5 100644 --- a/pool/paymentmgr_test.go +++ b/pool/paymentmgr_test.go @@ -620,16 +620,16 @@ func testPaymentMgrPayment(t *testing.T) { } // confirmCoinbases tests. - txHashes := make(map[string]*chainhash.Hash) + txHashes := make(map[chainhash.Hash]uint32) hashA := chainhash.Hash{'a'} - txHashes[hashA.String()] = &hashA + txHashes[hashA] = height hashB := chainhash.Hash{'b'} - txHashes[hashB.String()] = &hashB + txHashes[hashB] = height hashC := chainhash.Hash{'c'} - txHashes[hashC.String()] = &hashC + txHashes[hashC] = height spendableHeight := uint32(10) - mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + mgr.cfg.GetTxConfNotifications = func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { return nil, fmt.Errorf("unable to fetch tx conf notification source") } @@ -641,7 +641,7 @@ func testPaymentMgrPayment(t *testing.T) { t.Fatalf("expected tx conf notification source error") } - mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + mgr.cfg.GetTxConfNotifications = func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { return func() (*walletrpc.ConfirmationNotificationsResponse, error) { return &walletrpc.ConfirmationNotificationsResponse{}, nil }, nil @@ -661,7 +661,7 @@ func testPaymentMgrPayment(t *testing.T) { // The context here needs to be recreated after the previous test. ctx, cancel = context.WithCancel(context.Background()) - mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + mgr.cfg.GetTxConfNotifications = func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { return func() (*walletrpc.ConfirmationNotificationsResponse, error) { return nil, fmt.Errorf("unable to confirm transactions") }, nil @@ -698,7 +698,7 @@ func testPaymentMgrPayment(t *testing.T) { } txConfs = append(txConfs, &confC) - mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + mgr.cfg.GetTxConfNotifications = func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { return func() (*walletrpc.ConfirmationNotificationsResponse, error) { return &walletrpc.ConfirmationNotificationsResponse{ Confirmations: txConfs, @@ -856,7 +856,7 @@ func testPaymentMgrPayment(t *testing.T) { t.Fatalf("expected %d inputs, got %d", expectedInputs, len(inputs)) } - for _, hash := range inputTxHashes { + for hash := range inputTxHashes { txHash := hash.String() var match bool for _, in := range inputs { @@ -997,7 +997,7 @@ func testPaymentMgrPayment(t *testing.T) { mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { return int64(estMaturity) + 1, nil } - mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + mgr.cfg.GetTxConfNotifications = func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { return nil, fmt.Errorf("unable to fetch tx conf notification source") } @@ -1029,23 +1029,23 @@ func testPaymentMgrPayment(t *testing.T) { } txConfs = make([]*walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations, 0) - confA = walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ + confD := walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ TxHash: zeroHash[:], Confirmations: 50, BlockHash: []byte(zeroSource.BlockHash), BlockHeight: 60, } - txConfs = append(txConfs, &confA) - confB = walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ + txConfs = append(txConfs, &confD) + confE := walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ TxHash: randHash[:], Confirmations: 50, BlockHash: []byte(zeroSource.BlockHash), BlockHeight: 60, } - txConfs = append(txConfs, &confB) + txConfs = append(txConfs, &confE) mgr.cfg.CoinbaseConfTimeout = time.Millisecond * 500 - mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + mgr.cfg.GetTxConfNotifications = func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { return func() (*walletrpc.ConfirmationNotificationsResponse, error) { return &walletrpc.ConfirmationNotificationsResponse{ Confirmations: txConfs, @@ -1083,7 +1083,7 @@ func testPaymentMgrPayment(t *testing.T) { return nil } mgr.cfg.WalletPass = "123" - mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + mgr.cfg.GetTxConfNotifications = func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { return func() (*walletrpc.ConfirmationNotificationsResponse, error) { return &walletrpc.ConfirmationNotificationsResponse{ Confirmations: txConfs, @@ -1233,6 +1233,26 @@ func testPaymentMgrPayment(t *testing.T) { t.Fatalf("expected a rescan error, got %v", err) } + // Clear out the tx confirmation hashes to be rescanned for. + var confHashes map[chainhash.Hash]uint32 + mgr.mtx.Lock() + confHashes = mgr.txConfHashes + mgr.txConfHashes = make(map[chainhash.Hash]uint32) + mgr.mtx.Unlock() + + // Ensure dividend payment returns an error when there are no tx + // confirmation hashes to rescan. + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) + if !errors.Is(err, errs.TxConf) { + cancel() + t.Fatalf("expected a no tx conf error, got %v", err) + } + + // Restore the tx confirmation hashes to be rescanned for. + mgr.mtx.Lock() + mgr.txConfHashes = confHashes + mgr.mtx.Unlock() + // Ensure wallet rescan succeeds when it scans through the current height. mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { return &txBroadcasterImpl{ @@ -1469,7 +1489,7 @@ func testPaymentMgrSignals(t *testing.T) { txConfs = append(txConfs, &confB) mgr.cfg.CoinbaseConfTimeout = time.Millisecond * 500 - mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + mgr.cfg.GetTxConfNotifications = func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { return func() (*walletrpc.ConfirmationNotificationsResponse, error) { return &walletrpc.ConfirmationNotificationsResponse{ Confirmations: txConfs, From f734f319a4184e956f661aa3c61228fbadad2a3e Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Tue, 4 May 2021 19:56:33 +0000 Subject: [PATCH 4/6] pool: add trace logs for tx confirmations. --- pool/hub.go | 3 +++ pool/paymentmgr.go | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/pool/hub.go b/pool/hub.go index c3f93e81..344aede8 100644 --- a/pool/hub.go +++ b/pool/hub.go @@ -447,8 +447,11 @@ func (h *Hub) getWork(ctx context.Context) (string, string, error) { // the provided transaction hashes. func (h *Hub) getTxConfNotifications(txHashes []chainhash.Hash, stopAfter int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { hashes := make([][]byte, 0, len(txHashes)) + log.Tracef("Requesting tx conf notifications for %d "+ + "transactions (stop after #%d)", len(txHashes), stopAfter) for _, hash := range txHashes { hashes = append(hashes, hash[:]) + log.Tracef(" %s", hash) } req := &walletrpc.ConfirmationNotificationsRequest{ diff --git a/pool/paymentmgr.go b/pool/paymentmgr.go index f8e0e9d7..6eb7f065 100644 --- a/pool/paymentmgr.go +++ b/pool/paymentmgr.go @@ -501,6 +501,20 @@ func fetchTxConfNotifications(ctx context.Context, notifSource func() (*walletrp resp: resp, err: err, } + + if resp != nil { + log.Tracef("Got notification with %d confirmation(s)", + len(resp.Confirmations)) + for _, conf := range resp.Confirmations { + txHash, err := chainhash.NewHash(conf.TxHash) + if err != nil { + log.Tracef("Could not decode txhash: %b", conf.TxHash) + } else { + log.Tracef(" tx=%s, confs=%d, height=%d", + txHash, conf.Confirmations, conf.BlockHeight) + } + } + } }(notifCh) select { @@ -567,6 +581,11 @@ func (pm *PaymentMgr) confirmCoinbases(ctx context.Context, txHashes map[chainha } } + log.Tracef("%d coinbase(s) are not confirmed:", len(txHashes)) + for coinbase := range txHashes { + log.Tracef(" %v", coinbase) + } + if len(txHashes) == 0 { return nil } From d86a5fab52e568281f5b471f5af5ceee3497786c Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Wed, 5 May 2021 11:25:41 +0000 Subject: [PATCH 5/6] pool: fix ConfirmationNotificationsRequest params. This fixes an issue where the tx hashes sent in the notification request were duplicates of each other because of taking slice of the iterator variable. The stopAfter param was also updated to send over the correct parameter (conf count) instead of a block height. --- pool/hub.go | 2 +- pool/paymentmgr.go | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pool/hub.go b/pool/hub.go index 344aede8..af64179e 100644 --- a/pool/hub.go +++ b/pool/hub.go @@ -450,7 +450,7 @@ func (h *Hub) getTxConfNotifications(txHashes []chainhash.Hash, stopAfter int32) log.Tracef("Requesting tx conf notifications for %d "+ "transactions (stop after #%d)", len(txHashes), stopAfter) for _, hash := range txHashes { - hashes = append(hashes, hash[:]) + hashes = append(hashes, hash.CloneBytes()) log.Tracef(" %s", hash) } diff --git a/pool/paymentmgr.go b/pool/paymentmgr.go index 6eb7f065..967633d9 100644 --- a/pool/paymentmgr.go +++ b/pool/paymentmgr.go @@ -884,11 +884,16 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA maxSpendableHeight = spendableHeight } } - if maxSpendableHeight < height { - maxSpendableHeight = height + + var stopAfter uint32 + switch { + case maxSpendableHeight > height: + stopAfter = maxSpendableHeight - height + default: + stopAfter = 1 } - err = pm.confirmCoinbases(pCtx, inputTxHashes, maxSpendableHeight) + err = pm.confirmCoinbases(pCtx, inputTxHashes, stopAfter) if err != nil { atomic.AddUint32(&pm.failedTxConfs, 1) From e6737946bcde5014bc57c48a702cfd95bf3d4d81 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Wed, 12 May 2021 18:35:59 +0000 Subject: [PATCH 6/6] pool: resolve review issues (1 of x). --- pool/chainstate.go | 2 +- pool/paymentmgr.go | 12 +++---- pool/paymentmgr_test.go | 72 ++++++++--------------------------------- 3 files changed, 20 insertions(+), 66 deletions(-) diff --git a/pool/chainstate.go b/pool/chainstate.go index e837e91f..d90a688e 100644 --- a/pool/chainstate.go +++ b/pool/chainstate.go @@ -31,7 +31,7 @@ type ChainStateConfig struct { db Database // SoloPool represents the solo pool mining mode. SoloPool bool - // ProcessPayments relays payment signals for Processing. + // ProcessPayments relays payment signals for processing. ProcessPayments func(msg *paymentMsg) // GeneratePayments creates payments for participating accounts in pool // mining mode based on the configured payment scheme. diff --git a/pool/paymentmgr.go b/pool/paymentmgr.go index 967633d9..c11439a7 100644 --- a/pool/paymentmgr.go +++ b/pool/paymentmgr.go @@ -43,7 +43,7 @@ const ( // failures before a wallet rescan is requested. maxTxConfThreshold = uint32(3) - // paymentBufferSize repreents the buffering on the payment channel. + // paymentBufferSize is the size of the buffer on the payment channel. paymentBufferSize = uint32(30) ) @@ -783,17 +783,17 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA return errs.PoolError(errs.Disconnected, desc) } - // Request a wallet rescan if tx confirmation failures are - // at threshold. pCtx, pCancel := context.WithTimeout(ctx, pm.cfg.CoinbaseConfTimeout) defer pCancel() + // Request a wallet rescan if tx confirmation failures are + // at threshold. txConfCount := atomic.LoadUint32(&pm.failedTxConfs) - if txConfCount == maxTxConfThreshold { + if txConfCount >= maxTxConfThreshold { beginHeight := uint32(0) // Having no tx conf hashes at threshold indicates an - // underlining error. + // underlying error. pm.mtx.Lock() if len(pm.txConfHashes) == 0 { pm.mtx.Unlock() @@ -822,7 +822,7 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA } rescanSource, err := txB.Rescan(pCtx, rescanReq) if err != nil { - desc := fmt.Sprintf("%s: tx creator cannot be nil", funcName) + desc := fmt.Sprintf("%s: rescan source cannot be nil", funcName) return errs.PoolError(errs.Rescan, desc) } diff --git a/pool/paymentmgr_test.go b/pool/paymentmgr_test.go index e5d610b5..d1e9a7e7 100644 --- a/pool/paymentmgr_test.go +++ b/pool/paymentmgr_test.go @@ -151,7 +151,7 @@ func TestSharePercentages(t *testing.T) { } } -func createPaymentMgr(paymentMethod string) (*PaymentMgr, context.Context, context.CancelFunc, error) { +func createPaymentMgr(t *testing.T, paymentMethod string) (*PaymentMgr, context.Context, context.CancelFunc) { activeNet := chaincfg.SimNetParams() getBlockConfirmations := func(context.Context, *chainhash.Hash) (int64, error) { @@ -189,17 +189,14 @@ func createPaymentMgr(paymentMethod string) (*PaymentMgr, context.Context, conte mgr, err := NewPaymentMgr(pCfg) if err != nil { - return nil, nil, nil, err + t.Fatalf("[createPaymentMgr] unexpected error: %v", err) } - return mgr, ctx, cancel, err + return mgr, ctx, cancel } func testPaymentMgrPPS(t *testing.T) { - mgr, _, _, err := createPaymentMgr(PPS) - if err != nil { - t.Fatalf("[createPaymentMgr] unexpected error: %v", err) - } + mgr, _, _ := createPaymentMgr(t, PPS) // Ensure Pay-Per-Share (PPS) works as expected. now := time.Now() @@ -293,10 +290,7 @@ func testPaymentMgrPPS(t *testing.T) { } func testPaymentMgrPPLNS(t *testing.T) { - mgr, _, _, err := createPaymentMgr(PPLNS) - if err != nil { - t.Fatalf("[createPaymentMgr] unexpected error: %v", err) - } + mgr, _, _ := createPaymentMgr(t, PPLNS) // Ensure Pay-Per-Last-N-Shares (PPLNS) works as expected. now := time.Now() @@ -391,10 +385,7 @@ func testPaymentMgrPPLNS(t *testing.T) { } func testPaymentMgrMaturity(t *testing.T) { - mgr, _, _, err := createPaymentMgr(PPLNS) - if err != nil { - t.Fatalf("[createPaymentMgr] unexpected error: %v", err) - } + mgr, _, _ := createPaymentMgr(t, PPLNS) now := time.Now() shareCount := 3 @@ -407,7 +398,7 @@ func testPaymentMgrMaturity(t *testing.T) { // Ensure payment maturity works as expected. for i := 0; i < shareCount; i++ { // Create readily available shares for account X. - err = persistShare(db, xID, weight, thirtyBefore+int64(i)) + err := persistShare(db, xID, weight, thirtyBefore+int64(i)) if err != nil { t.Fatal(err) } @@ -415,7 +406,7 @@ func testPaymentMgrMaturity(t *testing.T) { sixtyAfter := time.Now().Add((time.Second * 60)).UnixNano() for i := 0; i < shareCount; i++ { // Create future shares for account Y. - err = persistShare(db, yID, weight, sixtyAfter+int64(i)) + err := persistShare(db, yID, weight, sixtyAfter+int64(i)) if err != nil { t.Fatal(err) } @@ -487,11 +478,7 @@ func testPaymentMgrPayment(t *testing.T) { t.Fatalf("failed to insert account: %v", err) } - mgr, _, _, err := createPaymentMgr(PPS) - if err != nil { - t.Fatalf("[createPaymentMgr] unexpected error: %v", err) - } - + mgr, _, _ := createPaymentMgr(t, PPS) height := uint32(20) // pruneOrphanedPayments tests. @@ -1208,31 +1195,6 @@ func testPaymentMgrPayment(t *testing.T) { t.Fatalf("expected a rescan error, got %v", err) } - // Ensure dividend payment returns an error if fetching rescan responses fail. - mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { - return &txBroadcasterImpl{ - signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { - return &walletrpc.SignTransactionResponse{ - Transaction: txBytes, - }, nil - }, - publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { - return nil, fmt.Errorf("unable to publish transaction") - }, - rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { - return &tRescanClient{ - err: fmt.Errorf("internal error"), - }, nil - }, - } - } - - err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) - if !errors.Is(err, errs.Rescan) { - cancel() - t.Fatalf("expected a rescan error, got %v", err) - } - // Clear out the tx confirmation hashes to be rescanned for. var confHashes map[chainhash.Hash]uint32 mgr.mtx.Lock() @@ -1331,11 +1293,7 @@ func testPaymentMgrPayment(t *testing.T) { } func testPaymentMgrDust(t *testing.T) { - mgr, _, _, err := createPaymentMgr(PPLNS) - if err != nil { - t.Fatalf("[createPaymentMgr] unexpected error: %v", err) - } - + mgr, _, _ := createPaymentMgr(t, PPLNS) height := uint32(20) // Ensure dust payments are forfeited by their originating accounts and @@ -1347,7 +1305,7 @@ func testPaymentMgrDust(t *testing.T) { yWeight := new(big.Rat).Mul(weight, new(big.Rat).SetInt64(int64(mul))) // Create shares for account x and y. - err = persistShare(db, xID, weight, now.UnixNano()) + err := persistShare(db, xID, weight, now.UnixNano()) if err != nil { t.Fatal(err) } @@ -1425,11 +1383,7 @@ func testPaymentMgrSignals(t *testing.T) { t.Fatalf("failed to insert account: %v", err) } - mgr, ctx, cancel, err := createPaymentMgr(PPLNS) - if err != nil { - t.Fatalf("[createPaymentMgr] unexpected error: %v", err) - } - + mgr, ctx, cancel := createPaymentMgr(t, PPLNS) var randBytes [chainhash.HashSize + 1]byte _, err = rand.Read(randBytes[:]) if err != nil { @@ -1547,7 +1501,7 @@ func testPaymentMgrSignals(t *testing.T) { return int64(estMaturity) + 1, nil } - // Ensure the payment lifecycle process recieves the payment signal and + // Ensure the payment lifecycle process receives the payment signal and // processes mature payments. msgA := paymentMsg{ CurrentHeight: estMaturity + 1,