Skip to content

Commit

Permalink
pool: avoid blocking chain ntfns on pmt processing.
Browse files Browse the repository at this point in the history
This dedicates a lifecycle process for payment
processing and updates the necessary configs.
Payment processing is now triggered via signals
from the chainstate to the payment manager.
Associated tests have been added.
  • Loading branch information
dnldd committed Apr 30, 2021
1 parent a05cedf commit cd7a037
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 45 deletions.
57 changes: 27 additions & 30 deletions pool/chainstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type ChainStateConfig struct {
db Database
// SoloPool represents the solo pool mining mode.
SoloPool bool
// PayDividends pays mature mining rewards to participating accounts.
PayDividends func(context.Context, uint32, bool) error
// ProcessPayments relays payment signals for Processing.
ProcessPayments func(msg *paymentMsg)
// GeneratePayments creates payments for participating accounts in pool
// mining mode based on the configured payment scheme.
GeneratePayments func(uint32, *PaymentSource, dcrutil.Amount, int64) error
Expand Down Expand Up @@ -211,6 +211,31 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
continue
}

block, err := cs.cfg.GetBlock(ctx, &header.PrevBlock)
if err != nil {
// Errors generated fetching blocks of confirmed mined
// work are curently fatal because payments are
// sourced from coinbases. The chainstate process will be
// terminated as a result.
log.Errorf("unable to fetch block with hash %x: %v",
header.PrevBlock, err)
close(msg.Done)
cs.cfg.Cancel()
continue
}

coinbaseTx := block.Transactions[0]
treasuryActive := isTreasuryActive(coinbaseTx)

soloPool := cs.cfg.SoloPool
if !soloPool {
go cs.cfg.ProcessPayments(&paymentMsg{
CurrentHeight: header.Height,
TreasuryActive: treasuryActive,
Done: make(chan bool),
})
}

// Prune invalidated jobs and accepted work.
if header.Height > MaxReorgLimit {
pruneLimit := header.Height - MaxReorgLimit
Expand Down Expand Up @@ -270,34 +295,6 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
}
}

block, err := cs.cfg.GetBlock(ctx, &header.PrevBlock)
if err != nil {
// Errors generated fetching blocks of confirmed mined
// work are curently fatal because payments are
// sourced from coinbases. The chainstate process will be
// terminated as a result.
log.Errorf("unable to fetch block with hash %x: %v",
header.PrevBlock, err)
close(msg.Done)
cs.cfg.Cancel()
continue
}

coinbaseTx := block.Transactions[0]
treasuryActive := isTreasuryActive(coinbaseTx)

// Process mature payments.
err = cs.cfg.PayDividends(ctx, header.Height, treasuryActive)
if err != nil {
log.Errorf("unable to process payments: %v", err)
close(msg.Done)
cs.cfg.Cancel()
continue
}

// Signal the gui cache of paid dividends.
cs.cfg.SignalCache(DividendsPaid)

// Check if the parent of the connected block is an accepted work
// of the pool.
parentHeight := header.Height - 1
Expand Down
7 changes: 2 additions & 5 deletions pool/chainstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ func testChainState(t *testing.T) {
t.Fatalf("unexpected serialization error: %v", err)
}

