Skip to content

Commit

Permalink
multi: fix chain notif. blocking by pmt processor.
Browse files Browse the repository at this point in the history
This fixes chain notification messages being
blocked by payment processing calls when
tx confirmations are not accurately
being reported.

The fix includes:
- ensuring a payment processing call concludes
before starting another one via the processing flag.

- calling the payment process in a goroutine to
avoid blocking the chainstate process.

- Keeping track of tx confirmation failures and
starting a wallet rescan if the failures exceed
threshold.

Associated tests have been updated.
  • Loading branch information
dnldd committed Apr 25, 2021
1 parent 3260b64 commit d86e97e
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 26 deletions.
3 changes: 3 additions & 0 deletions errors/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ const (

// CreateAmount indicates an amount creation error.
CreateAmount = ErrorKind("CreateAmount")

// Rescan indicates an wallet rescan error.
Rescan = ErrorKind("Rescan")
)

// Error satisfies the error interface and prints human-readable errors.
Expand Down
1 change: 1 addition & 0 deletions errors/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestErrorKindStringer(t *testing.T) {
{TxIn, "TxIn"},
{ContextCancelled, "ContextCancelled"},
{CreateAmount, "CreateAmount"},
{Rescan, "Rescan"},
}

for i, test := range tests {
Expand Down
24 changes: 12 additions & 12 deletions pool/chainstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,18 +286,6 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
coinbaseTx := block.Transactions[0]
treasuryActive := isTreasuryActive(coinbaseTx)

// Process mature payments.
err = cs.cfg.PayDividends(ctx, header.Height, treasuryActive)
if err != nil {
log.Errorf("unable to process payments: %v", err)
close(msg.Done)
cs.cfg.Cancel()
continue
}

// Signal the gui cache of paid dividends.
cs.cfg.SignalCache(DividendsPaid)

// Check if the parent of the connected block is an accepted work
// of the pool.
parentHeight := header.Height - 1
Expand Down Expand Up @@ -398,6 +386,18 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
cs.cfg.Cancel()
continue
}

// Process mature payments without blocking.
go func(cfg *ChainStateConfig, height uint32, treasuryActive bool) {
err = cs.cfg.PayDividends(ctx, header.Height, treasuryActive)
if err != nil {
log.Errorf("unable to process payments: %v", err)
cfg.Cancel()
}

// Signal the gui cache of paid dividends.
cfg.SignalCache(DividendsPaid)
}(cs.cfg, header.Height, treasuryActive)
}

close(msg.Done)
Expand Down
1 change: 0 additions & 1 deletion pool/chainstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ func testChainState(t *testing.T) {
}
cs.discCh <- discConfMsg
<-discConfMsg.Done
cs.cfg.PayDividends = payDividends

