Skip to content

Commit

Permalink
mempool/mining: Introduce TxSource interface.
Browse files Browse the repository at this point in the history
Upstream commit 2b6a9a5.

NOTE: This is only merging in the upstream changes, so while they are
useful for Decred as well, this merge commit does fully separate the
mempool as upstream does due to the new functionality provided by
Decred.
  • Loading branch information
davecgh committed May 26, 2016
2 parents 0e47daf + 2b6a9a5 commit 9031d85
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 102 deletions.
13 changes: 7 additions & 6 deletions cpuminer.go
Expand Up @@ -60,6 +60,7 @@ var (
type CPUMiner struct {
sync.Mutex
policy *miningPolicy
txSource TxSource
server *server
numWorkers uint32
started bool
Expand Down Expand Up @@ -212,7 +213,7 @@ func (m *CPUMiner) solveBlock(msgBlock *wire.MsgBlock, ticker *time.Ticker,

// Initial state.
lastGenerated := time.Now()
lastTxUpdate := m.server.txMemPool.LastUpdated()
lastTxUpdate := m.txSource.LastUpdated()
hashesCompleted := uint64(0)

// Note that the entire extra nonce range is iterated and the offset is
Expand Down Expand Up @@ -244,9 +245,10 @@ func (m *CPUMiner) solveBlock(msgBlock *wire.MsgBlock, ticker *time.Ticker,
// has been updated since the block template was
// generated and it has been at least 3 seconds,
// or if it's been one minute.
if (lastTxUpdate != m.server.txMemPool.LastUpdated() &&
if (lastTxUpdate != m.txSource.LastUpdated() &&
time.Now().After(lastGenerated.Add(3*time.Second))) ||
time.Now().After(lastGenerated.Add(60*time.Second)) {

return false
}

Expand Down Expand Up @@ -327,8 +329,7 @@ out:
// Create a new block template using the available transactions
// in the memory pool as a source of transactions to potentially
// include in the block.
template, err := NewBlockTemplate(m.policy, m.server.txMemPool,
payToAddr)
template, err := NewBlockTemplate(m.policy, m.server, payToAddr)
m.submitBlockLock.Unlock()
if err != nil {
errStr := fmt.Sprintf("Failed to create new block "+
Expand Down Expand Up @@ -607,8 +608,7 @@ func (m *CPUMiner) GenerateNBlocks(n uint32) ([]*chainhash.Hash, error) {
// Create a new block template using the available transactions
// in the memory pool as a source of transactions to potentially
// include in the block.
template, err := NewBlockTemplate(m.policy, m.server.txMemPool,
payToAddr)
template, err := NewBlockTemplate(m.policy, m.server, payToAddr)
m.submitBlockLock.Unlock()
if err != nil {
errStr := fmt.Sprintf("Failed to create new block "+
Expand Down Expand Up @@ -652,6 +652,7 @@ func (m *CPUMiner) GenerateNBlocks(n uint32) ([]*chainhash.Hash, error) {
func newCPUMiner(policy *miningPolicy, s *server) *CPUMiner {
return &CPUMiner{
policy: policy,
txSource: s.txMemPool,
server: s,
numWorkers: defaultNumWorkers,
updateNumWorkers: make(chan struct{}),
Expand Down
78 changes: 48 additions & 30 deletions mempool.go
Expand Up @@ -74,29 +74,23 @@ const (
maxNullDataOutputs = 4
)

// TxDesc is a descriptor containing a transaction in the mempool and the
// metadata we store about it.
type TxDesc struct {
Tx *dcrutil.Tx // Transaction.
Type stake.TxType // Transcation type.
Added time.Time // Time when added to pool.
Height int64 // Blockheight when added to pool.
Fee int64 // Transaction fees.
StartingPriority float64 // Priority when added to the pool.
}

// GetType returns what TxType a given TxDesc is.
func (td *TxDesc) GetType() stake.TxType {
return td.Type
}

// VoteTx is a struct describing a block vote (SSGen).
type VoteTx struct {
SsgenHash chainhash.Hash // Vote
SstxHash chainhash.Hash // Ticket
Vote bool
}

// mempoolTxDesc is a descriptor containing a transaction in the mempool along
// with additional metadata.
type mempoolTxDesc struct {
miningTxDesc

// StartingPriority is the priority of the transaction when it was added
// to the pool.
StartingPriority float64
}

// txMemPool is used as a source of transactions that need to be mined into
// blocks and relayed to other peers. It is safe for concurrent access from
// multiple peers.
Expand All @@ -106,7 +100,7 @@ type txMemPool struct {

sync.RWMutex
server *server
pool map[chainhash.Hash]*TxDesc
pool map[chainhash.Hash]*mempoolTxDesc
orphans map[chainhash.Hash]*dcrutil.Tx
orphansByPrev map[chainhash.Hash]map[chainhash.Hash]*dcrutil.Tx
addrindex map[string]map[chainhash.Hash]struct{} // maps address to txs
Expand Down Expand Up @@ -358,6 +352,9 @@ func (mp *txMemPool) SortParentsByVotes(currentTopBlock chainhash.Hash,
return sortedBlocks, err
}

// Ensure the txMemPool type implements the mining.TxSource interface.
var _ TxSource = (*txMemPool)(nil)

// removeOrphan is the internal function which implements the public
// RemoveOrphan. See the comment for RemoveOrphan for more details.
//
Expand Down Expand Up @@ -661,12 +658,14 @@ func (mp *txMemPool) addTransaction(txStore blockchain.TxStore, tx *dcrutil.Tx,

// Add the transaction to the pool and mark the referenced outpoints
// as spent by the pool.
mp.pool[*tx.Sha()] = &TxDesc{
Tx: tx,
Type: txType,
Added: time.Now(),
Height: height,
Fee: fee,
mp.pool[*tx.Sha()] = &mempoolTxDesc{
miningTxDesc: miningTxDesc{
Tx: tx,
Type: txType,
Added: time.Now(),
Height: height,
Fee: fee,
},
StartingPriority: calcPriority(tx.MsgTx(), txStore, height),
}
for _, txIn := range tx.MsgTx().TxIn {
Expand Down Expand Up @@ -952,8 +951,8 @@ func (mp *txMemPool) FilterTransactionsByAddress(
if txs, exists := mp.addrindex[addr.EncodeAddress()]; exists {
addressTxs := make([]*dcrutil.Tx, 0, len(txs))
for txHash := range txs {
if tx, exists := mp.pool[txHash]; exists {
addressTxs = append(addressTxs, tx.Tx)
if txD, exists := mp.pool[txHash]; exists {
addressTxs = append(addressTxs, txD.Tx)
}
}
return addressTxs, nil
Expand Down Expand Up @@ -1072,7 +1071,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew,
if txType == stake.TxTypeSSGen {
ssGenAlreadyFound := 0
for _, mpTx := range mp.pool {
if mpTx.GetType() == stake.TxTypeSSGen {
if mpTx.Type == stake.TxTypeSSGen {
if mpTx.Tx.MsgTx().TxIn[1].PreviousOutPoint ==
tx.MsgTx().TxIn[1].PreviousOutPoint {
ssGenAlreadyFound++
Expand All @@ -1090,7 +1089,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew,

if txType == stake.TxTypeSSRtx {
for _, mpTx := range mp.pool {
if mpTx.GetType() == stake.TxTypeSSRtx {
if mpTx.Type == stake.TxTypeSSRtx {
if mpTx.Tx.MsgTx().TxIn[0].PreviousOutPoint ==
tx.MsgTx().TxIn[0].PreviousOutPoint {
str := fmt.Sprintf("transaction %v in the pool "+
Expand Down Expand Up @@ -1645,11 +1644,11 @@ func (mp *txMemPool) TxShas() []*chainhash.Hash {
// The descriptors are to be treated as read only.
//
// This function is safe for concurrent access.
func (mp *txMemPool) TxDescs() []*TxDesc {
func (mp *txMemPool) TxDescs() []*mempoolTxDesc {
mp.RLock()
defer mp.RUnlock()

descs := make([]*TxDesc, len(mp.pool))
descs := make([]*mempoolTxDesc, len(mp.pool))
i := 0
for _, desc := range mp.pool {
descs[i] = desc
Expand All @@ -1659,6 +1658,25 @@ func (mp *txMemPool) TxDescs() []*TxDesc {
return descs
}

// MiningDescs returns a slice of mining descriptors for all the transactions
// in the pool.
//
// This is part of the TxSource interface implementation and is safe for
// concurrent access as required by the interface contract.
func (mp *txMemPool) MiningDescs() []*miningTxDesc {
mp.RLock()
defer mp.RUnlock()

descs := make([]*miningTxDesc, len(mp.pool))
i := 0
for _, desc := range mp.pool {
descs[i] = &desc.miningTxDesc
i++
}

return descs
}

// LastUpdated returns the last time a transaction was added to or removed from
// the main pool. It does not include the orphan pool.
//
Expand Down Expand Up @@ -1691,7 +1709,7 @@ func (mp *txMemPool) CheckIfTxsExist(hashes []chainhash.Hash) bool {
func newTxMemPool(server *server) *txMemPool {
memPool := &txMemPool{
server: server,
pool: make(map[chainhash.Hash]*TxDesc),
pool: make(map[chainhash.Hash]*mempoolTxDesc),
orphans: make(map[chainhash.Hash]*dcrutil.Tx),
orphansByPrev: make(map[chainhash.Hash]map[chainhash.Hash]*dcrutil.Tx),
outpoints: make(map[wire.OutPoint]*dcrutil.Tx),
Expand Down

0 comments on commit 9031d85

Please sign in to comment.