Skip to content

Commit

Permalink
mempool: Introduce mempoolConfig.
Browse files Browse the repository at this point in the history
Contains the following upstream commits:
- 83bcfea
- 2f6aeac
  - Cherry-picked because it fixes an issue originally introduced by the
    commit being merged

In addition to the normal required changes for syncing, the following
changes have been made in order to facilitate integration into Decred:

- Configure the NewestSha function to the new mempool config as a
  closure over the block manager state instead of using the database
  func directly
- Add a new NextStakeDifficulty callback to the new mempool config for
  obtaining the next stake difficulty from the block manager and
  configure it as a closure over the block manager state.
  • Loading branch information
davecgh committed May 26, 2016
2 parents 9031d85 + 83bcfea commit d73e576
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 46 deletions.
152 changes: 113 additions & 39 deletions mempool.go
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/decred/dcrd/blockchain"
"github.com/decred/dcrd/blockchain/stake"
"github.com/decred/dcrd/chaincfg"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/database"
"github.com/decred/dcrd/txscript"
Expand Down Expand Up @@ -91,6 +92,51 @@ type mempoolTxDesc struct {
StartingPriority float64
}

// mempoolConfig is a descriptor containing the memory pool configuration.
type mempoolConfig struct {
// ChainParams identifies which chain parameters the mempool is
// associated with.
ChainParams *chaincfg.Params

// DisableRelayPriority defines whether to relay free or low-fee
// transactions that do not have enough priority to be relayed.
DisableRelayPriority bool

// EnableAddrIndex defines whether the address index should be enabled.
EnableAddrIndex bool

// FetchTransactionStore defines the function to use to fetch
// transacation information.
FetchTransactionStore func(*dcrutil.Tx, bool, bool) (blockchain.TxStore, error)

// FreeTxRelayLimit defines the given amount in thousands of bytes
// per minute that transactions with no fee are rate limited to.
FreeTxRelayLimit float64

// MaxOrphanTxs defines the maximum number of orphan transactions to
// keep in memory.
MaxOrphanTxs int

// MinRelayTxFee defines the minimum transaction fee in BTC/kB to be
// considered a non-zero fee.
MinRelayTxFee dcrutil.Amount

// NewestSha defines the function to retrieve the newest sha.
NewestSha func() (*chainhash.Hash, int64, error)

// NextStakeDifficulty defines the function to retrieve the stake
// difficulty for the block after the current best block.
//
// This function must be safe for concurrent access.
NextStakeDifficulty func() (int64, error)

// SigCache defines a signature cache to use.
SigCache *txscript.SigCache

// TimeSource defines the timesource to use.
TimeSource blockchain.MedianTimeSource
}

