Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polygon/p2p: fix issues found during testing #9872

Merged
merged 2 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions polygon/p2p/fetcher_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"context"
"errors"
"fmt"
"reflect"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/cmp"
Expand Down Expand Up @@ -35,24 +35,21 @@ type Fetcher interface {

func NewFetcher(
config FetcherConfig,
logger log.Logger,
messageListener MessageListener,
messageSender MessageSender,
requestIdGenerator RequestIdGenerator,
) Fetcher {
return newFetcher(config, logger, messageListener, messageSender, requestIdGenerator)
return newFetcher(config, messageListener, messageSender, requestIdGenerator)
}

func newFetcher(
config FetcherConfig,
logger log.Logger,
messageListener MessageListener,
messageSender MessageSender,
requestIdGenerator RequestIdGenerator,
) *fetcher {
return &fetcher{
config: config,
logger: logger,
messageListener: messageListener,
messageSender: messageSender,
requestIdGenerator: requestIdGenerator,
Expand All @@ -61,7 +58,6 @@ func newFetcher(

type fetcher struct {
config FetcherConfig
logger log.Logger
messageListener MessageListener
messageSender MessageSender
requestIdGenerator RequestIdGenerator
Expand Down Expand Up @@ -91,14 +87,23 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe
for chunkNum := uint64(0); chunkNum < numChunks; chunkNum++ {
chunkStart := start + chunkNum*eth.MaxHeadersServe
chunkEnd := cmp.Min(end, chunkStart+eth.MaxHeadersServe)
headersChunk, err := fetchWithRetry(f.config, func() ([]*types.Header, error) {
return f.fetchHeaders(ctx, chunkStart, chunkEnd, peerId)
})
if err != nil {
return nil, err
}
for chunkStart < chunkEnd {
// a node may not respond with all MaxHeadersServe in 1 response,
// so we keep on consuming from last received number (akin to consuming a paging api)
// until we have all headers of the chunk or the peer stopped returning headers
headersChunk, err := fetchWithRetry(f.config, func() ([]*types.Header, error) {
return f.fetchHeaders(ctx, chunkStart, chunkEnd, peerId)
})
if err != nil {
return nil, err
}
if len(headersChunk) == 0 {
break
}

headers = append(headers, headersChunk...)
headers = append(headers, headersChunk...)
chunkStart += uint64(len(headersChunk))
}
}

if err := f.validateHeadersResponse(headers, start, amount); err != nil {
Expand Down Expand Up @@ -281,12 +286,6 @@ func (f *fetcher) validateBodies(bodies []*types.Body, headers []*types.Header)
}
}

for _, body := range bodies {
if len(body.Transactions) == 0 && len(body.Withdrawals) == 0 && len(body.Uncles) == 0 {
return ErrEmptyBody
}
}

return nil
}

Expand Down Expand Up @@ -327,7 +326,7 @@ func awaitResponse[TPacket any](
select {
case <-ctx.Done():
var nilPacket TPacket
return nilPacket, fmt.Errorf("await response interrupted: %w", ctx.Err())
return nilPacket, fmt.Errorf("await %v response interrupted: %w", reflect.TypeOf(nilPacket), ctx.Err())
case message := <-messages:
if filter(message) {
continue
Expand Down
71 changes: 28 additions & 43 deletions polygon/p2p/fetcher_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestFetcherFetchHeadersWithChunking(t *testing.T) {
Id: sentry.MessageId_BLOCK_HEADERS_66,
PeerId: peerId.H512(),
// 1024 headers in first response
Data: blockHeadersPacket66Bytes(t, requestId1, mockHeaders[:1025]),
Data: blockHeadersPacket66Bytes(t, requestId1, mockHeaders[:1024]),
},
}
mockRequestResponse1 := requestResponseMock{
Expand All @@ -88,7 +88,7 @@ func TestFetcherFetchHeadersWithChunking(t *testing.T) {
Id: sentry.MessageId_BLOCK_HEADERS_66,
PeerId: peerId.H512(),
// remaining 975 headers in second response
Data: blockHeadersPacket66Bytes(t, requestId2, mockHeaders[1025:]),
Data: blockHeadersPacket66Bytes(t, requestId2, mockHeaders[1024:]),
},
}
mockRequestResponse2 := requestResponseMock{
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestFetcherFetchHeadersResponseTimeoutRetrySuccess(t *testing.T) {
Id: sentry.MessageId_BLOCK_HEADERS_66,
PeerId: peerId.H512(),
// 1024 headers in first response
Data: blockHeadersPacket66Bytes(t, requestId1, mockHeaders[:1025]),
Data: blockHeadersPacket66Bytes(t, requestId1, mockHeaders[:1024]),
},
}
mockRequestResponse1 := requestResponseMock{
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestFetcherFetchHeadersResponseTimeoutRetrySuccess(t *testing.T) {
Id: sentry.MessageId_BLOCK_HEADERS_66,
PeerId: peerId.H512(),
// remaining 975 headers in third response
Data: blockHeadersPacket66Bytes(t, requestId3, mockHeaders[1025:]),
Data: blockHeadersPacket66Bytes(t, requestId3, mockHeaders[1024:]),
},
}
mockRequestResponse3 := requestResponseMock{
Expand Down Expand Up @@ -246,24 +246,39 @@ func TestFetcherFetchHeadersErrIncompleteResponse(t *testing.T) {
t.Parallel()

peerId := PeerIdFromUint64(1)
requestId := uint64(1234)
mockInboundMessages := []*sentry.InboundMessage{
requestId1 := uint64(1234)
requestId2 := uint64(1235)
mockInboundMessages1 := []*sentry.InboundMessage{
{
Id: sentry.MessageId_BLOCK_HEADERS_66,
PeerId: peerId.H512(),
Data: newMockBlockHeadersPacket66Bytes(t, requestId, 2),
Data: newMockBlockHeadersPacket66Bytes(t, requestId1, 2),
},
}
mockRequestResponse := requestResponseMock{
requestId: requestId,
mockResponseInboundMessages: mockInboundMessages,
mockInboundMessages2 := []*sentry.InboundMessage{
{
Id: sentry.MessageId_BLOCK_HEADERS_66,
PeerId: peerId.H512(),
Data: newMockBlockHeadersPacket66Bytes(t, requestId2, 0),
},
}
mockRequestResponse1 := requestResponseMock{
requestId: requestId1,
mockResponseInboundMessages: mockInboundMessages1,
wantRequestPeerId: peerId,
wantRequestOriginNumber: 1,
wantRequestAmount: 3,
}
mockRequestResponse2 := requestResponseMock{
requestId: requestId2,
mockResponseInboundMessages: mockInboundMessages2,
wantRequestPeerId: peerId,
wantRequestOriginNumber: 3,
wantRequestAmount: 1,
}

test := newFetcherTest(t, newMockRequestGenerator(requestId))
test.mockSentryStreams(mockRequestResponse)
test := newFetcherTest(t, newMockRequestGenerator(requestId1, requestId2))
test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2)
test.run(func(ctx context.Context, t *testing.T) {
var errIncompleteHeaders *ErrIncompleteHeaders
headers, err := test.fetcher.FetchHeaders(ctx, 1, 4, peerId)
Expand Down Expand Up @@ -452,36 +467,6 @@ func TestFetcherFetchBodiesResponseTimeoutRetrySuccess(t *testing.T) {
})
}

func TestFetcherFetchBodiesErrEmptyBody(t *testing.T) {
t.Parallel()

peerId := PeerIdFromUint64(1)
requestId := uint64(1234)
mockHeaders := []*types.Header{{Number: big.NewInt(1)}}
mockHashes := []common.Hash{mockHeaders[0].Hash()}
mockInboundMessages := []*sentry.InboundMessage{
{
Id: sentry.MessageId_BLOCK_BODIES_66,
PeerId: peerId.H512(),
Data: newMockBlockBodiesPacketBytes(t, requestId, &types.Body{}),
},
}
mockRequestResponse := requestResponseMock{
requestId: requestId,
mockResponseInboundMessages: mockInboundMessages,
wantRequestPeerId: peerId,
wantRequestHashes: mockHashes,
}

test := newFetcherTest(t, newMockRequestGenerator(requestId))
test.mockSentryStreams(mockRequestResponse)
test.run(func(ctx context.Context, t *testing.T) {
bodies, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId)
require.ErrorIs(t, err, ErrEmptyBody)
require.Nil(t, bodies)
})
}

func TestFetcherFetchBodiesErrMissingBodies(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -525,7 +510,7 @@ func newFetcherTest(t *testing.T, requestIdGenerator RequestIdGenerator) *fetche
messageListenerTest := newMessageListenerTest(t)
messageListener := messageListenerTest.messageListener
messageSender := NewMessageSender(messageListenerTest.sentryClient)
fetcher := newFetcher(fetcherConfig, messageListenerTest.logger, messageListener, messageSender, requestIdGenerator)
fetcher := newFetcher(fetcherConfig, messageListener, messageSender, requestIdGenerator)
return &fetcherTest{
messageListenerTest: messageListenerTest,
fetcher: fetcher,
Expand Down
2 changes: 0 additions & 2 deletions polygon/p2p/fetcher_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/ledgerwatch/erigon/core/types"
)

var ErrEmptyBody = errors.New("empty body")

type ErrInvalidFetchHeadersRange struct {
start uint64
end uint64
Expand Down
8 changes: 6 additions & 2 deletions polygon/p2p/fetcher_penalizing.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (pf *penalizingFetcher) FetchHeaders(ctx context.Context, start uint64, end
func (pf *penalizingFetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) {
bodies, err := pf.Fetcher.FetchBodies(ctx, headers, peerId)
if err != nil {
return nil, pf.maybePenalize(ctx, peerId, err, &ErrTooManyBodies{}, ErrEmptyBody)
return nil, pf.maybePenalize(ctx, peerId, err, &ErrTooManyBodies{})
}

return bodies, nil
Expand All @@ -56,7 +56,11 @@ func (pf *penalizingFetcher) maybePenalize(ctx context.Context, peerId *PeerId,
}

if shouldPenalize {
pf.logger.Debug("penalizing peer - penalize-able fetcher issue", "peerId", peerId, "err", err)
pf.logger.Debug(
"[p2p.penalizing.fetcher] penalizing peer - penalize-able fetcher issue",
"peerId", peerId,
"err", err,
)

if penalizeErr := pf.peerPenalizer.Penalize(ctx, peerId); penalizeErr != nil {
err = fmt.Errorf("%w: %w", penalizeErr, err)
Expand Down
32 changes: 0 additions & 32 deletions polygon/p2p/fetcher_penalizing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,38 +123,6 @@ func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenIncorrectOrigin(t *t
})
}

func TestPenalizingFetcherFetchBodiesShouldPenalizePeerWhenErrEmptyBody(t *testing.T) {
t.Parallel()

peerId := PeerIdFromUint64(1)
requestId := uint64(1234)
headers := []*types.Header{{Number: big.NewInt(1)}}
hashes := []common.Hash{headers[0].Hash()}
mockInboundMessages := []*sentry.InboundMessage{
{
Id: sentry.MessageId_BLOCK_BODIES_66,
PeerId: peerId.H512(),
Data: newMockBlockBodiesPacketBytes(t, requestId, &types.Body{}),
},
}
mockRequestResponse := requestResponseMock{
requestId: requestId,
mockResponseInboundMessages: mockInboundMessages,
wantRequestPeerId: peerId,
wantRequestHashes: hashes,
}

test := newPenalizingFetcherTest(t, newMockRequestGenerator(requestId))
test.mockSentryStreams(mockRequestResponse)
// setup expectation that peer should be penalized
mockExpectPenalizePeer(t, test.sentryClient, peerId)
test.run(func(ctx context.Context, t *testing.T) {
bodies, err := test.penalizingFetcher.FetchBodies(ctx, headers, peerId)
require.ErrorIs(t, err, ErrEmptyBody)
require.Nil(t, bodies)
})
}

func TestPenalizingFetcherFetchBodiesShouldPenalizePeerWhenErrTooManyBodies(t *testing.T) {
t.Parallel()

Expand Down
8 changes: 7 additions & 1 deletion polygon/p2p/message_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type messageListener struct {
}

func (ml *messageListener) Run(ctx context.Context) {
ml.logger.Info(messageListenerLogPrefix("running p2p message listener component"))

backgroundLoops := []func(ctx context.Context){
ml.listenInboundMessages,
ml.listenPeerEvents,
Expand Down Expand Up @@ -238,7 +240,7 @@ func notifyInboundMessageObservers[TPacket any](
var decodedData TPacket
if err := rlp.DecodeBytes(message.Data, &decodedData); err != nil {
if rlp.IsInvalidRLPError(err) {
logger.Debug("penalizing peer - invalid rlp", "peerId", peerId, "err", err)
logger.Debug(messageListenerLogPrefix("penalizing peer - invalid rlp"), "peerId", peerId, "err", err)

if penalizeErr := peerPenalizer.Penalize(ctx, peerId); penalizeErr != nil {
err = fmt.Errorf("%w: %w", penalizeErr, err)
Expand All @@ -262,3 +264,7 @@ func notifyObservers[TMessage any](observers map[uint64]MessageObserver[TMessage
go observer(message)
}
}

func messageListenerLogPrefix(message string) string {
return fmt.Sprintf("[p2p.message.listener] %s", message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be solved by a new logger?

}
29 changes: 23 additions & 6 deletions polygon/p2p/peer_penalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,39 @@ import (
"github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
)

type PeerPenalizer interface {
Penalize(ctx context.Context, peerId *PeerId) error
}

func NewTrackingPeerPenalizer(peerPenalizer PeerPenalizer, peerTracker PeerTracker) PeerPenalizer {
return &trackingPeerPenalizer{
PeerPenalizer: peerPenalizer,
peerTracker: peerTracker,
}
}

type trackingPeerPenalizer struct {
PeerPenalizer
peerTracker PeerTracker
}

func (p *trackingPeerPenalizer) Penalize(ctx context.Context, peerId *PeerId) error {
p.peerTracker.PeerDisconnected(peerId)
return p.PeerPenalizer.Penalize(ctx, peerId)
}

func NewPeerPenalizer(sentryClient direct.SentryClient) PeerPenalizer {
return &peerPenalizer{
sentryClient: sentryClient,
}
}

type PeerPenalizer interface {
Penalize(ctx context.Context, peerId *PeerId) error
}

type peerPenalizer struct {
sentryClient direct.SentryClient
}

func (pp *peerPenalizer) Penalize(ctx context.Context, peerId *PeerId) error {
_, err := pp.sentryClient.PenalizePeer(ctx, &sentry.PenalizePeerRequest{
func (p *peerPenalizer) Penalize(ctx context.Context, peerId *PeerId) error {
_, err := p.sentryClient.PenalizePeer(ctx, &sentry.PenalizePeerRequest{
PeerId: peerId.H512(),
Penalty: sentry.PenaltyKind_Kick,
})
Expand Down
4 changes: 2 additions & 2 deletions polygon/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ func newService(
requestIdGenerator RequestIdGenerator,
) *service {
peerTracker := NewPeerTracker()
peerPenalizer := NewPeerPenalizer(sentryClient)
peerPenalizer := NewTrackingPeerPenalizer(NewPeerPenalizer(sentryClient), peerTracker)
messageListener := NewMessageListener(logger, sentryClient, statusDataFactory, peerPenalizer)
messageListener.RegisterPeerEventObserver(NewPeerEventObserver(peerTracker))
messageSender := NewMessageSender(sentryClient)
fetcher := NewFetcher(fetcherConfig, logger, messageListener, messageSender, requestIdGenerator)
fetcher := NewFetcher(fetcherConfig, messageListener, messageSender, requestIdGenerator)
fetcher = NewPenalizingFetcher(logger, fetcher, peerPenalizer)
fetcher = NewTrackingFetcher(fetcher, peerTracker)
return &service{
Expand Down
Loading
Loading