Skip to content

Commit

Permalink
feat(share/getter): add support for ErrNotFound in shrex getter imple…
Browse files Browse the repository at this point in the history
…mentation (#2074)

## Overview

Resolves #2037
  • Loading branch information
walldiss committed Apr 14, 2023
1 parent 42f6044 commit d375373
Show file tree
Hide file tree
Showing 15 changed files with 312 additions and 96 deletions.
3 changes: 2 additions & 1 deletion share/eds/inverted_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func (m *mockIterator) ForEach(f func(mh multihash.Multihash) error) error {
return nil
}

// TestMultihashesForShard ensures that the inverted index correctly stores a single shard key per duplicate multihash
// TestMultihashesForShard ensures that the inverted index correctly stores a single shard key per
// duplicate multihash
func TestMultihashesForShard(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
Expand Down
3 changes: 0 additions & 3 deletions share/eds/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package eds

import (
"context"
"fmt"
"os"
"testing"
"time"
Expand Down Expand Up @@ -71,8 +70,6 @@ func TestEDSStore(t *testing.T) {
r, err := edsStore.GetCAR(ctx, dah.Hash())
assert.NoError(t, err)
carReader, err := car.NewCarReader(r)

fmt.Println(car.HeaderSize(carReader.Header))
assert.NoError(t, err)

for i := 0; i < 4; i++ {
Expand Down
24 changes: 16 additions & 8 deletions share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,16 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex
reqCtx, cancel := ctxWithSplitTimeout(ctx, sg.minAttemptsCount-attempt+1, sg.minRequestTimeout)
eds, getErr := sg.edsClient.RequestEDS(reqCtx, root.Hash(), peer)
cancel()
switch getErr {
case nil:
switch {
case getErr == nil:
setStatus(peers.ResultSynced)
return eds, nil
case context.DeadlineExceeded:
case p2p.ErrInvalidResponse:
case errors.Is(getErr, context.DeadlineExceeded),
errors.Is(getErr, context.Canceled):
case errors.Is(getErr, p2p.ErrNotFound):
getErr = share.ErrNotFound
setStatus(peers.ResultCooldownPeer)
case errors.Is(getErr, p2p.ErrInvalidResponse):
setStatus(peers.ResultBlacklistPeer)
default:
setStatus(peers.ResultCooldownPeer)
Expand Down Expand Up @@ -132,12 +136,16 @@ func (sg *ShrexGetter) GetSharesByNamespace(
reqCtx, cancel := ctxWithSplitTimeout(ctx, sg.minAttemptsCount-attempt+1, sg.minRequestTimeout)
nd, getErr := sg.ndClient.RequestND(reqCtx, root, id, peer)
cancel()
switch getErr {
case nil:
switch {
case getErr == nil:
setStatus(peers.ResultSuccess)
return nd, nil
case context.DeadlineExceeded:
case p2p.ErrInvalidResponse:
case errors.Is(getErr, context.DeadlineExceeded),
errors.Is(getErr, context.Canceled):
case errors.Is(getErr, p2p.ErrNotFound):
getErr = share.ErrNotFound
setStatus(peers.ResultCooldownPeer)
case errors.Is(getErr, p2p.ErrInvalidResponse):
setStatus(peers.ResultBlacklistPeer)
default:
setStatus(peers.ResultCooldownPeer)
Expand Down
188 changes: 143 additions & 45 deletions share/getters/shrex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,71 +2,122 @@ package getters

import (
"context"
mrand "math/rand"
"testing"
"time"

bsrv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
mdutils "github.com/ipfs/go-merkledag/test"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/libp2p/go-libp2p/core/host"
routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-app/pkg/da"
libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt/namespace"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/headertest"
"github.com/celestiaorg/celestia-node/share"
availability_test "github.com/celestiaorg/celestia-node/share/availability/test"
"github.com/celestiaorg/celestia-node/share/availability/discovery"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
)

func TestGetSharesWithProofByNamespace(t *testing.T) {
func TestShrexGetter(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
t.Cleanup(cancel)

// create test net
net := availability_test.NewTestDAGNet(ctx, t)

// generate test data
bServ := mdutils.Bserv()
randomEDS, nID := generateTestEDS(t, bServ)
dah := da.NewDataAvailabilityHeader(randomEDS)
net, err := mocknet.FullMeshConnected(2)
require.NoError(t, err)
clHost, srvHost := net.Hosts()[0], net.Hosts()[1]

// launch eds store and put test data into it
edsStore, err := newStore(t)
require.NoError(t, err)
err = edsStore.Start(ctx)
require.NoError(t, err)
err = edsStore.Put(ctx, dah.Hash(), randomEDS)
require.NoError(t, err)

// create server and register handler
srvHost := net.NewTestNode().Host
params := shrexnd.DefaultParameters()
srv, err := shrexnd.NewServer(params, srvHost, edsStore, NewIPLDGetter(bServ))
require.NoError(t, err)
err = srv.Start(ctx)
ndClient, _ := newNDClientServer(ctx, t, edsStore, srvHost, clHost)
edsClient, _ := newEDSClientServer(ctx, t, edsStore, srvHost, clHost)

// create shrex Getter
sub := new(headertest.Subscriber)
peerManager, err := testManager(ctx, clHost, sub)
require.NoError(t, err)
getter := NewShrexGetter(edsClient, ndClient, peerManager)
require.NoError(t, getter.Start(ctx))

t.Run("ND_Available", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

// generate test data
eds, dah, nID := generateTestEDS(t)
require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds))
peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: dah.Hash(),
Height: 1,
})

got, err := getter.GetSharesByNamespace(ctx, &dah, nID)
require.NoError(t, err)
require.NoError(t, got.Verify(&dah, nID))
})

t.Cleanup(func() {
_ = srv.Stop(ctx)
t.Run("ND_err_not_found", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

// generate test data
_, dah, nID := generateTestEDS(t)
peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: dah.Hash(),
Height: 1,
})

_, err := getter.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNotFound)
})

// create client and connect it to server
client, err := shrexnd.NewClient(params, net.NewTestNode().Host)
require.NoError(t, err)
net.ConnectAll()
t.Run("EDS_Available", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

// generate test data
eds, dah, _ := generateTestEDS(t)
require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds))
peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: dah.Hash(),
Height: 1,
})

got, err := getter.GetEDS(ctx, &dah)
require.NoError(t, err)
require.Equal(t, eds.Flattened(), got.Flattened())
})

