Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(share/getter): add support for ErrNotFound in shrex getter implementation #2074

Merged
merged 9 commits into from
Apr 14, 2023
3 changes: 2 additions & 1 deletion share/eds/inverted_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func (m *mockIterator) ForEach(f func(mh multihash.Multihash) error) error {
return nil
}

// TestMultihashesForShard ensures that the inverted index correctly stores a single shard key per duplicate multihash
// TestMultihashesForShard ensures that the inverted index correctly stores a single shard key per
// duplicate multihash
func TestMultihashesForShard(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
Expand Down
3 changes: 0 additions & 3 deletions share/eds/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package eds

import (
"context"
"fmt"
"os"
"testing"
"time"
Expand Down Expand Up @@ -71,8 +70,6 @@ func TestEDSStore(t *testing.T) {
r, err := edsStore.GetCAR(ctx, dah.Hash())
assert.NoError(t, err)
carReader, err := car.NewCarReader(r)

fmt.Println(car.HeaderSize(carReader.Header))
assert.NoError(t, err)

for i := 0; i < 4; i++ {
Expand Down
24 changes: 16 additions & 8 deletions share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,16 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex
reqCtx, cancel := ctxWithSplitTimeout(ctx, sg.minAttemptsCount-attempt+1, sg.minRequestTimeout)
eds, getErr := sg.edsClient.RequestEDS(reqCtx, root.Hash(), peer)
cancel()
switch getErr {
case nil:
switch {
case getErr == nil:
setStatus(peers.ResultSynced)
return eds, nil
case context.DeadlineExceeded:
case p2p.ErrInvalidResponse:
case errors.Is(getErr, context.DeadlineExceeded),
errors.Is(getErr, context.Canceled):
case errors.Is(getErr, p2p.ErrNotFound):
getErr = share.ErrNotFound
setStatus(peers.ResultCooldownPeer)
case errors.Is(getErr, p2p.ErrInvalidResponse):
setStatus(peers.ResultBlacklistPeer)
default:
setStatus(peers.ResultCooldownPeer)
Expand Down Expand Up @@ -132,12 +136,16 @@ func (sg *ShrexGetter) GetSharesByNamespace(
reqCtx, cancel := ctxWithSplitTimeout(ctx, sg.minAttemptsCount-attempt+1, sg.minRequestTimeout)
nd, getErr := sg.ndClient.RequestND(reqCtx, root, id, peer)
cancel()
switch getErr {
case nil:
switch {
case getErr == nil:
setStatus(peers.ResultSuccess)
return nd, nil
case context.DeadlineExceeded:
case p2p.ErrInvalidResponse:
case errors.Is(getErr, context.DeadlineExceeded),
errors.Is(getErr, context.Canceled):
case errors.Is(getErr, p2p.ErrNotFound):
getErr = share.ErrNotFound
setStatus(peers.ResultCooldownPeer)
case errors.Is(getErr, p2p.ErrInvalidResponse):
setStatus(peers.ResultBlacklistPeer)
default:
setStatus(peers.ResultCooldownPeer)
Expand Down
188 changes: 143 additions & 45 deletions share/getters/shrex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,71 +2,122 @@ package getters

import (
"context"
mrand "math/rand"
"testing"
"time"

bsrv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
mdutils "github.com/ipfs/go-merkledag/test"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/libp2p/go-libp2p/core/host"
routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-app/pkg/da"
libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt/namespace"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/headertest"
"github.com/celestiaorg/celestia-node/share"
availability_test "github.com/celestiaorg/celestia-node/share/availability/test"
"github.com/celestiaorg/celestia-node/share/availability/discovery"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
)

func TestGetSharesWithProofByNamespace(t *testing.T) {
func TestShrexGetter(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
t.Cleanup(cancel)

// create test net
net := availability_test.NewTestDAGNet(ctx, t)

// generate test data
bServ := mdutils.Bserv()
randomEDS, nID := generateTestEDS(t, bServ)
dah := da.NewDataAvailabilityHeader(randomEDS)
net, err := mocknet.FullMeshConnected(2)
require.NoError(t, err)
clHost, srvHost := net.Hosts()[0], net.Hosts()[1]

// launch eds store and put test data into it
edsStore, err := newStore(t)
require.NoError(t, err)
err = edsStore.Start(ctx)
require.NoError(t, err)
err = edsStore.Put(ctx, dah.Hash(), randomEDS)
require.NoError(t, err)

// create server and register handler
srvHost := net.NewTestNode().Host
params := shrexnd.DefaultParameters()
srv, err := shrexnd.NewServer(params, srvHost, edsStore, NewIPLDGetter(bServ))
require.NoError(t, err)
err = srv.Start(ctx)
ndClient, _ := newNDClientServer(ctx, t, edsStore, srvHost, clHost)
edsClient, _ := newEDSClientServer(ctx, t, edsStore, srvHost, clHost)

// create shrex Getter
sub := new(headertest.Subscriber)
peerManager, err := testManager(ctx, clHost, sub)
require.NoError(t, err)
getter := NewShrexGetter(edsClient, ndClient, peerManager)
require.NoError(t, getter.Start(ctx))

t.Run("ND_Available", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

// generate test data
eds, dah, nID := generateTestEDS(t)
require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds))
peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: dah.Hash(),
Height: 1,
})

got, err := getter.GetSharesByNamespace(ctx, &dah, nID)
require.NoError(t, err)
require.NoError(t, got.Verify(&dah, nID))
})

t.Cleanup(func() {
_ = srv.Stop(ctx)
t.Run("ND_err_not_found", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

// generate test data
_, dah, nID := generateTestEDS(t)
peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: dah.Hash(),
Height: 1,
})

_, err := getter.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNotFound)
})

// create client and connect it to server
client, err := shrexnd.NewClient(params, net.NewTestNode().Host)
require.NoError(t, err)
net.ConnectAll()
t.Run("EDS_Available", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

// generate test data
eds, dah, _ := generateTestEDS(t)
require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds))
peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: dah.Hash(),
Height: 1,
})

got, err := getter.GetEDS(ctx, &dah)
require.NoError(t, err)
require.Equal(t, eds.Flattened(), got.Flattened())
})

