Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: fix chain notif. blocking by pmt processor. #324

Merged
merged 6 commits into from
May 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions errors/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ const (

// CreateAmount indicates an amount creation error.
CreateAmount = ErrorKind("CreateAmount")

// Rescan indicates an wallet rescan error.
Rescan = ErrorKind("Rescan")
)

// Error satisfies the error interface and prints human-readable errors.
Expand Down
1 change: 1 addition & 0 deletions errors/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestErrorKindStringer(t *testing.T) {
{TxIn, "TxIn"},
{ContextCancelled, "ContextCancelled"},
{CreateAmount, "CreateAmount"},
{Rescan, "Rescan"},
}

for i, test := range tests {
Expand Down
57 changes: 27 additions & 30 deletions pool/chainstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type ChainStateConfig struct {
db Database
// SoloPool represents the solo pool mining mode.
SoloPool bool
// PayDividends pays mature mining rewards to participating accounts.
PayDividends func(context.Context, uint32, bool) error
// ProcessPayments relays payment signals for processing.
ProcessPayments func(msg *paymentMsg)
// GeneratePayments creates payments for participating accounts in pool
// mining mode based on the configured payment scheme.
GeneratePayments func(uint32, *PaymentSource, dcrutil.Amount, int64) error
Expand Down Expand Up @@ -211,6 +211,31 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
continue
}

block, err := cs.cfg.GetBlock(ctx, &header.PrevBlock)
if err != nil {
// Errors generated fetching blocks of confirmed mined
// work are curently fatal because payments are
// sourced from coinbases. The chainstate process will be
// terminated as a result.
log.Errorf("unable to fetch block with hash %x: %v",
header.PrevBlock, err)
close(msg.Done)
cs.cfg.Cancel()
continue
}

coinbaseTx := block.Transactions[0]
treasuryActive := isTreasuryActive(coinbaseTx)

soloPool := cs.cfg.SoloPool
if !soloPool {
go cs.cfg.ProcessPayments(&paymentMsg{
CurrentHeight: header.Height,
TreasuryActive: treasuryActive,
Done: make(chan bool),
})
}

// Prune invalidated jobs and accepted work.
if header.Height > MaxReorgLimit {
pruneLimit := header.Height - MaxReorgLimit
Expand Down Expand Up @@ -270,34 +295,6 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
}
}

block, err := cs.cfg.GetBlock(ctx, &header.PrevBlock)
if err != nil {
// Errors generated fetching blocks of confirmed mined
// work are curently fatal because payments are
// sourced from coinbases. The chainstate process will be
// terminated as a result.
log.Errorf("unable to fetch block with hash %x: %v",
header.PrevBlock, err)
close(msg.Done)
cs.cfg.Cancel()
continue
}

coinbaseTx := block.Transactions[0]
treasuryActive := isTreasuryActive(coinbaseTx)

// Process mature payments.
err = cs.cfg.PayDividends(ctx, header.Height, treasuryActive)
if err != nil {
log.Errorf("unable to process payments: %v", err)
close(msg.Done)
cs.cfg.Cancel()
continue
}

// Signal the gui cache of paid dividends.
cs.cfg.SignalCache(DividendsPaid)

// Check if the parent of the connected block is an accepted work
// of the pool.
parentHeight := header.Height - 1
Expand Down
7 changes: 2 additions & 5 deletions pool/chainstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ func testChainState(t *testing.T) {
t.Fatalf("unexpected serialization error: %v", err)
}

