Skip to content

Commit

Permalink
make mempool threadsafe
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com>
  • Loading branch information
joshua-kim committed Dec 19, 2023
1 parent c8866e6 commit 5331e87
Showing 1 changed file with 42 additions and 15 deletions.
57 changes: 42 additions & 15 deletions vms/platformvm/txs/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package mempool
import (
"errors"
"fmt"
"sync"

"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -72,19 +73,18 @@ type Mempool interface {
// Transactions from clients that have not yet been put into blocks and added to
// consensus
type mempool struct {
toEngine chan<- common.Message

bytesAvailableMetric prometheus.Gauge
bytesAvailable int
numTxs prometheus.Gauge

lock sync.Mutex
unissuedTxs linkedhashmap.LinkedHashmap[ids.ID, *txs.Tx]
numTxs prometheus.Gauge

// Key: Tx ID
// Value: Verification error
droppedTxIDs *cache.LRU[ids.ID, error]

consumedUTXOs set.Set[ids.ID]

toEngine chan<- common.Message
consumedUTXOs set.Set[ids.ID]
droppedTxIDs *cache.LRU[ids.ID, error]
bytesAvailable int
}

func New(
Expand Down Expand Up @@ -112,19 +112,20 @@ func New(

bytesAvailableMetric.Set(maxMempoolSize)
return &mempool{
toEngine: toEngine,
bytesAvailableMetric: bytesAvailableMetric,
numTxs: numTxs,
unissuedTxs: linkedhashmap.New[ids.ID, *txs.Tx](),
consumedUTXOs: set.NewSet[ids.ID](initialConsumedUTXOsSize),
droppedTxIDs: &cache.LRU[ids.ID, error]{Size: droppedTxIDsCacheSize},
bytesAvailable: maxMempoolSize,

unissuedTxs: linkedhashmap.New[ids.ID, *txs.Tx](),
numTxs: numTxs,

droppedTxIDs: &cache.LRU[ids.ID, error]{Size: droppedTxIDsCacheSize},
consumedUTXOs: set.NewSet[ids.ID](initialConsumedUTXOsSize),
toEngine: toEngine,
}, nil
}

func (m *mempool) Iterate(f func(tx *txs.Tx) bool) {
m.lock.Lock()
defer m.lock.Unlock()

itr := m.unissuedTxs.NewIterator()
for itr.Next() {
if !f(itr.Value()) {
Expand All @@ -134,6 +135,9 @@ func (m *mempool) Iterate(f func(tx *txs.Tx) bool) {
}

func (m *mempool) Add(tx *txs.Tx) error {
m.lock.Lock()
defer m.lock.Unlock()

switch tx.Unsigned.(type) {
case *txs.AdvanceTimeTx:
return errCantIssueAdvanceTimeTx
Expand Down Expand Up @@ -186,15 +190,24 @@ func (m *mempool) Add(tx *txs.Tx) error {
}

func (m *mempool) Has(txID ids.ID) bool {
m.lock.Lock()
defer m.lock.Unlock()

return m.Get(txID) != nil
}

func (m *mempool) Get(txID ids.ID) *txs.Tx {
m.lock.Lock()
defer m.lock.Unlock()

tx, _ := m.unissuedTxs.Get(txID)
return tx
}

func (m *mempool) Remove(txsToRemove []*txs.Tx) {
m.lock.Lock()
defer m.lock.Unlock()

for _, tx := range txsToRemove {
txID := tx.ID()
if !m.unissuedTxs.Delete(txID) {
Expand All @@ -211,24 +224,38 @@ func (m *mempool) Remove(txsToRemove []*txs.Tx) {
}

func (m *mempool) Peek() (*txs.Tx, bool) {
m.lock.Lock()
defer m.lock.Unlock()

_, tx, exists := m.unissuedTxs.Oldest()
return tx, exists
}

func (m *mempool) MarkDropped(txID ids.ID, reason error) {
m.lock.Lock()
defer m.lock.Unlock()

m.droppedTxIDs.Put(txID, reason)
}

func (m *mempool) GetDropReason(txID ids.ID) error {
m.lock.Lock()
defer m.lock.Unlock()

err, _ := m.droppedTxIDs.Get(txID)
return err
}

func (m *mempool) RequestBuildBlock(emptyBlockPermitted bool) {
m.lock.Lock()

if !emptyBlockPermitted && m.unissuedTxs.Len() == 0 {
m.lock.Unlock()
return
}

m.lock.Unlock()

select {
case m.toEngine <- common.PendingTxs:
default:
Expand Down

0 comments on commit 5331e87

Please sign in to comment.