Skip to content

Commit

Permalink
[TxPool] Prune accounts with nonce holes during high memory usage (#689)
Browse files Browse the repository at this point in the history
* implement background pruning during high slots consumption
  • Loading branch information
dbrajovic committed Aug 29, 2022
1 parent f44ce9f commit 210de9e
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 5 deletions.
10 changes: 10 additions & 0 deletions txpool/slot_gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"github.com/0xPolygon/polygon-edge/types"
)

const (
highPressureMark = 80 // 80%
)

// Gauge for measuring pool capacity in slots
type slotGauge struct {
height uint64 // amount of slots currently occupying the pool
Expand All @@ -27,6 +31,12 @@ func (g *slotGauge) decrease(slots uint64) {
atomic.AddUint64(&g.height, ^(slots - 1))
}

// highPressure checks if the gauge level
// is higher than the 0.8*max threshold
func (g *slotGauge) highPressure() bool {
return g.read() > (highPressureMark*g.max)/100
}

// slotsRequired calculates the number of slots required for given transaction(s).
func slotsRequired(txs ...*types.Transaction) uint64 {
slots := uint64(0)
Expand Down
71 changes: 66 additions & 5 deletions txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"math/big"
"time"

"github.com/golang/protobuf/ptypes/any"
"github.com/hashicorp/go-hclog"
Expand All @@ -26,6 +27,8 @@ const (
// maximum allowed number of times an account
// was excluded from block building (ibft.writeTransactions)
maxAccountDemotions = uint(10)

pruningCooldown = 5000 * time.Millisecond
)

// errors
Expand Down Expand Up @@ -152,6 +155,7 @@ type TxPool struct {
// does dispatching/handling requests.
enqueueReqCh chan enqueueRequest
promoteReqCh chan promoteRequest
pruneCh chan struct{}

// shutdown channel
shutdownCh chan struct{}
Expand Down Expand Up @@ -191,6 +195,12 @@ func NewTxPool(
gauge: slotGauge{height: 0, max: config.MaxSlots},
priceLimit: config.PriceLimit,
sealing: config.Sealing,

// main loop channels
enqueueReqCh: make(chan enqueueRequest),
promoteReqCh: make(chan promoteRequest),
pruneCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
}

// Attach the event manager
Expand All @@ -214,11 +224,6 @@ func NewTxPool(
proto.RegisterTxnPoolOperatorServer(grpcServer, pool)
}

// initialise channels
pool.enqueueReqCh = make(chan enqueueRequest)
pool.promoteReqCh = make(chan promoteRequest)
pool.shutdownCh = make(chan struct{})

return pool, nil
}

Expand All @@ -229,6 +234,23 @@ func (p *TxPool) Start() {
// set default value of txpool pending transactions gauge
p.metrics.PendingTxs.Set(0)

// run the handler for high gauge level pruning
go func() {
for {
select {
case <-p.shutdownCh:
return
case <-p.pruneCh:
p.pruneAccountsWithNonceHoles()
}

// handler is in cooldown to avoid successive calls
// which could be just no-ops
time.Sleep(pruningCooldown)
}
}()

// run the handler for the tx pipeline
go func() {
for {
select {
Expand Down Expand Up @@ -582,6 +604,41 @@ func (p *TxPool) validateTx(tx *types.Transaction) error {
return nil
}

func (p *TxPool) signalPruning() {
select {
case p.pruneCh <- struct{}{}:
default: // pruning handler is active or in cooldown
}
}

func (p *TxPool) pruneAccountsWithNonceHoles() {
p.accounts.Range(
func(_, value interface{}) bool {
account, _ := value.(*account)

account.enqueued.lock(true)
defer account.enqueued.unlock()

firstTx := account.enqueued.peek()

if firstTx == nil {
return true
}

if firstTx.Nonce == account.getNonce() {
return true
}

removed := account.enqueued.clear()

p.index.remove(removed...)
p.gauge.decrease(slotsRequired(removed...))

return true
},
)
}

// addTx is the main entry point to the pool
// for all new transactions. If the call is
// successful, an account is created for this address
Expand All @@ -597,6 +654,10 @@ func (p *TxPool) addTx(origin txOrigin, tx *types.Transaction) error {
return err
}

if p.gauge.highPressure() {
p.signalPruning()
}

// check for overflow
if p.gauge.read()+slotsRequired(tx) > p.gauge.max {
return ErrTxPoolOverflow
Expand Down
124 changes: 124 additions & 0 deletions txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,130 @@ func TestAddTxErrors(t *testing.T) {
})
}

func TestPruneAccountsWithNonceHoles(t *testing.T) {
t.Parallel()

t.Run(
"no enqueued to prune",
func(t *testing.T) {
t.Parallel()

pool, err := newTestPool()
assert.NoError(t, err)
pool.SetSigner(&mockSigner{})

pool.createAccountOnce(addr1)

assert.Equal(t, uint64(0), pool.gauge.read())
assert.Equal(t, uint64(0), pool.accounts.get(addr1).getNonce())
assert.Equal(t, uint64(0), pool.accounts.get(addr1).enqueued.length())

pool.pruneAccountsWithNonceHoles()

assert.Equal(t, uint64(0), pool.gauge.read())
assert.Equal(t, uint64(0), pool.accounts.get(addr1).getNonce())
assert.Equal(t, uint64(0), pool.accounts.get(addr1).enqueued.length())
},
)

t.Run(
"skip valid account",
func(t *testing.T) {
t.Parallel()

pool, err := newTestPool()
assert.NoError(t, err)
pool.SetSigner(&mockSigner{})

// enqueue tx
go func() {
assert.NoError(t,
pool.addTx(local, newTx(addr1, 0, 1)),
)
}()
go pool.handleEnqueueRequest(<-pool.enqueueReqCh)
<-pool.promoteReqCh

assert.Equal(t, uint64(1), pool.gauge.read())
assert.Equal(t, uint64(1), pool.accounts.get(addr1).enqueued.length())

// assert no nonce hole
assert.Equal(t,
pool.accounts.get(addr1).getNonce(),
pool.accounts.get(addr1).enqueued.peek().Nonce,
)

pool.pruneAccountsWithNonceHoles()

assert.Equal(t, uint64(1), pool.gauge.read())
assert.Equal(t, uint64(1), pool.accounts.get(addr1).enqueued.length())
},
)

t.Run(
"prune nonce hole account",
func(t *testing.T) {
t.Parallel()

pool, err := newTestPool()
assert.NoError(t, err)
pool.SetSigner(&mockSigner{})

// enqueue tx
go func() {
assert.NoError(t,
pool.addTx(local, newTx(addr1, 5, 1)),
)
}()
pool.handleEnqueueRequest(<-pool.enqueueReqCh)

assert.Equal(t, uint64(1), pool.gauge.read())
assert.Equal(t, uint64(1), pool.accounts.get(addr1).enqueued.length())

// assert nonce hole
assert.NotEqual(t,
pool.accounts.get(addr1).getNonce(),
pool.accounts.get(addr1).enqueued.peek().Nonce,
)

pool.pruneAccountsWithNonceHoles()

assert.Equal(t, uint64(0), pool.gauge.read())
assert.Equal(t, uint64(0), pool.accounts.get(addr1).enqueued.length())
},
)
}

func TestAddTxHighPressure(t *testing.T) {
t.Parallel()

t.Run(
"pruning handler is signaled",
func(t *testing.T) {
t.Parallel()

pool, err := newTestPool()
assert.NoError(t, err)
pool.SetSigner(&mockSigner{})

// mock high pressure
slots := 1 + (highPressureMark*pool.gauge.max)/100
pool.gauge.increase(slots)

// enqueue tx
go func() {
assert.NoError(t,
pool.addTx(local, newTx(addr1, 0, 1)),
)
}()

// pick up signal
_, ok := <-pool.pruneCh
assert.True(t, ok)
},
)
}

func TestAddGossipTx(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 210de9e

Please sign in to comment.