diff --git a/pool/chainstate.go b/pool/chainstate.go index 335d4f42..e837e91f 100644 --- a/pool/chainstate.go +++ b/pool/chainstate.go @@ -31,8 +31,8 @@ type ChainStateConfig struct { db Database // SoloPool represents the solo pool mining mode. SoloPool bool - // PayDividends pays mature mining rewards to participating accounts. - PayDividends func(context.Context, uint32, bool) error + // ProcessPayments relays payment signals for Processing. + ProcessPayments func(msg *paymentMsg) // GeneratePayments creates payments for participating accounts in pool // mining mode based on the configured payment scheme. GeneratePayments func(uint32, *PaymentSource, dcrutil.Amount, int64) error @@ -211,6 +211,31 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { continue } + block, err := cs.cfg.GetBlock(ctx, &header.PrevBlock) + if err != nil { + // Errors generated fetching blocks of confirmed mined + // work are curently fatal because payments are + // sourced from coinbases. The chainstate process will be + // terminated as a result. + log.Errorf("unable to fetch block with hash %x: %v", + header.PrevBlock, err) + close(msg.Done) + cs.cfg.Cancel() + continue + } + + coinbaseTx := block.Transactions[0] + treasuryActive := isTreasuryActive(coinbaseTx) + + soloPool := cs.cfg.SoloPool + if !soloPool { + go cs.cfg.ProcessPayments(&paymentMsg{ + CurrentHeight: header.Height, + TreasuryActive: treasuryActive, + Done: make(chan bool), + }) + } + // Prune invalidated jobs and accepted work. if header.Height > MaxReorgLimit { pruneLimit := header.Height - MaxReorgLimit @@ -270,34 +295,6 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { } } - block, err := cs.cfg.GetBlock(ctx, &header.PrevBlock) - if err != nil { - // Errors generated fetching blocks of confirmed mined - // work are curently fatal because payments are - // sourced from coinbases. The chainstate process will be - // terminated as a result. - log.Errorf("unable to fetch block with hash %x: %v", - header.PrevBlock, err) - close(msg.Done) - cs.cfg.Cancel() - continue - } - - coinbaseTx := block.Transactions[0] - treasuryActive := isTreasuryActive(coinbaseTx) - - // Process mature payments. - err = cs.cfg.PayDividends(ctx, header.Height, treasuryActive) - if err != nil { - log.Errorf("unable to process payments: %v", err) - close(msg.Done) - cs.cfg.Cancel() - continue - } - - // Signal the gui cache of paid dividends. - cs.cfg.SignalCache(DividendsPaid) - // Check if the parent of the connected block is an accepted work // of the pool. parentHeight := header.Height - 1 diff --git a/pool/chainstate_test.go b/pool/chainstate_test.go index 75cf76d1..101431c6 100644 --- a/pool/chainstate_test.go +++ b/pool/chainstate_test.go @@ -44,9 +44,7 @@ func testChainState(t *testing.T) { t.Fatalf("unexpected serialization error: %v", err) } - payDividends := func(context.Context, uint32, bool) error { - return nil - } + processPayments := func(*paymentMsg) {} generatePayments := func(uint32, *PaymentSource, dcrutil.Amount, int64) error { return nil } @@ -78,7 +76,7 @@ func testChainState(t *testing.T) { cCfg := &ChainStateConfig{ db: db, SoloPool: false, - PayDividends: payDividends, + ProcessPayments: processPayments, GeneratePayments: generatePayments, GetBlock: getBlock, GetBlockConfirmations: getBlockConfirmations, @@ -339,7 +337,6 @@ func testChainState(t *testing.T) { } cs.discCh <- discConfMsg <-discConfMsg.Done - cs.cfg.PayDividends = payDividends // Ensure the last work height can be updated. initialLastWorkHeight := cs.fetchLastWorkHeight() diff --git a/pool/hub.go b/pool/hub.go index 7677f8b3..049a2096 100644 --- a/pool/hub.go +++ b/pool/hub.go @@ -258,6 +258,8 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { FetchTxCreator: func() TxCreator { return h.nodeConn }, FetchTxBroadcaster: func() TxBroadcaster { return h.walletConn }, CoinbaseConfTimeout: h.cfg.CoinbaseConfTimeout, + SignalCache: h.SignalCache, + HubWg: h.wg, } var err error @@ -269,7 +271,7 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { sCfg := &ChainStateConfig{ db: h.cfg.DB, SoloPool: h.cfg.SoloPool, - PayDividends: h.paymentMgr.payDividends, + ProcessPayments: h.paymentMgr.processPayments, GeneratePayments: h.paymentMgr.generatePayments, GetBlock: h.getBlock, GetBlockConfirmations: h.getBlockConfirmations, @@ -642,9 +644,10 @@ func (h *Hub) shutdown() { // Run handles the process lifecycles of the pool hub. func (h *Hub) Run(ctx context.Context) { - h.wg.Add(2) + h.wg.Add(3) go h.endpoint.run(ctx) go h.chainState.handleChainUpdates(ctx) + go h.paymentMgr.handlePayments(ctx) // Wait until all hub processes have terminated, and then shutdown. h.wg.Wait() diff --git a/pool/paymentmgr.go b/pool/paymentmgr.go index 7148f674..d17e11b6 100644 --- a/pool/paymentmgr.go +++ b/pool/paymentmgr.go @@ -42,6 +42,9 @@ const ( // maxTxConfThreshold is the total number of coinbase confirmation // failures before a wallet rescan is requested. maxTxConfThreshold = uint32(3) + + // paymentBufferSize repreents the buffering on the payment channel. + paymentBufferSize = uint32(30) ) // TxCreator defines the functionality needed by a transaction creator for the @@ -116,6 +119,19 @@ type PaymentMgrConfig struct { // CoinbaseConfTimeout is the duration to wait for coinbase confirmations // when generating a payout transaction. CoinbaseConfTimeout time.Duration + // Cancel represents the pool's context cancellation function. + Cancel context.CancelFunc + // SignalCache sends the provided cache update event to the gui cache. + SignalCache func(event CacheUpdateEvent) + // HubWg represents the hub's waitgroup. + HubWg *sync.WaitGroup +} + +// paymentMsg represents a payment processing signal. +type paymentMsg struct { + CurrentHeight uint32 + TreasuryActive bool + Done chan bool } // PaymentMgr handles generating shares and paying out dividends to @@ -123,6 +139,7 @@ type PaymentMgrConfig struct { type PaymentMgr struct { failedTxConfs uint32 // update atomically. + paymentCh chan *paymentMsg processing bool cfg *PaymentMgrConfig mtx sync.Mutex @@ -131,7 +148,8 @@ type PaymentMgr struct { // NewPaymentMgr creates a new payment manager. func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) { pm := &PaymentMgr{ - cfg: pCfg, + cfg: pCfg, + paymentCh: make(chan *paymentMsg, paymentBufferSize), } rand.Seed(time.Now().UnixNano()) @@ -910,3 +928,35 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA return nil } + +// processPayments relays payment signals for processing. +func (pm *PaymentMgr) processPayments(msg *paymentMsg) { + pm.paymentCh <- msg +} + +// handlePayments processes dividend payment signals. +func (pm *PaymentMgr) handlePayments(ctx context.Context) { + for { + select { + case <-ctx.Done(): + pm.cfg.HubWg.Done() + return + + case msg := <-pm.paymentCh: + if !pm.cfg.SoloPool { + err := pm.payDividends(ctx, msg.CurrentHeight, msg.TreasuryActive) + if err != nil { + log.Errorf("unable to process payments: %v", err) + close(msg.Done) + pm.cfg.Cancel() + continue + } + + // Signal the gui cache of paid dividends. + pm.cfg.SignalCache(DividendsPaid) + + close(msg.Done) + } + } + } +} diff --git a/pool/paymentmgr_test.go b/pool/paymentmgr_test.go index 1d6309f4..48671e46 100644 --- a/pool/paymentmgr_test.go +++ b/pool/paymentmgr_test.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "math/big" + "sync" "sync/atomic" "testing" "time" @@ -150,7 +151,7 @@ func TestSharePercentages(t *testing.T) { } } -func createPaymentMgr(paymentMethod string) (*PaymentMgr, error) { +func createPaymentMgr(paymentMethod string) (*PaymentMgr, context.Context, context.CancelFunc, error) { activeNet := chaincfg.SimNetParams() getBlockConfirmations := func(context.Context, *chainhash.Hash) (int64, error) { @@ -165,6 +166,11 @@ func createPaymentMgr(paymentMethod string) (*PaymentMgr, error) { return nil } + signalCache := func(CacheUpdateEvent) { + // Do nothing. + } + + ctx, cancel := context.WithCancel(context.Background()) pCfg := &PaymentMgrConfig{ db: db, ActiveNet: activeNet, @@ -176,12 +182,21 @@ func createPaymentMgr(paymentMethod string) (*PaymentMgr, error) { FetchTxCreator: fetchTxCreator, FetchTxBroadcaster: fetchTxBroadcaster, PoolFeeAddrs: []dcrutil.Address{poolFeeAddrs}, + Cancel: cancel, + SignalCache: signalCache, + HubWg: new(sync.WaitGroup), + } + + mgr, err := NewPaymentMgr(pCfg) + if err != nil { + return nil, nil, nil, err } - return NewPaymentMgr(pCfg) + + return mgr, ctx, cancel, err } func testPaymentMgrPPS(t *testing.T) { - mgr, err := createPaymentMgr(PPS) + mgr, _, _, err := createPaymentMgr(PPS) if err != nil { t.Fatalf("[createPaymentMgr] unexpected error: %v", err) } @@ -278,7 +293,7 @@ func testPaymentMgrPPS(t *testing.T) { } func testPaymentMgrPPLNS(t *testing.T) { - mgr, err := createPaymentMgr(PPLNS) + mgr, _, _, err := createPaymentMgr(PPLNS) if err != nil { t.Fatalf("[createPaymentMgr] unexpected error: %v", err) } @@ -376,7 +391,7 @@ func testPaymentMgrPPLNS(t *testing.T) { } func testPaymentMgrMaturity(t *testing.T) { - mgr, err := createPaymentMgr(PPLNS) + mgr, _, _, err := createPaymentMgr(PPLNS) if err != nil { t.Fatalf("[createPaymentMgr] unexpected error: %v", err) } @@ -472,7 +487,7 @@ func testPaymentMgrPayment(t *testing.T) { t.Fatalf("failed to insert account: %v", err) } - mgr, err := createPaymentMgr(PPS) + mgr, _, _, err := createPaymentMgr(PPS) if err != nil { t.Fatalf("[createPaymentMgr] unexpected error: %v", err) } @@ -1296,7 +1311,7 @@ func testPaymentMgrPayment(t *testing.T) { } func testPaymentMgrDust(t *testing.T) { - mgr, err := createPaymentMgr(PPLNS) + mgr, _, _, err := createPaymentMgr(PPLNS) if err != nil { t.Fatalf("[createPaymentMgr] unexpected error: %v", err) } @@ -1375,3 +1390,183 @@ func testPaymentMgrDust(t *testing.T) { "than the initial (%v)", ft, expectedFeeAmt) } } + +func testPaymentMgrSignals(t *testing.T) { + // Insert some test accounts. + accountX := NewAccount(xAddr) + err := db.persistAccount(accountX) + if err != nil { + t.Fatalf("failed to insert account: %v", err) + } + + accountY := NewAccount(yAddr) + err = db.persistAccount(accountY) + if err != nil { + t.Fatalf("failed to insert account: %v", err) + } + + mgr, ctx, cancel, err := createPaymentMgr(PPLNS) + if err != nil { + t.Fatalf("[createPaymentMgr] unexpected error: %v", err) + } + + var randBytes [chainhash.HashSize + 1]byte + _, err = rand.Read(randBytes[:]) + if err != nil { + t.Fatalf("unable to generate random bytes: %v", err) + } + + randHash := chainhash.HashH(randBytes[:]) + randSource := &PaymentSource{ + BlockHash: randHash.String(), + Coinbase: randHash.String(), + } + + height := uint32(10) + estMaturity := uint32(26) + amt, _ := dcrutil.NewAmount(5) + + pmtX := NewPayment(xID, zeroSource, amt, height, estMaturity) + err = db.PersistPayment(pmtX) + if err != nil { + cancel() + t.Fatal(err) + } + + pmtY := NewPayment(yID, randSource, amt, height, estMaturity) + err = db.PersistPayment(pmtY) + if err != nil { + cancel() + t.Fatal(err) + } + + txHashes := make(map[string]*chainhash.Hash) + hashA := chainhash.Hash{'a'} + txHashes[hashA.String()] = &hashA + hashB := chainhash.Hash{'b'} + txHashes[hashB.String()] = &hashB + hashC := chainhash.Hash{'c'} + txHashes[hashC.String()] = &hashC + + mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { + return int64(estMaturity) + 1, nil + } + + txConfs := make([]*walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations, 0) + confA := walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ + TxHash: zeroHash[:], + Confirmations: 50, + BlockHash: []byte(zeroSource.BlockHash), + BlockHeight: 60, + } + txConfs = append(txConfs, &confA) + confB := walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ + TxHash: randHash[:], + Confirmations: 50, + BlockHash: []byte(zeroSource.BlockHash), + BlockHeight: 60, + } + txConfs = append(txConfs, &confB) + + mgr.cfg.CoinbaseConfTimeout = time.Millisecond * 500 + mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + return func() (*walletrpc.ConfirmationNotificationsResponse, error) { + return &walletrpc.ConfirmationNotificationsResponse{ + Confirmations: txConfs, + }, nil + }, nil + } + + txBytes := []byte("01000000018e17619f0d627c2769ee3f957582691aea59c2" + + "e79cc45b8ba1f08485dd88d75c0300000001ffffffff017a64e43703000000" + + "00001976a914978fa305bd66f63f0de847338bb56ff65fa8e27288ac000000" + + "000000000001f46ce43703000000846c0700030000006b483045022100d668" + + "5812801db991b72e80863eba7058466dfebb4aba0af75ab47bade177325102" + + "205f466fc47435c1a177482e527ff0e76f3c2c613940b358e57f0f0d78d5f2" + + "ffcb012102d040a4c34ae65a2b87ea8e9df7413e6504e5f27c6bde019a78ee" + + "96145b27c517") + txHash, _ := hex.DecodeString("013264da8cc53f70022dc2b5654ebefc9ecfed24ea18dfcfc9adca5642d4fe66") + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return &walletrpc.SignTransactionResponse{ + Transaction: txBytes, + }, nil + }, + publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return &walletrpc.PublishTransactionResponse{ + TransactionHash: txHash, + }, nil + }, + rescan: func(ctx context.Context, req *walletrpc.RescanRequest, options ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) { + return &tRescanClient{ + resp: &walletrpc.RescanResponse{ + RescannedThrough: 30, + }, + }, nil + }, + } + } + + mgr.cfg.FetchTxCreator = func() TxCreator { + return &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return &chainjson.GetTxOutResult{ + BestBlock: chainhash.Hash{0}.String(), + Confirmations: int64(estMaturity) + 1, + Value: 5, + Coinbase: true, + }, nil + }, + createRawTransaction: func(ctx context.Context, inputs []chainjson.TransactionInput, amounts map[dcrutil.Address]dcrutil.Amount, lockTime *int64, expiry *int64) (*wire.MsgTx, error) { + return &wire.MsgTx{}, nil + }, + } + } + mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { + return int64(estMaturity) + 1, nil + } + + // Ensure the payment lifecycle process recieves the payment signal and + // processes mature payments. + msgA := paymentMsg{ + CurrentHeight: estMaturity + 1, + TreasuryActive: false, + Done: make(chan bool), + } + + mgr.cfg.HubWg.Add(1) + go mgr.handlePayments(ctx) + + mgr.processPayments(&msgA) + <-msgA.Done + + // Esure the payment lifecycle process cancels the context when an + // error is encountered. + mgr.cfg.FetchTxCreator = func() TxCreator { + return &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return &chainjson.GetTxOutResult{ + BestBlock: chainhash.Hash{0}.String(), + Confirmations: int64(estMaturity) + 1, + Value: 5, + Coinbase: true, + }, nil + }, + createRawTransaction: func(ctx context.Context, inputs []chainjson.TransactionInput, amounts map[dcrutil.Address]dcrutil.Amount, lockTime *int64, expiry *int64) (*wire.MsgTx, error) { + return nil, fmt.Errorf("unable to create raw transactions") + }, + } + } + + msgB := paymentMsg{ + CurrentHeight: estMaturity + 1, + TreasuryActive: false, + Done: make(chan bool), + } + mgr.processPayments(&msgB) + <-msgB.Done + + cancel() + mgr.cfg.HubWg.Wait() +} diff --git a/pool/pool_test.go b/pool/pool_test.go index e3e4a3b9..265dd242 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -103,6 +103,7 @@ func TestPool(t *testing.T) { "testPaymentMgrMaturity": testPaymentMgrMaturity, "testPaymentMgrPayment": testPaymentMgrPayment, "testPaymentMgrDust": testPaymentMgrDust, + "testPaymentSignals": testPaymentMgrSignals, "testChainState": testChainState, "testHub": testHub, }