Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mempool/mining: Introduce TxSource interface. #225

Merged
merged 2 commits into from
May 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 7 additions & 6 deletions cpuminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var (
type CPUMiner struct {
sync.Mutex
policy *miningPolicy
txSource TxSource
server *server
numWorkers uint32
started bool
Expand Down Expand Up @@ -212,7 +213,7 @@ func (m *CPUMiner) solveBlock(msgBlock *wire.MsgBlock, ticker *time.Ticker,

// Initial state.
lastGenerated := time.Now()
lastTxUpdate := m.server.txMemPool.LastUpdated()
lastTxUpdate := m.txSource.LastUpdated()
hashesCompleted := uint64(0)

// Note that the entire extra nonce range is iterated and the offset is
Expand Down Expand Up @@ -244,9 +245,10 @@ func (m *CPUMiner) solveBlock(msgBlock *wire.MsgBlock, ticker *time.Ticker,
// has been updated since the block template was
// generated and it has been at least 3 seconds,
// or if it's been one minute.
if (lastTxUpdate != m.server.txMemPool.LastUpdated() &&
if (lastTxUpdate != m.txSource.LastUpdated() &&
time.Now().After(lastGenerated.Add(3*time.Second))) ||
time.Now().After(lastGenerated.Add(60*time.Second)) {

return false
}

Expand Down Expand Up @@ -327,8 +329,7 @@ out:
// Create a new block template using the available transactions
// in the memory pool as a source of transactions to potentially
// include in the block.
template, err := NewBlockTemplate(m.policy, m.server.txMemPool,
payToAddr)
template, err := NewBlockTemplate(m.policy, m.server, payToAddr)
m.submitBlockLock.Unlock()
if err != nil {
errStr := fmt.Sprintf("Failed to create new block "+
Expand Down Expand Up @@ -607,8 +608,7 @@ func (m *CPUMiner) GenerateNBlocks(n uint32) ([]*chainhash.Hash, error) {
// Create a new block template using the available transactions
// in the memory pool as a source of transactions to potentially
// include in the block.
template, err := NewBlockTemplate(m.policy, m.server.txMemPool,
payToAddr)
template, err := NewBlockTemplate(m.policy, m.server, payToAddr)
m.submitBlockLock.Unlock()
if err != nil {
errStr := fmt.Sprintf("Failed to create new block "+
Expand Down Expand Up @@ -652,6 +652,7 @@ func (m *CPUMiner) GenerateNBlocks(n uint32) ([]*chainhash.Hash, error) {
func newCPUMiner(policy *miningPolicy, s *server) *CPUMiner {
return &CPUMiner{
policy: policy,
txSource: s.txMemPool,
server: s,
numWorkers: defaultNumWorkers,
updateNumWorkers: make(chan struct{}),
Expand Down
78 changes: 48 additions & 30 deletions mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,29 +74,23 @@ const (
maxNullDataOutputs = 4
)

// TxDesc is a descriptor containing a transaction in the mempool and the
// metadata we store about it.
type TxDesc struct {
Tx *dcrutil.Tx // Transaction.
Type stake.TxType // Transcation type.
Added time.Time // Time when added to pool.
Height int64 // Blockheight when added to pool.
Fee int64 // Transaction fees.
StartingPriority float64 // Priority when added to the pool.
}

// GetType returns what TxType a given TxDesc is.
func (td *TxDesc) GetType() stake.TxType {
return td.Type
}

// VoteTx is a struct describing a block vote (SSGen).
type VoteTx struct {
SsgenHash chainhash.Hash // Vote
SstxHash chainhash.Hash // Ticket
Vote bool
}

// mempoolTxDesc is a descriptor containing a transaction in the mempool along
// with additional metadata.
type mempoolTxDesc struct {
miningTxDesc

// StartingPriority is the priority of the transaction when it was added
// to the pool.
StartingPriority float64
}

// txMemPool is used as a source of transactions that need to be mined into
// blocks and relayed to other peers. It is safe for concurrent access from
// multiple peers.
Expand All @@ -106,7 +100,7 @@ type txMemPool struct {

sync.RWMutex
server *server
pool map[chainhash.Hash]*TxDesc
pool map[chainhash.Hash]*mempoolTxDesc
orphans map[chainhash.Hash]*dcrutil.Tx
orphansByPrev map[chainhash.Hash]map[chainhash.Hash]*dcrutil.Tx
addrindex map[string]map[chainhash.Hash]struct{} // maps address to txs
Expand Down Expand Up @@ -358,6 +352,9 @@ func (mp *txMemPool) SortParentsByVotes(currentTopBlock chainhash.Hash,
return sortedBlocks, err
}

// Ensure the txMemPool type implements the mining.TxSource interface.
var _ TxSource = (*txMemPool)(nil)

// removeOrphan is the internal function which implements the public
// RemoveOrphan. See the comment for RemoveOrphan for more details.
//
Expand Down Expand Up @@ -661,12 +658,14 @@ func (mp *txMemPool) addTransaction(txStore blockchain.TxStore, tx *dcrutil.Tx,

// Add the transaction to the pool and mark the referenced outpoints
// as spent by the pool.
mp.pool[*tx.Sha()] = &TxDesc{
Tx: tx,
Type: txType,
Added: time.Now(),
Height: height,
Fee: fee,
mp.pool[*tx.Sha()] = &mempoolTxDesc{
miningTxDesc: miningTxDesc{
Tx: tx,
Type: txType,
Added: time.Now(),
Height: height,
Fee: fee,
},
StartingPriority: calcPriority(tx.MsgTx(), txStore, height),
}
for _, txIn := range tx.MsgTx().TxIn {
Expand Down Expand Up @@ -952,8 +951,8 @@ func (mp *txMemPool) FilterTransactionsByAddress(
if txs, exists := mp.addrindex[addr.EncodeAddress()]; exists {
addressTxs := make([]*dcrutil.Tx, 0, len(txs))
for txHash := range txs {
if tx, exists := mp.pool[txHash]; exists {
addressTxs = append(addressTxs, tx.Tx)
if txD, exists := mp.pool[txHash]; exists {
addressTxs = append(addressTxs, txD.Tx)
}
}
return addressTxs, nil
Expand Down Expand Up @@ -1072,7 +1071,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew,
if txType == stake.TxTypeSSGen {
ssGenAlreadyFound := 0
for _, mpTx := range mp.pool {
if mpTx.GetType() == stake.TxTypeSSGen {
if mpTx.Type == stake.TxTypeSSGen {
if mpTx.Tx.MsgTx().TxIn[1].PreviousOutPoint ==
tx.MsgTx().TxIn[1].PreviousOutPoint {
ssGenAlreadyFound++
Expand All @@ -1090,7 +1089,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew,

if txType == stake.TxTypeSSRtx {
for _, mpTx := range mp.pool {
if mpTx.GetType() == stake.TxTypeSSRtx {
if mpTx.Type == stake.TxTypeSSRtx {
if mpTx.Tx.MsgTx().TxIn[0].PreviousOutPoint ==
tx.MsgTx().TxIn[0].PreviousOutPoint {
str := fmt.Sprintf("transaction %v in the pool "+
Expand Down Expand Up @@ -1645,11 +1644,11 @@ func (mp *txMemPool) TxShas() []*chainhash.Hash {
// The descriptors are to be treated as read only.
//
// This function is safe for concurrent access.
func (mp *txMemPool) TxDescs() []*TxDesc {
func (mp *txMemPool) TxDescs() []*mempoolTxDesc {
mp.RLock()
defer mp.RUnlock()

descs := make([]*TxDesc, len(mp.pool))
descs := make([]*mempoolTxDesc, len(mp.pool))
i := 0
for _, desc := range mp.pool {
descs[i] = desc
Expand All @@ -1659,6 +1658,25 @@ func (mp *txMemPool) TxDescs() []*TxDesc {
return descs
}

// MiningDescs returns a slice of mining descriptors for all the transactions
// in the pool.
//
// This is part of the TxSource interface implementation and is safe for
// concurrent access as required by the interface contract.
func (mp *txMemPool) MiningDescs() []*miningTxDesc {
mp.RLock()
defer mp.RUnlock()

descs := make([]*miningTxDesc, len(mp.pool))
i := 0
for _, desc := range mp.pool {
descs[i] = &desc.miningTxDesc
i++
}

return descs
}

// LastUpdated returns the last time a transaction was added to or removed from
// the main pool. It does not include the orphan pool.
//
Expand Down Expand Up @@ -1691,7 +1709,7 @@ func (mp *txMemPool) CheckIfTxsExist(hashes []chainhash.Hash) bool {
func newTxMemPool(server *server) *txMemPool {
memPool := &txMemPool{
server: server,
pool: make(map[chainhash.Hash]*TxDesc),
pool: make(map[chainhash.Hash]*mempoolTxDesc),
orphans: make(map[chainhash.Hash]*dcrutil.Tx),
orphansByPrev: make(map[chainhash.Hash]map[chainhash.Hash]*dcrutil.Tx),
outpoints: make(map[wire.OutPoint]*dcrutil.Tx),
Expand Down