diff --git a/errors/error.go b/errors/error.go index 4a516f24..9a69baf0 100644 --- a/errors/error.go +++ b/errors/error.go @@ -143,6 +143,9 @@ const ( // CreateAmount indicates an amount creation error. CreateAmount = ErrorKind("CreateAmount") + + // Rescan indicates an wallet rescan error. + Rescan = ErrorKind("Rescan") ) // Error satisfies the error interface and prints human-readable errors. diff --git a/errors/error_test.go b/errors/error_test.go index cb72f986..94f02530 100644 --- a/errors/error_test.go +++ b/errors/error_test.go @@ -58,6 +58,7 @@ func TestErrorKindStringer(t *testing.T) { {TxIn, "TxIn"}, {ContextCancelled, "ContextCancelled"}, {CreateAmount, "CreateAmount"}, + {Rescan, "Rescan"}, } for i, test := range tests { diff --git a/pool/chainstate.go b/pool/chainstate.go index 335d4f42..d90a688e 100644 --- a/pool/chainstate.go +++ b/pool/chainstate.go @@ -31,8 +31,8 @@ type ChainStateConfig struct { db Database // SoloPool represents the solo pool mining mode. SoloPool bool - // PayDividends pays mature mining rewards to participating accounts. - PayDividends func(context.Context, uint32, bool) error + // ProcessPayments relays payment signals for processing. + ProcessPayments func(msg *paymentMsg) // GeneratePayments creates payments for participating accounts in pool // mining mode based on the configured payment scheme. GeneratePayments func(uint32, *PaymentSource, dcrutil.Amount, int64) error @@ -211,6 +211,31 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { continue } + block, err := cs.cfg.GetBlock(ctx, &header.PrevBlock) + if err != nil { + // Errors generated fetching blocks of confirmed mined + // work are curently fatal because payments are + // sourced from coinbases. The chainstate process will be + // terminated as a result. + log.Errorf("unable to fetch block with hash %x: %v", + header.PrevBlock, err) + close(msg.Done) + cs.cfg.Cancel() + continue + } + + coinbaseTx := block.Transactions[0] + treasuryActive := isTreasuryActive(coinbaseTx) + + soloPool := cs.cfg.SoloPool + if !soloPool { + go cs.cfg.ProcessPayments(&paymentMsg{ + CurrentHeight: header.Height, + TreasuryActive: treasuryActive, + Done: make(chan bool), + }) + } + // Prune invalidated jobs and accepted work. if header.Height > MaxReorgLimit { pruneLimit := header.Height - MaxReorgLimit @@ -270,34 +295,6 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { } } - block, err := cs.cfg.GetBlock(ctx, &header.PrevBlock) - if err != nil { - // Errors generated fetching blocks of confirmed mined - // work are curently fatal because payments are - // sourced from coinbases. The chainstate process will be - // terminated as a result. - log.Errorf("unable to fetch block with hash %x: %v", - header.PrevBlock, err) - close(msg.Done) - cs.cfg.Cancel() - continue - } - - coinbaseTx := block.Transactions[0] - treasuryActive := isTreasuryActive(coinbaseTx) - - // Process mature payments. - err = cs.cfg.PayDividends(ctx, header.Height, treasuryActive) - if err != nil { - log.Errorf("unable to process payments: %v", err) - close(msg.Done) - cs.cfg.Cancel() - continue - } - - // Signal the gui cache of paid dividends. - cs.cfg.SignalCache(DividendsPaid) - // Check if the parent of the connected block is an accepted work // of the pool. parentHeight := header.Height - 1 diff --git a/pool/chainstate_test.go b/pool/chainstate_test.go index 75cf76d1..101431c6 100644 --- a/pool/chainstate_test.go +++ b/pool/chainstate_test.go @@ -44,9 +44,7 @@ func testChainState(t *testing.T) { t.Fatalf("unexpected serialization error: %v", err) } - payDividends := func(context.Context, uint32, bool) error { - return nil - } + processPayments := func(*paymentMsg) {} generatePayments := func(uint32, *PaymentSource, dcrutil.Amount, int64) error { return nil } @@ -78,7 +76,7 @@ func testChainState(t *testing.T) { cCfg := &ChainStateConfig{ db: db, SoloPool: false, - PayDividends: payDividends, + ProcessPayments: processPayments, GeneratePayments: generatePayments, GetBlock: getBlock, GetBlockConfirmations: getBlockConfirmations, @@ -339,7 +337,6 @@ func testChainState(t *testing.T) { } cs.discCh <- discConfMsg <-discConfMsg.Done - cs.cfg.PayDividends = payDividends // Ensure the last work height can be updated. initialLastWorkHeight := cs.fetchLastWorkHeight() diff --git a/pool/hub.go b/pool/hub.go index 41a7a633..af64179e 100644 --- a/pool/hub.go +++ b/pool/hub.go @@ -100,6 +100,7 @@ var ( type WalletConnection interface { SignTransaction(context.Context, *walletrpc.SignTransactionRequest, ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) PublishTransaction(context.Context, *walletrpc.PublishTransactionRequest, ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) + Rescan(ctx context.Context, in *walletrpc.RescanRequest, opts ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) } // NodeConnection defines the functionality needed by a mining node @@ -257,6 +258,8 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { FetchTxCreator: func() TxCreator { return h.nodeConn }, FetchTxBroadcaster: func() TxBroadcaster { return h.walletConn }, CoinbaseConfTimeout: h.cfg.CoinbaseConfTimeout, + SignalCache: h.SignalCache, + HubWg: h.wg, } var err error @@ -268,7 +271,7 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { sCfg := &ChainStateConfig{ db: h.cfg.DB, SoloPool: h.cfg.SoloPool, - PayDividends: h.paymentMgr.payDividends, + ProcessPayments: h.paymentMgr.processPayments, GeneratePayments: h.paymentMgr.generatePayments, GetBlock: h.getBlock, GetBlockConfirmations: h.getBlockConfirmations, @@ -442,10 +445,13 @@ func (h *Hub) getWork(ctx context.Context) (string, string, error) { // getTxConfNotifications streams transaction confirmation notifications for // the provided transaction hashes. -func (h *Hub) getTxConfNotifications(txHashes []*chainhash.Hash, stopAfter int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { +func (h *Hub) getTxConfNotifications(txHashes []chainhash.Hash, stopAfter int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { hashes := make([][]byte, 0, len(txHashes)) + log.Tracef("Requesting tx conf notifications for %d "+ + "transactions (stop after #%d)", len(txHashes), stopAfter) for _, hash := range txHashes { - hashes = append(hashes, hash[:]) + hashes = append(hashes, hash.CloneBytes()) + log.Tracef(" %s", hash) } req := &walletrpc.ConfirmationNotificationsRequest{ @@ -641,9 +647,10 @@ func (h *Hub) shutdown() { // Run handles the process lifecycles of the pool hub. func (h *Hub) Run(ctx context.Context) { - h.wg.Add(2) + h.wg.Add(3) go h.endpoint.run(ctx) go h.chainState.handleChainUpdates(ctx) + go h.paymentMgr.handlePayments(ctx) // Wait until all hub processes have terminated, and then shutdown. h.wg.Wait() diff --git a/pool/hub_test.go b/pool/hub_test.go index c3b46bad..94db148a 100644 --- a/pool/hub_test.go +++ b/pool/hub_test.go @@ -23,8 +23,43 @@ import ( chainjson "github.com/decred/dcrd/rpc/jsonrpc/types/v2" "github.com/decred/dcrd/wire" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) +type tRescanClient struct { + resp *walletrpc.RescanResponse + err error + grpc.ClientStream +} + +func (r *tRescanClient) Recv() (*walletrpc.RescanResponse, error) { + return r.resp, r.err +} + +func (r *tRescanClient) Header() (metadata.MD, error) { + return nil, nil +} + +func (r *tRescanClient) Trailer() metadata.MD { + return nil +} + +func (r *tRescanClient) CloseSend() error { + return nil +} + +func (r *tRescanClient) Context() context.Context { + return nil +} + +func (r *tRescanClient) SendMsg(m interface{}) error { + return nil +} + +func (r *tRescanClient) RecvMsg(m interface{}) error { + return nil +} + type tWalletConnection struct { } @@ -109,6 +144,15 @@ func (t *tWalletConnection) PublishTransaction(context.Context, *walletrpc.Publi }, nil } +func (t *tWalletConnection) Rescan(context.Context, *walletrpc.RescanRequest, ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + client := new(tRescanClient) + client.resp = &walletrpc.RescanResponse{ + RescannedThrough: 10000, + } + + return client, nil +} + type tNodeConnection struct{} func (t *tNodeConnection) CreateRawTransaction(context.Context, []chainjson.TransactionInput, map[dcrutil.Address]dcrutil.Amount, *int64, *int64) (*wire.MsgTx, error) { diff --git a/pool/paymentmgr.go b/pool/paymentmgr.go index 3a97ac13..c11439a7 100644 --- a/pool/paymentmgr.go +++ b/pool/paymentmgr.go @@ -10,6 +10,8 @@ import ( "fmt" "math/big" "math/rand" + "sync" + "sync/atomic" "time" "decred.org/dcrwallet/rpc/walletrpc" @@ -36,6 +38,13 @@ const ( // output value of a transaction is allowed to be short of the // provided input due to rounding errors. maxRoundingDiff = dcrutil.Amount(500) + + // maxTxConfThreshold is the total number of coinbase confirmation + // failures before a wallet rescan is requested. + maxTxConfThreshold = uint32(3) + + // paymentBufferSize is the size of the buffer on the payment channel. + paymentBufferSize = uint32(30) ) // TxCreator defines the functionality needed by a transaction creator for the @@ -57,6 +66,8 @@ type TxBroadcaster interface { SignTransaction(context.Context, *walletrpc.SignTransactionRequest, ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) // PublishTransaction broadcasts the transaction unto the network. PublishTransaction(context.Context, *walletrpc.PublishTransactionRequest, ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) + // Rescan requests a wallet utxo rescan. + Rescan(ctx context.Context, in *walletrpc.RescanRequest, opts ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) } // confNotifMsg represents a tx confirmation notification message. @@ -65,6 +76,12 @@ type confNotifMsg struct { err error } +// rescanMsg represents a rescan response. +type rescanMsg struct { + resp *walletrpc.RescanResponse + err error +} + // PaymentMgrConfig contains all of the configuration values which should be // provided when creating a new instance of PaymentMgr. type PaymentMgrConfig struct { @@ -92,7 +109,7 @@ type PaymentMgrConfig struct { GetBlockConfirmations func(context.Context, *chainhash.Hash) (int64, error) // GetTxConfNotifications streams transaction confirmation notifications on // the provided hashes. - GetTxConfNotifications func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) + GetTxConfNotifications func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) // FetchTxCreator returns a transaction creator that allows coinbase lookups // and payment transaction creation. FetchTxCreator func() TxCreator @@ -102,18 +119,39 @@ type PaymentMgrConfig struct { // CoinbaseConfTimeout is the duration to wait for coinbase confirmations // when generating a payout transaction. CoinbaseConfTimeout time.Duration + // Cancel represents the pool's context cancellation function. + Cancel context.CancelFunc + // SignalCache sends the provided cache update event to the gui cache. + SignalCache func(event CacheUpdateEvent) + // HubWg represents the hub's waitgroup. + HubWg *sync.WaitGroup +} + +// paymentMsg represents a payment processing signal. +type paymentMsg struct { + CurrentHeight uint32 + TreasuryActive bool + Done chan bool } // PaymentMgr handles generating shares and paying out dividends to // participating accounts. type PaymentMgr struct { - cfg *PaymentMgrConfig + failedTxConfs uint32 // update atomically. + txConfHashes map[chainhash.Hash]uint32 + + processing bool + paymentCh chan *paymentMsg + cfg *PaymentMgrConfig + mtx sync.Mutex } // NewPaymentMgr creates a new payment manager. func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) { pm := &PaymentMgr{ - cfg: pCfg, + cfg: pCfg, + txConfHashes: make(map[chainhash.Hash]uint32), + paymentCh: make(chan *paymentMsg, paymentBufferSize), } rand.Seed(time.Now().UnixNano()) @@ -148,6 +186,21 @@ func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) { return pm, nil } +// isProcessing returns whether the payment manager is in the process of +// paying out dividends. +func (pm *PaymentMgr) isProcessing() bool { + pm.mtx.Lock() + defer pm.mtx.Unlock() + return pm.processing +} + +// setProcessing sets the processing flag to the provided boolean. +func (pm *PaymentMgr) setProcessing(status bool) { + pm.mtx.Lock() + pm.processing = status + pm.mtx.Unlock() +} + // sharePercentages calculates the percentages due each participating account // according to their weighted shares. func (pm *PaymentMgr) sharePercentages(shares []*Share) (map[string]*big.Rat, error) { @@ -448,12 +501,26 @@ func fetchTxConfNotifications(ctx context.Context, notifSource func() (*walletrp resp: resp, err: err, } + + if resp != nil { + log.Tracef("Got notification with %d confirmation(s)", + len(resp.Confirmations)) + for _, conf := range resp.Confirmations { + txHash, err := chainhash.NewHash(conf.TxHash) + if err != nil { + log.Tracef("Could not decode txhash: %b", conf.TxHash) + } else { + log.Tracef(" tx=%s, confs=%d, height=%d", + txHash, conf.Confirmations, conf.BlockHeight) + } + } + } }(notifCh) select { case <-ctx.Done(): - log.Tracef("%s: unable to fetch tx confirmation notifications", - funcName) + log.Tracef("%s: context cancelled fetching tx confirmation "+ + "notifications", funcName) return nil, errs.ContextCancelled case notif := <-notifCh: close(notifCh) @@ -470,11 +537,11 @@ func fetchTxConfNotifications(ctx context.Context, notifSource func() (*walletrp // transaction hashes are spendable by the expected maximum spendable height. // // The context passed to this function must have a corresponding -// cancellation to allow for a clean shutdown process -func (pm *PaymentMgr) confirmCoinbases(ctx context.Context, txHashes map[string]*chainhash.Hash, spendableHeight uint32) error { +// cancellation to allow for a clean shutdown process. +func (pm *PaymentMgr) confirmCoinbases(ctx context.Context, txHashes map[chainhash.Hash]uint32, spendableHeight uint32) error { funcName := "confirmCoinbases" - hashes := make([]*chainhash.Hash, 0, len(txHashes)) - for _, hash := range txHashes { + hashes := make([]chainhash.Hash, 0, len(txHashes)) + for hash := range txHashes { hashes = append(hashes, hash) } @@ -510,20 +577,78 @@ func (pm *PaymentMgr) confirmCoinbases(ctx context.Context, txHashes map[string] // Remove spendable coinbase from the tx hash set. All // coinbases are spendable when the tx hash set is empty. - delete(txHashes, hash.String()) + delete(txHashes, *hash) } } + log.Tracef("%d coinbase(s) are not confirmed:", len(txHashes)) + for coinbase := range txHashes { + log.Tracef(" %v", coinbase) + } + if len(txHashes) == 0 { return nil } } } +// fetchRecanReponse is a helper function for fetching rescan response. +// It will return when either a response or error is received from the +// provided rescan source, or when the provided context is cancelled. +func fetchRescanResponse(ctx context.Context, rescanSource func() (*walletrpc.RescanResponse, error)) (*walletrpc.RescanResponse, error) { + funcName := "fetchRescanResponse" + respCh := make(chan *rescanMsg) + go func(ch chan *rescanMsg) { + resp, err := rescanSource() + ch <- &rescanMsg{ + resp: resp, + err: err, + } + }(respCh) + + select { + case <-ctx.Done(): + log.Tracef("%s: context cancelled fetching rescan response", funcName) + return nil, errs.ContextCancelled + case msg := <-respCh: + close(respCh) + if msg.err != nil { + desc := fmt.Sprintf("%s: unable fetch wallet rescan response, %s", + funcName, msg.err) + return nil, errs.PoolError(errs.Rescan, desc) + } + return msg.resp, nil + } +} + +// monitorRescan ensures the wallet rescans up to the provided block height. +// +// The context passed to this function must have a corresponding +// cancellation to allow for a clean shutdown process. +func (pm *PaymentMgr) monitorRescan(ctx context.Context, rescanSource walletrpc.WalletService_RescanClient, height int32) error { + funcName := "monitorRescan" + for { + resp, err := fetchRescanResponse(ctx, rescanSource.Recv) + if err != nil { + if errors.Is(err, errs.ContextCancelled) { + desc := fmt.Sprintf("%s: cancelled wallet rescan", funcName) + return errs.PoolError(errs.ContextCancelled, desc) + } + return err + } + + // Stop monitoring once the most recent block height has been rescanned. + if resp.RescannedThrough >= height { + log.Infof("wallet rescanned through height #%d", height) + return nil + } + } +} + // generatePayoutTxDetails creates the payout transaction inputs and outputs // from the provided payments func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator, feeAddr dcrutil.Address, payments map[string][]*Payment, treasuryActive bool) ([]chainjson.TransactionInput, - map[string]*chainhash.Hash, map[string]dcrutil.Amount, dcrutil.Amount, error) { + map[chainhash.Hash]uint32, map[string]dcrutil.Amount, dcrutil.Amount, error) { funcName := "generatePayoutTxDetails" // The coinbase output prior to @@ -537,7 +662,7 @@ func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator var tIn, tOut dcrutil.Amount inputs := make([]chainjson.TransactionInput, 0) - inputTxHashes := make(map[string]*chainhash.Hash) + inputTxHashes := make(map[chainhash.Hash]uint32) outputs := make(map[string]dcrutil.Amount) for _, pmtSet := range payments { coinbaseTx := pmtSet[0].Source.Coinbase @@ -571,7 +696,7 @@ func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator Tree: wire.TxTreeRegular, } inputs = append(inputs, in) - inputTxHashes[txHash.String()] = txHash + inputTxHashes[*txHash] = pmtSet[0].Height prevOutV, err := dcrutil.NewAmount(in.Amount) if err != nil { @@ -644,13 +769,76 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA return nil } + if pm.isProcessing() { + log.Info("payment processing already in progress, terminating") + return nil + } + + pm.setProcessing(true) + defer pm.setProcessing(false) + + txB := pm.cfg.FetchTxBroadcaster() + if txB == nil { + desc := fmt.Sprintf("%s: tx broadcaster cannot be nil", funcName) + return errs.PoolError(errs.Disconnected, desc) + } + + pCtx, pCancel := context.WithTimeout(ctx, pm.cfg.CoinbaseConfTimeout) + defer pCancel() + + // Request a wallet rescan if tx confirmation failures are + // at threshold. + txConfCount := atomic.LoadUint32(&pm.failedTxConfs) + if txConfCount >= maxTxConfThreshold { + beginHeight := uint32(0) + + // Having no tx conf hashes at threshold indicates an + // underlying error. + pm.mtx.Lock() + if len(pm.txConfHashes) == 0 { + pm.mtx.Unlock() + desc := fmt.Sprintf("%s: no tx conf hashes to rescan for", + funcName) + return errs.PoolError(errs.TxConf, desc) + } + + // Find the lowest height to start the rescan from. + for _, height := range pm.txConfHashes { + if beginHeight == 0 { + beginHeight = height + } + + if beginHeight > height { + beginHeight = height + } + } + pm.mtx.Unlock() + + // Start the rescan a block height below the lowest reported block + // having inaccurate confirmation information. + log.Infof("wallet rescanning from height #%d", beginHeight-1) + rescanReq := &walletrpc.RescanRequest{ + BeginHeight: int32(beginHeight - 1), + } + rescanSource, err := txB.Rescan(pCtx, rescanReq) + if err != nil { + desc := fmt.Sprintf("%s: rescan source cannot be nil", funcName) + return errs.PoolError(errs.Rescan, desc) + } + + err = pm.monitorRescan(pCtx, rescanSource, int32(height)) + if err != nil { + return err + } + } + txC := pm.cfg.FetchTxCreator() if txC == nil { desc := fmt.Sprintf("%s: tx creator cannot be nil", funcName) return errs.PoolError(errs.Disconnected, desc) } - // remove all matured orphaned payments. Since the associated blocks + // Remove all matured orphaned payments. Since the associated blocks // to these payments are not part of the main chain they will not be // paid out. pmts, err := pm.pruneOrphanedPayments(ctx, mPmts) @@ -696,14 +884,26 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA maxSpendableHeight = spendableHeight } } - if maxSpendableHeight < height { - maxSpendableHeight = height + + var stopAfter uint32 + switch { + case maxSpendableHeight > height: + stopAfter = maxSpendableHeight - height + default: + stopAfter = 1 } - tCtx, tCancel := context.WithTimeout(ctx, pm.cfg.CoinbaseConfTimeout) - defer tCancel() - err = pm.confirmCoinbases(tCtx, inputTxHashes, maxSpendableHeight) + err = pm.confirmCoinbases(pCtx, inputTxHashes, stopAfter) if err != nil { + atomic.AddUint32(&pm.failedTxConfs, 1) + + // Track the transactions with inaccurate confirmation data. + pm.mtx.Lock() + for k, v := range inputTxHashes { + pm.txConfHashes[k] = v + } + pm.mtx.Unlock() + // Do not error if coinbase spendable confirmation requests are // terminated by the context cancellation. if !errors.Is(err, errs.ContextCancelled) { @@ -725,11 +925,6 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA return err } - txB := pm.cfg.FetchTxBroadcaster() - if txB == nil { - desc := fmt.Sprintf("%s: tx broadcaster cannot be nil", funcName) - return errs.PoolError(errs.Disconnected, desc) - } signTxReq := &walletrpc.SignTransactionRequest{ SerializedTransaction: txBytes, Passphrase: []byte(pm.cfg.WalletPass), @@ -788,5 +983,44 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA return err } + // Reset the failed tx conf counter and clear the hashes. + atomic.StoreUint32(&pm.failedTxConfs, 0) + + pm.mtx.Lock() + pm.txConfHashes = make(map[chainhash.Hash]uint32) + pm.mtx.Unlock() + return nil } + +// processPayments relays payment signals for processing. +func (pm *PaymentMgr) processPayments(msg *paymentMsg) { + pm.paymentCh <- msg +} + +// handlePayments processes dividend payment signals. +func (pm *PaymentMgr) handlePayments(ctx context.Context) { + for { + select { + case <-ctx.Done(): + pm.cfg.HubWg.Done() + return + + case msg := <-pm.paymentCh: + if !pm.cfg.SoloPool { + err := pm.payDividends(ctx, msg.CurrentHeight, msg.TreasuryActive) + if err != nil { + log.Errorf("unable to process payments: %v", err) + close(msg.Done) + pm.cfg.Cancel() + continue + } + + // Signal the gui cache of paid dividends. + pm.cfg.SignalCache(DividendsPaid) + + close(msg.Done) + } + } + } +} diff --git a/pool/paymentmgr_test.go b/pool/paymentmgr_test.go index 9916f827..d1e9a7e7 100644 --- a/pool/paymentmgr_test.go +++ b/pool/paymentmgr_test.go @@ -11,6 +11,8 @@ import ( "errors" "fmt" "math/big" + "sync" + "sync/atomic" "testing" "time" @@ -59,6 +61,7 @@ func (txC *txCreatorImpl) CreateRawTransaction(ctx context.Context, inputs []cha type txBroadcasterImpl struct { signTransaction func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) publishTransaction func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) + rescan func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) } // SignTransaction signs transaction inputs, unlocking them for use. @@ -71,6 +74,11 @@ func (txB *txBroadcasterImpl) PublishTransaction(ctx context.Context, req *walle return txB.publishTransaction(ctx, req, options...) } +// Rescan begins a rescan of all transactions related to the wallet. +func (txB *txBroadcasterImpl) Rescan(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return txB.rescan(ctx, req, options...) +} + func TestSharePercentages(t *testing.T) { mgr := PaymentMgr{} @@ -143,7 +151,7 @@ func TestSharePercentages(t *testing.T) { } } -func createPaymentMgr(paymentMethod string) (*PaymentMgr, error) { +func createPaymentMgr(t *testing.T, paymentMethod string) (*PaymentMgr, context.Context, context.CancelFunc) { activeNet := chaincfg.SimNetParams() getBlockConfirmations := func(context.Context, *chainhash.Hash) (int64, error) { @@ -158,6 +166,11 @@ func createPaymentMgr(paymentMethod string) (*PaymentMgr, error) { return nil } + signalCache := func(CacheUpdateEvent) { + // Do nothing. + } + + ctx, cancel := context.WithCancel(context.Background()) pCfg := &PaymentMgrConfig{ db: db, ActiveNet: activeNet, @@ -169,16 +182,22 @@ func createPaymentMgr(paymentMethod string) (*PaymentMgr, error) { FetchTxCreator: fetchTxCreator, FetchTxBroadcaster: fetchTxBroadcaster, PoolFeeAddrs: []dcrutil.Address{poolFeeAddrs}, + Cancel: cancel, + SignalCache: signalCache, + HubWg: new(sync.WaitGroup), } - return NewPaymentMgr(pCfg) -} -func testPaymentMgrPPS(t *testing.T) { - mgr, err := createPaymentMgr(PPS) + mgr, err := NewPaymentMgr(pCfg) if err != nil { t.Fatalf("[createPaymentMgr] unexpected error: %v", err) } + return mgr, ctx, cancel +} + +func testPaymentMgrPPS(t *testing.T) { + mgr, _, _ := createPaymentMgr(t, PPS) + // Ensure Pay-Per-Share (PPS) works as expected. now := time.Now() sixtyBefore := now.Add(-(time.Second * 60)).UnixNano() @@ -271,10 +290,7 @@ func testPaymentMgrPPS(t *testing.T) { } func testPaymentMgrPPLNS(t *testing.T) { - mgr, err := createPaymentMgr(PPLNS) - if err != nil { - t.Fatalf("[createPaymentMgr] unexpected error: %v", err) - } + mgr, _, _ := createPaymentMgr(t, PPLNS) // Ensure Pay-Per-Last-N-Shares (PPLNS) works as expected. now := time.Now() @@ -369,10 +385,7 @@ func testPaymentMgrPPLNS(t *testing.T) { } func testPaymentMgrMaturity(t *testing.T) { - mgr, err := createPaymentMgr(PPLNS) - if err != nil { - t.Fatalf("[createPaymentMgr] unexpected error: %v", err) - } + mgr, _, _ := createPaymentMgr(t, PPLNS) now := time.Now() shareCount := 3 @@ -385,7 +398,7 @@ func testPaymentMgrMaturity(t *testing.T) { // Ensure payment maturity works as expected. for i := 0; i < shareCount; i++ { // Create readily available shares for account X. - err = persistShare(db, xID, weight, thirtyBefore+int64(i)) + err := persistShare(db, xID, weight, thirtyBefore+int64(i)) if err != nil { t.Fatal(err) } @@ -393,7 +406,7 @@ func testPaymentMgrMaturity(t *testing.T) { sixtyAfter := time.Now().Add((time.Second * 60)).UnixNano() for i := 0; i < shareCount; i++ { // Create future shares for account Y. - err = persistShare(db, yID, weight, sixtyAfter+int64(i)) + err := persistShare(db, yID, weight, sixtyAfter+int64(i)) if err != nil { t.Fatal(err) } @@ -465,11 +478,7 @@ func testPaymentMgrPayment(t *testing.T) { t.Fatalf("failed to insert account: %v", err) } - mgr, err := createPaymentMgr(PPS) - if err != nil { - t.Fatalf("[createPaymentMgr] unexpected error: %v", err) - } - + mgr, _, _ := createPaymentMgr(t, PPS) height := uint32(20) // pruneOrphanedPayments tests. @@ -598,16 +607,16 @@ func testPaymentMgrPayment(t *testing.T) { } // confirmCoinbases tests. - txHashes := make(map[string]*chainhash.Hash) + txHashes := make(map[chainhash.Hash]uint32) hashA := chainhash.Hash{'a'} - txHashes[hashA.String()] = &hashA + txHashes[hashA] = height hashB := chainhash.Hash{'b'} - txHashes[hashB.String()] = &hashB + txHashes[hashB] = height hashC := chainhash.Hash{'c'} - txHashes[hashC.String()] = &hashC + txHashes[hashC] = height spendableHeight := uint32(10) - mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + mgr.cfg.GetTxConfNotifications = func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { return nil, fmt.Errorf("unable to fetch tx conf notification source") } @@ -619,7 +628,7 @@ func testPaymentMgrPayment(t *testing.T) { t.Fatalf("expected tx conf notification source error") } - mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + mgr.cfg.GetTxConfNotifications = func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { return func() (*walletrpc.ConfirmationNotificationsResponse, error) { return &walletrpc.ConfirmationNotificationsResponse{}, nil }, nil @@ -639,7 +648,7 @@ func testPaymentMgrPayment(t *testing.T) { // The context here needs to be recreated after the previous test. ctx, cancel = context.WithCancel(context.Background()) - mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + mgr.cfg.GetTxConfNotifications = func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { return func() (*walletrpc.ConfirmationNotificationsResponse, error) { return nil, fmt.Errorf("unable to confirm transactions") }, nil @@ -676,7 +685,7 @@ func testPaymentMgrPayment(t *testing.T) { } txConfs = append(txConfs, &confC) - mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + mgr.cfg.GetTxConfNotifications = func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { return func() (*walletrpc.ConfirmationNotificationsResponse, error) { return &walletrpc.ConfirmationNotificationsResponse{ Confirmations: txConfs, @@ -834,7 +843,7 @@ func testPaymentMgrPayment(t *testing.T) { t.Fatalf("expected %d inputs, got %d", expectedInputs, len(inputs)) } - for _, hash := range inputTxHashes { + for hash := range inputTxHashes { txHash := hash.String() var match bool for _, in := range inputs { @@ -931,6 +940,13 @@ func testPaymentMgrPayment(t *testing.T) { mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { return 16, nil } + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return nil, fmt.Errorf("unable to sign transaction") + }, + } + } err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) if !errors.Is(err, errs.TxOut) { @@ -968,7 +984,7 @@ func testPaymentMgrPayment(t *testing.T) { mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { return int64(estMaturity) + 1, nil } - mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + mgr.cfg.GetTxConfNotifications = func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { return nil, fmt.Errorf("unable to fetch tx conf notification source") } @@ -1000,23 +1016,23 @@ func testPaymentMgrPayment(t *testing.T) { } txConfs = make([]*walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations, 0) - confA = walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ + confD := walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ TxHash: zeroHash[:], Confirmations: 50, BlockHash: []byte(zeroSource.BlockHash), BlockHeight: 60, } - txConfs = append(txConfs, &confA) - confB = walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ + txConfs = append(txConfs, &confD) + confE := walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ TxHash: randHash[:], Confirmations: 50, BlockHash: []byte(zeroSource.BlockHash), BlockHeight: 60, } - txConfs = append(txConfs, &confB) + txConfs = append(txConfs, &confE) mgr.cfg.CoinbaseConfTimeout = time.Millisecond * 500 - mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + mgr.cfg.GetTxConfNotifications = func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { return func() (*walletrpc.ConfirmationNotificationsResponse, error) { return &walletrpc.ConfirmationNotificationsResponse{ Confirmations: txConfs, @@ -1054,7 +1070,7 @@ func testPaymentMgrPayment(t *testing.T) { return nil } mgr.cfg.WalletPass = "123" - mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + mgr.cfg.GetTxConfNotifications = func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { return func() (*walletrpc.ConfirmationNotificationsResponse, error) { return &walletrpc.ConfirmationNotificationsResponse{ Confirmations: txConfs, @@ -1128,6 +1144,104 @@ func testPaymentMgrPayment(t *testing.T) { t.Fatalf("expected a publish error, got %v", err) } + // Ensure dividend payment returns an error if fetching the wallet client + // fails. + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return &walletrpc.SignTransactionResponse{ + Transaction: txBytes, + }, nil + }, + publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return nil, fmt.Errorf("unable to publish transaction") + }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return nil, fmt.Errorf("unable to create rescan client") + }, + } + } + + atomic.StoreUint32(&mgr.failedTxConfs, maxTxConfThreshold) + + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) + if !errors.Is(err, errs.Rescan) { + cancel() + t.Fatalf("expected a rescan error, got %v", err) + } + + // Ensure dividend payment returns an error if fetching rescan responses fail. + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return &walletrpc.SignTransactionResponse{ + Transaction: txBytes, + }, nil + }, + publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return nil, fmt.Errorf("unable to publish transaction") + }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return &tRescanClient{ + err: fmt.Errorf("internal error"), + }, nil + }, + } + } + + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) + if !errors.Is(err, errs.Rescan) { + cancel() + t.Fatalf("expected a rescan error, got %v", err) + } + + // Clear out the tx confirmation hashes to be rescanned for. + var confHashes map[chainhash.Hash]uint32 + mgr.mtx.Lock() + confHashes = mgr.txConfHashes + mgr.txConfHashes = make(map[chainhash.Hash]uint32) + mgr.mtx.Unlock() + + // Ensure dividend payment returns an error when there are no tx + // confirmation hashes to rescan. + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) + if !errors.Is(err, errs.TxConf) { + cancel() + t.Fatalf("expected a no tx conf error, got %v", err) + } + + // Restore the tx confirmation hashes to be rescanned for. + mgr.mtx.Lock() + mgr.txConfHashes = confHashes + mgr.mtx.Unlock() + + // Ensure wallet rescan succeeds when it scans through the current height. + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return &walletrpc.SignTransactionResponse{ + Transaction: txBytes, + }, nil + }, + publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return nil, fmt.Errorf("unable to publish transaction") + }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return &tRescanClient{ + resp: &walletrpc.RescanResponse{ + RescannedThrough: 30, + }, + }, nil + }, + } + } + + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) + if !errors.Is(err, errs.PublishTx) { + cancel() + t.Fatalf("expected a publish error, got %v", err) + } + // Ensure paying dividend payment succeeds with valid inputs. txHash, _ := hex.DecodeString("013264da8cc53f70022dc2b5654ebefc9ecfed24ea18dfcfc9adca5642d4fe66") mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { @@ -1142,6 +1256,13 @@ func testPaymentMgrPayment(t *testing.T) { TransactionHash: txHash, }, nil }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return &tRescanClient{ + resp: &walletrpc.RescanResponse{ + RescannedThrough: 30, + }, + }, nil + }, } } @@ -1151,6 +1272,13 @@ func testPaymentMgrPayment(t *testing.T) { t.Fatalf("unexpected dividend payment error, got %v", err) } + // Ensure the tx confirmation failure count reset to zero on a sucessful + // dividend payment. + txConfCount := atomic.LoadUint32(&mgr.failedTxConfs) + if txConfCount != 0 { + t.Fatalf("expected tx conf failure count to be %d, got %d", 0, txConfCount) + } + cancel() // Reset backed up values to their defaults. @@ -1165,11 +1293,7 @@ func testPaymentMgrPayment(t *testing.T) { } func testPaymentMgrDust(t *testing.T) { - mgr, err := createPaymentMgr(PPLNS) - if err != nil { - t.Fatalf("[createPaymentMgr] unexpected error: %v", err) - } - + mgr, _, _ := createPaymentMgr(t, PPLNS) height := uint32(20) // Ensure dust payments are forfeited by their originating accounts and @@ -1181,7 +1305,7 @@ func testPaymentMgrDust(t *testing.T) { yWeight := new(big.Rat).Mul(weight, new(big.Rat).SetInt64(int64(mul))) // Create shares for account x and y. - err = persistShare(db, xID, weight, now.UnixNano()) + err := persistShare(db, xID, weight, now.UnixNano()) if err != nil { t.Fatal(err) } @@ -1244,3 +1368,179 @@ func testPaymentMgrDust(t *testing.T) { "than the initial (%v)", ft, expectedFeeAmt) } } + +func testPaymentMgrSignals(t *testing.T) { + // Insert some test accounts. + accountX := NewAccount(xAddr) + err := db.persistAccount(accountX) + if err != nil { + t.Fatalf("failed to insert account: %v", err) + } + + accountY := NewAccount(yAddr) + err = db.persistAccount(accountY) + if err != nil { + t.Fatalf("failed to insert account: %v", err) + } + + mgr, ctx, cancel := createPaymentMgr(t, PPLNS) + var randBytes [chainhash.HashSize + 1]byte + _, err = rand.Read(randBytes[:]) + if err != nil { + t.Fatalf("unable to generate random bytes: %v", err) + } + + randHash := chainhash.HashH(randBytes[:]) + randSource := &PaymentSource{ + BlockHash: randHash.String(), + Coinbase: randHash.String(), + } + + height := uint32(10) + estMaturity := uint32(26) + amt, _ := dcrutil.NewAmount(5) + + pmtX := NewPayment(xID, zeroSource, amt, height, estMaturity) + err = db.PersistPayment(pmtX) + if err != nil { + cancel() + t.Fatal(err) + } + + pmtY := NewPayment(yID, randSource, amt, height, estMaturity) + err = db.PersistPayment(pmtY) + if err != nil { + cancel() + t.Fatal(err) + } + + txHashes := make(map[string]*chainhash.Hash) + hashA := chainhash.Hash{'a'} + txHashes[hashA.String()] = &hashA + hashB := chainhash.Hash{'b'} + txHashes[hashB.String()] = &hashB + hashC := chainhash.Hash{'c'} + txHashes[hashC.String()] = &hashC + + mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { + return int64(estMaturity) + 1, nil + } + + txConfs := make([]*walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations, 0) + confA := walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ + TxHash: zeroHash[:], + Confirmations: 50, + BlockHash: []byte(zeroSource.BlockHash), + BlockHeight: 60, + } + txConfs = append(txConfs, &confA) + confB := walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ + TxHash: randHash[:], + Confirmations: 50, + BlockHash: []byte(zeroSource.BlockHash), + BlockHeight: 60, + } + txConfs = append(txConfs, &confB) + + mgr.cfg.CoinbaseConfTimeout = time.Millisecond * 500 + mgr.cfg.GetTxConfNotifications = func([]chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + return func() (*walletrpc.ConfirmationNotificationsResponse, error) { + return &walletrpc.ConfirmationNotificationsResponse{ + Confirmations: txConfs, + }, nil + }, nil + } + + txBytes := []byte("01000000018e17619f0d627c2769ee3f957582691aea59c2" + + "e79cc45b8ba1f08485dd88d75c0300000001ffffffff017a64e43703000000" + + "00001976a914978fa305bd66f63f0de847338bb56ff65fa8e27288ac000000" + + "000000000001f46ce43703000000846c0700030000006b483045022100d668" + + "5812801db991b72e80863eba7058466dfebb4aba0af75ab47bade177325102" + + "205f466fc47435c1a177482e527ff0e76f3c2c613940b358e57f0f0d78d5f2" + + "ffcb012102d040a4c34ae65a2b87ea8e9df7413e6504e5f27c6bde019a78ee" + + "96145b27c517") + txHash, _ := hex.DecodeString("013264da8cc53f70022dc2b5654ebefc9ecfed24ea18dfcfc9adca5642d4fe66") + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return &walletrpc.SignTransactionResponse{ + Transaction: txBytes, + }, nil + }, + publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return &walletrpc.PublishTransactionResponse{ + TransactionHash: txHash, + }, nil + }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return &tRescanClient{ + resp: &walletrpc.RescanResponse{ + RescannedThrough: 30, + }, + }, nil + }, + } + } + + mgr.cfg.FetchTxCreator = func() TxCreator { + return &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return &chainjson.GetTxOutResult{ + BestBlock: chainhash.Hash{0}.String(), + Confirmations: int64(estMaturity) + 1, + Value: 5, + Coinbase: true, + }, nil + }, + createRawTransaction: func(ctx context.Context, inputs []chainjson.TransactionInput, amounts map[dcrutil.Address]dcrutil.Amount, lockTime *int64, expiry *int64) (*wire.MsgTx, error) { + return &wire.MsgTx{}, nil + }, + } + } + mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { + return int64(estMaturity) + 1, nil + } + + // Ensure the payment lifecycle process receives the payment signal and + // processes mature payments. + msgA := paymentMsg{ + CurrentHeight: estMaturity + 1, + TreasuryActive: false, + Done: make(chan bool), + } + + mgr.cfg.HubWg.Add(1) + go mgr.handlePayments(ctx) + + mgr.processPayments(&msgA) + <-msgA.Done + + // Esure the payment lifecycle process cancels the context when an + // error is encountered. + mgr.cfg.FetchTxCreator = func() TxCreator { + return &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return &chainjson.GetTxOutResult{ + BestBlock: chainhash.Hash{0}.String(), + Confirmations: int64(estMaturity) + 1, + Value: 5, + Coinbase: true, + }, nil + }, + createRawTransaction: func(ctx context.Context, inputs []chainjson.TransactionInput, amounts map[dcrutil.Address]dcrutil.Amount, lockTime *int64, expiry *int64) (*wire.MsgTx, error) { + return nil, fmt.Errorf("unable to create raw transactions") + }, + } + } + + msgB := paymentMsg{ + CurrentHeight: estMaturity + 1, + TreasuryActive: false, + Done: make(chan bool), + } + mgr.processPayments(&msgB) + <-msgB.Done + + cancel() + mgr.cfg.HubWg.Wait() +} diff --git a/pool/pool_test.go b/pool/pool_test.go index e3e4a3b9..265dd242 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -103,6 +103,7 @@ func TestPool(t *testing.T) { "testPaymentMgrMaturity": testPaymentMgrMaturity, "testPaymentMgrPayment": testPaymentMgrPayment, "testPaymentMgrDust": testPaymentMgrDust, + "testPaymentSignals": testPaymentMgrSignals, "testChainState": testChainState, "testHub": testHub, }