got, err := client.RequestND(
ctx,
&dah,
nID,
srvHost.ID())
t.Run("EDS_err_not_found", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

require.NoError(t, err)
require.NoError(t, got.Verify(&dah, nID))
// generate test data
_, dah, _ := generateTestEDS(t)
peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: dah.Hash(),
Height: 1,
})

_, err := getter.GetEDS(ctx, &dah)
require.ErrorIs(t, err, share.ErrNotFound)
})
}

func newStore(t *testing.T) (*eds.Store, error) {
Expand All @@ -77,24 +128,71 @@ func newStore(t *testing.T) (*eds.Store, error) {
return eds.NewStore(tmpDir, ds)
}

func generateTestEDS(t *testing.T, bServ bsrv.BlockService) (*rsmt2d.ExtendedDataSquare, namespace.ID) {
shares := share.RandShares(t, 16)

from := mrand.Intn(len(shares))
to := mrand.Intn(len(shares))
func generateTestEDS(t *testing.T) (*rsmt2d.ExtendedDataSquare, da.DataAvailabilityHeader, namespace.ID) {
eds := share.RandEDS(t, 4)
dah := da.NewDataAvailabilityHeader(eds)
randNID := dah.RowsRoots[(len(dah.RowsRoots)-1)/2][:8]
return eds, dah, randNID
}

if to < from {
from, to = to, from
func testManager(ctx context.Context, host host.Host, headerSub libhead.Subscriber[*header.ExtendedHeader],
) (*peers.Manager, error) {
shrexSub, err := shrexsub.NewPubSub(ctx, host, "test")
if err != nil {
return nil, err
}

nID := shares[from][:share.NamespaceSize]
// change some shares to have same nID
for i := from; i <= to; i++ {
copy(shares[i][:share.NamespaceSize], nID)
disc := discovery.NewDiscovery(nil,
routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), 0, time.Second, time.Second)
connGater, err := conngater.NewBasicConnectionGater(ds_sync.MutexWrap(datastore.NewMapDatastore()))
if err != nil {
return nil, err
}
manager, err := peers.NewManager(
peers.DefaultParameters(),
headerSub,
shrexSub,
disc,
host,
connGater,
)
return manager, err
}

eds, err := share.AddShares(context.Background(), shares, bServ)
func newNDClientServer(ctx context.Context, t *testing.T, edsStore *eds.Store, srvHost, clHost host.Host,
) (*shrexnd.Client, *shrexnd.Server) {
params := shrexnd.DefaultParameters()

// create server and register handler
server, err := shrexnd.NewServer(params, srvHost, edsStore, NewStoreGetter(edsStore))
require.NoError(t, err)
require.NoError(t, server.Start(ctx))

t.Cleanup(func() {
_ = server.Stop(ctx)
})

// create client and connect it to server
client, err := shrexnd.NewClient(params, clHost)
require.NoError(t, err)
return client, server
}

return eds, nID
func newEDSClientServer(ctx context.Context, t *testing.T, edsStore *eds.Store, srvHost, clHost host.Host,
) (*shrexeds.Client, *shrexeds.Server) {
params := shrexeds.DefaultParameters()

// create server and register handler
server, err := shrexeds.NewServer(params, srvHost, edsStore)
require.NoError(t, err)
require.NoError(t, server.Start(ctx))

t.Cleanup(func() {
_ = server.Stop(ctx)
})

// create client and connect it to server
client, err := shrexeds.NewClient(params, clHost)
require.NoError(t, err)
return client, server
}
8 changes: 4 additions & 4 deletions share/p2p/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"errors"
)

// ErrUnavailable is returned when a peer doesn't have the requested data or doesn't have the
// capacity to serve it at the moment. It is used to signal that the peer couldn't serve the data
// successfully, but should be retried later.
var ErrUnavailable = errors.New("server cannot serve the requested data at this time")
// ErrNotFound is returned when a peer is unable to find the requested data or resource.
// It is used to signal that the peer couldn't serve the data successfully, and it's not
// available at the moment. The request may be retried later, but it's unlikely to succeed.
var ErrNotFound = errors.New("the requested data or resource could not be found")

// ErrInvalidResponse is returned when a peer returns an invalid response or caused an internal
// error. It is used to signal that the peer couldn't serve the data successfully, and should not be
Expand Down
8 changes: 4 additions & 4 deletions share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Manager struct {
lock sync.Mutex
params Parameters

// header subscription is necessary in order to validate the inbound eds hash
// header subscription is necessary in order to Validate the inbound eds hash
headerSub libhead.Subscriber[*header.ExtendedHeader]
shrexSub *shrexsub.PubSub
host host.Host
Expand Down Expand Up @@ -134,7 +134,7 @@ func (m *Manager) Start(startCtx context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel

err := m.shrexSub.AddValidator(m.validate)
err := m.shrexSub.AddValidator(m.Validate)
if err != nil {
return fmt.Errorf("registering validator: %w", err)
}
Expand Down Expand Up @@ -256,8 +256,8 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri
}
}

// validate will collect peer.ID into corresponding peer pool
func (m *Manager) validate(_ context.Context, peerID peer.ID, msg shrexsub.Notification) pubsub.ValidationResult {
// Validate will collect peer.ID into corresponding peer pool
func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notification) pubsub.ValidationResult {

// messages broadcast from self should bypass the validation with Accept
if peerID == m.host.ID() {
Expand Down
Loading