From d375373b162552276aed5b0d93ac4bcb3f1c6d0c Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 14 Apr 2023 18:23:54 +0800 Subject: [PATCH] feat(share/getter): add support for ErrNotFound in shrex getter implementation (#2074) ## Overview Resolves https://github.com/celestiaorg/celestia-node/issues/2037 --- share/eds/inverted_index_test.go | 3 +- share/eds/store_test.go | 3 - share/getters/shrex.go | 24 ++- share/getters/shrex_test.go | 188 +++++++++++++----- share/p2p/errors.go | 8 +- share/p2p/peers/manager.go | 8 +- share/p2p/peers/manager_test.go | 26 +-- share/p2p/shrexeds/client.go | 8 +- share/p2p/shrexeds/exchange_test.go | 11 +- .../shrexeds/pb/extended_data_square.pb.go | 16 +- .../shrexeds/pb/extended_data_square.proto | 1 + share/p2p/shrexeds/server.go | 10 +- share/p2p/shrexnd/client.go | 6 +- share/p2p/shrexnd/exchange_test.go | 92 ++++++++- share/p2p/shrexnd/server.go | 4 + 15 files changed, 312 insertions(+), 96 deletions(-) diff --git a/share/eds/inverted_index_test.go b/share/eds/inverted_index_test.go index fafbca6c28..f228aa0d92 100644 --- a/share/eds/inverted_index_test.go +++ b/share/eds/inverted_index_test.go @@ -24,7 +24,8 @@ func (m *mockIterator) ForEach(f func(mh multihash.Multihash) error) error { return nil } -// TestMultihashesForShard ensures that the inverted index correctly stores a single shard key per duplicate multihash +// TestMultihashesForShard ensures that the inverted index correctly stores a single shard key per +// duplicate multihash func TestMultihashesForShard(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) diff --git a/share/eds/store_test.go b/share/eds/store_test.go index ab0d74b2d2..6c6d7d10b2 100644 --- a/share/eds/store_test.go +++ b/share/eds/store_test.go @@ -2,7 +2,6 @@ package eds import ( "context" - "fmt" "os" "testing" "time" @@ -71,8 +70,6 @@ func TestEDSStore(t *testing.T) { r, err := edsStore.GetCAR(ctx, dah.Hash()) assert.NoError(t, err) carReader, err := car.NewCarReader(r) - - fmt.Println(car.HeaderSize(carReader.Header)) assert.NoError(t, err) for i := 0; i < 4; i++ { diff --git a/share/getters/shrex.go b/share/getters/shrex.go index f4e2d11f33..95f06d821e 100644 --- a/share/getters/shrex.go +++ b/share/getters/shrex.go @@ -83,12 +83,16 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex reqCtx, cancel := ctxWithSplitTimeout(ctx, sg.minAttemptsCount-attempt+1, sg.minRequestTimeout) eds, getErr := sg.edsClient.RequestEDS(reqCtx, root.Hash(), peer) cancel() - switch getErr { - case nil: + switch { + case getErr == nil: setStatus(peers.ResultSynced) return eds, nil - case context.DeadlineExceeded: - case p2p.ErrInvalidResponse: + case errors.Is(getErr, context.DeadlineExceeded), + errors.Is(getErr, context.Canceled): + case errors.Is(getErr, p2p.ErrNotFound): + getErr = share.ErrNotFound + setStatus(peers.ResultCooldownPeer) + case errors.Is(getErr, p2p.ErrInvalidResponse): setStatus(peers.ResultBlacklistPeer) default: setStatus(peers.ResultCooldownPeer) @@ -132,12 +136,16 @@ func (sg *ShrexGetter) GetSharesByNamespace( reqCtx, cancel := ctxWithSplitTimeout(ctx, sg.minAttemptsCount-attempt+1, sg.minRequestTimeout) nd, getErr := sg.ndClient.RequestND(reqCtx, root, id, peer) cancel() - switch getErr { - case nil: + switch { + case getErr == nil: setStatus(peers.ResultSuccess) return nd, nil - case context.DeadlineExceeded: - case p2p.ErrInvalidResponse: + case errors.Is(getErr, context.DeadlineExceeded), + errors.Is(getErr, context.Canceled): + case errors.Is(getErr, p2p.ErrNotFound): + getErr = share.ErrNotFound + setStatus(peers.ResultCooldownPeer) + case errors.Is(getErr, p2p.ErrInvalidResponse): setStatus(peers.ResultBlacklistPeer) default: setStatus(peers.ResultCooldownPeer) diff --git a/share/getters/shrex_test.go b/share/getters/shrex_test.go index c47dbf6f85..b9960c1fac 100644 --- a/share/getters/shrex_test.go +++ b/share/getters/shrex_test.go @@ -2,71 +2,122 @@ package getters import ( "context" - mrand "math/rand" "testing" "time" - bsrv "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" ds_sync "github.com/ipfs/go-datastore/sync" - mdutils "github.com/ipfs/go-merkledag/test" + routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" + "github.com/libp2p/go-libp2p/core/host" + routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing" + "github.com/libp2p/go-libp2p/p2p/net/conngater" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" "github.com/celestiaorg/celestia-app/pkg/da" + libhead "github.com/celestiaorg/go-header" "github.com/celestiaorg/nmt/namespace" "github.com/celestiaorg/rsmt2d" + "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/header/headertest" "github.com/celestiaorg/celestia-node/share" - availability_test "github.com/celestiaorg/celestia-node/share/availability/test" + "github.com/celestiaorg/celestia-node/share/availability/discovery" "github.com/celestiaorg/celestia-node/share/eds" + "github.com/celestiaorg/celestia-node/share/p2p/peers" + "github.com/celestiaorg/celestia-node/share/p2p/shrexeds" "github.com/celestiaorg/celestia-node/share/p2p/shrexnd" + "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" ) -func TestGetSharesWithProofByNamespace(t *testing.T) { +func TestShrexGetter(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) t.Cleanup(cancel) // create test net - net := availability_test.NewTestDAGNet(ctx, t) - - // generate test data - bServ := mdutils.Bserv() - randomEDS, nID := generateTestEDS(t, bServ) - dah := da.NewDataAvailabilityHeader(randomEDS) + net, err := mocknet.FullMeshConnected(2) + require.NoError(t, err) + clHost, srvHost := net.Hosts()[0], net.Hosts()[1] // launch eds store and put test data into it edsStore, err := newStore(t) require.NoError(t, err) err = edsStore.Start(ctx) require.NoError(t, err) - err = edsStore.Put(ctx, dah.Hash(), randomEDS) - require.NoError(t, err) - // create server and register handler - srvHost := net.NewTestNode().Host - params := shrexnd.DefaultParameters() - srv, err := shrexnd.NewServer(params, srvHost, edsStore, NewIPLDGetter(bServ)) - require.NoError(t, err) - err = srv.Start(ctx) + ndClient, _ := newNDClientServer(ctx, t, edsStore, srvHost, clHost) + edsClient, _ := newEDSClientServer(ctx, t, edsStore, srvHost, clHost) + + // create shrex Getter + sub := new(headertest.Subscriber) + peerManager, err := testManager(ctx, clHost, sub) require.NoError(t, err) + getter := NewShrexGetter(edsClient, ndClient, peerManager) + require.NoError(t, getter.Start(ctx)) + + t.Run("ND_Available", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, time.Second) + t.Cleanup(cancel) + + // generate test data + eds, dah, nID := generateTestEDS(t) + require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds)) + peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ + DataHash: dah.Hash(), + Height: 1, + }) + + got, err := getter.GetSharesByNamespace(ctx, &dah, nID) + require.NoError(t, err) + require.NoError(t, got.Verify(&dah, nID)) + }) - t.Cleanup(func() { - _ = srv.Stop(ctx) + t.Run("ND_err_not_found", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, time.Second) + t.Cleanup(cancel) + + // generate test data + _, dah, nID := generateTestEDS(t) + peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ + DataHash: dah.Hash(), + Height: 1, + }) + + _, err := getter.GetSharesByNamespace(ctx, &dah, nID) + require.ErrorIs(t, err, share.ErrNotFound) }) - // create client and connect it to server - client, err := shrexnd.NewClient(params, net.NewTestNode().Host) - require.NoError(t, err) - net.ConnectAll() + t.Run("EDS_Available", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, time.Second) + t.Cleanup(cancel) + + // generate test data + eds, dah, _ := generateTestEDS(t) + require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds)) + peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ + DataHash: dah.Hash(), + Height: 1, + }) + + got, err := getter.GetEDS(ctx, &dah) + require.NoError(t, err) + require.Equal(t, eds.Flattened(), got.Flattened()) + }) - got, err := client.RequestND( - ctx, - &dah, - nID, - srvHost.ID()) + t.Run("EDS_err_not_found", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, time.Second) + t.Cleanup(cancel) - require.NoError(t, err) - require.NoError(t, got.Verify(&dah, nID)) + // generate test data + _, dah, _ := generateTestEDS(t) + peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ + DataHash: dah.Hash(), + Height: 1, + }) + + _, err := getter.GetEDS(ctx, &dah) + require.ErrorIs(t, err, share.ErrNotFound) + }) } func newStore(t *testing.T) (*eds.Store, error) { @@ -77,24 +128,71 @@ func newStore(t *testing.T) (*eds.Store, error) { return eds.NewStore(tmpDir, ds) } -func generateTestEDS(t *testing.T, bServ bsrv.BlockService) (*rsmt2d.ExtendedDataSquare, namespace.ID) { - shares := share.RandShares(t, 16) - - from := mrand.Intn(len(shares)) - to := mrand.Intn(len(shares)) +func generateTestEDS(t *testing.T) (*rsmt2d.ExtendedDataSquare, da.DataAvailabilityHeader, namespace.ID) { + eds := share.RandEDS(t, 4) + dah := da.NewDataAvailabilityHeader(eds) + randNID := dah.RowsRoots[(len(dah.RowsRoots)-1)/2][:8] + return eds, dah, randNID +} - if to < from { - from, to = to, from +func testManager(ctx context.Context, host host.Host, headerSub libhead.Subscriber[*header.ExtendedHeader], +) (*peers.Manager, error) { + shrexSub, err := shrexsub.NewPubSub(ctx, host, "test") + if err != nil { + return nil, err } - nID := shares[from][:share.NamespaceSize] - // change some shares to have same nID - for i := from; i <= to; i++ { - copy(shares[i][:share.NamespaceSize], nID) + disc := discovery.NewDiscovery(nil, + routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), 0, time.Second, time.Second) + connGater, err := conngater.NewBasicConnectionGater(ds_sync.MutexWrap(datastore.NewMapDatastore())) + if err != nil { + return nil, err } + manager, err := peers.NewManager( + peers.DefaultParameters(), + headerSub, + shrexSub, + disc, + host, + connGater, + ) + return manager, err +} - eds, err := share.AddShares(context.Background(), shares, bServ) +func newNDClientServer(ctx context.Context, t *testing.T, edsStore *eds.Store, srvHost, clHost host.Host, +) (*shrexnd.Client, *shrexnd.Server) { + params := shrexnd.DefaultParameters() + + // create server and register handler + server, err := shrexnd.NewServer(params, srvHost, edsStore, NewStoreGetter(edsStore)) + require.NoError(t, err) + require.NoError(t, server.Start(ctx)) + + t.Cleanup(func() { + _ = server.Stop(ctx) + }) + + // create client and connect it to server + client, err := shrexnd.NewClient(params, clHost) require.NoError(t, err) + return client, server +} - return eds, nID +func newEDSClientServer(ctx context.Context, t *testing.T, edsStore *eds.Store, srvHost, clHost host.Host, +) (*shrexeds.Client, *shrexeds.Server) { + params := shrexeds.DefaultParameters() + + // create server and register handler + server, err := shrexeds.NewServer(params, srvHost, edsStore) + require.NoError(t, err) + require.NoError(t, server.Start(ctx)) + + t.Cleanup(func() { + _ = server.Stop(ctx) + }) + + // create client and connect it to server + client, err := shrexeds.NewClient(params, clHost) + require.NoError(t, err) + return client, server } diff --git a/share/p2p/errors.go b/share/p2p/errors.go index b3c1b6e503..77f23c554e 100644 --- a/share/p2p/errors.go +++ b/share/p2p/errors.go @@ -4,10 +4,10 @@ import ( "errors" ) -// ErrUnavailable is returned when a peer doesn't have the requested data or doesn't have the -// capacity to serve it at the moment. It is used to signal that the peer couldn't serve the data -// successfully, but should be retried later. -var ErrUnavailable = errors.New("server cannot serve the requested data at this time") +// ErrNotFound is returned when a peer is unable to find the requested data or resource. +// It is used to signal that the peer couldn't serve the data successfully, and it's not +// available at the moment. The request may be retried later, but it's unlikely to succeed. +var ErrNotFound = errors.New("the requested data or resource could not be found") // ErrInvalidResponse is returned when a peer returns an invalid response or caused an internal // error. It is used to signal that the peer couldn't serve the data successfully, and should not be diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 6ccdd7365c..c18e91bdb1 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -43,7 +43,7 @@ type Manager struct { lock sync.Mutex params Parameters - // header subscription is necessary in order to validate the inbound eds hash + // header subscription is necessary in order to Validate the inbound eds hash headerSub libhead.Subscriber[*header.ExtendedHeader] shrexSub *shrexsub.PubSub host host.Host @@ -134,7 +134,7 @@ func (m *Manager) Start(startCtx context.Context) error { ctx, cancel := context.WithCancel(context.Background()) m.cancel = cancel - err := m.shrexSub.AddValidator(m.validate) + err := m.shrexSub.AddValidator(m.Validate) if err != nil { return fmt.Errorf("registering validator: %w", err) } @@ -256,8 +256,8 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri } } -// validate will collect peer.ID into corresponding peer pool -func (m *Manager) validate(_ context.Context, peerID peer.ID, msg shrexsub.Notification) pubsub.ValidationResult { +// Validate will collect peer.ID into corresponding peer pool +func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notification) pubsub.ValidationResult { // messages broadcast from self should bypass the validation with Accept if peerID == m.host.ID() { diff --git a/share/p2p/peers/manager_test.go b/share/p2p/peers/manager_test.go index 7d5bbaa96c..05c1f5b33e 100644 --- a/share/p2p/peers/manager_test.go +++ b/share/p2p/peers/manager_test.go @@ -28,7 +28,7 @@ import ( // TODO: add broadcast to tests func TestManager(t *testing.T) { - t.Run("validate pool by headerSub", func(t *testing.T) { + t.Run("Validate pool by headerSub", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*50) t.Cleanup(cancel) @@ -49,7 +49,7 @@ func TestManager(t *testing.T) { stopManager(t, manager) }) - t.Run("validate pool by shrex.Getter", func(t *testing.T) { + t.Run("Validate pool by shrex.Getter", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) @@ -61,7 +61,7 @@ func TestManager(t *testing.T) { require.NoError(t, err) peerID, msg := peer.ID("peer1"), newShrexSubMsg(h) - result := manager.validate(ctx, peerID, msg) + result := manager.Validate(ctx, peerID, msg) require.Equal(t, pubsub.ValidationIgnore, result) pID, done, err := manager.Peer(ctx, h.DataHash.Bytes()) @@ -91,12 +91,12 @@ func TestManager(t *testing.T) { // own messages should be be accepted msg := newShrexSubMsg(h) - result := manager.validate(ctx, manager.host.ID(), msg) + result := manager.Validate(ctx, manager.host.ID(), msg) require.Equal(t, pubsub.ValidationAccept, result) // normal messages should be ignored peerID := peer.ID("peer1") - result = manager.validate(ctx, peerID, msg) + result = manager.Validate(ctx, peerID, msg) require.Equal(t, pubsub.ValidationIgnore, result) // mark peer as misbehaved to blacklist it @@ -107,7 +107,7 @@ func TestManager(t *testing.T) { done(ResultBlacklistPeer) // new messages from misbehaved peer should be Rejected - result = manager.validate(ctx, pID, msg) + result = manager.Validate(ctx, pID, msg) require.Equal(t, pubsub.ValidationReject, result) stopManager(t, manager) @@ -135,7 +135,7 @@ func TestManager(t *testing.T) { DataHash: share.DataHash("datahash1"), Height: 2, } - manager.validate(ctx, peerID, msg) + manager.Validate(ctx, peerID, msg) // create validated pool validDataHash := share.DataHash("datahash2") @@ -148,7 +148,7 @@ func TestManager(t *testing.T) { // messages with blacklisted hash should be rejected right away peerID2 := peer.ID("peer2") - result := manager.validate(ctx, peerID2, msg) + result := manager.Validate(ctx, peerID2, msg) require.Equal(t, pubsub.ValidationReject, result) // check blacklisted pools @@ -235,7 +235,7 @@ func TestManager(t *testing.T) { require.NoError(t, err) peerID, msg := peer.ID("peer1"), newShrexSubMsg(h) - result := manager.validate(ctx, peerID, msg) + result := manager.Validate(ctx, peerID, msg) require.Equal(t, pubsub.ValidationIgnore, result) pID, done, err := manager.Peer(ctx, h.DataHash.Bytes()) @@ -249,7 +249,7 @@ func TestManager(t *testing.T) { require.True(t, pool.isSynced.Load()) // add peer on synced pool should be noop - result = manager.validate(ctx, "peer2", msg) + result = manager.Validate(ctx, "peer2", msg) require.Equal(t, pubsub.ValidationIgnore, result) require.Len(t, pool.peersList, 0) }) @@ -276,7 +276,7 @@ func TestManager(t *testing.T) { DataHash: share.DataHash("datahash"), Height: uint64(h.Height() - 1), } - result := manager.validate(ctx, "peer", msg) + result := manager.Validate(ctx, "peer", msg) require.Equal(t, pubsub.ValidationIgnore, result) // amount of pools should not change @@ -300,7 +300,7 @@ func TestManager(t *testing.T) { DataHash: share.DataHash("datahash"), Height: uint64(h.Height() - 1), } - result := manager.validate(ctx, "peer", msg) + result := manager.Validate(ctx, "peer", msg) require.Equal(t, pubsub.ValidationIgnore, result) // unlock header sub after message validator @@ -339,7 +339,7 @@ func TestIntegration(t *testing.T) { require.NoError(t, err) fnPeerManager.host = nw.Hosts()[1] - require.NoError(t, fnPubSub.AddValidator(fnPeerManager.validate)) + require.NoError(t, fnPubSub.AddValidator(fnPeerManager.Validate)) _, err = fnPubSub.Subscribe() require.NoError(t, err) diff --git a/share/p2p/shrexeds/client.go b/share/p2p/shrexeds/client.go index 4c96265c63..b7c3e28402 100644 --- a/share/p2p/shrexeds/client.go +++ b/share/p2p/shrexeds/client.go @@ -60,7 +60,7 @@ func (c *Client) RequestEDS( return nil, context.DeadlineExceeded } } - if err != p2p.ErrUnavailable { + if err != p2p.ErrNotFound { log.Debugw("client: eds request to peer failed", "peer", peer, "hash", dataHash.String()) } @@ -104,7 +104,7 @@ func (c *Client) doRequest( if err != nil { // server is overloaded and closed the stream if errors.Is(err, io.EOF) { - return nil, p2p.ErrUnavailable + return nil, p2p.ErrNotFound } stream.Reset() //nolint:errcheck return nil, fmt.Errorf("failed to read status from stream: %w", err) @@ -120,8 +120,8 @@ func (c *Client) doRequest( return eds, nil case pb.Status_NOT_FOUND: log.Debugf("client: peer %s couldn't serve eds %s with status %s", to.String(), dataHash.String(), resp.GetStatus()) - return nil, p2p.ErrUnavailable - case pb.Status_INVALID: + return nil, p2p.ErrNotFound + case pb.Status_INVALID, pb.Status_INTERNAL: fallthrough default: log.Errorf("request status %s returned for root %s", resp.Status.String(), dataHash.String()) diff --git a/share/p2p/shrexeds/exchange_test.go b/share/p2p/shrexeds/exchange_test.go index 3085e1e32e..31d017fa13 100644 --- a/share/p2p/shrexeds/exchange_test.go +++ b/share/p2p/shrexeds/exchange_test.go @@ -86,6 +86,15 @@ func TestExchange_RequestEDS(t *testing.T) { assert.Nil(t, requestedEDS) }) + t.Run("EDS_err_not_found", func(t *testing.T) { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Second) + t.Cleanup(cancel) + eds := share.RandEDS(t, 4) + dah := da.NewDataAvailabilityHeader(eds) + _, err := client.RequestEDS(timeoutCtx, dah.Hash(), server.host.ID()) + require.ErrorIs(t, err, p2p.ErrNotFound) + }) + // Testcase: Concurrency limit reached t.Run("EDS_concurrency_limit", func(t *testing.T) { store, client, server := makeExchange(t) @@ -124,7 +133,7 @@ func TestExchange_RequestEDS(t *testing.T) { // wait until all server slots are taken wg.Wait() _, err = client.RequestEDS(ctx, nil, server.host.ID()) - require.ErrorIs(t, err, p2p.ErrUnavailable) + require.ErrorIs(t, err, p2p.ErrNotFound) }) } diff --git a/share/p2p/shrexeds/pb/extended_data_square.pb.go b/share/p2p/shrexeds/pb/extended_data_square.pb.go index c6f7e07123..ed1a96ae3b 100644 --- a/share/p2p/shrexeds/pb/extended_data_square.pb.go +++ b/share/p2p/shrexeds/pb/extended_data_square.pb.go @@ -28,18 +28,21 @@ const ( Status_INVALID Status = 0 Status_OK Status = 1 Status_NOT_FOUND Status = 2 + Status_INTERNAL Status = 3 ) var Status_name = map[int32]string{ 0: "INVALID", 1: "OK", 2: "NOT_FOUND", + 3: "INTERNAL", } var Status_value = map[string]int32{ "INVALID": 0, "OK": 1, "NOT_FOUND": 2, + "INTERNAL": 3, } func (x Status) String() string { @@ -149,7 +152,7 @@ func init() { } var fileDescriptor_49d42aa96098056e = []byte{ - // 213 bytes of a gzipped FileDescriptorProto + // 227 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x28, 0xce, 0x48, 0x2c, 0x4a, 0xd5, 0x2f, 0x30, 0x2a, 0xd0, 0x2f, 0xce, 0x28, 0x4a, 0xad, 0x48, 0x4d, 0x29, 0xd6, 0x2f, 0x48, 0xd2, 0x4f, 0xad, 0x28, 0x49, 0xcd, 0x4b, 0x49, 0x4d, 0x89, 0x4f, 0x49, 0x2c, 0x49, 0x8c, @@ -157,13 +160,14 @@ var fileDescriptor_49d42aa96098056e = []byte{ 0x72, 0x75, 0x09, 0x0e, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x12, 0xe2, 0x62, 0xc9, 0x48, 0x2c, 0xce, 0x90, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x09, 0x02, 0xb3, 0x95, 0xf4, 0xb8, 0xb8, 0xc1, 0x2a, 0x8a, 0x0b, 0xf2, 0xf3, 0x8a, 0x53, 0x85, 0xe4, 0xb9, 0xd8, 0x8a, 0x4b, 0x12, 0x4b, 0x4a, - 0x8b, 0xc1, 0x8a, 0xf8, 0x8c, 0xd8, 0xf5, 0x82, 0xc1, 0xdc, 0x20, 0xa8, 0xb0, 0x96, 0x0e, 0x17, + 0x8b, 0xc1, 0x8a, 0xf8, 0x8c, 0xd8, 0xf5, 0x82, 0xc1, 0xdc, 0x20, 0xa8, 0xb0, 0x96, 0x15, 0x17, 0x1b, 0x44, 0x44, 0x88, 0x9b, 0x8b, 0xdd, 0xd3, 0x2f, 0xcc, 0xd1, 0xc7, 0xd3, 0x45, 0x80, 0x41, 0x88, 0x8d, 0x8b, 0xc9, 0xdf, 0x5b, 0x80, 0x51, 0x88, 0x97, 0x8b, 0xd3, 0xcf, 0x3f, 0x24, 0xde, - 0xcd, 0x3f, 0xd4, 0xcf, 0x45, 0x80, 0xc9, 0x49, 0xe2, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, - 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63, 0x39, 0x86, 0x1b, 0x8f, - 0xe5, 0x18, 0x92, 0xd8, 0xc0, 0x0e, 0x34, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x34, 0x8f, 0xa7, - 0x57, 0xd4, 0x00, 0x00, 0x00, + 0xcd, 0x3f, 0xd4, 0xcf, 0x45, 0x80, 0x49, 0x88, 0x87, 0x8b, 0xc3, 0xd3, 0x2f, 0xc4, 0x35, 0xc8, + 0xcf, 0xd1, 0x47, 0x80, 0xd9, 0x49, 0xe2, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, + 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63, 0x39, 0x86, 0x1b, 0x8f, 0xe5, 0x18, + 0x92, 0xd8, 0xc0, 0xce, 0x35, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x7b, 0x1d, 0xd4, 0xa7, 0xe2, + 0x00, 0x00, 0x00, } func (m *EDSRequest) Marshal() (dAtA []byte, err error) { diff --git a/share/p2p/shrexeds/pb/extended_data_square.proto b/share/p2p/shrexeds/pb/extended_data_square.proto index 1830d85f8e..63750962e9 100644 --- a/share/p2p/shrexeds/pb/extended_data_square.proto +++ b/share/p2p/shrexeds/pb/extended_data_square.proto @@ -8,6 +8,7 @@ enum Status { INVALID = 0; OK = 1; // data found NOT_FOUND = 2; // data not found + INTERNAL = 3; // internal server error } message EDSResponse { diff --git a/share/p2p/shrexeds/server.go b/share/p2p/shrexeds/server.go index ccc841df64..bae472871e 100644 --- a/share/p2p/shrexeds/server.go +++ b/share/p2p/shrexeds/server.go @@ -2,6 +2,7 @@ package shrexeds import ( "context" + "errors" "fmt" "io" "time" @@ -78,13 +79,18 @@ func (s *Server) handleStream(stream network.Stream) { ctx, cancel := context.WithTimeout(s.ctx, s.params.HandleRequestTimeout) defer cancel() - status := p2p_pb.Status_OK + // determine whether the EDS is available in our store // we do not close the reader, so that other requests will not need to re-open the file. // closing is handled by the LRU cache. edsReader, err := s.store.GetCAR(ctx, hash) - if err != nil { + status := p2p_pb.Status_OK + switch { + case errors.Is(err, eds.ErrNotFound): status = p2p_pb.Status_NOT_FOUND + case err != nil: + log.Debugw("server: get car", "err", err) + status = p2p_pb.Status_INTERNAL } // inform the client of our status diff --git a/share/p2p/shrexnd/client.go b/share/p2p/shrexnd/client.go index 1264412868..467376605f 100644 --- a/share/p2p/shrexnd/client.go +++ b/share/p2p/shrexnd/client.go @@ -68,7 +68,7 @@ func (c *Client) RequestND( return nil, context.DeadlineExceeded } } - if err != p2p.ErrUnavailable { + if err != p2p.ErrNotFound { log.Debugw("client-nd: peer returned err", "peer", peer, "err", err) } return nil, err @@ -109,7 +109,7 @@ func (c *Client) doRequest( if err != nil { // server is overloaded and closed the stream if errors.Is(err, io.EOF) { - return nil, p2p.ErrUnavailable + return nil, p2p.ErrNotFound } stream.Reset() //nolint:errcheck return nil, fmt.Errorf("client-nd: reading response: %w", err) @@ -188,7 +188,7 @@ func statusToErr(code pb.StatusCode) error { case pb.StatusCode_OK: return nil case pb.StatusCode_NOT_FOUND: - return p2p.ErrUnavailable + return p2p.ErrNotFound case pb.StatusCode_INTERNAL, pb.StatusCode_INVALID: fallthrough default: diff --git a/share/p2p/shrexnd/exchange_test.go b/share/p2p/shrexnd/exchange_test.go index b1d09bc721..8542992e0e 100644 --- a/share/p2p/shrexnd/exchange_test.go +++ b/share/p2p/shrexnd/exchange_test.go @@ -6,15 +6,54 @@ import ( "testing" "time" + "github.com/ipfs/go-datastore" + ds_sync "github.com/ipfs/go-datastore/sync" + libhost "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" + "github.com/celestiaorg/celestia-app/pkg/da" + "github.com/celestiaorg/nmt/namespace" + "github.com/celestiaorg/rsmt2d" + + "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/p2p" ) +func TestExchange_RequestND_NotFound(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + edsStore, client, server := makeExchange(t, notFoundGetter{}) + require.NoError(t, edsStore.Start(ctx)) + require.NoError(t, server.Start(ctx)) + + t.Run("CAR_not_exist", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, time.Second) + t.Cleanup(cancel) + + root := share.Root{} + nID := make([]byte, 8) + _, err := client.RequestND(ctx, &root, nID, server.host.ID()) + require.ErrorIs(t, err, p2p.ErrNotFound) + }) + + t.Run("Getter_err_not_found", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, time.Second) + t.Cleanup(cancel) + + eds := share.RandEDS(t, 4) + dah := da.NewDataAvailabilityHeader(eds) + require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds)) + + randNID := dah.RowsRoots[(len(dah.RowsRoots)-1)/2][:8] + _, err := client.RequestND(ctx, &dah, randNID, server.host.ID()) + require.ErrorIs(t, err, p2p.ErrNotFound) + }) +} + func TestExchange_RequestND(t *testing.T) { - // Testcase: Concurrency limit reached t.Run("ND_concurrency_limit", func(t *testing.T) { net, err := mocknet.FullMeshConnected(2) require.NoError(t, err) @@ -57,6 +96,55 @@ func TestExchange_RequestND(t *testing.T) { // wait until all server slots are taken wg.Wait() _, err = client.RequestND(ctx, nil, nil, server.host.ID()) - require.ErrorIs(t, err, p2p.ErrUnavailable) + require.ErrorIs(t, err, p2p.ErrNotFound) }) } + +type notFoundGetter struct{} + +func (m notFoundGetter) GetShare(_ context.Context, _ *share.Root, _, _ int, +) (share.Share, error) { + return nil, share.ErrNotFound +} + +func (m notFoundGetter) GetEDS(_ context.Context, _ *share.Root, +) (*rsmt2d.ExtendedDataSquare, error) { + return nil, share.ErrNotFound +} + +func (m notFoundGetter) GetSharesByNamespace(_ context.Context, _ *share.Root, _ namespace.ID, +) (share.NamespacedShares, error) { + return nil, share.ErrNotFound +} + +func newStore(t *testing.T) *eds.Store { + t.Helper() + + tmpDir := t.TempDir() + ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) + store, err := eds.NewStore(tmpDir, ds) + require.NoError(t, err) + return store +} + +func createMocknet(t *testing.T, amount int) []libhost.Host { + t.Helper() + + net, err := mocknet.FullMeshConnected(amount) + require.NoError(t, err) + // get host and peer + return net.Hosts() +} + +func makeExchange(t *testing.T, getter share.Getter) (*eds.Store, *Client, *Server) { + t.Helper() + store := newStore(t) + hosts := createMocknet(t, 2) + + client, err := NewClient(DefaultParameters(), hosts[0]) + require.NoError(t, err) + server, err := NewServer(DefaultParameters(), hosts[1], store, getter) + require.NoError(t, err) + + return store, client, server +} diff --git a/share/p2p/shrexnd/server.go b/share/p2p/shrexnd/server.go index eaa0d1d9d2..8668dcff14 100644 --- a/share/p2p/shrexnd/server.go +++ b/share/p2p/shrexnd/server.go @@ -111,6 +111,10 @@ func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stre } shares, err := srv.getter.GetSharesByNamespace(ctx, dah, req.NamespaceId) + if errors.Is(err, share.ErrNotFound) { + srv.respondNotFoundError(stream) + return + } if err != nil { log.Errorw("server: retrieving shares", "err", err) srv.respondInternalError(stream)