Skip to content

Commit

Permalink
fix: iterate all deals to index piece (#1549)
Browse files Browse the repository at this point in the history
* fix: iterate all deals to index piece

* add test, use multierror

* add and update comments
  • Loading branch information
LexLuthr committed Jul 3, 2023
1 parent 7409f90 commit 16c6310
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 35 deletions.
91 changes: 56 additions & 35 deletions piecedirectory/piecedirectory.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid
return nil
}

// BuildIndexForPiece builds indexes for a given piece CID. The piece must contain a valid deal
// corresponding to an unsealed sector for this method to work. It will try to build index
// using all available deals and will exit as soon as it succeeds for one of the deals
func (ps *PieceDirectory) BuildIndexForPiece(ctx context.Context, pieceCid cid.Cid) error {
ctx, span := tracing.Tracer.Start(ctx, "pm.build_index_for_piece")
defer span.End()
Expand All @@ -251,12 +254,18 @@ func (ps *PieceDirectory) BuildIndexForPiece(ctx context.Context, pieceCid cid.C
return fmt.Errorf("getting piece deals: no deals found for piece")
}

err = ps.addIndexForPieceThrottled(ctx, pieceCid, dls[0])
if err != nil {
return fmt.Errorf("adding index for piece deal %d: %w", dls[0].ChainDealID, err)
var merr error

// Iterate over all available deals in case first deal does not have an unsealed sector
for _, dl := range dls {
err = ps.addIndexForPieceThrottled(ctx, pieceCid, dl)
if err == nil {
return nil
}
merr = multierror.Append(merr, fmt.Errorf("adding index for piece deal %d: %w", dl.ChainDealID, err))
}

return nil
return merr
}

func (ps *PieceDirectory) RemoveDealForPiece(ctx context.Context, pieceCid cid.Cid, dealUuid string) error {
Expand Down Expand Up @@ -427,44 +436,56 @@ func (ps *PieceDirectory) BlockstoreGetSize(ctx context.Context, c cid.Cid) (int
return 0, format.ErrNotFound{Cid: c}
}

// Get the size of the block from the first piece (should be the same for
// any piece)
offsetSize, err := ps.GetOffsetSize(ctx, pieces[0], c.Hash())
if err != nil {
return 0, fmt.Errorf("getting size of cid %s in piece %s: %w", c, pieces[0], err)
}
var merr error

if offsetSize.Size > 0 {
return int(offsetSize.Size), nil
}
// Iterate over all pieces in case the sector containing the first piece with the Block
// is not unsealed
for _, p := range pieces {
// Get the size of the block from the piece (should be the same for
// all pieces)
offsetSize, err := ps.GetOffsetSize(ctx, p, c.Hash())
if err != nil {
merr = multierror.Append(merr, fmt.Errorf("getting size of cid %s in piece %s: %w", c, p, err))
continue
}

// Indexes imported from the DAG store do not have block size information
// (they only have offset information). Check if the block size is zero
// because the index is incomplete.
isComplete, err := ps.store.IsCompleteIndex(ctx, pieces[0])
if err != nil {
return 0, fmt.Errorf("getting index complete status for piece %s: %w", pieces[0], err)
}
if offsetSize.Size > 0 {
return int(offsetSize.Size), nil
}

if isComplete {
// The deal index is complete, so it must be a zero-sized block.
// A zero-sized block is unusual, but possible.
return int(offsetSize.Size), nil
}
// Indexes imported from the DAG store do not have block size information
// (they only have offset information). Check if the block size is zero
// because the index is incomplete.
isComplete, err := ps.store.IsCompleteIndex(ctx, p)
if err != nil {
merr = multierror.Append(merr, fmt.Errorf("getting index complete status for piece %s: %w", p, err))
continue
}

// The index is incomplete, so re-build the index on the fly
err = ps.BuildIndexForPiece(ctx, pieces[0])
if err != nil {
return 0, fmt.Errorf("re-building index for piece %s: %w", pieces[0], err)
}
if isComplete {
// The deal index is complete, so it must be a zero-sized block.
// A zero-sized block is unusual, but possible.
return int(offsetSize.Size), nil
}

// Now get the size again
offsetSize, err = ps.GetOffsetSize(ctx, pieces[0], c.Hash())
if err != nil {
return 0, fmt.Errorf("getting size of cid %s in piece %s: %w", c, pieces[0], err)
// The index is incomplete, so re-build the index on the fly
err = ps.BuildIndexForPiece(ctx, p)
if err != nil {
merr = multierror.Append(merr, fmt.Errorf("re-building index for piece %s: %w", p, err))
continue
}

// Now get the size again
offsetSize, err = ps.GetOffsetSize(ctx, p, c.Hash())
if err != nil {
merr = multierror.Append(merr, fmt.Errorf("getting size of cid %s in piece %s: %w", c, p, err))
continue
}

return int(offsetSize.Size), nil
}

return int(offsetSize.Size), nil
return 0, merr
}

func (ps *PieceDirectory) BlockstoreHas(ctx context.Context, c cid.Cid) (bool, error) {
Expand Down
83 changes: 83 additions & 0 deletions piecedirectory/piecedirectory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"testing"
"time"

pdTypes "github.com/filecoin-project/boost/piecedirectory/types"
mock_piecedirectory "github.com/filecoin-project/boost/piecedirectory/types/mocks"
"github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/svc"
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/filecoin-project/go-state-types/abi"
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -66,6 +68,10 @@ func testPieceDirectory(ctx context.Context, t *testing.T, bdsvc *svc.Service) {
t.Run("flagging pieces", func(t *testing.T) {
testFlaggingPieces(ctx, t, cl)
})

t.Run("reIndexing pieces from multiple sectors", func(t *testing.T) {
testReIndexMultiSector(ctx, t, cl)
})
}

func testPieceDirectoryNotFound(ctx context.Context, t *testing.T, cl *client.Store) {
Expand Down Expand Up @@ -329,3 +335,80 @@ func testFlaggingPieces(ctx context.Context, t *testing.T, cl *client.Store) {
require.NoError(t, err)
require.Equal(t, 0, len(pcids))
}

// Verify that BuildIndexForPiece iterates over all deals return error if none of the deals (sectors)
// can be used to read the piece. We are testing 2 conditions here:
// 1. No eligible piece is found for both deals - error is expected
// 2. 1 eligible piece is found - no error is expected
func testReIndexMultiSector(ctx context.Context, t *testing.T, cl *client.Store) {
ctrl := gomock.NewController(t)
pr := mock_piecedirectory.NewMockPieceReader(ctrl)
pm := NewPieceDirectory(cl, pr, 1)
pm.Start(ctx)

// Create a random CAR file
carFilePath := CreateCarFile(t)
carFile, err := os.Open(carFilePath)
require.NoError(t, err)
defer carFile.Close()

carReader, err := car.OpenReader(carFilePath)
require.NoError(t, err)
defer carReader.Close()
carv1Reader, err := carReader.DataReader()
require.NoError(t, err)

// Return error first 3 time as during the first attempt we want to surface errors from
// failed BuildIndexForPiece operation for both deals. 3rd time to return error for first deal
// in the second run where we want the method to succeed eventually.
pr.EXPECT().GetReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("piece error")).Times(3)
pr.EXPECT().GetReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(_ context.Context, _ abi.SectorNumber, _ abi.PaddedPieceSize, _ abi.PaddedPieceSize) (pdTypes.SectionReader, error) {
_, err := carv1Reader.Seek(0, io.SeekStart)
return MockSectionReader{carv1Reader}, err
})

pieceCid := CalculateCommp(t, carv1Reader).PieceCID

// Add deal info for the piece - it doesn't matter what it is, the piece
// just needs to have 2 deals. One with no available pieceReader (simulating no unsealed sector)
// and other one with correct pieceReader
d1 := model.DealInfo{
DealUuid: uuid.New().String(),
ChainDealID: 1,
SectorID: 2,
PieceOffset: 0,
PieceLength: 0,
}

d2 := model.DealInfo{
DealUuid: uuid.New().String(),
ChainDealID: 2,
SectorID: 3,
PieceOffset: 0,
PieceLength: 0,
}

err = cl.AddDealForPiece(ctx, pieceCid, d1)
require.NoError(t, err)

err = cl.AddDealForPiece(ctx, pieceCid, d2)
require.NoError(t, err)

b, err := cl.IsIndexed(ctx, pieceCid)
require.NoError(t, err)
require.False(t, b)

// Expect error as GetReader() mock will return error for both deals
err = pm.BuildIndexForPiece(ctx, pieceCid)
require.ErrorContains(t, err, "piece error")

// No error is expected as GetReader() mock will return error for first deal
// but correct reader for the second deal
err = pm.BuildIndexForPiece(ctx, pieceCid)
require.NoError(t, err)

b, err = cl.IsIndexed(ctx, pieceCid)
require.NoError(t, err)
require.True(t, b)
}

0 comments on commit 16c6310

Please sign in to comment.