payDividends := func(context.Context, uint32, bool) error {
return nil
}
processPayments := func(*paymentMsg) {}
generatePayments := func(uint32, *PaymentSource, dcrutil.Amount, int64) error {
return nil
}
Expand Down Expand Up @@ -78,7 +76,7 @@ func testChainState(t *testing.T) {
cCfg := &ChainStateConfig{
db: db,
SoloPool: false,
PayDividends: payDividends,
ProcessPayments: processPayments,
GeneratePayments: generatePayments,
GetBlock: getBlock,
GetBlockConfirmations: getBlockConfirmations,
Expand Down Expand Up @@ -339,7 +337,6 @@ func testChainState(t *testing.T) {
}
cs.discCh <- discConfMsg
<-discConfMsg.Done
cs.cfg.PayDividends = payDividends

// Ensure the last work height can be updated.
initialLastWorkHeight := cs.fetchLastWorkHeight()
Expand Down
7 changes: 5 additions & 2 deletions pool/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) {
FetchTxCreator: func() TxCreator { return h.nodeConn },
FetchTxBroadcaster: func() TxBroadcaster { return h.walletConn },
CoinbaseConfTimeout: h.cfg.CoinbaseConfTimeout,
SignalCache: h.SignalCache,
HubWg: h.wg,
}

var err error
Expand All @@ -269,7 +271,7 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) {
sCfg := &ChainStateConfig{
db: h.cfg.DB,
SoloPool: h.cfg.SoloPool,
PayDividends: h.paymentMgr.payDividends,
ProcessPayments: h.paymentMgr.processPayments,
GeneratePayments: h.paymentMgr.generatePayments,
GetBlock: h.getBlock,
GetBlockConfirmations: h.getBlockConfirmations,
Expand Down Expand Up @@ -642,9 +644,10 @@ func (h *Hub) shutdown() {

// Run handles the process lifecycles of the pool hub.
func (h *Hub) Run(ctx context.Context) {
h.wg.Add(2)
h.wg.Add(3)
go h.endpoint.run(ctx)
go h.chainState.handleChainUpdates(ctx)
go h.paymentMgr.handlePayments(ctx)

// Wait until all hub processes have terminated, and then shutdown.
h.wg.Wait()
Expand Down
52 changes: 51 additions & 1 deletion pool/paymentmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ const (
// maxTxConfThreshold is the total number of coinbase confirmation
// failures before a wallet rescan is requested.
maxTxConfThreshold = uint32(3)

// paymentBufferSize repreents the buffering on the payment channel.
paymentBufferSize = uint32(30)
)

// TxCreator defines the functionality needed by a transaction creator for the
Expand Down Expand Up @@ -116,13 +119,27 @@ type PaymentMgrConfig struct {
// CoinbaseConfTimeout is the duration to wait for coinbase confirmations
// when generating a payout transaction.
CoinbaseConfTimeout time.Duration
// Cancel represents the pool's context cancellation function.
Cancel context.CancelFunc
// SignalCache sends the provided cache update event to the gui cache.
SignalCache func(event CacheUpdateEvent)
// HubWg represents the hub's waitgroup.
HubWg *sync.WaitGroup
}

// paymentMsg represents a payment processing signal.
type paymentMsg struct {
CurrentHeight uint32
TreasuryActive bool
Done chan bool
}

// PaymentMgr handles generating shares and paying out dividends to
// participating accounts.
type PaymentMgr struct {
failedTxConfs uint32 // update atomically.

paymentCh chan *paymentMsg
processing bool
cfg *PaymentMgrConfig
mtx sync.Mutex
Expand All @@ -131,7 +148,8 @@ type PaymentMgr struct {
// NewPaymentMgr creates a new payment manager.
func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) {
pm := &PaymentMgr{
cfg: pCfg,
cfg: pCfg,
paymentCh: make(chan *paymentMsg, paymentBufferSize),
}
rand.Seed(time.Now().UnixNano())

Expand Down Expand Up @@ -910,3 +928,35 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA

return nil
}

// processPayments relays payment signals for processing.
func (pm *PaymentMgr) processPayments(msg *paymentMsg) {
pm.paymentCh <- msg
}

// handlePayments processes dividend payment signals.
func (pm *PaymentMgr) handlePayments(ctx context.Context) {
for {
select {
case <-ctx.Done():
pm.cfg.HubWg.Done()
return

case msg := <-pm.paymentCh:
if !pm.cfg.SoloPool {
err := pm.payDividends(ctx, msg.CurrentHeight, msg.TreasuryActive)
if err != nil {
log.Errorf("unable to process payments: %v", err)
close(msg.Done)
pm.cfg.Cancel()
continue
}

// Signal the gui cache of paid dividends.
pm.cfg.SignalCache(DividendsPaid)

close(msg.Done)
}
}
}
}

0 comments on commit cd7a037

Please sign in to comment.