Skip to content

Commit

Permalink
Integrate CARv2 blockstore in the retrieval market (#560)
Browse files Browse the repository at this point in the history
* refactor: integrate dag store into retrieval market

* fix all tests and the padding issue

* refactor: move mount from shared testutil to dagstore dir

* refactor: add tests for lazy blockstore

* refactor: code cleanup

* feat: update go-car to latest

* Dagstore lotus mount Implementation with tests (#564)

* dagstore lotus mount impl

* refactor: nicer error messages

* mount api tests

* refactor: integrate dag store (#565)

Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>

* some storage market fixes by aarsh

* fix: better error messages in DAG store wrapper

* refactor: simplify mock dag store wrapper

* fix: TestBounceConnectionDealTransferOngoing

* refactor: remove some commented out code

* refactor: closable blockstore interface to use full blockstore

* fix: TestBounceConnectionDealTransferUnsealing

* refactor: add comment explaining lotus mount template

* test: verify that the lazy blockstore is only initialized once

* fix: comment

* fix: always finalize blockstore before reaching complete state (#567)

Co-authored-by: aarshkshah1992 <aarshkshah1992@gmail.com>
  • Loading branch information
dirkmc and aarshkshah1992 committed Jul 13, 2021
1 parent 940bb3f commit 2cbcf20
Show file tree
Hide file tree
Showing 62 changed files with 1,949 additions and 681 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ orbs:
executors:
golang:
docker:
- image: circleci/golang:1.14-node
- image: circleci/golang:1.16.4
resource_class: large

commands:
Expand Down
19 changes: 13 additions & 6 deletions carstore/read_only_blockstore.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
package carstore

import (
"io"
"sync"

bstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipld/go-car/v2/blockstore"
"golang.org/x/xerrors"
)

type ClosableBlockstore interface {
bstore.Blockstore
io.Closer
}

// CarReadOnlyStoreTracker tracks the lifecycle of a ReadOnly CAR Blockstore and makes it easy to create/get/cleanup the blockstores.
// It's important to close a CAR Blockstore when done using it so that the backing CAR file can be closed.
type CarReadOnlyStoreTracker struct {
mu sync.RWMutex
stores map[string]*blockstore.ReadOnly
stores map[string]ClosableBlockstore
}

func NewReadOnlyStoreTracker() *CarReadOnlyStoreTracker {
return &CarReadOnlyStoreTracker{
stores: make(map[string]*blockstore.ReadOnly),
stores: make(map[string]ClosableBlockstore),
}
}

func (r *CarReadOnlyStoreTracker) Add(key string, bs *blockstore.ReadOnly) (bool, error) {
func (r *CarReadOnlyStoreTracker) Add(key string, bs ClosableBlockstore) (bool, error) {
r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -32,15 +39,15 @@ func (r *CarReadOnlyStoreTracker) Add(key string, bs *blockstore.ReadOnly) (bool
return true, nil
}

func (r *CarReadOnlyStoreTracker) GetOrCreate(key string, carFilePath string) (*blockstore.ReadOnly, error) {
func (r *CarReadOnlyStoreTracker) GetOrCreate(key string, carFilePath string) (ClosableBlockstore, error) {
r.mu.Lock()
defer r.mu.Unlock()

if bs, ok := r.stores[key]; ok {
return bs, nil
}

rdOnly, err := blockstore.OpenReadOnly(carFilePath, true)
rdOnly, err := blockstore.OpenReadOnly(carFilePath)
if err != nil {
return nil, xerrors.Errorf("failed to open read-only blockstore: %w", err)
}
Expand All @@ -49,7 +56,7 @@ func (r *CarReadOnlyStoreTracker) GetOrCreate(key string, carFilePath string) (*
return rdOnly, nil
}

func (r *CarReadOnlyStoreTracker) Get(key string) (*blockstore.ReadOnly, error) {
func (r *CarReadOnlyStoreTracker) Get(key string) (ClosableBlockstore, error) {
r.mu.RLock()
defer r.mu.RUnlock()

Expand Down
77 changes: 73 additions & 4 deletions carstore/read_only_blockstore_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,84 @@
package carstore
package carstore_test

import (
"context"
"path/filepath"
"testing"

"github.com/ipld/go-car/v2/blockstore"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/dagstore"

"github.com/filecoin-project/go-fil-markets/carstore"
tut "github.com/filecoin-project/go-fil-markets/shared_testutil"
)

func TestReadOnlyStoreTracker(t *testing.T) {
ctx := context.Background()

// Create a CARv2 file from a fixture
testData := tut.NewLibp2pTestData(ctx, t)
fpath1 := filepath.Join("retrievalmarket", "impl", "fixtures", "lorem.txt")
_, carFilePath := testData.LoadUnixFSFileToStore(t, fpath1)
fpath2 := filepath.Join("retrievalmarket", "impl", "fixtures", "lorem_under_1_block.txt")
_, carFilePath2 := testData.LoadUnixFSFileToStore(t, fpath2)
rdOnlyBS1, err := blockstore.OpenReadOnly(carFilePath)
require.NoError(t, err)
len1 := getBstoreLen(ctx, t, rdOnlyBS1)

k1 := "k1"
tracker := NewReadOnlyStoreTracker()
k2 := "k2"
tracker := carstore.NewReadOnlyStoreTracker()

// Get a non-existent key
_, err = tracker.Get(k1)
require.True(t, carstore.IsNotFound(err))

// Add a read-only blockstore
ok, err := tracker.Add(k1, rdOnlyBS1)
require.NoError(t, err)
require.True(t, ok)

// Get the blockstore using its key
got, err := tracker.Get(k1)
require.NoError(t, err)

// Verify the blockstore is the same
lenGot := getBstoreLen(ctx, t, got)
require.Equal(t, len1, lenGot)

// Call GetOrCreate using the same key
got2, err := tracker.GetOrCreate(k1, carFilePath)
require.NoError(t, err)

// Verify the blockstore is the same
lenGot2 := getBstoreLen(ctx, t, got2)
require.Equal(t, len1, lenGot2)

// Call GetOrCreate with a different CAR file
rdOnlyBS2, err := tracker.GetOrCreate(k2, carFilePath2)
require.NoError(t, err)

// Verify the blockstore is different
len2 := getBstoreLen(ctx, t, rdOnlyBS2)
require.NotEqual(t, len1, len2)

// Clean the second blockstore from the tracker
err = tracker.CleanBlockstore(k2)
require.NoError(t, err)

// Verify it's been removed
_, err = tracker.Get(k2)
require.True(t, carstore.IsNotFound(err))
}

_, err := tracker.Get(k1)
require.True(t, IsNotFound(err))
func getBstoreLen(ctx context.Context, t *testing.T, bs dagstore.ReadBlockstore) int {
ch, err := bs.AllKeysChan(ctx)
require.NoError(t, err)
var len int
for range ch {
len++
}
return len
}
8 changes: 5 additions & 3 deletions carstore/read_write_blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (r *CarReadWriteStoreTracker) GetOrCreate(key string, carV2FilePath string,
return bs, nil
}

rwBs, err := blockstore.NewReadWrite(carV2FilePath, []cid.Cid{rootCid})
rwBs, err := blockstore.NewReadWrite(carV2FilePath, []cid.Cid{rootCid}, blockstore.WithCidDeduplication)
if err != nil {
return nil, xerrors.Errorf("failed to create read-write blockstore: %w", err)
}
Expand All @@ -55,9 +55,11 @@ func (r *CarReadWriteStoreTracker) CleanBlockstore(key string) error {
r.mu.Lock()
defer r.mu.Unlock()

if _, ok := r.stores[key]; ok {
delete(r.stores, key)
if rw, ok := r.stores[key]; ok {
_ = rw.Finalize()
}

delete(r.stores, key)

return nil
}
65 changes: 65 additions & 0 deletions carstore/read_write_blockstore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package carstore_test

import (
"context"
"path/filepath"
"testing"

cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/carstore"
tut "github.com/filecoin-project/go-fil-markets/shared_testutil"
)

func TestReadWriteStoreTracker(t *testing.T) {
ctx := context.Background()

// Create a CARv2 file from a fixture
testData := tut.NewLibp2pTestData(ctx, t)
fpath1 := filepath.Join("retrievalmarket", "impl", "fixtures", "lorem.txt")
lnk1, carFilePath1 := testData.LoadUnixFSFileToStore(t, fpath1)
rootCidLnk1, ok := lnk1.(cidlink.Link)
require.True(t, ok)
fpath2 := filepath.Join("retrievalmarket", "impl", "fixtures", "lorem_under_1_block.txt")
lnk2, carFilePath2 := testData.LoadUnixFSFileToStore(t, fpath2)
rootCidLnk2, ok := lnk2.(cidlink.Link)
require.True(t, ok)

k1 := "k1"
k2 := "k2"
tracker := carstore.NewCarReadWriteStoreTracker()

// Get a non-existent key
_, err := tracker.Get(k1)
require.True(t, carstore.IsNotFound(err))

// Create a blockstore by calling GetOrCreate
rdOnlyBS1, err := tracker.GetOrCreate(k1, carFilePath1, rootCidLnk1.Cid)
require.NoError(t, err)

// Get the blockstore using its key
got, err := tracker.Get(k1)
require.NoError(t, err)

// Verify the blockstore is the same
len1 := getBstoreLen(ctx, t, rdOnlyBS1)
lenGot := getBstoreLen(ctx, t, got)
require.Equal(t, len1, lenGot)

// Call GetOrCreate with a different CAR file
rdOnlyBS2, err := tracker.GetOrCreate(k2, carFilePath2, rootCidLnk2.Cid)
require.NoError(t, err)

// Verify the blockstore is different
len2 := getBstoreLen(ctx, t, rdOnlyBS2)
require.NotEqual(t, len1, len2)

// Clean the second blockstore from the tracker
err = tracker.CleanBlockstore(k2)
require.NoError(t, err)

// Verify it's been removed
_, err = tracker.Get(k2)
require.True(t, carstore.IsNotFound(err))
}
105 changes: 105 additions & 0 deletions dagstore/dagstorewrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package dagstore

import (
"context"
"io"

"github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
"golang.org/x/xerrors"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"

"github.com/filecoin-project/go-fil-markets/carstore"
)

// DagStoreWrapper hides the details of the DAG store implementation from
// the other parts of go-fil-markets
type DagStoreWrapper interface {
// RegisterShard loads a CAR file into the DAG store and builds an index for it
RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string) error
// LoadShard fetches the data for a shard and provides a blockstore interface to it
LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error)
}

type dagStoreWrapper struct {
dagStore *dagstore.DAGStore
mountApi LotusMountAPI
}

func NewDagStoreWrapper(dsRegistry *mount.Registry, dagStore *dagstore.DAGStore, mountApi LotusMountAPI) (*dagStoreWrapper, error) {
err := dsRegistry.Register(lotusScheme, NewLotusMountTemplate(mountApi))
if err != nil {
return nil, err
}

return &dagStoreWrapper{
dagStore: dagStore,
mountApi: mountApi,
}, nil
}

type closableBlockstore struct {
bstore.Blockstore
io.Closer
}

func (ds *dagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) {
key := shard.KeyFromCID(pieceCid)
resch := make(chan dagstore.ShardResult, 1)
err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{})
if err != nil {
return nil, xerrors.Errorf("failed to schedule acquire shard for piece CID %s: %w", pieceCid, err)
}

// TODO: Can I rely on AcquireShard to return an error if the context times out?
//select {
//case <-ctx.Done():
// return ctx.Err()
//case res := <-resch:
// return nil, res.Error
//}

res := <-resch
if res.Error != nil {
return nil, xerrors.Errorf("failed to acquire shard for piece CID %s: %w", pieceCid, err)
}

bs, err := res.Accessor.Blockstore()
if err != nil {
return nil, err
}

return &closableBlockstore{Blockstore: NewReadOnlyBlockstore(bs), Closer: res.Accessor}, nil
}

func (ds *dagStoreWrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string) error {
key := shard.KeyFromCID(pieceCid)
mt, err := NewLotusMount(pieceCid, ds.mountApi)
if err != nil {
return xerrors.Errorf("failed to create lotus mount for piece CID %s: %w", pieceCid, err)
}

opts := dagstore.RegisterOpts{ExistingTransient: carPath}
resch := make(chan dagstore.ShardResult, 1)
err = ds.dagStore.RegisterShard(ctx, key, mt, resch, opts)
if err != nil {
return xerrors.Errorf("failed to schedule register shard for piece CID %s: %w", pieceCid, err)
}

// TODO: Can I rely on RegisterShard to return an error if the context times out?
//select {
//case <-ctx.Done():
// return ctx.Err()
//case res := <-resch:
// return res.Error
//}

res := <-resch
if res.Error != nil {
return xerrors.Errorf("failed to register shard for piece CID %s: %w", pieceCid, res.Error)
}
return nil
}

0 comments on commit 2cbcf20

Please sign in to comment.