// txMemPool is used as a source of transactions that need to be mined into
// blocks and relayed to other peers. It is safe for concurrent access from
// multiple peers.
Expand All @@ -99,7 +145,7 @@ type txMemPool struct {
lastUpdated int64 // last time pool was updated.

sync.RWMutex
server *server
cfg mempoolConfig
pool map[chainhash.Hash]*mempoolTxDesc
orphans map[chainhash.Hash]*dcrutil.Tx
orphansByPrev map[chainhash.Hash]map[chainhash.Hash]*dcrutil.Tx
Expand Down Expand Up @@ -140,8 +186,8 @@ func (mp *txMemPool) insertVote(ssgen *dcrutil.Tx) error {
"voting %v on the transaction tree",
voteHash, blockHash, blockHeight, vote)

slice := make([]*VoteTx, int(mp.server.chainParams.TicketsPerBlock),
int(mp.server.chainParams.TicketsPerBlock))
slice := make([]*VoteTx, int(mp.cfg.ChainParams.TicketsPerBlock),
int(mp.cfg.ChainParams.TicketsPerBlock))
slice[0] = voteTx
mp.votes[blockHash] = slice
return nil
Expand Down Expand Up @@ -291,7 +337,7 @@ func (mp *txMemPool) sortParentsByVotes(currentTopBlock chainhash.Hash,
sort.Sort(sort.Reverse(ByNumberOfVotes(bwlvs)))

var sortedUsefulBlocks []chainhash.Hash
minimumVotesRequired := uint16((mp.server.chainParams.TicketsPerBlock / 2) + 1)
minimumVotesRequired := uint16((mp.cfg.ChainParams.TicketsPerBlock / 2) + 1)
for _, bwlv := range bwlvs {
if bwlv.Votes >= minimumVotesRequired {
sortedUsefulBlocks = append(sortedUsefulBlocks, bwlv.Block)
Expand Down Expand Up @@ -401,7 +447,7 @@ func (mp *txMemPool) RemoveOrphan(txHash *chainhash.Hash) {
//
// This function MUST be called with the mempool lock held (for writes).
func (mp *txMemPool) limitNumOrphans() error {
if len(mp.orphans)+1 > cfg.MaxOrphanTxs && cfg.MaxOrphanTxs > 0 {
if len(mp.orphans)+1 > mp.cfg.MaxOrphanTxs && mp.cfg.MaxOrphanTxs > 0 {
// Generate a cryptographically random hash.
randHashBytes := make([]byte, chainhash.HashSize)
_, err := rand.Read(randHashBytes)
Expand Down Expand Up @@ -467,7 +513,7 @@ func (mp *txMemPool) maybeAddOrphan(tx *dcrutil.Tx) error {
//
// Note that the number of orphan transactions in the orphan pool is
// also limited, so this equates to a maximum memory used of
// maxOrphanTxSize * cfg.MaxOrphanTxs (which is ~5MB using the default
// maxOrphanTxSize * mp.cfg.MaxOrphanTxs (which is ~5MB using the default
// values at the time this comment was written).
serializedLen := tx.MsgTx().SerializeSize()
if serializedLen > maxOrphanTxSize {
Expand Down Expand Up @@ -602,15 +648,18 @@ func (mp *txMemPool) removeTransaction(tx *dcrutil.Tx, removeRedeemers bool) {
// Remove the transaction and mark the referenced outpoints as unspent
// by the pool.
if txDesc, exists := mp.pool[*txHash]; exists {
// Remove the transaction and its addresses from the address
// index if it's enabled.
if mp.cfg.EnableAddrIndex {
mp.pruneTxFromAddrIndex(tx, txType)
}

for _, txIn := range txDesc.Tx.MsgTx().TxIn {
delete(mp.outpoints, txIn.PreviousOutPoint)
}
delete(mp.pool, *txHash)
atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix())
}

// Remove the transaction and its addresses from the address index.
mp.pruneTxFromAddrIndex(tx, txType)
}

// RemoveTransaction removes the passed transaction from the mempool. If
Expand Down Expand Up @@ -672,6 +721,32 @@ func (mp *txMemPool) addTransaction(txStore blockchain.TxStore, tx *dcrutil.Tx,
mp.outpoints[txIn.PreviousOutPoint] = tx
}
atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix())

// Add the addresses associated with the transaction to the address
// index if it's enabled.
if mp.cfg.EnableAddrIndex {
mp.addTransactionToAddrIndex(tx, txType)
}
}

// addTransactionToAddrIndex adds all addresses related to the transaction to
// our in-memory address index. Note that this address is only populated when
// we're running with the optional address index activated.
//
// This function MUST be called with the mempool lock held (for writes).
func (mp *txMemPool) addTransactionToAddrIndex(tx *dcrutil.Tx,
txType stake.TxType) error {

// Insert the addresses into the mempool address index.
for _, txOut := range tx.MsgTx().TxOut {
err := mp.indexScriptAddressToTx(txOut.Version, txOut.PkScript,
tx, txType)
if err != nil {
return err
}
}

return nil
}

// fetchReferencedOutputScripts looks up and returns all the scriptPubKeys
Expand Down Expand Up @@ -715,7 +790,7 @@ func (mp *txMemPool) indexScriptAddressToTx(pkVersion uint16, pkScript []byte,
// An exception is SStx commitments. Handle these manually.
if txType == stake.TxTypeSStx && class == txscript.NullDataTy {
addr, err := stake.AddrFromSStxPkScrCommitment(pkScript,
mp.server.chainParams)
mp.cfg.ChainParams)
if err != nil {
txmpLog.Tracef("Unable to extract encoded addresses "+
"from sstx commitment script for addrindex: %v", err)
Expand Down Expand Up @@ -754,7 +829,7 @@ func (mp *txMemPool) pruneTxFromAddrIndex(tx *dcrutil.Tx, txType stake.TxType) {
// An exception is SStx commitments. Handle these manually.
if txType == stake.TxTypeSStx && class == txscript.NullDataTy {
addr, err := stake.AddrFromSStxPkScrCommitment(txOut.PkScript,
mp.server.chainParams)
mp.cfg.ChainParams)
if err != nil {
// If we couldn't extract addresses, skip this output.
continue
Expand Down Expand Up @@ -854,7 +929,7 @@ func (mp *txMemPool) isTxTreeValid(newestHash *chainhash.Hash) bool {
// There are not possibly enough votes to tell if the txTree is valid;
// assume it's valid.
if len(mp.votes[*newestHash]) <=
int(mp.server.chainParams.TicketsPerBlock/2) {
int(mp.cfg.ChainParams.TicketsPerBlock/2) {
return true
}

Expand Down Expand Up @@ -897,9 +972,13 @@ func (mp *txMemPool) IsTxTreeValid(best *chainhash.Hash) bool {
// This function MUST be called with the mempool lock held (for reads).
func (mp *txMemPool) fetchInputTransactions(tx *dcrutil.Tx, includeSpent bool) (blockchain.TxStore,
error) {
tv := mp.IsTxTreeValid(mp.server.blockManager.chainState.newestHash)
txStore, err := mp.server.blockManager.blockChain.FetchTransactionStore(tx,
tv, includeSpent)

newestHash, _, err := mp.cfg.NewestSha()
if err != nil {
return nil, err
}
tv := mp.IsTxTreeValid(newestHash)
txStore, err := mp.cfg.FetchTransactionStore(tx, tv, includeSpent)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -986,7 +1065,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew,
// Perform preliminary sanity checks on the transaction. This makes
// use of chain which contains the invariant rules for what
// transactions are allowed into blocks.
err := blockchain.CheckTransactionSanity(tx, mp.server.chainParams)
err := blockchain.CheckTransactionSanity(tx, mp.cfg.ChainParams)
if err != nil {
if cerr, ok := err.(blockchain.RuleError); ok {
return nil, chainRuleError(cerr)
Expand Down Expand Up @@ -1014,7 +1093,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew,
// Get the current height of the main chain. A standalone transaction
// will be mined into the next block at best, so it's height is at least
// one more than the current height.
_, curHeight, err := mp.server.db.NewestSha()
_, curHeight, err := mp.cfg.NewestSha()
if err != nil {
// This is an unexpected error so don't turn it into a rule
// error.
Expand All @@ -1036,7 +1115,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew,
// forbid their relaying.
if !activeNetParams.RelayNonStdTxs {
err := checkTransactionStandard(tx, txType, nextBlockHeight,
mp.server.timeSource, cfg.minRelayTxFee)
mp.cfg.TimeSource, mp.cfg.MinRelayTxFee)
if err != nil {
// Attempt to extract a reject code from the error so
// it can be retained. When not possible, fall back to
Expand All @@ -1054,9 +1133,12 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew,
// If the transaction is a ticket, ensure that it meets the next
// stake difficulty.
if txType == stake.TxTypeSStx {
mp.server.blockManager.chainState.Lock()
sDiff := mp.server.blockManager.chainState.nextStakeDifficulty
mp.server.blockManager.chainState.Unlock()
sDiff, err := mp.cfg.NextStakeDifficulty()
if err != nil {
// This is an unexpected error so don't turn it into a
// rule error.
return nil, err
}

if tx.MsgTx().TxOut[0].Value < sDiff {
str := fmt.Sprintf("transaction %v has not enough funds "+
Expand Down Expand Up @@ -1176,7 +1258,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew,
nextBlockHeight,
txStore,
false, // Don't check fraud proof; filled in by miner
mp.server.chainParams)
mp.cfg.ChainParams)
if err != nil {
if cerr, ok := err.(blockchain.RuleError); ok {
return nil, chainRuleError(cerr)
Expand Down Expand Up @@ -1240,7 +1322,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew,
// high-priority transactions, don't require a fee for it.
// This applies to non-stake transactions only.
serializedSize := int64(tx.MsgTx().SerializeSize())
minFee := calcMinRequiredTxRelayFee(serializedSize, cfg.minRelayTxFee)
minFee := calcMinRequiredTxRelayFee(serializedSize, mp.cfg.MinRelayTxFee)
if txType == stake.TxTypeRegular { // Non-stake only
if serializedSize >= (defaultBlockPrioritySize-1000) && txFee < minFee {
str := fmt.Sprintf("transaction %v has %v fees which is under "+
Expand All @@ -1254,8 +1336,9 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew,
// in the next block. Transactions which are being added back to the
// memory pool from blocks that have been disconnected during a reorg
// are exempted.
//
// This applies to non-stake transactions only.
if isNew && !cfg.NoRelayPriority && txFee < minFee &&
if isNew && !mp.cfg.DisableRelayPriority && txFee < minFee &&
txType == stake.TxTypeRegular {

currentPriority := calcPriority(tx.MsgTx(), txStore,
Expand All @@ -1280,7 +1363,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew,
mp.lastPennyUnix = nowUnix

// Are we still over the limit?
if mp.pennyTotal >= cfg.FreeTxRelayLimit*10*1000 {
if mp.pennyTotal >= mp.cfg.FreeTxRelayLimit*10*1000 {
str := fmt.Sprintf("transaction %v has been rejected "+
"by the rate limiter due to low fees", txHash)
return nil, txRuleError(wire.RejectInsufficientFee, str)
Expand All @@ -1290,7 +1373,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew,
mp.pennyTotal += float64(serializedSize)
txmpLog.Tracef("rate limit: curTotal %v, nextTotal: %v, "+
"limit %v", oldTotal, mp.pennyTotal,
cfg.FreeTxRelayLimit*10*1000)
mp.cfg.FreeTxRelayLimit*10*1000)
}

// Set an absolute threshold for ticket rejection and obey it. Tickets
Expand Down Expand Up @@ -1324,7 +1407,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew,
// Verify crypto signatures for each input and reject the transaction if
// any don't verify.
err = blockchain.ValidateTransactionScripts(tx, txStore,
txscript.StandardVerifyFlags, mp.server.sigCache)
txscript.StandardVerifyFlags, mp.cfg.SigCache)
if err != nil {
if cerr, ok := err.(blockchain.RuleError); ok {
return nil, chainRuleError(cerr)
Expand Down Expand Up @@ -1354,15 +1437,6 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew,
txmpLog.Debugf("Accepted transaction %v (pool size: %v)", txHash,
len(mp.pool))

if mp.server.rpcServer != nil {
// Notify websocket clients about mempool transactions.
mp.server.rpcServer.ntfnMgr.NotifyMempoolTx(tx, isNew)

// Potentially notify any getblocktemplate long poll clients
// about stale block templates due to the new transaction.
mp.server.rpcServer.gbtWorkState.NotifyMempoolTx(mp.LastUpdated())
}

return nil, nil
}

Expand Down Expand Up @@ -1706,17 +1780,17 @@ func (mp *txMemPool) CheckIfTxsExist(hashes []chainhash.Hash) bool {

// newTxMemPool returns a new memory pool for validating and storing standalone
// transactions until they are mined into a block.
func newTxMemPool(server *server) *txMemPool {
func newTxMemPool(cfg *mempoolConfig) *txMemPool {
memPool := &txMemPool{
server: server,
cfg: *cfg,
pool: make(map[chainhash.Hash]*mempoolTxDesc),
orphans: make(map[chainhash.Hash]*dcrutil.Tx),
orphansByPrev: make(map[chainhash.Hash]map[chainhash.Hash]*dcrutil.Tx),
outpoints: make(map[wire.OutPoint]*dcrutil.Tx),
votes: make(map[chainhash.Hash][]*VoteTx),
}

if !cfg.NoAddrIndex {
if cfg.EnableAddrIndex {
memPool.addrindex = make(map[string]map[chainhash.Hash]struct{})
}
return memPool
Expand Down
10 changes: 4 additions & 6 deletions mining.go
Expand Up @@ -703,9 +703,7 @@ func medianAdjustedTime(chainState *chainState,
// valid from the perspective of the mainchain (not necessarily
// the mempool or block) before inserting into a tx tree.
// If it fails the check, it returns false; otherwise true.
func maybeInsertStakeTx(mp *txMemPool, stx *dcrutil.Tx, treeValid bool) bool {
bm := mp.server.blockManager

func maybeInsertStakeTx(bm *blockManager, stx *dcrutil.Tx, treeValid bool) bool {
missingInput := false

txStore, err := bm.FetchTransactionStore(stx, treeValid)
Expand Down Expand Up @@ -1714,7 +1712,7 @@ mempoolLoop:

if isSSGen, _ := stake.IsSSGen(tx); isSSGen {
txCopy := dcrutil.NewTxDeepTxIns(tx.MsgTx())
if maybeInsertStakeTx(mempool, txCopy, treeValid) {
if maybeInsertStakeTx(blockManager, txCopy, treeValid) {
vb := stake.GetSSGenVoteBits(txCopy)
voteBitsVoters = append(voteBitsVoters, vb)
blockTxnsStake = append(blockTxnsStake, txCopy)
Expand Down Expand Up @@ -1820,7 +1818,7 @@ mempoolLoop:
// Quick check for difficulty here.
if tx.MsgTx().TxOut[0].Value >= requiredStakeDifficulty {
txCopy := dcrutil.NewTxDeepTxIns(tx.MsgTx())
if maybeInsertStakeTx(mempool, txCopy, treeValid) {
if maybeInsertStakeTx(blockManager, txCopy, treeValid) {
blockTxnsStake = append(blockTxnsStake, txCopy)
freshStake++
}
Expand All @@ -1843,7 +1841,7 @@ mempoolLoop:
isSSRtx, _ := stake.IsSSRtx(tx)
if tx.Tree() == dcrutil.TxTreeStake && isSSRtx {
txCopy := dcrutil.NewTxDeepTxIns(tx.MsgTx())
if maybeInsertStakeTx(mempool, txCopy, treeValid) {
if maybeInsertStakeTx(blockManager, txCopy, treeValid) {
blockTxnsStake = append(blockTxnsStake, txCopy)
revocations++
}
Expand Down

0 comments on commit d73e576

Please sign in to comment.