Skip to content

Commit

Permalink
P2p sdk pull gossip (#861)
Browse files Browse the repository at this point in the history
* Add SDK Router message handling (#316)

Co-authored-by: Stephen Buttolph <stephen@avalabs.org>

* Fix hanging requests after Shutdown (#326)

* fix requests hanging after shutdown

* fix build

---------

Signed-off-by: Stephen Buttolph <stephen@avalabs.org>
Co-authored-by: Stephen Buttolph <stephen@avalabs.org>

* Update to 1.10.10-rc.2 (#328)

* update to avalanchego 1.10.10-rc.2

* nits

* nit

* Add P2P SDK Pull Gossip (#318)

* add batchsize

* sync changes

* Drop outbound gossip for non vdrs (#862)

* Drop outbound gossip requests for non-validators (#334)

* drop outbound gossip requests for non validators

* nit

* nit

* sync changes

---------

Co-authored-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com>

---------

Co-authored-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com>
  • Loading branch information
ceyonur and joshua-kim committed Sep 19, 2023
1 parent 3317f7b commit d91407f
Show file tree
Hide file tree
Showing 8 changed files with 364 additions and 9 deletions.
15 changes: 15 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,21 @@ func (pool *TxPool) PendingFrom(addrs []common.Address, enforceTips bool) map[co
return pending
}

// IteratePending iterates over [pool.pending] until [f] returns false.
// The caller must not modify [tx].
func (pool *TxPool) IteratePending(f func(tx *types.Transaction) bool) {
pool.mu.RLock()
defer pool.mu.RUnlock()

for _, list := range pool.pending {
for _, tx := range list.txs.items {
if !f(tx) {
return
}
}
}
}

// Locals retrieves the accounts currently considered local by the pool.
func (pool *TxPool) Locals() []common.Address {
pool.mu.Lock()
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
github.com/VictoriaMetrics/fastcache v1.10.0
github.com/ava-labs/avalanche-network-runner v1.7.2-0.20230825150237-723bc7b31724
github.com/ava-labs/avalanchego v1.10.10-rc.2
github.com/ava-labs/avalanchego v1.10.10-rc.4
github.com/cespare/cp v0.1.0
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
github.com/davecgh/go-spew v1.1.1
Expand Down Expand Up @@ -48,6 +48,7 @@ require (
golang.org/x/sys v0.8.0
golang.org/x/text v0.8.0
golang.org/x/time v0.1.0
google.golang.org/protobuf v1.30.0
)

require (
Expand Down Expand Up @@ -148,7 +149,6 @@ require (
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
google.golang.org/grpc v1.56.0-dev // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/avalanche-network-runner v1.7.2-0.20230825150237-723bc7b31724 h1:ptqFgQtJ5DyLb2lvuvawLJNlvo1A1qv+JXYTneNeg14=
github.com/ava-labs/avalanche-network-runner v1.7.2-0.20230825150237-723bc7b31724/go.mod h1:euKHwZ77sGvGfhVj4v9WPM4jD2b5N80ldE2XHqO7lwA=
github.com/ava-labs/avalanchego v1.10.10-rc.2 h1:nlHc1JwKb5TEc9oqPU2exvOpazhxr11N2ym/LzYxv4k=
github.com/ava-labs/avalanchego v1.10.10-rc.2/go.mod h1:BN97sZppDSvIMIfEjrLTjdPTFkGLkb0ISJHEcoxMMNk=
github.com/ava-labs/avalanchego v1.10.10-rc.4 h1:1oxQf1boQDliJspfGBqsYsqg91d4F3qiFTnwnp+EruY=
github.com/ava-labs/avalanchego v1.10.10-rc.4/go.mod h1:BN97sZppDSvIMIfEjrLTjdPTFkGLkb0ISJHEcoxMMNk=
github.com/ava-labs/coreth v0.12.5-rc.3 h1:cpmC+fSZMsO4gaFWqXHzAHrJACf05u5HPAYmwh7nmkU=
github.com/ava-labs/coreth v0.12.5-rc.3/go.mod h1:HI+jTIflnDFBd0bledgkgid1Uurwr8q1h7zb3LsFsSo=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down
117 changes: 117 additions & 0 deletions plugin/evm/gossip_mempool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm

import (
"context"
"fmt"
"sync"

"github.com/ava-labs/avalanchego/ids"
"github.com/ethereum/go-ethereum/log"

"github.com/ava-labs/avalanchego/network/p2p/gossip"

"github.com/ava-labs/subnet-evm/core"
"github.com/ava-labs/subnet-evm/core/txpool"
"github.com/ava-labs/subnet-evm/core/types"
)

var (
_ gossip.Gossipable = (*GossipTx)(nil)
_ gossip.Set[*GossipTx] = (*GossipTxPool)(nil)
)

func NewGossipTxPool(mempool *txpool.TxPool) (*GossipTxPool, error) {
bloom, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate)
if err != nil {
return nil, fmt.Errorf("failed to initialize bloom filter: %w", err)
}

return &GossipTxPool{
mempool: mempool,
pendingTxs: make(chan core.NewTxsEvent),
bloom: bloom,
}, nil
}

type GossipTxPool struct {
mempool *txpool.TxPool
pendingTxs chan core.NewTxsEvent

bloom *gossip.BloomFilter
lock sync.RWMutex
}

func (g *GossipTxPool) Subscribe(ctx context.Context) {
g.mempool.SubscribeNewTxsEvent(g.pendingTxs)

for {
select {
case <-ctx.Done():
log.Debug("shutting down subscription")
return
case pendingTxs := <-g.pendingTxs:
g.lock.Lock()
for _, pendingTx := range pendingTxs.Txs {
tx := &GossipTx{Tx: pendingTx}
g.bloom.Add(tx)
reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, txGossipMaxFalsePositiveRate)
if err != nil {
log.Error("failed to reset bloom filter", "err", err)
continue
}

if reset {
log.Debug("resetting bloom filter", "reason", "reached max filled ratio")

g.mempool.IteratePending(func(tx *types.Transaction) bool {
g.bloom.Add(&GossipTx{Tx: pendingTx})
return true
})
}
}
g.lock.Unlock()
}
}
}

// Add enqueues the transaction to the mempool. Subscribe should be called
// to receive an event if tx is actually added to the mempool or not.
func (g *GossipTxPool) Add(tx *GossipTx) error {
return g.mempool.AddRemotes([]*types.Transaction{tx.Tx})[0]
}

func (g *GossipTxPool) Iterate(f func(tx *GossipTx) bool) {
g.mempool.IteratePending(func(tx *types.Transaction) bool {
return f(&GossipTx{Tx: tx})
})
}

func (g *GossipTxPool) GetFilter() ([]byte, []byte, error) {
g.lock.RLock()
defer g.lock.RUnlock()

bloom, err := g.bloom.Bloom.MarshalBinary()
salt := g.bloom.Salt

return bloom, salt[:], err
}

type GossipTx struct {
Tx *types.Transaction
}

func (tx *GossipTx) GetID() ids.ID {
return ids.ID(tx.Tx.Hash())
}

func (tx *GossipTx) Marshal() ([]byte, error) {
return tx.Tx.MarshalBinary()
}

func (tx *GossipTx) Unmarshal(bytes []byte) error {
tx.Tx = &types.Transaction{}
return tx.Tx.UnmarshalBinary(bytes)
}
File renamed without changes.
132 changes: 132 additions & 0 deletions plugin/evm/tx_gossip_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm

import (
"context"
"math/big"
"sync"
"testing"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/network/p2p/gossip"
"github.com/ava-labs/avalanchego/proto/pb/sdk"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"go.uber.org/mock/gomock"

"google.golang.org/protobuf/proto"

"github.com/ava-labs/subnet-evm/core/types"
)

func TestTxGossip(t *testing.T) {
require := require.New(t)

// set up prefunded address
_, vm, _, sender := GenesisVM(t, true, genesisJSONLatest, "", "")
defer func() {
require.NoError(vm.Shutdown(context.Background()))
}()

// sender for the peer requesting gossip from [vm]
ctrl := gomock.NewController(t)
peerSender := common.NewMockSender(ctrl)
router := p2p.NewRouter(logging.NoLog{}, peerSender, prometheus.NewRegistry(), "")

// we're only making client requests, so we don't need a server handler
client, err := router.RegisterAppProtocol(txGossipProtocol, nil, nil)
require.NoError(err)

emptyBloomFilter, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate)
require.NoError(err)
emptyBloomFilterBytes, err := emptyBloomFilter.Bloom.MarshalBinary()
require.NoError(err)
request := &sdk.PullGossipRequest{
Filter: emptyBloomFilterBytes,
Salt: utils.RandomBytes(32),
}

requestBytes, err := proto.Marshal(request)
require.NoError(err)

wg := &sync.WaitGroup{}

requestingNodeID := ids.GenerateTestNodeID()
peerSender.EXPECT().SendAppRequest(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) {
go func() {
require.NoError(vm.AppRequest(ctx, requestingNodeID, requestID, time.Time{}, appRequestBytes))
}()
}).AnyTimes()

sender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error {
go func() {
require.NoError(router.AppResponse(ctx, nodeID, requestID, appResponseBytes))
}()
return nil
}

// we only accept gossip requests from validators
mockValidatorSet, ok := vm.ctx.ValidatorState.(*validators.TestState)
require.True(ok)
mockValidatorSet.GetCurrentHeightF = func(context.Context) (uint64, error) {
return 0, nil
}
mockValidatorSet.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) {
return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil
}

// Ask the VM for any new transactions. We should get nothing at first.
wg.Add(1)
onResponse := func(_ context.Context, nodeID ids.NodeID, responseBytes []byte, err error) {
require.NoError(err)

response := &sdk.PullGossipResponse{}
require.NoError(proto.Unmarshal(responseBytes, response))
require.Empty(response.Gossip)
wg.Done()
}
require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse))
wg.Wait()

// Issue a tx to the VM
address := testEthAddrs[0]
key := testKeys[0]
tx := types.NewTransaction(0, address, big.NewInt(10), 21000, big.NewInt(testMinGasPrice), nil)
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(vm.chainConfig.ChainID), key)
require.NoError(err)

errs := vm.txPool.AddLocals([]*types.Transaction{signedTx})
require.Len(errs, 1)
require.Nil(errs[0])

// wait so we aren't throttled by the vm
time.Sleep(5 * time.Second)

// Ask the VM for new transactions. We should get the newly issued tx.
wg.Add(1)
onResponse = func(_ context.Context, nodeID ids.NodeID, responseBytes []byte, err error) {
require.NoError(err)

response := &sdk.PullGossipResponse{}
require.NoError(proto.Unmarshal(responseBytes, response))
require.Len(response.Gossip, 1)

gotTx := &GossipTx{}
require.NoError(gotTx.Unmarshal(response.Gossip[0]))
require.Equal(signedTx.Hash(), gotTx.Tx.Hash())

wg.Done()
}
require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse))
wg.Wait()
}

0 comments on commit d91407f

Please sign in to comment.