diff --git a/go.mod b/go.mod index 9f8561bbac..55537aafae 100644 --- a/go.mod +++ b/go.mod @@ -327,7 +327,7 @@ require ( replace ( github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.8.0-sdk-v0.46.7 - github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20221014072825-395797efb659 + github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20230404123415-177451f83136 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.15.0-tm-v0.34.23 ) diff --git a/go.sum b/go.sum index bbc5882603..13117ba45b 100644 --- a/go.sum +++ b/go.sum @@ -206,8 +206,8 @@ github.com/celestiaorg/celestia-core v1.15.0-tm-v0.34.23 h1:BHvn41IHOtvHeX1VZqO/ github.com/celestiaorg/celestia-core v1.15.0-tm-v0.34.23/go.mod h1:nL+vkAMKy/A8wWemWqMwBy4pOGWYYbboAVTEe3N5gIU= github.com/celestiaorg/cosmos-sdk v1.8.0-sdk-v0.46.7 h1:EADZy33ufskVIy6Rj6jbi3SOVCeYYo26zUi7iYx+QR0= github.com/celestiaorg/cosmos-sdk v1.8.0-sdk-v0.46.7/go.mod h1:vg3Eza9adJJ5Mdx6boz5MpZsZcTZyrfTVYZHyi2zLm4= -github.com/celestiaorg/dagstore v0.0.0-20221014072825-395797efb659 h1:f3205vw3GYBtMiNoS+qB6IuHSs50Iwqsm9lNIikLTCk= -github.com/celestiaorg/dagstore v0.0.0-20221014072825-395797efb659/go.mod h1:ta/DlqIH10bvhwqJIw51Nq3QU4XVMp6pz3f0Deve9fM= +github.com/celestiaorg/dagstore v0.0.0-20230404123415-177451f83136 h1:LBvY3NDA18fcS72pBAEd2pENoUpz1iV4cCXBN2Zrj/I= +github.com/celestiaorg/dagstore v0.0.0-20230404123415-177451f83136/go.mod h1:ta/DlqIH10bvhwqJIw51Nq3QU4XVMp6pz3f0Deve9fM= github.com/celestiaorg/go-header v0.2.3 h1:41r60OtAeexWC3J3eTELgWfzcdKR2taFlfcJ/2IHZD4= github.com/celestiaorg/go-header v0.2.3/go.mod h1:6XKf0yhoEQqfKQTZnyTZjTjF5jH5Wq9uO9AvDMkdYbs= github.com/celestiaorg/go-libp2p-messenger v0.2.0 h1:/0MuPDcFamQMbw9xTZ73yImqgTO3jHV7wKHvWD/Irao= diff --git a/share/eds/accessor_cache.go b/share/eds/accessor_cache.go index 047ce49104..eac79b946f 100644 --- a/share/eds/accessor_cache.go +++ b/share/eds/accessor_cache.go @@ -63,6 +63,10 @@ func (bc *blockstoreCache) Get(shardContainingCid shard.Key) (*accessorWithBlock lk.Lock() defer lk.Unlock() + return bc.unsafeGet(shardContainingCid) +} + +func (bc *blockstoreCache) unsafeGet(shardContainingCid shard.Key) (*accessorWithBlockstore, error) { // We've already ensured that the given shard has the cid/multihash we are looking for. val, ok := bc.cache.Get(shardContainingCid) if !ok { @@ -83,16 +87,23 @@ func (bc *blockstoreCache) Get(shardContainingCid shard.Key) (*accessorWithBlock func (bc *blockstoreCache) Add( shardContainingCid shard.Key, accessor *dagstore.ShardAccessor, +) (*accessorWithBlockstore, error) { + lk := &bc.stripedLocks[shardKeyToStriped(shardContainingCid)] + lk.Lock() + defer lk.Unlock() + + return bc.unsafeAdd(shardContainingCid, accessor) +} + +func (bc *blockstoreCache) unsafeAdd( + shardContainingCid shard.Key, + accessor *dagstore.ShardAccessor, ) (*accessorWithBlockstore, error) { blockStore, err := accessor.Blockstore() if err != nil { return nil, fmt.Errorf("failed to get blockstore from accessor: %w", err) } - lk := &bc.stripedLocks[shardKeyToStriped(shardContainingCid)] - lk.Lock() - defer lk.Unlock() - newAccessor := &accessorWithBlockstore{ bs: blockStore, sa: accessor, diff --git a/share/eds/store.go b/share/eds/store.go index bb03d0ca3c..257cff2b65 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -204,17 +204,18 @@ func (s *Store) Put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext // The Reader strictly reads the CAR header and first quadrant (1/4) of the EDS, omitting all the // NMT Merkle proofs. Integrity of the store data is not verified. // -// Caller must Close returned reader after reading. -func (s *Store) GetCAR(ctx context.Context, root share.DataHash) (io.ReadCloser, error) { +// The shard is cached in the Store, so subsequent calls to GetCAR with the same root will use the same reader. +// The cache is responsible for closing the underlying reader. +func (s *Store) GetCAR(ctx context.Context, root share.DataHash) (io.Reader, error) { ctx, span := tracer.Start(ctx, "store/get-car", trace.WithAttributes(attribute.String("root", root.String()))) defer span.End() key := root.String() - accessor, err := s.getAccessor(ctx, shard.KeyFromString(key)) + accessor, err := s.getCachedAccessor(ctx, shard.KeyFromString(key)) if err != nil { return nil, fmt.Errorf("failed to get accessor: %w", err) } - return accessor, nil + return accessor.sa.Reader(), nil } // Blockstore returns an IPFS blockstore providing access to individual shares/nodes of all EDS @@ -247,13 +248,12 @@ func (s *Store) GetDAH(ctx context.Context, root share.DataHash) (*share.Root, e defer span.End() key := shard.KeyFromString(root.String()) - accessor, err := s.getAccessor(ctx, key) + accessor, err := s.getCachedAccessor(ctx, key) if err != nil { return nil, fmt.Errorf("eds/store: failed to get accessor: %w", err) } - defer accessor.Close() - carHeader, err := carv1.ReadHeader(bufio.NewReader(accessor)) + carHeader, err := carv1.ReadHeader(bufio.NewReader(accessor.sa.Reader())) if err != nil { return nil, fmt.Errorf("eds/store: failed to read car header: %w", err) } @@ -297,8 +297,11 @@ func (s *Store) getAccessor(ctx context.Context, key shard.Key) (*dagstore.Shard } func (s *Store) getCachedAccessor(ctx context.Context, key shard.Key) (*accessorWithBlockstore, error) { - // try to fetch from cache - accessor, err := s.cache.Get(key) + lk := &s.cache.stripedLocks[shardKeyToStriped(key)] + lk.Lock() + defer lk.Unlock() + + accessor, err := s.cache.unsafeGet(key) if err != nil && err != errCacheMiss { log.Errorf("unexpected error while reading key from bs cache %s: %s", key, err) } @@ -311,7 +314,7 @@ func (s *Store) getCachedAccessor(ctx context.Context, key shard.Key) (*accessor if err != nil { return nil, err } - return s.cache.Add(key, shardAccessor) + return s.cache.unsafeAdd(key, shardAccessor) } // Remove removes EDS from Store by the given share.Root hash and cleans up all @@ -367,7 +370,6 @@ func (s *Store) Get(ctx context.Context, root share.DataHash) (eds *rsmt2d.Exten if err != nil { return nil, fmt.Errorf("failed to get CAR file: %w", err) } - defer f.Close() eds, err = ReadEDS(ctx, f, root) if err != nil { return nil, fmt.Errorf("failed to read EDS from CAR file: %w", err) diff --git a/share/eds/store_test.go b/share/eds/store_test.go index 6216a0ca20..e30e66f848 100644 --- a/share/eds/store_test.go +++ b/share/eds/store_test.go @@ -206,6 +206,43 @@ func Test_BlockstoreCache(t *testing.T) { assert.NoError(t, err, errCacheMiss) } +// Test_CachedAccessor verifies that the reader represented by a cached accessor can be read from +// multiple times, without exhausting the underlying reader. +func Test_CachedAccessor(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + edsStore, err := newStore(t) + require.NoError(t, err) + err = edsStore.Start(ctx) + require.NoError(t, err) + + eds, dah := randomEDS(t) + err = edsStore.Put(ctx, dah.Hash(), eds) + require.NoError(t, err) + + shardKey := shard.KeyFromString(dah.String()) + // adds to cache + cachedAccessor, err := edsStore.getCachedAccessor(ctx, shardKey) + assert.NoError(t, err) + + // first read + carReader, err := car.NewCarReader(cachedAccessor.sa.Reader()) + assert.NoError(t, err) + firstBlock, err := carReader.Next() + assert.NoError(t, err) + + // second read + cachedAccessor, err = edsStore.getCachedAccessor(ctx, shardKey) + assert.NoError(t, err) + carReader, err = car.NewCarReader(cachedAccessor.sa.Reader()) + assert.NoError(t, err) + secondBlock, err := carReader.Next() + assert.NoError(t, err) + + assert.Equal(t, firstBlock, secondBlock) +} + func newStore(t *testing.T) (*Store, error) { t.Helper() diff --git a/share/p2p/shrexeds/server.go b/share/p2p/shrexeds/server.go index d79f3c31c7..ccc841df64 100644 --- a/share/p2p/shrexeds/server.go +++ b/share/p2p/shrexeds/server.go @@ -80,11 +80,11 @@ func (s *Server) handleStream(stream network.Stream) { defer cancel() status := p2p_pb.Status_OK // determine whether the EDS is available in our store + // we do not close the reader, so that other requests will not need to re-open the file. + // closing is handled by the LRU cache. edsReader, err := s.store.GetCAR(ctx, hash) if err != nil { status = p2p_pb.Status_NOT_FOUND - } else { - defer edsReader.Close() } // inform the client of our status @@ -144,7 +144,7 @@ func (s *Server) writeStatus(status p2p_pb.Status, stream network.Stream) error return err } -func (s *Server) writeODS(edsReader io.ReadCloser, stream network.Stream) error { +func (s *Server) writeODS(edsReader io.Reader, stream network.Stream) error { err := stream.SetWriteDeadline(time.Now().Add(s.params.ServerWriteTimeout)) if err != nil { log.Debug(err)