Skip to content

Commit

Permalink
feat(p2p): Extend Head request with RequestOptions and introduce With…
Browse files Browse the repository at this point in the history
…SubjectiveInit opt
  • Loading branch information
renaynay committed Jun 21, 2023
1 parent 7da1572 commit 6f74541
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 16 deletions.
25 changes: 17 additions & 8 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (ex *Exchange[H]) Stop(ctx context.Context) error {
// The Head must be verified thereafter where possible.
// We request in parallel all the trusted peers, compare their response
// and return the highest one.
func (ex *Exchange[H]) Head(ctx context.Context) (H, error) {
func (ex *Exchange[H]) Head(ctx context.Context, reqOpts ...RequestOption) (H, error) {
log.Debug("requesting head")

reqCtx := ctx
Expand All @@ -124,16 +124,25 @@ func (ex *Exchange[H]) Head(ctx context.Context) (H, error) {
defer cancel()
}

reqParams := DefaultRequestParams()
for _, opt := range reqOpts {
opt(&reqParams)
}

peers := ex.peerTracker.GetPeers()
if reqParams.SubjectiveInit || len(peers) < minTrustedHeadResponses {
peers = ex.trustedPeers()
}

var (
zero H
trustedPeers = ex.trustedPeers()
headerRespCh = make(chan H, len(trustedPeers))
headerReq = &p2p_pb.HeaderRequest{
zero H
headerReq = &p2p_pb.HeaderRequest{
Data: &p2p_pb.HeaderRequest_Origin{Origin: uint64(0)},
Amount: 1,
}
headerRespCh = make(chan H, len(peers))
)
for _, from := range trustedPeers {
for _, from := range peers {
go func(from peer.ID) {
headers, err := ex.request(reqCtx, from, headerReq)
if err != nil {
Expand All @@ -146,8 +155,8 @@ func (ex *Exchange[H]) Head(ctx context.Context) (H, error) {
}(from)
}

headers := make([]H, 0, len(trustedPeers))
for range trustedPeers {
headers := make([]H, 0, len(peers))
for range peers {
select {
case h := <-headerRespCh:
if !h.IsZero() {
Expand Down
65 changes: 57 additions & 8 deletions p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package p2p

import (
"context"
"strconv"
"testing"
"time"

Expand All @@ -18,23 +19,71 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/go-libp2p-messenger/serde"

"github.com/celestiaorg/go-header"
"github.com/celestiaorg/go-header/headertest"
p2p_pb "github.com/celestiaorg/go-header/p2p/pb"
"github.com/celestiaorg/go-libp2p-messenger/serde"
)

const networkID = "private"

func TestExchange_RequestHead(t *testing.T) {
hosts := createMocknet(t, 2)
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])
// perform header request
header, err := exchg.Head(context.Background())
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

hosts := createMocknet(t, 3)
exchg, trustedStore := createP2PExAndServer(t, hosts[0], hosts[1])

// create new server-side exchange that will act as the tracked peer
// it will have a higher chain head than the trusted peer so that the
// test can determine which peer was asked
trackedStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10)
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](hosts[2], trackedStore,
WithNetworkID[ServerParameters](networkID),
)
require.NoError(t, err)
err = serverSideEx.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() {
err = serverSideEx.Stop(ctx)
require.NoError(t, err)
})

var tests = []struct {
withSubjInit bool
expectedHeight int64
expectedHash header.Hash
}{
// routes to trusted peer only
{
withSubjInit: true,
expectedHeight: trustedStore.HeadHeight,
expectedHash: trustedStore.Headers[trustedStore.HeadHeight].Hash(),
},
// routes to tracked peers and takes highest chain head
{
withSubjInit: false,
expectedHeight: trackedStore.HeadHeight,
expectedHash: trackedStore.Headers[trackedStore.HeadHeight].Hash(),
},
}

for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
var opts []RequestOption
if tt.withSubjInit {
opts = append(opts, WithSubjectiveInit)
}

assert.Equal(t, store.Headers[store.HeadHeight].Height(), header.Height())
assert.Equal(t, store.Headers[store.HeadHeight].Hash(), header.Hash())
header, err := exchg.Head(ctx, opts...)
require.NoError(t, err)

assert.Equal(t, tt.expectedHeight, header.Height())
assert.Equal(t, tt.expectedHash, header.Hash())

})
}
}

func TestExchange_RequestHead_UnresponsivePeer(t *testing.T) {
Expand All @@ -50,7 +99,7 @@ func TestExchange_RequestHead_UnresponsivePeer(t *testing.T) {
goodStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 5)
_ = server(ctx, t, hosts[1], goodStore)

badStore := &timedOutStore{timeout: time.Millisecond*500} // simulates peer that does not respond
badStore := &timedOutStore{timeout: time.Millisecond * 500} // simulates peer that does not respond
_ = server(ctx, t, hosts[2], badStore)

ctx, cancel = context.WithTimeout(ctx, time.Millisecond*500)
Expand Down
22 changes: 22 additions & 0 deletions p2p/opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package p2p

type RequestOption func(opts *RequestParams)

// RequestParams contains options to be used for header Exchange
// requests.
type RequestParams struct {
// SubjectiveInit determines whether the Exchange should use
// trusted peers for its Head request (true = yes).
SubjectiveInit bool
}

func DefaultRequestParams() RequestParams {
return RequestParams{
SubjectiveInit: false,
}
}

// WithSubjectiveInit TODO
func WithSubjectiveInit(opts *RequestParams) {
opts.SubjectiveInit = true
}
10 changes: 10 additions & 0 deletions p2p/peer_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ func (p *peerTracker) track() {
}
}

func (p *peerTracker) GetPeers() []peer.ID {
p.peerLk.RLock()
peers := make([]peer.ID, 0, len(p.trackedPeers))
for peerID := range p.trackedPeers {
peers = append(peers, peerID)
}
p.peerLk.RUnlock()
return peers
}

func (p *peerTracker) connected(pID peer.ID) {
if p.host.ID() == pID {
return
Expand Down

0 comments on commit 6f74541

Please sign in to comment.