From d97e634e85f901e4d94659412d3b227a65b29dff Mon Sep 17 00:00:00 2001 From: Ryan Date: Tue, 4 Apr 2023 17:24:12 +0200 Subject: [PATCH] fix(share/eds): use cached accessors in `GetCAR` and `GetDAH` (#2000) Closes #1514 . This PR enables accessor cache usage for GetCAR and GetDAH. This will allow shrexeds and shrexnd servers to only require opening the underlying file once (until removed from cache). If many peers request the EDS at once, the server previously needed to open the file each time. Will still require a dependency bump of our dagstore fork for the new test to not fail - so draft until then. Related: https://github.com/celestiaorg/dagstore/pull/2 --------- Co-authored-by: rene <41963722+renaynay@users.noreply.github.com> Co-authored-by: Hlib Kanunnikov --- go.mod | 2 +- go.sum | 4 ++-- share/eds/accessor_cache.go | 19 ++++++++++++++---- share/eds/store.go | 24 ++++++++++++----------- share/eds/store_test.go | 37 ++++++++++++++++++++++++++++++++++++ share/p2p/shrexeds/server.go | 6 +++--- 6 files changed, 71 insertions(+), 21 deletions(-) diff --git a/go.mod b/go.mod index 9f8561bbac..55537aafae 100644 --- a/go.mod +++ b/go.mod @@ -327,7 +327,7 @@ require ( replace ( github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.8.0-sdk-v0.46.7 - github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20221014072825-395797efb659 + github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20230404123415-177451f83136 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.15.0-tm-v0.34.23 ) diff --git a/go.sum b/go.sum index bbc5882603..13117ba45b 100644 --- a/go.sum +++ b/go.sum @@ -206,8 +206,8 @@ github.com/celestiaorg/celestia-core v1.15.0-tm-v0.34.23 h1:BHvn41IHOtvHeX1VZqO/ github.com/celestiaorg/celestia-core v1.15.0-tm-v0.34.23/go.mod h1:nL+vkAMKy/A8wWemWqMwBy4pOGWYYbboAVTEe3N5gIU= github.com/celestiaorg/cosmos-sdk v1.8.0-sdk-v0.46.7 h1:EADZy33ufskVIy6Rj6jbi3SOVCeYYo26zUi7iYx+QR0= github.com/celestiaorg/cosmos-sdk v1.8.0-sdk-v0.46.7/go.mod h1:vg3Eza9adJJ5Mdx6boz5MpZsZcTZyrfTVYZHyi2zLm4= -github.com/celestiaorg/dagstore v0.0.0-20221014072825-395797efb659 h1:f3205vw3GYBtMiNoS+qB6IuHSs50Iwqsm9lNIikLTCk= -github.com/celestiaorg/dagstore v0.0.0-20221014072825-395797efb659/go.mod h1:ta/DlqIH10bvhwqJIw51Nq3QU4XVMp6pz3f0Deve9fM= +github.com/celestiaorg/dagstore v0.0.0-20230404123415-177451f83136 h1:LBvY3NDA18fcS72pBAEd2pENoUpz1iV4cCXBN2Zrj/I= +github.com/celestiaorg/dagstore v0.0.0-20230404123415-177451f83136/go.mod h1:ta/DlqIH10bvhwqJIw51Nq3QU4XVMp6pz3f0Deve9fM= github.com/celestiaorg/go-header v0.2.3 h1:41r60OtAeexWC3J3eTELgWfzcdKR2taFlfcJ/2IHZD4= github.com/celestiaorg/go-header v0.2.3/go.mod h1:6XKf0yhoEQqfKQTZnyTZjTjF5jH5Wq9uO9AvDMkdYbs= github.com/celestiaorg/go-libp2p-messenger v0.2.0 h1:/0MuPDcFamQMbw9xTZ73yImqgTO3jHV7wKHvWD/Irao= diff --git a/share/eds/accessor_cache.go b/share/eds/accessor_cache.go index 047ce49104..eac79b946f 100644 --- a/share/eds/accessor_cache.go +++ b/share/eds/accessor_cache.go @@ -63,6 +63,10 @@ func (bc *blockstoreCache) Get(shardContainingCid shard.Key) (*accessorWithBlock lk.Lock() defer lk.Unlock() + return bc.unsafeGet(shardContainingCid) +} + +func (bc *blockstoreCache) unsafeGet(shardContainingCid shard.Key) (*accessorWithBlockstore, error) { // We've already ensured that the given shard has the cid/multihash we are looking for. val, ok := bc.cache.Get(shardContainingCid) if !ok { @@ -83,16 +87,23 @@ func (bc *blockstoreCache) Get(shardContainingCid shard.Key) (*accessorWithBlock func (bc *blockstoreCache) Add( shardContainingCid shard.Key, accessor *dagstore.ShardAccessor, +) (*accessorWithBlockstore, error) { + lk := &bc.stripedLocks[shardKeyToStriped(shardContainingCid)] + lk.Lock() + defer lk.Unlock() + + return bc.unsafeAdd(shardContainingCid, accessor) +} + +func (bc *blockstoreCache) unsafeAdd( + shardContainingCid shard.Key, + accessor *dagstore.ShardAccessor, ) (*accessorWithBlockstore, error) { blockStore, err := accessor.Blockstore() if err != nil { return nil, fmt.Errorf("failed to get blockstore from accessor: %w", err) } - lk := &bc.stripedLocks[shardKeyToStriped(shardContainingCid)] - lk.Lock() - defer lk.Unlock() - newAccessor := &accessorWithBlockstore{ bs: blockStore, sa: accessor, diff --git a/share/eds/store.go b/share/eds/store.go index bb03d0ca3c..257cff2b65 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -204,17 +204,18 @@ func (s *Store) Put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext // The Reader strictly reads the CAR header and first quadrant (1/4) of the EDS, omitting all the // NMT Merkle proofs. Integrity of the store data is not verified. // -// Caller must Close returned reader after reading. -func (s *Store) GetCAR(ctx context.Context, root share.DataHash) (io.ReadCloser, error) { +// The shard is cached in the Store, so subsequent calls to GetCAR with the same root will use the same reader. +// The cache is responsible for closing the underlying reader. +func (s *Store) GetCAR(ctx context.Context, root share.DataHash) (io.Reader, error) { ctx, span := tracer.Start(ctx, "store/get-car", trace.WithAttributes(attribute.String("root", root.String()))) defer span.End() key := root.String() - accessor, err := s.getAccessor(ctx, shard.KeyFromString(key)) + accessor, err := s.getCachedAccessor(ctx, shard.KeyFromString(key)) if err != nil { return nil, fmt.Errorf("failed to get accessor: %w", err) } - return accessor, nil + return accessor.sa.Reader(), nil } // Blockstore returns an IPFS blockstore providing access to individual shares/nodes of all EDS @@ -247,13 +248,12 @@ func (s *Store) GetDAH(ctx context.Context, root share.DataHash) (*share.Root, e defer span.End() key := shard.KeyFromString(root.String()) - accessor, err := s.getAccessor(ctx, key) + accessor, err := s.getCachedAccessor(ctx, key) if err != nil { return nil, fmt.Errorf("eds/store: failed to get accessor: %w", err) } - defer accessor.Close() - carHeader, err := carv1.ReadHeader(bufio.NewReader(accessor)) + carHeader, err := carv1.ReadHeader(bufio.NewReader(accessor.sa.Reader())) if err != nil { return nil, fmt.Errorf("eds/store: failed to read car header: %w", err) } @@ -297,8 +297,11 @@ func (s *Store) getAccessor(ctx context.Context, key shard.Key) (*dagstore.Shard } func (s *Store) getCachedAccessor(ctx context.Context, key shard.Key) (*accessorWithBlockstore, error) { - // try to fetch from cache - accessor, err := s.cache.Get(key) + lk := &s.cache.stripedLocks[shardKeyToStriped(key)] + lk.Lock() + defer lk.Unlock() + + accessor, err := s.cache.unsafeGet(key) if err != nil && err != errCacheMiss { log.Errorf("unexpected error while reading key from bs cache %s: %s", key, err) } @@ -311,7 +314,7 @@ func (s *Store) getCachedAccessor(ctx context.Context, key shard.Key) (*accessor if err != nil { return nil, err } - return s.cache.Add(key, shardAccessor) + return s.cache.unsafeAdd(key, shardAccessor) } // Remove removes EDS from Store by the given share.Root hash and cleans up all @@ -367,7 +370,6 @@ func (s *Store) Get(ctx context.Context, root share.DataHash) (eds *rsmt2d.Exten if err != nil { return nil, fmt.Errorf("failed to get CAR file: %w", err) } - defer f.Close() eds, err = ReadEDS(ctx, f, root) if err != nil { return nil, fmt.Errorf("failed to read EDS from CAR file: %w", err) diff --git a/share/eds/store_test.go b/share/eds/store_test.go index 6216a0ca20..e30e66f848 100644 --- a/share/eds/store_test.go +++ b/share/eds/store_test.go @@ -206,6 +206,43 @@ func Test_BlockstoreCache(t *testing.T) { assert.NoError(t, err, errCacheMiss) } +// Test_CachedAccessor verifies that the reader represented by a cached accessor can be read from +// multiple times, without exhausting the underlying reader. +func Test_CachedAccessor(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + edsStore, err := newStore(t) + require.NoError(t, err) + err = edsStore.Start(ctx) + require.NoError(t, err) + + eds, dah := randomEDS(t) + err = edsStore.Put(ctx, dah.Hash(), eds) + require.NoError(t, err) + + shardKey := shard.KeyFromString(dah.String()) + // adds to cache + cachedAccessor, err := edsStore.getCachedAccessor(ctx, shardKey) + assert.NoError(t, err) + + // first read + carReader, err := car.NewCarReader(cachedAccessor.sa.Reader()) + assert.NoError(t, err) + firstBlock, err := carReader.Next() + assert.NoError(t, err) + + // second read + cachedAccessor, err = edsStore.getCachedAccessor(ctx, shardKey) + assert.NoError(t, err) + carReader, err = car.NewCarReader(cachedAccessor.sa.Reader()) + assert.NoError(t, err) + secondBlock, err := carReader.Next() + assert.NoError(t, err) + + assert.Equal(t, firstBlock, secondBlock) +} + func newStore(t *testing.T) (*Store, error) { t.Helper() diff --git a/share/p2p/shrexeds/server.go b/share/p2p/shrexeds/server.go index d79f3c31c7..ccc841df64 100644 --- a/share/p2p/shrexeds/server.go +++ b/share/p2p/shrexeds/server.go @@ -80,11 +80,11 @@ func (s *Server) handleStream(stream network.Stream) { defer cancel() status := p2p_pb.Status_OK // determine whether the EDS is available in our store + // we do not close the reader, so that other requests will not need to re-open the file. + // closing is handled by the LRU cache. edsReader, err := s.store.GetCAR(ctx, hash) if err != nil { status = p2p_pb.Status_NOT_FOUND - } else { - defer edsReader.Close() } // inform the client of our status @@ -144,7 +144,7 @@ func (s *Server) writeStatus(status p2p_pb.Status, stream network.Stream) error return err } -func (s *Server) writeODS(edsReader io.ReadCloser, stream network.Stream) error { +func (s *Server) writeODS(edsReader io.Reader, stream network.Stream) error { err := stream.SetWriteDeadline(time.Now().Add(s.params.ServerWriteTimeout)) if err != nil { log.Debug(err)