payDividends := func(context.Context, uint32, bool) error {
return nil
}
processPayments := func(*paymentMsg) {}
generatePayments := func(uint32, *PaymentSource, dcrutil.Amount, int64) error {
return nil
}
Expand Down Expand Up @@ -78,7 +76,7 @@ func testChainState(t *testing.T) {
cCfg := &ChainStateConfig{
db: db,
SoloPool: false,
PayDividends: payDividends,
ProcessPayments: processPayments,
GeneratePayments: generatePayments,
GetBlock: getBlock,
GetBlockConfirmations: getBlockConfirmations,
Expand Down Expand Up @@ -339,7 +337,6 @@ func testChainState(t *testing.T) {
}
cs.discCh <- discConfMsg
<-discConfMsg.Done
cs.cfg.PayDividends = payDividends

// Ensure the last work height can be updated.
initialLastWorkHeight := cs.fetchLastWorkHeight()
Expand Down
15 changes: 11 additions & 4 deletions pool/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ var (
type WalletConnection interface {
SignTransaction(context.Context, *walletrpc.SignTransactionRequest, ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error)
PublishTransaction(context.Context, *walletrpc.PublishTransactionRequest, ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error)
Rescan(ctx context.Context, in *walletrpc.RescanRequest, opts ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error)
}

// NodeConnection defines the functionality needed by a mining node
Expand Down Expand Up @@ -257,6 +258,8 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) {
FetchTxCreator: func() TxCreator { return h.nodeConn },
FetchTxBroadcaster: func() TxBroadcaster { return h.walletConn },
CoinbaseConfTimeout: h.cfg.CoinbaseConfTimeout,
SignalCache: h.SignalCache,
HubWg: h.wg,
}

var err error
Expand All @@ -268,7 +271,7 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) {
sCfg := &ChainStateConfig{
db: h.cfg.DB,
SoloPool: h.cfg.SoloPool,
PayDividends: h.paymentMgr.payDividends,
ProcessPayments: h.paymentMgr.processPayments,
GeneratePayments: h.paymentMgr.generatePayments,
GetBlock: h.getBlock,
GetBlockConfirmations: h.getBlockConfirmations,
Expand Down Expand Up @@ -442,10 +445,13 @@ func (h *Hub) getWork(ctx context.Context) (string, string, error) {

// getTxConfNotifications streams transaction confirmation notifications for
// the provided transaction hashes.
func (h *Hub) getTxConfNotifications(txHashes []*chainhash.Hash, stopAfter int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) {
func (h *Hub) getTxConfNotifications(txHashes []chainhash.Hash, stopAfter int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) {
hashes := make([][]byte, 0, len(txHashes))
log.Tracef("Requesting tx conf notifications for %d "+
"transactions (stop after #%d)", len(txHashes), stopAfter)
jholdstock marked this conversation as resolved.
Show resolved Hide resolved
for _, hash := range txHashes {
hashes = append(hashes, hash[:])
hashes = append(hashes, hash.CloneBytes())
log.Tracef(" %s", hash)
}

req := &walletrpc.ConfirmationNotificationsRequest{
Expand Down Expand Up @@ -641,9 +647,10 @@ func (h *Hub) shutdown() {

// Run handles the process lifecycles of the pool hub.
func (h *Hub) Run(ctx context.Context) {
h.wg.Add(2)
h.wg.Add(3)
go h.endpoint.run(ctx)
go h.chainState.handleChainUpdates(ctx)
go h.paymentMgr.handlePayments(ctx)

// Wait until all hub processes have terminated, and then shutdown.
h.wg.Wait()
Expand Down
44 changes: 44 additions & 0 deletions pool/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,43 @@ import (
chainjson "github.com/decred/dcrd/rpc/jsonrpc/types/v2"
"github.com/decred/dcrd/wire"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type tRescanClient struct {
resp *walletrpc.RescanResponse
err error
grpc.ClientStream
}

func (r *tRescanClient) Recv() (*walletrpc.RescanResponse, error) {
return r.resp, r.err
}

func (r *tRescanClient) Header() (metadata.MD, error) {
return nil, nil
}

func (r *tRescanClient) Trailer() metadata.MD {
return nil
}

func (r *tRescanClient) CloseSend() error {
return nil
}

func (r *tRescanClient) Context() context.Context {
return nil
}

func (r *tRescanClient) SendMsg(m interface{}) error {
return nil
}

func (r *tRescanClient) RecvMsg(m interface{}) error {
return nil
}

type tWalletConnection struct {
}

Expand Down Expand Up @@ -109,6 +144,15 @@ func (t *tWalletConnection) PublishTransaction(context.Context, *walletrpc.Publi
}, nil
}

func (t *tWalletConnection) Rescan(context.Context, *walletrpc.RescanRequest, ...grpc.CallOption) (walletrpc.WalletService_RescanClient, error) {
client := new(tRescanClient)
client.resp = &walletrpc.RescanResponse{
RescannedThrough: 10000,
}

return client, nil
}

type tNodeConnection struct{}

func (t *tNodeConnection) CreateRawTransaction(context.Context, []chainjson.TransactionInput, map[dcrutil.Address]dcrutil.Amount, *int64, *int64) (*wire.MsgTx, error) {
Expand Down