got, err := client.RequestND(
ctx,
&dah,
nID,
srvHost.ID())
t.Run("EDS_err_not_found", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

require.NoError(t, err)
require.NoError(t, got.Verify(&dah, nID))
// generate test data
_, dah, _ := generateTestEDS(t)
peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: dah.Hash(),
Height: 1,
})

_, err := getter.GetEDS(ctx, &dah)
require.ErrorIs(t, err, share.ErrNotFound)
})
}

func newStore(t *testing.T) (*eds.Store, error) {
Expand All @@ -77,24 +128,71 @@ func newStore(t *testing.T) (*eds.Store, error) {
return eds.NewStore(tmpDir, ds)
}

func generateTestEDS(t *testing.T, bServ bsrv.BlockService) (*rsmt2d.ExtendedDataSquare, namespace.ID) {
shares := share.RandShares(t, 16)

from := mrand.Intn(len(shares))
to := mrand.Intn(len(shares))
func generateTestEDS(t *testing.T) (*rsmt2d.ExtendedDataSquare, da.DataAvailabilityHeader, namespace.ID) {
eds := share.RandEDS(t, 4)
dah := da.NewDataAvailabilityHeader(eds)
randNID := dah.RowsRoots[(len(dah.RowsRoots)-1)/2][:8]
return eds, dah, randNID
}

if to < from {
from, to = to, from
func testManager(ctx context.Context, host host.Host, headerSub libhead.Subscriber[*header.ExtendedHeader],
) (*peers.Manager, error) {
shrexSub, err := shrexsub.NewPubSub(ctx, host, "test")
if err != nil {
return nil, err
}

nID := shares[from][:share.NamespaceSize]
// change some shares to have same nID
for i := from; i <= to; i++ {
copy(shares[i][:share.NamespaceSize], nID)
disc := discovery.NewDiscovery(nil,
routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), 0, time.Second, time.Second)
connGater, err := conngater.NewBasicConnectionGater(ds_sync.MutexWrap(datastore.NewMapDatastore()))
if err != nil {
return nil, err
}
manager, err := peers.NewManager(
peers.DefaultParameters(),
headerSub,
shrexSub,
disc,
host,
connGater,
)
return manager, err
}

eds, err := share.AddShares(context.Background(), shares, bServ)
func newNDClientServer(ctx context.Context, t *testing.T, edsStore *eds.Store, srvHost, clHost host.Host,
) (*shrexnd.Client, *shrexnd.Server) {
params := shrexnd.DefaultParameters()

// create server and register handler
server, err := shrexnd.NewServer(params, srvHost, edsStore, NewStoreGetter(edsStore))
require.NoError(t, err)
require.NoError(t, server.Start(ctx))

t.Cleanup(func() {
_ = server.Stop(ctx)
})

// create client and connect it to server
client, err := shrexnd.NewClient(params, clHost)
require.NoError(t, err)
return client, server
}

return eds, nID
func newEDSClientServer(ctx context.Context, t *testing.T, edsStore *eds.Store, srvHost, clHost host.Host,
) (*shrexeds.Client, *shrexeds.Server) {
params := shrexeds.DefaultParameters()

// create server and register handler
server, err := shrexeds.NewServer(params, srvHost, edsStore)
require.NoError(t, err)
require.NoError(t, server.Start(ctx))

t.Cleanup(func() {
_ = server.Stop(ctx)
})

// create client and connect it to server
client, err := shrexeds.NewClient(params, clHost)
require.NoError(t, err)
return client, server
}
8 changes: 4 additions & 4 deletions share/p2p/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"errors"
)

// ErrUnavailable is returned when a peer doesn't have the requested data or doesn't have the
// capacity to serve it at the moment. It is used to signal that the peer couldn't serve the data
// successfully, but should be retried later.
var ErrUnavailable = errors.New("server cannot serve the requested data at this time")
// ErrNotFound is returned when a peer is unable to find the requested data or resource.
// It is used to signal that the peer couldn't serve the data successfully, and it's not
// available at the moment. The request may be retried later, but it's unlikely to succeed.
var ErrNotFound = errors.New("the requested data or resource could not be found")

// ErrInvalidResponse is returned when a peer returns an invalid response or caused an internal
// error. It is used to signal that the peer couldn't serve the data successfully, and should not be
Expand Down
8 changes: 4 additions & 4 deletions share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Manager struct {
lock sync.Mutex
params Parameters

// header subscription is necessary in order to validate the inbound eds hash
// header subscription is necessary in order to Validate the inbound eds hash
headerSub libhead.Subscriber[*header.ExtendedHeader]
shrexSub *shrexsub.PubSub
host host.Host
Expand Down Expand Up @@ -134,7 +134,7 @@ func (m *Manager) Start(startCtx context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel

err := m.shrexSub.AddValidator(m.validate)
err := m.shrexSub.AddValidator(m.Validate)
if err != nil {
return fmt.Errorf("registering validator: %w", err)
}
Expand Down Expand Up @@ -256,8 +256,8 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri
}
}

// validate will collect peer.ID into corresponding peer pool
func (m *Manager) validate(_ context.Context, peerID peer.ID, msg shrexsub.Notification) pubsub.ValidationResult {
// Validate will collect peer.ID into corresponding peer pool
func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notification) pubsub.ValidationResult {

// messages broadcast from self should bypass the validation with Accept
if peerID == m.host.ID() {
Expand Down
Loading

0 comments on commit d375373

Please sign in to comment.