// Ensure the last work height can be updated.
initialLastWorkHeight := cs.fetchLastWorkHeight()
Expand Down
1 change: 1 addition & 0 deletions pool/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ var (
type WalletConnection interface {
SignTransaction(context.Context, *walletrpc.SignTransactionRequest, ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error)
PublishTransaction(context.Context, *walletrpc.PublishTransactionRequest, ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error)
Rescan(ctx context.Context, in *walletrpc.RescanRequest, opts ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error)
}

// NodeConnection defines the functionality needed by a mining node
Expand Down
44 changes: 44 additions & 0 deletions pool/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,43 @@ import (
chainjson "github.com/decred/dcrd/rpc/jsonrpc/types/v2"
"github.com/decred/dcrd/wire"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type tRescanClient struct {
resp *walletrpc.RescanResponse
err error
grpc.ClientStream
}

func (r *tRescanClient) Recv() (*walletrpc.RescanResponse, error) {
return r.resp, r.err
}

func (r *tRescanClient) Header() (metadata.MD, error) {
return nil, nil
}

func (r *tRescanClient) Trailer() metadata.MD {
return nil
}

func (r *tRescanClient) CloseSend() error {
return nil
}

func (r *tRescanClient) Context() context.Context {
return nil
}

func (r *tRescanClient) SendMsg(m interface{}) error {
return nil
}

func (r *tRescanClient) RecvMsg(m interface{}) error {
return nil
}

type tWalletConnection struct {
}

Expand Down Expand Up @@ -109,6 +144,15 @@ func (t *tWalletConnection) PublishTransaction(context.Context, *walletrpc.Publi
}, nil
}

func (t *tWalletConnection) Rescan(context.Context, *walletrpc.RescanRequest, ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) {
client := new(tRescanClient)
client.resp = &walletrpc.RescanResponse{
RescannedThrough: 10000,
}

return client, nil
}

type tNodeConnection struct{}

func (t *tNodeConnection) CreateRawTransaction(context.Context, []chainjson.TransactionInput, map[dcrutil.Address]dcrutil.Amount, *int64, *int64) (*wire.MsgTx, error) {
Expand Down
147 changes: 134 additions & 13 deletions pool/paymentmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"fmt"
"math/big"
"math/rand"
"sync"
"sync/atomic"
"time"

"decred.org/dcrwallet/rpc/walletrpc"
Expand All @@ -36,6 +38,10 @@ const (
// output value of a transaction is allowed to be short of the
// provided input due to rounding errors.
maxRoundingDiff = dcrutil.Amount(500)

// maxTxConfThreshold is the total number of coinbase confirmation
// failures before a wallet rescan is requested.
maxTxConfThreshold = uint32(3)
)

// TxCreator defines the functionality needed by a transaction creator for the
Expand All @@ -57,6 +63,8 @@ type TxBroadcaster interface {
SignTransaction(context.Context, *walletrpc.SignTransactionRequest, ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error)
// PublishTransaction broadcasts the transaction unto the network.
PublishTransaction(context.Context, *walletrpc.PublishTransactionRequest, ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error)
// Rescan requests a wallet utxo rescan.
Rescan(ctx context.Context, in *walletrpc.RescanRequest, opts ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error)
}

// confNotifMsg represents a tx confirmation notification message.
Expand All @@ -65,6 +73,12 @@ type confNotifMsg struct {
err error
}

// rescanMsg represents a rescan response.
type rescanMsg struct {
resp *walletrpc.RescanResponse
err error
}

// PaymentMgrConfig contains all of the configuration values which should be
// provided when creating a new instance of PaymentMgr.
type PaymentMgrConfig struct {
Expand Down Expand Up @@ -107,7 +121,11 @@ type PaymentMgrConfig struct {
// PaymentMgr handles generating shares and paying out dividends to
// participating accounts.
type PaymentMgr struct {
cfg *PaymentMgrConfig
failedTxConfs uint32 // update atomically.

processing bool
cfg *PaymentMgrConfig
mtx sync.Mutex
}

// NewPaymentMgr creates a new payment manager.
Expand Down Expand Up @@ -148,6 +166,21 @@ func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) {
return pm, nil
}

// isProcessing returns whether the payment manager is in the process of
// paying out dividends.
func (pm *PaymentMgr) isProcessing() bool {
pm.mtx.Lock()
defer pm.mtx.Unlock()
return pm.processing
}

// setProcessing sets the processing flag to the provided boolean.
func (pm *PaymentMgr) setProcessing(status bool) {
pm.mtx.Lock()
pm.processing = status
pm.mtx.Unlock()
}

// sharePercentages calculates the percentages due each participating account
// according to their weighted shares.
func (pm *PaymentMgr) sharePercentages(shares []*Share) (map[string]*big.Rat, error) {
Expand Down Expand Up @@ -452,8 +485,8 @@ func fetchTxConfNotifications(ctx context.Context, notifSource func() (*walletrp

select {
case <-ctx.Done():
log.Tracef("%s: unable to fetch tx confirmation notifications",
funcName)
log.Tracef("%s: context cancelled fetching tx confirmation "+
"notifications", funcName)
return nil, errs.ContextCancelled
case notif := <-notifCh:
close(notifCh)
Expand All @@ -470,7 +503,7 @@ func fetchTxConfNotifications(ctx context.Context, notifSource func() (*walletrp
// transaction hashes are spendable by the expected maximum spendable height.
//
// The context passed to this function must have a corresponding
// cancellation to allow for a clean shutdown process
// cancellation to allow for a clean shutdown process.
func (pm *PaymentMgr) confirmCoinbases(ctx context.Context, txHashes map[string]*chainhash.Hash, spendableHeight uint32) error {
funcName := "confirmCoinbases"
hashes := make([]*chainhash.Hash, 0, len(txHashes))
Expand Down Expand Up @@ -520,6 +553,59 @@ func (pm *PaymentMgr) confirmCoinbases(ctx context.Context, txHashes map[string]
}
}

// fetchRecanReponse is a helper function for fetching rescan response.
// It will return when either a response or error is received from the
// provided rescan source, or when the provided context is cancelled.
func fetchRescanResponse(ctx context.Context, rescanSource func() (*walletrpc.RescanResponse, error)) (*walletrpc.RescanResponse, error) {
funcName := "fetchRescanResponse"
respCh := make(chan *rescanMsg)
go func(ch chan *rescanMsg) {
resp, err := rescanSource()
ch <- &rescanMsg{
resp: resp,
err: err,
}
}(respCh)

select {
case <-ctx.Done():
log.Tracef("%s: context cancelled fetching rescan response", funcName)
return nil, errs.ContextCancelled
case msg := <-respCh:
close(respCh)
if msg.err != nil {
desc := fmt.Sprintf("%s: unable fetch wallet rescan response, %s",
funcName, msg.err)
return nil, errs.PoolError(errs.Rescan, desc)
}
return msg.resp, nil
}
}

// monitorRescan ensures the wallet rescans up to the provided block height.
//
// The context passed to this function must have a corresponding
// cancellation to allow for a clean shutdown process.
func (pm *PaymentMgr) monitorRescan(ctx context.Context, rescanSource walletrpc.WalletService_RescanClient, height int32) error {
funcName := "monitorRescan"
for {
resp, err := fetchRescanResponse(ctx, rescanSource.Recv)
if err != nil {
if errors.Is(err, errs.ContextCancelled) {
desc := fmt.Sprintf("%s: cancelled wallet rescan", funcName)
return errs.PoolError(errs.ContextCancelled, desc)
}
return err
}

// Stop monitoring once the most recent block height has been rescanned.
if resp.RescannedThrough >= height {
log.Infof("wallet rescanned through height #%d", height)
return nil
}
}
}

// generatePayoutTxDetails creates the payout transaction inputs and outputs
// from the provided payments
func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator, feeAddr dcrutil.Address, payments map[string][]*Payment, treasuryActive bool) ([]chainjson.TransactionInput,
Expand Down Expand Up @@ -644,13 +730,51 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA
return nil
}

if pm.isProcessing() {
log.Info("Waiting for in-progress payment processing to " +
"complete before starting another")
return nil
}

pm.setProcessing(true)
defer pm.setProcessing(false)

txB := pm.cfg.FetchTxBroadcaster()
if txB == nil {
desc := fmt.Sprintf("%s: tx broadcaster cannot be nil", funcName)
return errs.PoolError(errs.Disconnected, desc)
}

// Request a wallet rescan if tx confirmation failures are
// at threshold.
pCtx, pCancel := context.WithTimeout(ctx, pm.cfg.CoinbaseConfTimeout)
defer pCancel()

txConfCount := atomic.LoadUint32(&pm.failedTxConfs)
if txConfCount == maxTxConfThreshold {
beginHeight := int32(height) - (int32(pm.cfg.ActiveNet.CoinbaseMaturity) * 2)
rescanReq := &walletrpc.RescanRequest{
BeginHeight: beginHeight,
}
rescanSource, err := txB.Rescan(pCtx, rescanReq)
if err != nil {
desc := fmt.Sprintf("%s: tx creator cannot be nil", funcName)
return errs.PoolError(errs.Rescan, desc)
}

err = pm.monitorRescan(pCtx, rescanSource, int32(height))
if err != nil {
return err
}
}

txC := pm.cfg.FetchTxCreator()
if txC == nil {
desc := fmt.Sprintf("%s: tx creator cannot be nil", funcName)
return errs.PoolError(errs.Disconnected, desc)
}

// remove all matured orphaned payments. Since the associated blocks
// Remove all matured orphaned payments. Since the associated blocks
// to these payments are not part of the main chain they will not be
// paid out.
pmts, err := pm.pruneOrphanedPayments(ctx, mPmts)
Expand Down Expand Up @@ -700,10 +824,10 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA
maxSpendableHeight = height
}

tCtx, tCancel := context.WithTimeout(ctx, pm.cfg.CoinbaseConfTimeout)
defer tCancel()
err = pm.confirmCoinbases(tCtx, inputTxHashes, maxSpendableHeight)
err = pm.confirmCoinbases(pCtx, inputTxHashes, maxSpendableHeight)
if err != nil {
atomic.AddUint32(&pm.failedTxConfs, 1)

// Do not error if coinbase spendable confirmation requests are
// terminated by the context cancellation.
if !errors.Is(err, errs.ContextCancelled) {
Expand All @@ -725,11 +849,6 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA
return err
}

txB := pm.cfg.FetchTxBroadcaster()
if txB == nil {
desc := fmt.Sprintf("%s: tx broadcaster cannot be nil", funcName)
return errs.PoolError(errs.Disconnected, desc)
}
signTxReq := &walletrpc.SignTransactionRequest{
SerializedTransaction: txBytes,
Passphrase: []byte(pm.cfg.WalletPass),
Expand Down Expand Up @@ -788,5 +907,7 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA
return err
}

atomic.StoreUint32(&pm.failedTxConfs, 0)

return nil
}
Loading

0 comments on commit d86e97e

Please sign in to comment.