Skip to content

Commit

Permalink
fix(share/eds): use cached accessors in GetCAR and GetDAH (#2000)
Browse files Browse the repository at this point in the history
Closes #1514 . This PR enables accessor cache usage for GetCAR and
GetDAH. This will allow shrexeds and shrexnd servers to only require
opening the underlying file once (until removed from cache). If many
peers request the EDS at once, the server previously needed to open the
file each time.

Will still require a dependency bump of our dagstore fork for the new
test to not fail - so draft until then. Related:
celestiaorg/dagstore#2

---------

Co-authored-by: rene <41963722+renaynay@users.noreply.github.com>
Co-authored-by: Hlib Kanunnikov <hlibwondertan@gmail.com>
  • Loading branch information
3 people committed Apr 4, 2023
1 parent e14995e commit d97e634
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 21 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ require (

replace (
github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.8.0-sdk-v0.46.7
github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20221014072825-395797efb659
github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20230404123415-177451f83136
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.15.0-tm-v0.34.23
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ github.com/celestiaorg/celestia-core v1.15.0-tm-v0.34.23 h1:BHvn41IHOtvHeX1VZqO/
github.com/celestiaorg/celestia-core v1.15.0-tm-v0.34.23/go.mod h1:nL+vkAMKy/A8wWemWqMwBy4pOGWYYbboAVTEe3N5gIU=
github.com/celestiaorg/cosmos-sdk v1.8.0-sdk-v0.46.7 h1:EADZy33ufskVIy6Rj6jbi3SOVCeYYo26zUi7iYx+QR0=
github.com/celestiaorg/cosmos-sdk v1.8.0-sdk-v0.46.7/go.mod h1:vg3Eza9adJJ5Mdx6boz5MpZsZcTZyrfTVYZHyi2zLm4=
github.com/celestiaorg/dagstore v0.0.0-20221014072825-395797efb659 h1:f3205vw3GYBtMiNoS+qB6IuHSs50Iwqsm9lNIikLTCk=
github.com/celestiaorg/dagstore v0.0.0-20221014072825-395797efb659/go.mod h1:ta/DlqIH10bvhwqJIw51Nq3QU4XVMp6pz3f0Deve9fM=
github.com/celestiaorg/dagstore v0.0.0-20230404123415-177451f83136 h1:LBvY3NDA18fcS72pBAEd2pENoUpz1iV4cCXBN2Zrj/I=
github.com/celestiaorg/dagstore v0.0.0-20230404123415-177451f83136/go.mod h1:ta/DlqIH10bvhwqJIw51Nq3QU4XVMp6pz3f0Deve9fM=
github.com/celestiaorg/go-header v0.2.3 h1:41r60OtAeexWC3J3eTELgWfzcdKR2taFlfcJ/2IHZD4=
github.com/celestiaorg/go-header v0.2.3/go.mod h1:6XKf0yhoEQqfKQTZnyTZjTjF5jH5Wq9uO9AvDMkdYbs=
github.com/celestiaorg/go-libp2p-messenger v0.2.0 h1:/0MuPDcFamQMbw9xTZ73yImqgTO3jHV7wKHvWD/Irao=
Expand Down
19 changes: 15 additions & 4 deletions share/eds/accessor_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (bc *blockstoreCache) Get(shardContainingCid shard.Key) (*accessorWithBlock
lk.Lock()
defer lk.Unlock()

return bc.unsafeGet(shardContainingCid)
}

func (bc *blockstoreCache) unsafeGet(shardContainingCid shard.Key) (*accessorWithBlockstore, error) {
// We've already ensured that the given shard has the cid/multihash we are looking for.
val, ok := bc.cache.Get(shardContainingCid)
if !ok {
Expand All @@ -83,16 +87,23 @@ func (bc *blockstoreCache) Get(shardContainingCid shard.Key) (*accessorWithBlock
func (bc *blockstoreCache) Add(
shardContainingCid shard.Key,
accessor *dagstore.ShardAccessor,
) (*accessorWithBlockstore, error) {
lk := &bc.stripedLocks[shardKeyToStriped(shardContainingCid)]
lk.Lock()
defer lk.Unlock()

return bc.unsafeAdd(shardContainingCid, accessor)
}

func (bc *blockstoreCache) unsafeAdd(
shardContainingCid shard.Key,
accessor *dagstore.ShardAccessor,
) (*accessorWithBlockstore, error) {
blockStore, err := accessor.Blockstore()
if err != nil {
return nil, fmt.Errorf("failed to get blockstore from accessor: %w", err)
}

lk := &bc.stripedLocks[shardKeyToStriped(shardContainingCid)]
lk.Lock()
defer lk.Unlock()

newAccessor := &accessorWithBlockstore{
bs: blockStore,
sa: accessor,
Expand Down
24 changes: 13 additions & 11 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,18 @@ func (s *Store) Put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext
// The Reader strictly reads the CAR header and first quadrant (1/4) of the EDS, omitting all the
// NMT Merkle proofs. Integrity of the store data is not verified.
//
// Caller must Close returned reader after reading.
func (s *Store) GetCAR(ctx context.Context, root share.DataHash) (io.ReadCloser, error) {
// The shard is cached in the Store, so subsequent calls to GetCAR with the same root will use the same reader.
// The cache is responsible for closing the underlying reader.
func (s *Store) GetCAR(ctx context.Context, root share.DataHash) (io.Reader, error) {
ctx, span := tracer.Start(ctx, "store/get-car", trace.WithAttributes(attribute.String("root", root.String())))
defer span.End()

key := root.String()
accessor, err := s.getAccessor(ctx, shard.KeyFromString(key))
accessor, err := s.getCachedAccessor(ctx, shard.KeyFromString(key))
if err != nil {
return nil, fmt.Errorf("failed to get accessor: %w", err)
}
return accessor, nil
return accessor.sa.Reader(), nil
}

// Blockstore returns an IPFS blockstore providing access to individual shares/nodes of all EDS
Expand Down Expand Up @@ -247,13 +248,12 @@ func (s *Store) GetDAH(ctx context.Context, root share.DataHash) (*share.Root, e
defer span.End()

key := shard.KeyFromString(root.String())
accessor, err := s.getAccessor(ctx, key)
accessor, err := s.getCachedAccessor(ctx, key)
if err != nil {
return nil, fmt.Errorf("eds/store: failed to get accessor: %w", err)
}
defer accessor.Close()

carHeader, err := carv1.ReadHeader(bufio.NewReader(accessor))
carHeader, err := carv1.ReadHeader(bufio.NewReader(accessor.sa.Reader()))
if err != nil {
return nil, fmt.Errorf("eds/store: failed to read car header: %w", err)
}
Expand Down Expand Up @@ -297,8 +297,11 @@ func (s *Store) getAccessor(ctx context.Context, key shard.Key) (*dagstore.Shard
}

func (s *Store) getCachedAccessor(ctx context.Context, key shard.Key) (*accessorWithBlockstore, error) {
// try to fetch from cache
accessor, err := s.cache.Get(key)
lk := &s.cache.stripedLocks[shardKeyToStriped(key)]
lk.Lock()
defer lk.Unlock()

accessor, err := s.cache.unsafeGet(key)
if err != nil && err != errCacheMiss {
log.Errorf("unexpected error while reading key from bs cache %s: %s", key, err)
}
Expand All @@ -311,7 +314,7 @@ func (s *Store) getCachedAccessor(ctx context.Context, key shard.Key) (*accessor
if err != nil {
return nil, err
}
return s.cache.Add(key, shardAccessor)
return s.cache.unsafeAdd(key, shardAccessor)
}

// Remove removes EDS from Store by the given share.Root hash and cleans up all
Expand Down Expand Up @@ -367,7 +370,6 @@ func (s *Store) Get(ctx context.Context, root share.DataHash) (eds *rsmt2d.Exten
if err != nil {
return nil, fmt.Errorf("failed to get CAR file: %w", err)
}
defer f.Close()
eds, err = ReadEDS(ctx, f, root)
if err != nil {
return nil, fmt.Errorf("failed to read EDS from CAR file: %w", err)
Expand Down
37 changes: 37 additions & 0 deletions share/eds/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,43 @@ func Test_BlockstoreCache(t *testing.T) {
assert.NoError(t, err, errCacheMiss)
}

// Test_CachedAccessor verifies that the reader represented by a cached accessor can be read from
// multiple times, without exhausting the underlying reader.
func Test_CachedAccessor(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

edsStore, err := newStore(t)
require.NoError(t, err)
err = edsStore.Start(ctx)
require.NoError(t, err)

eds, dah := randomEDS(t)
err = edsStore.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)

shardKey := shard.KeyFromString(dah.String())
// adds to cache
cachedAccessor, err := edsStore.getCachedAccessor(ctx, shardKey)
assert.NoError(t, err)

// first read
carReader, err := car.NewCarReader(cachedAccessor.sa.Reader())
assert.NoError(t, err)
firstBlock, err := carReader.Next()
assert.NoError(t, err)

// second read
cachedAccessor, err = edsStore.getCachedAccessor(ctx, shardKey)
assert.NoError(t, err)
carReader, err = car.NewCarReader(cachedAccessor.sa.Reader())
assert.NoError(t, err)
secondBlock, err := carReader.Next()
assert.NoError(t, err)

assert.Equal(t, firstBlock, secondBlock)
}

func newStore(t *testing.T) (*Store, error) {
t.Helper()

Expand Down
6 changes: 3 additions & 3 deletions share/p2p/shrexeds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ func (s *Server) handleStream(stream network.Stream) {
defer cancel()
status := p2p_pb.Status_OK
// determine whether the EDS is available in our store
// we do not close the reader, so that other requests will not need to re-open the file.
// closing is handled by the LRU cache.
edsReader, err := s.store.GetCAR(ctx, hash)
if err != nil {
status = p2p_pb.Status_NOT_FOUND
} else {
defer edsReader.Close()
}

// inform the client of our status
Expand Down Expand Up @@ -144,7 +144,7 @@ func (s *Server) writeStatus(status p2p_pb.Status, stream network.Stream) error
return err
}

func (s *Server) writeODS(edsReader io.ReadCloser, stream network.Stream) error {
func (s *Server) writeODS(edsReader io.Reader, stream network.Stream) error {
err := stream.SetWriteDeadline(time.Now().Add(s.params.ServerWriteTimeout))
if err != nil {
log.Debug(err)
Expand Down

0 comments on commit d97e634

Please sign in to comment.