Skip to content

Commit

Permalink
Pool: return lost error, fix race of 2 sentries, set high limits to s…
Browse files Browse the repository at this point in the history
…ee worst case (#83)
  • Loading branch information
AskAlexSharov committed Sep 20, 2021
1 parent f2549ad commit c40a022
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 16 deletions.
31 changes: 22 additions & 9 deletions txpool/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,23 @@ func (f *Fetch) SetWaitGroup(wg *sync.WaitGroup) {

// ConnectSentries initialises connection to the sentry
func (f *Fetch) ConnectSentries() {
for i := range f.sentryClients {
go func(i int) {
f.receiveMessageLoop(f.sentryClients[i])
}(i)
go func(i int) {
f.receivePeerLoop(f.sentryClients[i])
}(i)
}
//TODO: fix race in parse ctx - 2 sentries causing it
go func(i int) {
f.receiveMessageLoop(f.sentryClients[i])
}(0)
go func(i int) {
f.receivePeerLoop(f.sentryClients[i])
}(0)
/*
for i := range f.sentryClients {
go func(i int) {
f.receiveMessageLoop(f.sentryClients[i])
}(i)
go func(i int) {
f.receivePeerLoop(f.sentryClients[i])
}(i)
}
*/
}
func (f *Fetch) ConnectCore() {
go func() {
Expand Down Expand Up @@ -303,7 +312,11 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
case sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_POOLED_TRANSACTIONS_66:
txs := TxSlots{}
f.pooledTxsParseCtx.Reject(func(hash []byte) error {
if known, _ := f.pool.IdHashKnown(tx, hash); known {
known, err := f.pool.IdHashKnown(tx, hash)
if err != nil {
return err
}
if known {
return ErrRejected
}
return nil
Expand Down
19 changes: 12 additions & 7 deletions txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ var DefaultConfig = Config{
LogEvery: 30 * time.Second,
CacheEvictEvery: 1 * time.Minute,

PendingSubPoolLimit: 50_000,
PendingSubPoolLimit: 200_000,
BaseFeeSubPoolLimit: 200_000,
QueuedSubPoolLimit: 90_000,
QueuedSubPoolLimit: 200_000,
}

// Pool is interface for the transaction pool
Expand Down Expand Up @@ -1077,13 +1077,15 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) {
}
copy(v[20:], metaTx.Tx.rlp)

has, _ := tx.Has(kv.PoolTransaction, []byte(txHash))
if has {
panic("must not happen")
}
if err := tx.Put(kv.PoolTransaction, []byte(txHash), v); err != nil {
has, err := tx.Has(kv.PoolTransaction, []byte(txHash))
if err != nil {
return err
}
if !has {
if err := tx.Put(kv.PoolTransaction, []byte(txHash), v); err != nil {
return err
}
}
metaTx.Tx.rlp = nil
}

Expand Down Expand Up @@ -1247,6 +1249,9 @@ func (p *TxPool) printDebug(prefix string) {
}
}
func (p *TxPool) logStats() {
if !p.started.Load() {
log.Info("[txpool] Not started yet, waiting for new blocks...")
}
//protocolBaseFee, currentBaseFee := p.protocolBaseFee.Load(), p.currentBaseFee.Load()

p.lock.RLock()
Expand Down

0 comments on commit c40a022

Please sign in to comment.