Skip to content

Commit

Permalink
feat(share/getter): add support for ErrNotFound in IPLD/Store getters (
Browse files Browse the repository at this point in the history
…#2050)

<!--
Please read and fill out this form before submitting your PR.

Please make sure you have reviewed our contributors guide before
submitting your
first PR.
-->

## Overview
 - introduce ErrNotFound to getter interface
 - add support in ipld getter
 - add support in store getter

Resolves #2036
  • Loading branch information
walldiss committed Apr 14, 2023
1 parent 9087e0e commit 71b5078
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 34 deletions.
5 changes: 2 additions & 3 deletions share/availability/light/availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,8 @@ func TestService_GetSharesByNamespaceNotFound(t *testing.T) {
getter, root := GetterWithRandSquare(t, 1)
root.RowsRoots = nil

shares, err := getter.GetSharesByNamespace(context.Background(), root, []byte{1, 1, 1, 1, 1, 1, 1, 1})
assert.Len(t, shares, 0)
assert.NoError(t, err)
_, err := getter.GetSharesByNamespace(context.Background(), root, []byte{1, 1, 1, 1, 1, 1, 1, 1})
assert.ErrorIs(t, err, share.ErrNotFound)
}

func BenchmarkService_GetSharesByNamespace(b *testing.B) {
Expand Down
17 changes: 5 additions & 12 deletions share/eds/adapters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package eds

import (
"context"
"crypto/rand"
"errors"
mrand "math/rand"
"sort"
Expand All @@ -18,7 +17,7 @@ import (

func TestBlockGetter_GetBlocks(t *testing.T) {
t.Run("happy path", func(t *testing.T) {
cids := randCIDs(32)
cids := randCIDs(t, 32)
// sort cids in asc order
sort.Slice(cids, func(i, j int) bool {
return cids[i].String() < cids[j].String()
Expand All @@ -45,7 +44,7 @@ func TestBlockGetter_GetBlocks(t *testing.T) {
}
})
t.Run("retrieval error", func(t *testing.T) {
cids := randCIDs(32)
cids := randCIDs(t, 32)

// split cids into failed and succeeded
failedLen := mrand.Intn(len(cids)-1) + 1
Expand Down Expand Up @@ -85,7 +84,7 @@ func TestBlockGetter_GetBlocks(t *testing.T) {
}
})
t.Run("retrieval timeout", func(t *testing.T) {
cids := randCIDs(128)
cids := randCIDs(t, 128)

bg := &BlockGetter{
store: rbsMock{},
Expand Down Expand Up @@ -140,16 +139,10 @@ func (r rbsMock) HashOnRead(bool) {
panic("implement me")
}

func randCID() cid.Cid {
hash := make([]byte, ipld.NmtHashSize)
_, _ = rand.Read(hash)
return ipld.MustCidFromNamespacedSha256(hash)
}

func randCIDs(n int) []cid.Cid {
func randCIDs(t *testing.T, n int) []cid.Cid {
cids := make([]cid.Cid, n)
for i := range cids {
cids[i] = randCID()
cids[i] = ipld.RandNamespacedCID(t)
}
return cids
}
22 changes: 16 additions & 6 deletions share/eds/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/filecoin-project/dagstore"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
bstore "github.com/ipfs/go-ipfs-blockstore"
ipld "github.com/ipfs/go-ipld-format"
blocks "github.com/ipfs/go-libipfs/blocks"
Expand All @@ -16,7 +17,6 @@ var _ bstore.Blockstore = (*blockstore)(nil)

var (
errUnsupportedOperation = errors.New("unsupported operation")
errShardNotFound = errors.New("the provided cid does not map to any shard")
)

// blockstore implements the store.Blockstore interface on an EDSStore.
Expand All @@ -41,6 +41,9 @@ func newBlockstore(store *Store, cache *blockstoreCache) *blockstore {

func (bs *blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash())
if errors.Is(err, ErrNotFound) {
return false, nil
}
if err != nil {
return false, fmt.Errorf("failed to find shards containing multihash: %w", err)
}
Expand All @@ -49,16 +52,23 @@ func (bs *blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {

func (bs *blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
blockstr, err := bs.getReadOnlyBlockstore(ctx, cid)
if err != nil {
log.Debugf("failed to get blockstore for cid %s: %s", cid, err)
if errors.Is(err, ErrNotFound) {
// nmt's GetNode expects an ipld.ErrNotFound when a cid is not found.
return nil, ipld.ErrNotFound{Cid: cid}
}
if err != nil {
log.Debugf("failed to get blockstore for cid %s: %s", cid, err)
return nil, err
}
return blockstr.Get(ctx, cid)
}

func (bs *blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
blockstr, err := bs.getReadOnlyBlockstore(ctx, cid)
if errors.Is(err, ErrNotFound) {
// nmt's GetSize expects an ipld.ErrNotFound when a cid is not found.
return 0, ipld.ErrNotFound{Cid: cid}
}
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -102,12 +112,12 @@ func (bs *blockstore) HashOnRead(bool) {
// getReadOnlyBlockstore finds the underlying blockstore of the shard that contains the given CID.
func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (dagstore.ReadBlockstore, error) {
keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash())
if errors.Is(err, datastore.ErrNotFound) {
return nil, ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("failed to find shards containing multihash: %w", err)
}
if len(keys) == 0 {
return nil, errShardNotFound
}

// a share can exist in multiple EDSes, so just take the first one.
shardKey := keys[0]
Expand Down
8 changes: 8 additions & 0 deletions share/eds/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import (
"testing"

"github.com/filecoin-project/dagstore"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipld/go-car"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

ipld2 "github.com/celestiaorg/celestia-node/share/ipld"
)

// TestBlockstore_Operations tests Has, Get, and GetSize on the top level eds.Store blockstore.
Expand Down Expand Up @@ -48,6 +51,7 @@ func TestBlockstore_Operations(t *testing.T) {
break
}
blockCid := next.Cid()
randomCid := ipld2.RandNamespacedCID(t)

for _, bs := range blockstores {
// test GetSize
Expand All @@ -61,6 +65,10 @@ func TestBlockstore_Operations(t *testing.T) {
assert.Equal(t, block.Cid(), blockCid)
assert.Equal(t, block.RawData(), next.RawData())

// test Get (cid not found)
_, err = bs.Get(ctx, randomCid)
require.ErrorAs(t, err, &ipld.ErrNotFound{Cid: randomCid})

// test GetSize
size, err := bs.GetSize(ctx, blockCid)
assert.NotZerof(t, size, "blocksize.GetSize reported a root block from blockstore was empty")
Expand Down
4 changes: 4 additions & 0 deletions share/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package share

import (
"context"
"errors"
"fmt"

"github.com/minio/sha256-simd"
Expand All @@ -11,6 +12,9 @@ import (
"github.com/celestiaorg/rsmt2d"
)

// ErrNotFound is used to indicated that requested data could not be found.
var ErrNotFound = errors.New("data not found")

// Getter interface provides a set of accessors for shares by the Root.
// Automatically verifies integrity of shares(exceptions possible depending on the implementation).
//
Expand Down
98 changes: 98 additions & 0 deletions share/getters/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package getters
import (
"context"
"testing"
"time"

bsrv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
offline "github.com/ipfs/go-ipfs-exchange-offline"
mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -100,6 +103,11 @@ func TestStoreGetter(t *testing.T) {
assert.Equal(t, eds.GetCell(uint(i), uint(j)), share)
}
}

// root not found
_, dah = randomEDS(t)
_, err = sg.GetShare(ctx, &dah, 0, 0)
require.ErrorIs(t, err, share.ErrNotFound)
})

t.Run("GetEDS", func(t *testing.T) {
Expand All @@ -110,6 +118,11 @@ func TestStoreGetter(t *testing.T) {
retrievedEDS, err := sg.GetEDS(ctx, &dah)
require.NoError(t, err)
assert.True(t, share.EqualEDS(eds, retrievedEDS))

// root not found
root := share.Root{}
_, err = sg.GetEDS(ctx, &root)
require.ErrorIs(t, err, share.ErrNotFound)
})

t.Run("GetSharesByNamespace", func(t *testing.T) {
Expand All @@ -121,8 +134,93 @@ func TestStoreGetter(t *testing.T) {
require.NoError(t, err)
require.NoError(t, shares.Verify(&dah, nID))
assert.Len(t, shares.Flatten(), 2)

// nid not found
nID = make([]byte, 8)
_, err = sg.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNotFound)

// root not found
root := share.Root{}
_, err = sg.GetSharesByNamespace(ctx, &root, nID)
require.ErrorIs(t, err, share.ErrNotFound)
})
}

func TestIPLDGetter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

tmpDir := t.TempDir()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
edsStore, err := eds.NewStore(tmpDir, ds)
require.NoError(t, err)

err = edsStore.Start(ctx)
require.NoError(t, err)

bserv := bsrv.New(edsStore.Blockstore(), offline.Exchange(edsStore.Blockstore()))
sg := NewIPLDGetter(bserv)

t.Run("GetShare", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

eds, dah := randomEDS(t)
err = edsStore.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)

squareSize := int(eds.Width())
for i := 0; i < squareSize; i++ {
for j := 0; j < squareSize; j++ {
share, err := sg.GetShare(ctx, &dah, i, j)
require.NoError(t, err)
assert.Equal(t, eds.GetCell(uint(i), uint(j)), share)
}
}

// root not found
_, dah = randomEDS(t)
_, err = sg.GetShare(ctx, &dah, 0, 0)
require.ErrorIs(t, err, share.ErrNotFound)
})

t.Run("GetEDS", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

eds, dah := randomEDS(t)
err = edsStore.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)

retrievedEDS, err := sg.GetEDS(ctx, &dah)
require.NoError(t, err)
assert.True(t, share.EqualEDS(eds, retrievedEDS))
})

t.Run("GetSharesByNamespace", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

eds, nID, dah := randomEDSWithDoubledNamespace(t, 4)
err = edsStore.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)

shares, err := sg.GetSharesByNamespace(ctx, &dah, nID)
require.NoError(t, err)
require.NoError(t, shares.Verify(&dah, nID))
assert.Len(t, shares.Flatten(), 2)

// nid not found
nID = make([]byte, 8)
_, err = sg.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNotFound)

// root not found
root := share.Root{}
_, err = sg.GetSharesByNamespace(ctx, &root, nID)
require.ErrorIs(t, err, share.ErrNotFound)
})
}

func randomEDS(t *testing.T) (*rsmt2d.ExtendedDataSquare, share.Root) {
Expand Down
17 changes: 15 additions & 2 deletions share/getters/ipld.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package getters

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -53,12 +54,16 @@ func (ig *IPLDGetter) GetShare(ctx context.Context, dah *share.Root, row, col in

// wrap the blockservice in a session if it has been signaled in the context.
blockGetter := getGetter(ctx, ig.bServ)
nd, err := share.GetShare(ctx, blockGetter, root, leaf, len(dah.RowsRoots))
s, err := share.GetShare(ctx, blockGetter, root, leaf, len(dah.RowsRoots))
if errors.Is(err, ipld.ErrNodeNotFound) {
// convert error to satisfy getter interface contract
err = share.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("getter/ipld: failed to retrieve share: %w", err)
}

return nd, nil
return s, nil
}

func (ig *IPLDGetter) GetEDS(ctx context.Context, root *share.Root) (eds *rsmt2d.ExtendedDataSquare, err error) {
Expand All @@ -71,6 +76,10 @@ func (ig *IPLDGetter) GetEDS(ctx context.Context, root *share.Root) (eds *rsmt2d

// rtrv.Retrieve calls shares.GetShares until enough shares are retrieved to reconstruct the EDS
eds, err = ig.rtrv.Retrieve(ctx, root)
if errors.Is(err, ipld.ErrNodeNotFound) {
// convert error to satisfy getter interface contract
err = share.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("getter/ipld: failed to retrieve eds: %w", err)
}
Expand Down Expand Up @@ -98,6 +107,10 @@ func (ig *IPLDGetter) GetSharesByNamespace(
// wrap the blockservice in a session if it has been signaled in the context.
blockGetter := getGetter(ctx, ig.bServ)
shares, err = collectSharesByNamespace(ctx, blockGetter, root, nID)
if errors.Is(err, ipld.ErrNodeNotFound) {
// convert error to satisfy getter interface contract
err = share.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("getter/ipld: failed to retrieve shares by namespace: %w", err)
}
Expand Down
Loading

0 comments on commit 71b5078

Please sign in to comment.