diff --git a/carstore/read_only_blockstore.go b/carstore/read_only_blockstore.go deleted file mode 100644 index f7a5a37a..00000000 --- a/carstore/read_only_blockstore.go +++ /dev/null @@ -1,83 +0,0 @@ -package carstore - -import ( - "io" - "sync" - - bstore "github.com/ipfs/go-ipfs-blockstore" - carv2 "github.com/ipld/go-car/v2" - "github.com/ipld/go-car/v2/blockstore" - "golang.org/x/xerrors" -) - -type ClosableBlockstore interface { - bstore.Blockstore - io.Closer -} - -// CarReadOnlyStoreTracker tracks the lifecycle of a ReadOnly CAR Blockstore and makes it easy to create/get/cleanup the blockstores. -// It's important to close a CAR Blockstore when done using it so that the backing CAR file can be closed. -type CarReadOnlyStoreTracker struct { - mu sync.RWMutex - stores map[string]ClosableBlockstore -} - -func NewReadOnlyStoreTracker() *CarReadOnlyStoreTracker { - return &CarReadOnlyStoreTracker{ - stores: make(map[string]ClosableBlockstore), - } -} - -func (r *CarReadOnlyStoreTracker) Add(key string, bs ClosableBlockstore) (bool, error) { - r.mu.Lock() - defer r.mu.Unlock() - - if _, ok := r.stores[key]; ok { - return false, nil - } - - r.stores[key] = bs - return true, nil -} - -func (r *CarReadOnlyStoreTracker) GetOrCreate(key string, carFilePath string) (ClosableBlockstore, error) { - r.mu.Lock() - defer r.mu.Unlock() - - if bs, ok := r.stores[key]; ok { - return bs, nil - } - - rdOnly, err := blockstore.OpenReadOnly(carFilePath, carv2.ZeroLengthSectionAsEOF(true), blockstore.UseWholeCIDs(true)) - if err != nil { - return nil, xerrors.Errorf("failed to open read-only blockstore: %w", err) - } - r.stores[key] = rdOnly - - return rdOnly, nil -} - -func (r *CarReadOnlyStoreTracker) Get(key string) (ClosableBlockstore, error) { - r.mu.RLock() - defer r.mu.RUnlock() - - if bs, ok := r.stores[key]; ok { - return bs, nil - } - - return nil, xerrors.Errorf("could not get blockstore for key %s: %w", key, ErrNotFound) -} - -func (r *CarReadOnlyStoreTracker) CleanBlockstore(key string) error { - r.mu.Lock() - defer r.mu.Unlock() - - if bs, ok := r.stores[key]; ok { - delete(r.stores, key) - if err := bs.Close(); err != nil { - return xerrors.Errorf("failed to close read-only blockstore: %w", err) - } - } - - return nil -} diff --git a/carstore/read_write_blockstore.go b/carstore/read_write_blockstore.go deleted file mode 100644 index 1ef9a390..00000000 --- a/carstore/read_write_blockstore.go +++ /dev/null @@ -1,66 +0,0 @@ -package carstore - -import ( - "sync" - - "github.com/ipfs/go-cid" - "github.com/ipld/go-car/v2/blockstore" - "golang.org/x/xerrors" -) - -// CarReadWriteStoreTracker tracks the lifecycle of a ReadWrite CAR Blockstore and makes it easy to create/get/cleanup the blockstores. -// It's important to close a CAR Blockstore when done using it so that the backing CAR file can be closed. -type CarReadWriteStoreTracker struct { - mu sync.RWMutex - stores map[string]*blockstore.ReadWrite -} - -func NewCarReadWriteStoreTracker() *CarReadWriteStoreTracker { - return &CarReadWriteStoreTracker{ - stores: make(map[string]*blockstore.ReadWrite), - } -} - -func (r *CarReadWriteStoreTracker) GetOrCreate(key string, carV2FilePath string, rootCid cid.Cid) (*blockstore.ReadWrite, error) { - r.mu.Lock() - defer r.mu.Unlock() - - if bs, ok := r.stores[key]; ok { - return bs, nil - } - - rwBs, err := blockstore.OpenReadWrite(carV2FilePath, []cid.Cid{rootCid}, blockstore.UseWholeCIDs(true)) - if err != nil { - return nil, xerrors.Errorf("failed to create read-write blockstore: %w", err) - } - r.stores[key] = rwBs - - return rwBs, nil -} - -func (r *CarReadWriteStoreTracker) Get(key string) (*blockstore.ReadWrite, error) { - r.mu.RLock() - defer r.mu.RUnlock() - - if bs, ok := r.stores[key]; ok { - return bs, nil - } - - return nil, xerrors.Errorf("could not get blockstore for key %s: %w", key, ErrNotFound) -} - -func (r *CarReadWriteStoreTracker) CleanBlockstore(key string) error { - r.mu.Lock() - defer r.mu.Unlock() - - if rw, ok := r.stores[key]; ok { - // If the blockstore has already been finalized, calling Finalize again - // will return an error. For our purposes it's simplest if Finalize is - // idempotent so we just ignore any error. - _ = rw.Finalize() - } - - delete(r.stores, key) - - return nil -} diff --git a/docs/storageprovider.mmd.png b/docs/storageprovider.mmd.png index 3ac6d81a..5f723cbe 100644 Binary files a/docs/storageprovider.mmd.png and b/docs/storageprovider.mmd.png differ diff --git a/docs/storageprovider.mmd.svg b/docs/storageprovider.mmd.svg index 966b3aa2..02f01467 100644 --- a/docs/storageprovider.mmd.svg +++ b/docs/storageprovider.mmd.svg @@ -1,6 +1,6 @@ -ProviderEventOpenProviderEventDealRejectedProviderEventDealRejectedProviderEventDealRejectedProviderEventRejectionSentProviderEventDealDecidingProviderEventDataRequestedProviderEventDataTransferFailedProviderEventDataTransferFailedProviderEventDataTransferInitiatedProviderEventDataTransferInitiatedProviderEventDataTransferRestartedProviderEventDataTransferRestartedProviderEventDataTransferCancelledProviderEventDataTransferCancelledProviderEventDataTransferCancelledProviderEventDataTransferCompletedProviderEventDataTransferCompletedProviderEventDataVerificationFailedProviderEventVerifiedDataProviderEventVerifiedDataProviderEventFundingInitiatedProviderEventFundedProviderEventFundedProviderEventDealPublishInitiatedProviderEventDealPublishErrorProviderEventSendResponseFailedProviderEventSendResponseFailedProviderEventDealPublishedProviderEventFileStoreErroredProviderEventFileStoreErroredProviderEventFileStoreErroredProviderEventFileStoreErroredProviderEventMultistoreErroredProviderEventDealHandoffFailedProviderEventDealHandedOffProviderEventDealPrecommitFailedProviderEventDealPrecommittedProviderEventDealActivationFailedProviderEventDealActivatedProviderEventDealActivatedProviderEventCleanupFinishedProviderEventDealSlashedProviderEventDealExpiredProviderEventDealCompletionFailedProviderEventFailedProviderEventRestartProviderEventRestartProviderEventRestartProviderEventRestartProviderEventTrackFundsFailedStorageDealUnknownStorageDealStagedOn entry runs HandoffDealStorageDealSealingOn entry runs VerifyDealActivatedStorageDealFinalizingOn entry runs CleanupDealStorageDealActiveOn entry runs WaitForDealCompletionStorageDealExpiredStorageDealSlashedStorageDealRejectingOn entry runs RejectDealStorageDealFailingOn entry runs FailDealStorageDealValidatingOn entry runs ValidateDealProposalStorageDealAcceptWaitOn entry runs DecideOnProposalStorageDealTransferringStorageDealWaitingForDataStorageDealVerifyDataOn entry runs VerifyDataStorageDealReserveProviderFundsOn entry runs ReserveProviderFundsStorageDealProviderFundingOn entry runs WaitForFundingStorageDealPublishOn entry runs PublishDealStorageDealPublishingOn entry runs WaitForPublishStorageDealErrorStorageDealProviderTransferAwaitRestartStorageDealAwaitingPreCommitOn entry runs VerifyDealPreCommittedThe following events are not shown cause they can trigger from any state.ProviderEventNodeErrored - transitions state to StorageDealFailingProviderEventRestart - does not transition stateThe following events only record in this state.ProviderEventPieceStoreErroredThe following events only record in this state.ProviderEventFundsReleasedThe following events only record in this state.ProviderEventDataTransferRestartedProviderEventDataTransferStalledThe following events only record in this state.ProviderEventFundsReservedThe following events only record in this state.ProviderEventFundsReleasedThe following events only record in this state.ProviderEventDataTransferStalled \ No newline at end of file diff --git a/filestorecaradapter/adapter.go b/filestorecaradapter/adapter.go deleted file mode 100644 index fc2b934f..00000000 --- a/filestorecaradapter/adapter.go +++ /dev/null @@ -1,88 +0,0 @@ -package filestorecaradapter - -import ( - "io" - - "github.com/ipfs/go-cid" - "github.com/ipfs/go-filestore" - bstore "github.com/ipfs/go-ipfs-blockstore" - carv2 "github.com/ipld/go-car/v2" - "github.com/ipld/go-car/v2/blockstore" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-fil-markets/filestorecaradapter/internal" -) - -type FileStore struct { - bstore.Blockstore - io.Closer -} - -// NewReadOnlyFileStore returns a Filestore backed by the given CAR file that clients can read Unixfs DAG blocks from. -// The intermediate blocks of the Unixfs DAG are returned as is from the given CAR file. -// For the leaf nodes of the Unixfs DAG, if `PosInfo`(filepath, offset, size) nodes have been written to the CAR file, -// the filestore will read the `PosInfo` node from the CAR file and then resolve the actual raw leaf data from the file -// referenced by the `PosInfo` node. -// -// Note that if the given CAR file does NOT contain any `PosInfo` nodes and contains all Unixfs DAG blocks -// as is, the filestore will return all blocks as is from the given CAR file i.e. in such a case, -// the Filestore will simply act as a pass-through read only CAR blockstore. -func NewReadOnlyFileStore(carFilePath string) (*FileStore, error) { - // Open a readOnly blockstore that wraps the given CAR file. - rdOnly, err := blockstore.OpenReadOnly(carFilePath, carv2.ZeroLengthSectionAsEOF(true), blockstore.UseWholeCIDs(true)) - if err != nil { - return nil, xerrors.Errorf("failed to open read-only blockstore: %w", err) - } - - // adapt the CAR blockstore to a `key-value datastore` to persist the (cid -> `PosInfo`) infromation - // for the leaf nodes of a Unixfs DAG i.e. the nodes that correspond to fix sized chunks of the user's raw file. - adapter := internal.BlockstoreToDSBatchingAdapter(rdOnly) - - fm := filestore.NewFileManager(adapter, "/") - fm.AllowFiles = true - fstore := filestore.NewFilestore(rdOnly, fm) - bs := bstore.NewIdStore(fstore) - - return &FileStore{ - bs, - rdOnly, - }, nil -} - -// NewReadWriteFileStore returns a Filestore that clients can write Unixfs DAG blocks to and read Unixfs DAG blocks from. -// The Filestore will persist "intermediate" Unixfs DAG blocks as is to the given CARv2 file. -// For the leaf nodes of the UnixFS DAG which correspond to fixed size chunks of the user file , -// the Filestore will store the `PosInfo` Node specifying the (user filePath, offset, size) information for the chunk -// in the CARv2 file. -// -// Note that if the client does NOT write any `PosInfo` nodes to the Filestore, the backing CARv2 file will contain -// all blocks as is i.e. in such a case, the Filestore will simply act as a pass-through read-write CAR Blockstore. -func NewReadWriteFileStore(carV2FilePath string, roots []cid.Cid) (*FileStore, error) { - rw, err := blockstore.OpenReadWrite(carV2FilePath, roots, blockstore.UseWholeCIDs(true)) - if err != nil { - return nil, xerrors.Errorf("failed to open read-write blockstore: %w", err) - } - - adapter := internal.BlockstoreToDSBatchingAdapter(rw) - fm := filestore.NewFileManager(adapter, "/") - fm.AllowFiles = true - fstore := filestore.NewFilestore(rw, fm) - bs := bstore.NewIdStore(fstore) - - return &FileStore{ - bs, - &carV2BSCloser{rw}, - }, nil -} - -type carV2BSCloser struct { - rw *blockstore.ReadWrite -} - -func (c *carV2BSCloser) Close() error { - if err := c.rw.Finalize(); err != nil { - return xerrors.Errorf("failed to finalize read-write blockstore: %w", err) - } - - return nil -} diff --git a/filestorecaradapter/internal/adapter.go b/filestorecaradapter/internal/adapter.go deleted file mode 100644 index 68e99a7a..00000000 --- a/filestorecaradapter/internal/adapter.go +++ /dev/null @@ -1,89 +0,0 @@ -package internal - -import ( - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/query" - bstore "github.com/ipfs/go-ipfs-blockstore" - mh "github.com/multiformats/go-multihash" - "golang.org/x/xerrors" -) - -var cidBuilder = cid.V1Builder{Codec: cid.Raw, MhType: mh.IDENTITY} - -var _ datastore.Batching = (*bsToDSBatchingAdapter)(nil) - -// transforms a blockstore interface to a datastore.Batching interface. -type bsToDSBatchingAdapter struct { - bstore.Blockstore -} - -func BlockstoreToDSBatchingAdapter(bs bstore.Blockstore) *bsToDSBatchingAdapter { - return &bsToDSBatchingAdapter{bs} -} - -func (a *bsToDSBatchingAdapter) Get(key datastore.Key) (value []byte, err error) { - c, err := cidBuilder.Sum(key.Bytes()) - if err != nil { - return nil, xerrors.Errorf("failed to create cid: %w", err) - } - - blk, err := a.Blockstore.Get(c) - if err != nil { - return nil, xerrors.Errorf("failed to get cid %s: %w", c, err) - } - return blk.RawData(), nil -} - -func (a *bsToDSBatchingAdapter) Put(key datastore.Key, value []byte) error { - c, err := cidBuilder.Sum(key.Bytes()) - if err != nil { - return xerrors.Errorf("failed to create cid: %w", err) - } - - blk, err := blocks.NewBlockWithCid(value, c) - if err != nil { - return xerrors.Errorf("failed to create block: %w", err) - } - - if err := a.Blockstore.Put(blk); err != nil { - return xerrors.Errorf("failed to put block: %w", err) - } - - return nil -} - -func (a *bsToDSBatchingAdapter) Has(key datastore.Key) (exists bool, err error) { - c, err := cidBuilder.Sum(key.Bytes()) - if err != nil { - return false, xerrors.Errorf("failed to create cid: %w", err) - - } - - return a.Blockstore.Has(c) -} - -func (a *bsToDSBatchingAdapter) Batch() (datastore.Batch, error) { - return datastore.NewBasicBatch(a), nil -} - -func (a *bsToDSBatchingAdapter) GetSize(_ datastore.Key) (size int, err error) { - return 0, xerrors.New("operation NOT supported: GetSize") -} - -func (a *bsToDSBatchingAdapter) Query(_ query.Query) (query.Results, error) { - return nil, xerrors.New("operation NOT supported: Query") -} - -func (a *bsToDSBatchingAdapter) Delete(_ datastore.Key) error { - return xerrors.New("operation NOT supported: Delete") -} - -func (a *bsToDSBatchingAdapter) Sync(_ datastore.Key) error { - return xerrors.New("operation NOT supported: Sync") -} - -func (a *bsToDSBatchingAdapter) Close() error { - return nil -} diff --git a/package-lock.json b/package-lock.json index fac21769..3c3468bd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,8 +1,465 @@ { "name": "go-fil-markets", "version": "1.0.0", - "lockfileVersion": 1, + "lockfileVersion": 2, "requires": true, + "packages": { + "": { + "version": "1.0.0", + "license": "(Apache-2.0 OR MIT)", + "dependencies": { + "@mermaid-js/mermaid-cli": "^8.5.1-2" + } + }, + "node_modules/@mermaid-js/mermaid-cli": { + "version": "8.5.1-2", + "resolved": "https://registry.npmjs.org/@mermaid-js/mermaid-cli/-/mermaid-cli-8.5.1-2.tgz", + "integrity": "sha512-IGYWJZLlV7kx0NOnREvu2Ikioyg0AX59dbXoY13t1zdjUlrr5YQRgDVaIySkOo347yLB6z6thTmuxaYAw/KTHg==", + "dependencies": { + "chalk": "^3.0.0", + "commander": "^4.0.1", + "puppeteer": "^2.0.0" + }, + "bin": { + "mmdc": "index.bundle.js" + } + }, + "node_modules/@types/color-name": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/@types/color-name/-/color-name-1.1.1.tgz", + "integrity": "sha512-rr+OQyAjxze7GgWrSaJwydHStIhHq2lvY3BOC2Mj7KnzI7XK0Uw1TOOdI9lDoajEbSWLiYgoo4f1R51erQfhPQ==" + }, + "node_modules/@types/mime-types": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/@types/mime-types/-/mime-types-2.1.0.tgz", + "integrity": "sha1-nKUs2jY/aZxpRmwqbM2q2RPqenM=" + }, + "node_modules/agent-base": { + "version": "5.1.1", + "resolved": "https://registry.npmjs.org/agent-base/-/agent-base-5.1.1.tgz", + "integrity": "sha512-TMeqbNl2fMW0nMjTEPOwe3J/PRFP4vqeoNuQMG0HlMrtm5QxKqdvAkZ1pRBQ/ulIyDD5Yq0nJ7YbdD8ey0TO3g==", + "engines": { + "node": ">= 6.0.0" + } + }, + "node_modules/ansi-styles": { + "version": "4.2.1", + "resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.2.1.tgz", + "integrity": "sha512-9VGjrMsG1vePxcSweQsN20KY/c4zN0h9fLjqAbwbPfahM3t+NL+M9HC8xeXG2I8pX5NoamTGNuomEUFI7fcUjA==", + "dependencies": { + "@types/color-name": "^1.1.1", + "color-convert": "^2.0.1" + }, + "engines": { + "node": ">=8" + }, + "funding": { + "url": "https://github.com/chalk/ansi-styles?sponsor=1" + } + }, + "node_modules/async-limiter": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.1.tgz", + "integrity": "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==" + }, + "node_modules/balanced-match": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.0.tgz", + "integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=" + }, + "node_modules/brace-expansion": { + "version": "1.1.11", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-1.1.11.tgz", + "integrity": "sha512-iCuPHDFgrHX7H2vEI/5xpz07zSHB00TpugqhmYtVmMO6518mCuRMoOYFldEBl0g187ufozdaHgWKcYFb61qGiA==", + "dependencies": { + "balanced-match": "^1.0.0", + "concat-map": "0.0.1" + } + }, + "node_modules/buffer-crc32": { + "version": "0.2.13", + "resolved": "https://registry.npmjs.org/buffer-crc32/-/buffer-crc32-0.2.13.tgz", + "integrity": "sha1-DTM+PwDqxQqhRUq9MO+MKl2ackI=", + "engines": { + "node": "*" + } + }, + "node_modules/buffer-from": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.1.tgz", + "integrity": "sha512-MQcXEUbCKtEo7bhqEs6560Hyd4XaovZlO/k9V3hjVUF/zwW7KBVdSK4gIt/bzwS9MbR5qob+F5jusZsb0YQK2A==" + }, + "node_modules/chalk": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-3.0.0.tgz", + "integrity": "sha512-4D3B6Wf41KOYRFdszmDqMCGq5VV/uMAB273JILmO+3jAlh8X4qDtdtgCR3fxtbLEMzSx22QdhnDcJvu2u1fVwg==", + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=8" + } + }, + "node_modules/color-convert": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz", + "integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==", + "dependencies": { + "color-name": "~1.1.4" + }, + "engines": { + "node": ">=7.0.0" + } + }, + "node_modules/color-name": { + "version": "1.1.4", + "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz", + "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==" + }, + "node_modules/commander": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/commander/-/commander-4.1.1.tgz", + "integrity": "sha512-NOKm8xhkzAjzFx8B2v5OAHT+u5pRQc2UCa2Vq9jYL/31o2wi9mxBA7LIFs3sV5VSC49z6pEhfbMULvShKj26WA==", + "engines": { + "node": ">= 6" + } + }, + "node_modules/concat-map": { + "version": "0.0.1", + "resolved": "https://registry.npmjs.org/concat-map/-/concat-map-0.0.1.tgz", + "integrity": "sha1-2Klr13/Wjfd5OnMDajug1UBdR3s=" + }, + "node_modules/concat-stream": { + "version": "1.6.2", + "resolved": "https://registry.npmjs.org/concat-stream/-/concat-stream-1.6.2.tgz", + "integrity": "sha512-27HBghJxjiZtIk3Ycvn/4kbJk/1uZuJFfuPEns6LaEvpvG1f0hTea8lilrouyo9mVc2GWdcEZ8OLoGmSADlrCw==", + "engines": [ + "node >= 0.8" + ], + "dependencies": { + "buffer-from": "^1.0.0", + "inherits": "^2.0.3", + "readable-stream": "^2.2.2", + "typedarray": "^0.0.6" + } + }, + "node_modules/core-util-is": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.2.tgz", + "integrity": "sha1-tf1UIgqivFq1eqtxQMlAdUUDwac=" + }, + "node_modules/debug": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.1.1.tgz", + "integrity": "sha512-pYAIzeRo8J6KPEaJ0VWOh5Pzkbw/RetuzehGM7QRRX5he4fPHx2rdKMB256ehJCkX+XRQm16eZLqLNS8RSZXZw==", + "deprecated": "Debug versions >=3.2.0 <3.2.7 || >=4 <4.3.1 have a low-severity ReDos regression when used in a Node.js environment. It is recommended you upgrade to 3.2.7 or 4.3.1. (https://github.com/visionmedia/debug/issues/797)", + "dependencies": { + "ms": "^2.1.1" + } + }, + "node_modules/extract-zip": { + "version": "1.7.0", + "resolved": "https://registry.npmjs.org/extract-zip/-/extract-zip-1.7.0.tgz", + "integrity": "sha512-xoh5G1W/PB0/27lXgMQyIhP5DSY/LhoCsOyZgb+6iMmRtCwVBo55uKaMoEYrDCKQhWvqEip5ZPKAc6eFNyf/MA==", + "dependencies": { + "concat-stream": "^1.6.2", + "debug": "^2.6.9", + "mkdirp": "^0.5.4", + "yauzl": "^2.10.0" + }, + "bin": { + "extract-zip": "cli.js" + } + }, + "node_modules/extract-zip/node_modules/debug": { + "version": "2.6.9", + "resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz", + "integrity": "sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==", + "dependencies": { + "ms": "2.0.0" + } + }, + "node_modules/extract-zip/node_modules/ms": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", + "integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g=" + }, + "node_modules/fd-slicer": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/fd-slicer/-/fd-slicer-1.1.0.tgz", + "integrity": "sha1-JcfInLH5B3+IkbvmHY85Dq4lbx4=", + "dependencies": { + "pend": "~1.2.0" + } + }, + "node_modules/fs.realpath": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/fs.realpath/-/fs.realpath-1.0.0.tgz", + "integrity": "sha1-FQStJSMVjKpA20onh8sBQRmU6k8=" + }, + "node_modules/glob": { + "version": "7.1.6", + "resolved": "https://registry.npmjs.org/glob/-/glob-7.1.6.tgz", + "integrity": "sha512-LwaxwyZ72Lk7vZINtNNrywX0ZuLyStrdDtabefZKAY5ZGJhVtgdznluResxNmPitE0SAO+O26sWTHeKSI2wMBA==", + "dependencies": { + "fs.realpath": "^1.0.0", + "inflight": "^1.0.4", + "inherits": "2", + "minimatch": "^3.0.4", + "once": "^1.3.0", + "path-is-absolute": "^1.0.0" + }, + "engines": { + "node": "*" + }, + "funding": { + "url": "https://github.com/sponsors/isaacs" + } + }, + "node_modules/has-flag": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz", + "integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==", + "engines": { + "node": ">=8" + } + }, + "node_modules/https-proxy-agent": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/https-proxy-agent/-/https-proxy-agent-4.0.0.tgz", + "integrity": "sha512-zoDhWrkR3of1l9QAL8/scJZyLu8j/gBkcwcaQOZh7Gyh/+uJQzGVETdgT30akuwkpL8HTRfssqI3BZuV18teDg==", + "dependencies": { + "agent-base": "5", + "debug": "4" + }, + "engines": { + "node": ">= 6.0.0" + } + }, + "node_modules/inflight": { + "version": "1.0.6", + "resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz", + "integrity": "sha1-Sb1jMdfQLQwJvJEKEHW6gWW1bfk=", + "dependencies": { + "once": "^1.3.0", + "wrappy": "1" + } + }, + "node_modules/inherits": { + "version": "2.0.4", + "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz", + "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==" + }, + "node_modules/isarray": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/isarray/-/isarray-1.0.0.tgz", + "integrity": "sha1-u5NdSFgsuhaMBoNJV6VKPgcSTxE=" + }, + "node_modules/mime": { + "version": "2.4.6", + "resolved": "https://registry.npmjs.org/mime/-/mime-2.4.6.tgz", + "integrity": "sha512-RZKhC3EmpBchfTGBVb8fb+RL2cWyw/32lshnsETttkBAyAUXSGHxbEJWWRXc751DrIxG1q04b8QwMbAwkRPpUA==", + "bin": { + "mime": "cli.js" + }, + "engines": { + "node": ">=4.0.0" + } + }, + "node_modules/mime-db": { + "version": "1.44.0", + "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.44.0.tgz", + "integrity": "sha512-/NOTfLrsPBVeH7YtFPgsVWveuL+4SjjYxaQ1xtM1KMFj7HdxlBlxeyNLzhyJVx7r4rZGJAZ/6lkKCitSc/Nmpg==", + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/mime-types": { + "version": "2.1.27", + "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.27.tgz", + "integrity": "sha512-JIhqnCasI9yD+SsmkquHBxTSEuZdQX5BuQnS2Vc7puQQQ+8yiP5AY5uWhpdv4YL4VM5c6iliiYWPgJ/nJQLp7w==", + "dependencies": { + "mime-db": "1.44.0" + }, + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/minimatch": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.0.4.tgz", + "integrity": "sha512-yJHVQEhyqPLUTgt9B83PXu6W3rx4MvvHvSUvToogpwoGDOUQ+yDrR0HRot+yOCdCO7u4hX3pWft6kWBBcqh0UA==", + "dependencies": { + "brace-expansion": "^1.1.7" + }, + "engines": { + "node": "*" + } + }, + "node_modules/minimist": { + "version": "1.2.5", + "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.5.tgz", + "integrity": "sha512-FM9nNUYrRBAELZQT3xeZQ7fmMOBg6nWNmJKTcgsJeaLstP/UODVpGsr5OhXhhXg6f+qtJ8uiZ+PUxkDWcgIXLw==" + }, + "node_modules/mkdirp": { + "version": "0.5.5", + "resolved": "https://registry.npmjs.org/mkdirp/-/mkdirp-0.5.5.tgz", + "integrity": "sha512-NKmAlESf6jMGym1++R0Ra7wvhV+wFW63FaSOFPwRahvea0gMUcGUhVeAg/0BC0wiv9ih5NYPB1Wn1UEI1/L+xQ==", + "dependencies": { + "minimist": "^1.2.5" + }, + "bin": { + "mkdirp": "bin/cmd.js" + } + }, + "node_modules/ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" + }, + "node_modules/once": { + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", + "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=", + "dependencies": { + "wrappy": "1" + } + }, + "node_modules/path-is-absolute": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/path-is-absolute/-/path-is-absolute-1.0.1.tgz", + "integrity": "sha1-F0uSaHNVNP+8es5r9TpanhtcX18=", + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/pend": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/pend/-/pend-1.2.0.tgz", + "integrity": "sha1-elfrVQpng/kRUzH89GY9XI4AelA=" + }, + "node_modules/process-nextick-args": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/process-nextick-args/-/process-nextick-args-2.0.1.tgz", + "integrity": "sha512-3ouUOpQhtgrbOa17J7+uxOTpITYWaGP7/AhoR3+A+/1e9skrzelGi/dXzEYyvbxubEF6Wn2ypscTKiKJFFn1ag==" + }, + "node_modules/progress": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/progress/-/progress-2.0.3.tgz", + "integrity": "sha512-7PiHtLll5LdnKIMw100I+8xJXR5gW2QwWYkT6iJva0bXitZKa/XMrSbdmg3r2Xnaidz9Qumd0VPaMrZlF9V9sA==", + "engines": { + "node": ">=0.4.0" + } + }, + "node_modules/proxy-from-env": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz", + "integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==" + }, + "node_modules/puppeteer": { + "version": "2.1.1", + "resolved": "https://registry.npmjs.org/puppeteer/-/puppeteer-2.1.1.tgz", + "integrity": "sha512-LWzaDVQkk1EPiuYeTOj+CZRIjda4k2s5w4MK4xoH2+kgWV/SDlkYHmxatDdtYrciHUKSXTsGgPgPP8ILVdBsxg==", + "hasInstallScript": true, + "dependencies": { + "@types/mime-types": "^2.1.0", + "debug": "^4.1.0", + "extract-zip": "^1.6.6", + "https-proxy-agent": "^4.0.0", + "mime": "^2.0.3", + "mime-types": "^2.1.25", + "progress": "^2.0.1", + "proxy-from-env": "^1.0.0", + "rimraf": "^2.6.1", + "ws": "^6.1.0" + }, + "engines": { + "node": ">=8.16.0" + } + }, + "node_modules/readable-stream": { + "version": "2.3.7", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-2.3.7.tgz", + "integrity": "sha512-Ebho8K4jIbHAxnuxi7o42OrZgF/ZTNcsZj6nRKyUmkhLFq8CHItp/fy6hQZuZmP/n3yZ9VBUbp4zz/mX8hmYPw==", + "dependencies": { + "core-util-is": "~1.0.0", + "inherits": "~2.0.3", + "isarray": "~1.0.0", + "process-nextick-args": "~2.0.0", + "safe-buffer": "~5.1.1", + "string_decoder": "~1.1.1", + "util-deprecate": "~1.0.1" + } + }, + "node_modules/rimraf": { + "version": "2.7.1", + "resolved": "https://registry.npmjs.org/rimraf/-/rimraf-2.7.1.tgz", + "integrity": "sha512-uWjbaKIK3T1OSVptzX7Nl6PvQ3qAGtKEtVRjRuazjfL3Bx5eI409VZSqgND+4UNnmzLVdPj9FqFJNPqBZFve4w==", + "dependencies": { + "glob": "^7.1.3" + }, + "bin": { + "rimraf": "bin.js" + } + }, + "node_modules/safe-buffer": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==" + }, + "node_modules/string_decoder": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz", + "integrity": "sha512-n/ShnvDi6FHbbVfviro+WojiFzv+s8MPMHBczVePfUpDJLwoLT0ht1l4YwBCbi8pJAveEEdnkHyPyTP/mzRfwg==", + "dependencies": { + "safe-buffer": "~5.1.0" + } + }, + "node_modules/supports-color": { + "version": "7.1.0", + "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.1.0.tgz", + "integrity": "sha512-oRSIpR8pxT1Wr2FquTNnGet79b3BWljqOuoW/h4oBhxJ/HUbX5nX6JSruTkvXDCFMwDPvsaTTbvMLKZWSy0R5g==", + "dependencies": { + "has-flag": "^4.0.0" + }, + "engines": { + "node": ">=8" + } + }, + "node_modules/typedarray": { + "version": "0.0.6", + "resolved": "https://registry.npmjs.org/typedarray/-/typedarray-0.0.6.tgz", + "integrity": "sha1-hnrHTjhkGHsdPUfZlqeOxciDB3c=" + }, + "node_modules/util-deprecate": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", + "integrity": "sha1-RQ1Nyfpw3nMnYvvS1KKJgUGaDM8=" + }, + "node_modules/wrappy": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", + "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=" + }, + "node_modules/ws": { + "version": "6.2.2", + "resolved": "https://registry.npmjs.org/ws/-/ws-6.2.2.tgz", + "integrity": "sha512-zmhltoSR8u1cnDsD43TX59mzoMZsLKqUweyYBAIvTngR3shc0W6aOZylZmq/7hqyVxPdi+5Ud2QInblgyE72fw==", + "dependencies": { + "async-limiter": "~1.0.0" + } + }, + "node_modules/yauzl": { + "version": "2.10.0", + "resolved": "https://registry.npmjs.org/yauzl/-/yauzl-2.10.0.tgz", + "integrity": "sha1-x+sXyT4RLLEIb6bY5R+wZnt5pfk=", + "dependencies": { + "buffer-crc32": "~0.2.3", + "fd-slicer": "~1.1.0" + } + } + }, "dependencies": { "@mermaid-js/mermaid-cli": { "version": "8.5.1-2", diff --git a/retrievalmarket/impl/client.go b/retrievalmarket/impl/client.go index 934bfdb8..7b7c5781 100644 --- a/retrievalmarket/impl/client.go +++ b/retrievalmarket/impl/client.go @@ -22,7 +22,6 @@ import ( "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-statemachine/fsm" - "github.com/filecoin-project/go-fil-markets/carstore" "github.com/filecoin-project/go-fil-markets/discovery" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates" @@ -30,6 +29,7 @@ import ( "github.com/filecoin-project/go-fil-markets/retrievalmarket/migrations" rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" "github.com/filecoin-project/go-fil-markets/shared" + "github.com/filecoin-project/go-fil-markets/stores" ) var log = logging.Logger("retrieval") @@ -47,7 +47,7 @@ type Client struct { stateMachines fsm.Group migrateStateMachines func(context.Context) error carPath string - readWriteBlockstores *carstore.CarReadWriteStoreTracker + stores *stores.ReadWriteBlockstores // Guards concurrent access to Retrieve method retrieveLk sync.Mutex @@ -77,15 +77,15 @@ var _ retrievalmarket.RetrievalClient = &Client{} // NewClient creates a new retrieval client func NewClient(network rmnet.RetrievalMarketNetwork, carPath string, dataTransfer datatransfer.Manager, node retrievalmarket.RetrievalClientNode, resolver discovery.PeerResolver, ds datastore.Batching) (retrievalmarket.RetrievalClient, error) { c := &Client{ - network: network, - dataTransfer: dataTransfer, - node: node, - resolver: resolver, - dealIDGen: shared.NewTimeCounter(), - subscribers: pubsub.New(dispatcher), - readySub: pubsub.New(shared.ReadyDispatcher), - carPath: carPath, - readWriteBlockstores: carstore.NewCarReadWriteStoreTracker(), + network: network, + dataTransfer: dataTransfer, + node: node, + resolver: resolver, + dealIDGen: shared.NewTimeCounter(), + subscribers: pubsub.New(dispatcher), + readySub: pubsub.New(shared.ReadyDispatcher), + carPath: carPath, + stores: stores.NewReadWriteBlockstores(), } retrievalMigrations, err := migrations.ClientMigrations.Build() if err != nil { @@ -253,7 +253,7 @@ func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrie next := c.dealIDGen.Next() dealID := retrievalmarket.DealID(next) carFilePath := filepath.Join(c.carPath, dealID.String()) - _, err = c.readWriteBlockstores.GetOrCreate(dealID.String(), carFilePath, payloadCID) + _, err = c.stores.GetOrOpen(dealID.String(), carFilePath, payloadCID) if err != nil { return nil, xerrors.Errorf("failed to create retrieval client blockstore: %w", err) } @@ -462,7 +462,7 @@ func (c *clientDealEnvironment) CloseDataTransfer(ctx context.Context, channelID // FinalizeBlockstore is called when all blocks have been received func (c *clientDealEnvironment) FinalizeBlockstore(ctx context.Context, dealID retrievalmarket.DealID) error { - bs, err := c.c.readWriteBlockstores.Get(dealID.String()) + bs, err := c.c.stores.Get(dealID.String()) if err != nil { return xerrors.Errorf("getting deal with ID %s to finalize it: %w", dealID, err) } @@ -472,7 +472,7 @@ func (c *clientDealEnvironment) FinalizeBlockstore(ctx context.Context, dealID r return xerrors.Errorf("failed to finalize blockstore for deal with ID %s: %w", dealID, err) } - err = c.c.readWriteBlockstores.CleanBlockstore(dealID.String()) + err = c.c.stores.Untrack(dealID.String()) if err != nil { return xerrors.Errorf("failed to clean blockstore for deal with ID %s: %w", dealID, err) } @@ -490,7 +490,8 @@ func (csg *clientStoreGetter) Get(otherPeer peer.ID, dealID retrievalmarket.Deal if err != nil { return nil, err } - return csg.c.readWriteBlockstores.Get(deal.ID.String()) + bs, err := csg.c.stores.Get(deal.ID.String()) + return bs, err } // ClientFSMParameterSpec is a valid set of parameters for a client deal FSM - used in doc generation diff --git a/retrievalmarket/impl/integration_test.go b/retrievalmarket/impl/integration_test.go index 3e275582..58f06042 100644 --- a/retrievalmarket/impl/integration_test.go +++ b/retrievalmarket/impl/integration_test.go @@ -30,7 +30,6 @@ import ( "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/specs-actors/actors/builtin/paych" - "github.com/filecoin-project/go-fil-markets/filestorecaradapter" "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/retrievalmarket" retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl" @@ -39,6 +38,7 @@ import ( rmtesting "github.com/filecoin-project/go-fil-markets/retrievalmarket/testing" "github.com/filecoin-project/go-fil-markets/shared" tut "github.com/filecoin-project/go-fil-markets/shared_testutil" + "github.com/filecoin-project/go-fil-markets/stores" ) func TestClientCanMakeQueryToProvider(t *testing.T) { @@ -358,7 +358,7 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { // Create a CARv2 file from a fixture fpath := filepath.Join("retrievalmarket", "impl", "fixtures", testCase.filename) - pieceLink, fileStoreCARv2FilePath := testData.LoadUnixFSFileToStore(t, fpath) + pieceLink, path := testData.LoadUnixFSFileToStore(t, fpath) c, ok := pieceLink.(cidlink.Link) require.True(t, ok) payloadCID := c.Cid @@ -366,8 +366,10 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { // Get the CARv1 payload of the UnixFS DAG that the (Filestore backed by the CARv2) contains. carFile, err := os.CreateTemp(t.TempDir(), "rand") require.NoError(t, err) - fs, err := filestorecaradapter.NewReadOnlyFileStore(fileStoreCARv2FilePath) + + fs, err := stores.ReadOnlyFilestore(path) require.NoError(t, err) + sc := car.NewSelectiveCar(bgCtx, fs, []car.Dag{{Root: payloadCID, Selector: shared.AllSelector()}}) prepared, err := sc.Prepare() require.NoError(t, err) @@ -704,7 +706,7 @@ func setupProvider( dagstoreWrapper := tut.NewMockDagStoreWrapper(pieceStore, providerNode) // Register the piece with the DAG store wrapper - err = shared.RegisterShardSync(ctx, dagstoreWrapper, pieceInfo.PieceCID, carFilePath, true) + err = stores.RegisterShardSync(ctx, dagstoreWrapper, pieceInfo.PieceCID, carFilePath, true) require.NoError(t, err) // Remove the CAR file so that the provider is forced to unseal the data diff --git a/retrievalmarket/impl/provider.go b/retrievalmarket/impl/provider.go index a4f5fc8a..2a25d5f0 100644 --- a/retrievalmarket/impl/provider.go +++ b/retrievalmarket/impl/provider.go @@ -20,7 +20,6 @@ import ( "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-statemachine/fsm" - "github.com/filecoin-project/go-fil-markets/carstore" "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/askstore" @@ -30,6 +29,7 @@ import ( "github.com/filecoin-project/go-fil-markets/retrievalmarket/migrations" rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" "github.com/filecoin-project/go-fil-markets/shared" + "github.com/filecoin-project/go-fil-markets/stores" ) // RetrievalProviderOption is a function that configures a retrieval provider @@ -59,8 +59,8 @@ type Provider struct { askStore retrievalmarket.AskStore disableNewDeals bool retrievalPricingFunc RetrievalPricingFunc - dagStore shared.DagStoreWrapper - readOnlyBlockStores *carstore.CarReadOnlyStoreTracker + dagStore stores.DAGStoreWrapper + stores *stores.ReadOnlyBlockstores } type internalProviderEvent struct { @@ -103,7 +103,7 @@ func NewProvider(minerAddress address.Address, node retrievalmarket.RetrievalProviderNode, network rmnet.RetrievalMarketNetwork, pieceStore piecestore.PieceStore, - dagStore shared.DagStoreWrapper, + dagStore stores.DAGStoreWrapper, dataTransfer datatransfer.Manager, ds datastore.Batching, retrievalPricingFunc RetrievalPricingFunc, @@ -124,7 +124,7 @@ func NewProvider(minerAddress address.Address, readySub: pubsub.New(shared.ReadyDispatcher), retrievalPricingFunc: retrievalPricingFunc, dagStore: dagStore, - readOnlyBlockStores: carstore.NewReadOnlyStoreTracker(), + stores: stores.NewReadOnlyBlockstores(), } err := shared.MoveKey(ds, "retrieval-ask", "retrieval-ask/latest") diff --git a/retrievalmarket/impl/provider_environments.go b/retrievalmarket/impl/provider_environments.go index d013ead9..f77a5a11 100644 --- a/retrievalmarket/impl/provider_environments.go +++ b/retrievalmarket/impl/provider_environments.go @@ -33,7 +33,7 @@ func (pve *providerValidationEnvironment) GetAsk(ctx context.Context, payloadCid storageDeals, err := storageDealsForPiece(pieceCid != nil, payloadCid, piece, pve.p.pieceStore) if err != nil { - return retrievalmarket.Ask{}, xerrors.Errorf("failed to fetch deals for payload, err=%s", err) + return retrievalmarket.Ask{}, xerrors.Errorf("failed to fetch deals for payload: %w", err) } input := retrievalmarket.PricingInput{ @@ -135,7 +135,7 @@ func (pde *providerDealEnvironment) PrepareBlockstore(ctx context.Context, dealI } log.Debugf("adding blockstore for deal %d to tracker", dealID) - _, err = pde.p.readOnlyBlockStores.Add(dealID.String(), bs) + _, err = pde.p.stores.Track(dealID.String(), bs) log.Debugf("added blockstore for deal %d to tracker", dealID) return err } @@ -171,7 +171,7 @@ func (pde *providerDealEnvironment) CloseDataTransfer(ctx context.Context, chid func (pde *providerDealEnvironment) DeleteStore(dealID retrievalmarket.DealID) error { // close the read-only blockstore and stop tracking it for the deal - if err := pde.p.readOnlyBlockStores.CleanBlockstore(dealID.String()); err != nil { + if err := pde.p.stores.Untrack(dealID.String()); err != nil { return xerrors.Errorf("failed to clean read-only blockstore for deal %d: %w", dealID, err) } @@ -318,6 +318,6 @@ func (psg *providerStoreGetter) Get(otherPeer peer.ID, dealID retrievalmarket.De // but is only accessed in step 4 after the data has been unsealed. // return newLazyBlockstore(func() (dagstore.ReadBlockstore, error) { - return psg.p.readOnlyBlockStores.Get(dealID.String()) + return psg.p.stores.Get(dealID.String()) }), nil } diff --git a/retrievalmarket/retrieval_restart_integration_test.go b/retrievalmarket/retrieval_restart_integration_test.go index 0c9f6a2f..fbe97cf4 100644 --- a/retrievalmarket/retrieval_restart_integration_test.go +++ b/retrievalmarket/retrieval_restart_integration_test.go @@ -107,7 +107,7 @@ func TestBounceConnectionDealTransferOngoing(t *testing.T) { deps.DagStore = tut.NewMockDagStoreWrapper(pieceStore, providerNode) sh := testharness.NewHarnessWithTestData(t, deps.TestData, deps, true, false) - defer os.Remove(sh.FileStoreCARv2FilePath) + defer os.Remove(sh.IndexedCAR) // do a storage deal storageClientSeenDeal := doStorage(t, bgCtx, sh) @@ -236,7 +236,7 @@ func TestBounceConnectionDealTransferUnsealing(t *testing.T) { deps.DagStore = tut.NewMockDagStoreWrapper(pieceStore, providerNode) sh := testharness.NewHarnessWithTestData(t, td, deps, true, false) - defer os.Remove(sh.FileStoreCARv2FilePath) + defer os.Remove(sh.IndexedCAR) // do a storage deal storageClientSeenDeal := doStorage(t, bgCtx, sh) diff --git a/retrievalmarket/storage_retrieval_integration_test.go b/retrievalmarket/storage_retrieval_integration_test.go index 974bd125..7fa5693b 100644 --- a/retrievalmarket/storage_retrieval_integration_test.go +++ b/retrievalmarket/storage_retrieval_integration_test.go @@ -25,7 +25,6 @@ import ( "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/specs-actors/actors/builtin/paych" - "github.com/filecoin-project/go-fil-markets/filestorecaradapter" "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/retrievalmarket" retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl" @@ -39,6 +38,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket/testharness" "github.com/filecoin-project/go-fil-markets/storagemarket/testharness/dependencies" "github.com/filecoin-project/go-fil-markets/storagemarket/testnodes" + "github.com/filecoin-project/go-fil-markets/stores" ) func TestStorageRetrieval(t *testing.T) { @@ -90,7 +90,7 @@ func TestStorageRetrieval(t *testing.T) { deps := setupDepsWithDagStore(bgCtx, t, providerNode, pieceStore) sh := testharness.NewHarnessWithTestData(t, deps.TestData, deps, true, false) - defer os.Remove(sh.FileStoreCARv2FilePath) + defer os.Remove(sh.IndexedCAR) storageProviderSeenDeal := doStorage(t, bgCtx, sh) ctxTimeout, canc := context.WithTimeout(bgCtx, 25*time.Second) @@ -158,25 +158,27 @@ func TestOfflineStorageRetrieval(t *testing.T) { pieceStore := tut.NewTestPieceStore() deps := setupDepsWithDagStore(bgCtx, t, providerNode, pieceStore) sh := testharness.NewHarnessWithTestData(t, deps.TestData, deps, true, false) + defer os.Remove(sh.IndexedCAR) - defer os.Remove(sh.FileStoreCARv2FilePath) // start and wait for client/provider ctx, cancel := context.WithTimeout(bgCtx, 5*time.Second) defer cancel() shared_testutil.StartAndWaitForReady(ctx, t, sh.Provider) shared_testutil.StartAndWaitForReady(ctx, t, sh.Client) - // Do a Selective CARv1 traversal on the CARv2 file to get a deterministic CARv1 that we can import on the miner side. - rdOnly, err := filestorecaradapter.NewReadOnlyFileStore(sh.FileStoreCARv2FilePath) + // Do a Selective CARv1 traversal on the CARv2 file to get a deterministic CARv1 that we can import on the miner side. + fs, err := stores.ReadOnlyFilestore(sh.IndexedCAR) require.NoError(t, err) - sc := car.NewSelectiveCar(ctx, rdOnly, []car.Dag{{Root: sh.PayloadCid, Selector: shared.AllSelector()}}) + + require.NoError(t, err) + sc := car.NewSelectiveCar(ctx, fs, []car.Dag{{Root: sh.PayloadCid, Selector: shared.AllSelector()}}) prepared, err := sc.Prepare() require.NoError(t, err) carBuf := new(bytes.Buffer) require.NoError(t, prepared.Write(carBuf)) - require.NoError(t, rdOnly.Close()) + require.NoError(t, fs.Close()) - commP, size, err := clientutils.CommP(ctx, sh.FileStoreCARv2FilePath, &storagemarket.DataRef{ + commP, size, err := clientutils.CommP(ctx, sh.IndexedCAR, &storagemarket.DataRef{ // hacky but need it for now because if it's manual, we wont get a CommP. TransferType: storagemarket.TTGraphsync, Root: sh.PayloadCid, diff --git a/shared_testutil/carV2.go b/shared_testutil/files.go similarity index 54% rename from shared_testutil/carV2.go rename to shared_testutil/files.go index 7360943c..230220b1 100644 --- a/shared_testutil/carV2.go +++ b/shared_testutil/files.go @@ -8,7 +8,7 @@ import ( "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" - cidutil "github.com/ipfs/go-cidutil" + "github.com/ipfs/go-cidutil" ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" bstore "github.com/ipfs/go-ipfs-blockstore" @@ -23,66 +23,60 @@ import ( mh "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" - "github.com/filecoin-project/go-fil-markets/filestorecaradapter" + "github.com/filecoin-project/go-fil-markets/stores" ) var defaultHashFunction = uint64(mh.BLAKE2B_MIN + 31) -// GenFullCARv2FromNormalFile generates a CARv2 file from a "normal" i.e. non-CAR file and returns the file path. -// All the Unixfs blocks are written as is in the CARv2 file. -func GenFullCARv2FromNormalFile(t *testing.T, normalFilePath string) (root cid.Cid, carV2FilePath string) { - ctx := context.Background() - - bs := bstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - dagSvc := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) - - root = genUnixfsDAG(t, ctx, normalFilePath, dagSvc) - // Create a UnixFS DAG again AND generate a CARv2 file using a CARv2 read-write blockstore now that we have the root. - carV2Path := genFullCARv2File(t, ctx, normalFilePath, root) - - return root, carV2Path -} - -// GenFileStoreCARv2FromNormalFile generates a CARv2 file that can be used to back a Filestore from a "normal" i.e. non-CAR file and returns the file path. -func GenFileStoreCARv2FromNormalFile(t *testing.T, normalFilePath string) (root cid.Cid, carV2FilePath string) { - ctx := context.Background() +// CreateDenseCARv2 generates a "dense" UnixFS CARv2 from the supplied ordinary file. +// A dense UnixFS CARv2 is one storing leaf data. Contrast to CreateRefCARv2. +func CreateDenseCARv2(t *testing.T, src string) (root cid.Cid, path string) { bs := bstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) dagSvc := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) - root = genUnixfsDAG(t, ctx, normalFilePath, dagSvc) - // Create a UnixFS DAG again AND generate a CARv2 file using a filestore backed by a CARv2 read-write blockstore. - carV2Path := genFileStoreCARv2File(t, ctx, normalFilePath, root) - return root, carV2Path -} + root = buildUnixFS(t, src, dagSvc) -func genFullCARv2File(t *testing.T, ctx context.Context, fPath string, root cid.Cid) string { - tmp, err := os.CreateTemp("", "rand") + // Create a UnixFS DAG again AND generate a CARv2 file using a CARv2 + // read-write blockstore now that we have the root. + out, err := os.CreateTemp("", "rand") require.NoError(t, err) - require.NoError(t, tmp.Close()) + require.NoError(t, out.Close()) - rw, err := blockstore.OpenReadWrite(tmp.Name(), []cid.Cid{root}, blockstore.UseWholeCIDs(true)) + rw, err := blockstore.OpenReadWrite(out.Name(), []cid.Cid{root}, blockstore.UseWholeCIDs(true)) require.NoError(t, err) - dagSvc := merkledag.NewDAGService(blockservice.New(rw, offline.Exchange(rw))) - root2 := genUnixfsDAG(t, ctx, fPath, dagSvc) + dagSvc = merkledag.NewDAGService(blockservice.New(rw, offline.Exchange(rw))) + + root2 := buildUnixFS(t, src, dagSvc) require.NoError(t, rw.Finalize()) require.Equal(t, root, root2) - // return the path of the CARv2 file. - return tmp.Name() + return root, out.Name() +} + +// CreateRefCARv2 generates a "ref" CARv2 from the supplied ordinary file. +// A "ref" CARv2 is one that stores leaf data as positional references to the original file. +func CreateRefCARv2(t *testing.T, src string) (cid.Cid, string) { + bs := bstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + dagSvc := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + + root := buildUnixFS(t, src, dagSvc) + path := genRefCARv2(t, src, root) + + return root, path } -func genFileStoreCARv2File(t *testing.T, ctx context.Context, fPath string, root cid.Cid) string { +func genRefCARv2(t *testing.T, path string, root cid.Cid) string { tmp, err := os.CreateTemp("", "rand") require.NoError(t, err) require.NoError(t, tmp.Close()) - fs, err := filestorecaradapter.NewReadWriteFileStore(tmp.Name(), []cid.Cid{root}) + fs, err := stores.ReadWriteFilestore(tmp.Name(), root) require.NoError(t, err) dagSvc := merkledag.NewDAGService(blockservice.New(fs, offline.Exchange(fs))) - root2 := genUnixfsDAG(t, ctx, fPath, dagSvc) + root2 := buildUnixFS(t, path, dagSvc) require.NoError(t, fs.Close()) require.Equal(t, root, root2) @@ -90,17 +84,19 @@ func genFileStoreCARv2File(t *testing.T, ctx context.Context, fPath string, root return tmp.Name() } -func genUnixfsDAG(t *testing.T, ctx context.Context, normalFilePath string, dag ipldformat.DAGService) cid.Cid { +func buildUnixFS(t *testing.T, from string, into ipldformat.DAGService) cid.Cid { // read in a fixture file - fpath, err := filepath.Abs(filepath.Join(thisDir(t), "..", normalFilePath)) + fpath, err := filepath.Abs(filepath.Join(thisDir(t), "..", from)) require.NoError(t, err) + // open the fixture file f, err := os.Open(fpath) require.NoError(t, err) stat, err := f.Stat() require.NoError(t, err) - // get a IPLD Reader Path File that can be used to read information required to write the Unixfs DAG blocks to a filestore + // get a IPLD Reader Path File that can be used to read information + // required to write the Unixfs DAG blocks to a filestore rpf, err := files.NewReaderPathFile(fpath, f, stat) require.NoError(t, err) @@ -110,7 +106,7 @@ func genUnixfsDAG(t *testing.T, ctx context.Context, normalFilePath string, dag require.NoError(t, err) prefix.MhType = defaultHashFunction - bufferedDS := ipldformat.NewBufferedDAG(ctx, dag) + bufferedDS := ipldformat.NewBufferedDAG(context.Background(), into) params := ihelper.DagBuilderParams{ Maxlinks: unixfsLinksPerLevel, RawLeaves: true, diff --git a/shared_testutil/mockdagstorewrapper.go b/shared_testutil/mockdagstorewrapper.go index a09e3bb0..afb7a91d 100644 --- a/shared_testutil/mockdagstorewrapper.go +++ b/shared_testutil/mockdagstorewrapper.go @@ -13,10 +13,9 @@ import ( "github.com/filecoin-project/dagstore" - "github.com/filecoin-project/go-fil-markets/carstore" "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/retrievalmarket" - "github.com/filecoin-project/go-fil-markets/shared" + "github.com/filecoin-project/go-fil-markets/stores" ) type registration struct { @@ -36,7 +35,7 @@ type MockDagStoreWrapper struct { registrations map[cid.Cid]registration } -var _ shared.DagStoreWrapper = (*MockDagStoreWrapper)(nil) +var _ stores.DAGStoreWrapper = (*MockDagStoreWrapper)(nil) func NewMockDagStoreWrapper(pieceStore piecestore.PieceStore, rpn retrievalmarket.RetrievalProviderNode) *MockDagStoreWrapper { return &MockDagStoreWrapper{ @@ -81,7 +80,7 @@ func (m *MockDagStoreWrapper) ClearRegistrations() { m.registrations = make(map[cid.Cid]registration) } -func (m *MockDagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) { +func (m *MockDagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (stores.ClosableBlockstore, error) { m.lk.Lock() defer m.lk.Unlock() @@ -106,7 +105,7 @@ func (m *MockDagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) ( return getBlockstoreFromReader(r, pieceCid) } -func getBlockstoreFromReader(r io.ReadCloser, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) { +func getBlockstoreFromReader(r io.ReadCloser, pieceCid cid.Cid) (stores.ClosableBlockstore, error) { // Write the piece to a file tmpFile, err := os.CreateTemp("", "dagstoretmp") if err != nil { diff --git a/shared_testutil/mocknet.go b/shared_testutil/mocknet.go index bd2fc8f8..e6a0345d 100644 --- a/shared_testutil/mocknet.go +++ b/shared_testutil/mocknet.go @@ -6,7 +6,6 @@ import ( "io" "io/ioutil" "os" - "path" "path/filepath" "runtime" "testing" @@ -152,52 +151,52 @@ func NewLibp2pTestData(ctx context.Context, t *testing.T) *Libp2pTestData { const unixfsChunkSize uint64 = 1 << 10 const unixfsLinksPerLevel = 1024 -// LoadUnixFSFile injects the fixture `filename` into the given blockstore from the +// LoadUnixFSFile injects the fixture `src` into the given blockstore from the // fixtures directory. If useSecondNode is true, fixture is injected to the second node; // otherwise the first node gets it -func (ltd *Libp2pTestData) LoadUnixFSFile(t *testing.T, fixturesPath string, useSecondNode bool) (ipld.Link, string) { +func (ltd *Libp2pTestData) LoadUnixFSFile(t *testing.T, src string, useSecondNode bool) (ipld.Link, string) { var dagService ipldformat.DAGService if useSecondNode { dagService = ltd.DagService2 } else { dagService = ltd.DagService1 } - return ltd.loadUnixFSFile(t, fixturesPath, dagService) + return ltd.loadUnixFSFile(t, src, dagService) } -// LoadUnixFSFileToStore creates a CAR file from the fixture at `fixturesPath` -func (ltd *Libp2pTestData) LoadUnixFSFileToStore(t *testing.T, fixturesPath string) (ipld.Link, string) { +// LoadUnixFSFileToStore creates a CAR file from the fixture at `src` +func (ltd *Libp2pTestData) LoadUnixFSFileToStore(t *testing.T, src string) (ipld.Link, string) { dstore := dss.MutexWrap(datastore.NewMapDatastore()) bs := bstore.NewBlockstore(dstore) dagService := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) - return ltd.loadUnixFSFile(t, fixturesPath, dagService) + return ltd.loadUnixFSFile(t, src, dagService) } -func (ltd *Libp2pTestData) loadUnixFSFile(t *testing.T, fixturesPath string, dagService ipldformat.DAGService) (ipld.Link, string) { - ctx := context.Background() - +func (ltd *Libp2pTestData) loadUnixFSFile(t *testing.T, src string, dagService ipldformat.DAGService) (ipld.Link, string) { // save the original files bytes - fpath, err := filepath.Abs(filepath.Join(thisDir(t), "..", fixturesPath)) + fpath, err := filepath.Abs(filepath.Join(thisDir(t), "..", src)) require.NoError(t, err) + f, err := os.Open(fpath) require.NoError(t, err) + ltd.OrigBytes, err = ioutil.ReadAll(f) require.NoError(t, err) require.NotEmpty(t, ltd.OrigBytes) // generate a unixfs dag using the given dagService to get the root. - root := genUnixfsDAG(t, ctx, fixturesPath, dagService) + root := buildUnixFS(t, src, dagService) // Create a UnixFS DAG again AND generate a CARv2 file that can be used to back a filestore. - carV2Path := genFileStoreCARv2File(t, ctx, fixturesPath, root) - return cidlink.Link{Cid: root}, carV2Path + path := genRefCARv2(t, src, root) + return cidlink.Link{Cid: root}, path } func thisDir(t *testing.T) string { _, fname, _, ok := runtime.Caller(1) require.True(t, ok) - return path.Dir(fname) + return filepath.Dir(fname) } // VerifyFileTransferred checks that the fixture file was sent from one node to the other. @@ -222,7 +221,6 @@ func (ltd *Libp2pTestData) VerifyFileTransferredIntoStore(t *testing.T, link ipl } func (ltd *Libp2pTestData) verifyFileTransferred(t *testing.T, link ipld.Link, dagService ipldformat.DAGService, readLen uint64) { - c := link.(cidlink.Link).Cid // load the root of the UnixFS DAG from the new blockstore diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index 02fcbe82..f1463b7a 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -23,7 +23,6 @@ import ( "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/specs-actors/actors/builtin/market" - "github.com/filecoin-project/go-fil-markets/carstore" discoveryimpl "github.com/filecoin-project/go-fil-markets/discovery/impl" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/shared" @@ -34,6 +33,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/migrations" "github.com/filecoin-project/go-fil-markets/storagemarket/network" + "github.com/filecoin-project/go-fil-markets/stores" ) var log = logging.Logger("storagemarket_impl") @@ -58,7 +58,7 @@ type Client struct { unsubDataTransfer datatransfer.Unsubscribe - readOnlyCARStoreTracker *carstore.CarReadOnlyStoreTracker + stores *stores.ReadOnlyBlockstores } // StorageClientOption allows custom configuration of a storage client @@ -82,14 +82,14 @@ func NewClient( options ...StorageClientOption, ) (*Client, error) { c := &Client{ - net: net, - dataTransfer: dataTransfer, - discovery: discovery, - node: scn, - pubSub: pubsub.New(clientDispatcher), - readySub: pubsub.New(shared.ReadyDispatcher), - pollingInterval: DefaultPollingInterval, - readOnlyCARStoreTracker: carstore.NewReadOnlyStoreTracker(), + net: net, + dataTransfer: dataTransfer, + discovery: discovery, + node: scn, + pubSub: pubsub.New(clientDispatcher), + readySub: pubsub.New(shared.ReadyDispatcher), + pollingInterval: DefaultPollingInterval, + stores: stores.NewReadOnlyBlockstores(), } storageMigrations, err := migrations.ClientMigrations.Build() if err != nil { @@ -327,7 +327,7 @@ func (c *Client) ProposeStorageDeal(ctx context.Context, params storagemarket.Pr return nil, xerrors.Errorf("looking up addresses: %w", err) } - commP, pieceSize, err := clientutils.CommP(ctx, params.FilestoreCARv2FilePath, params.Data) + commP, pieceSize, err := clientutils.CommP(ctx, params.IndexedCAR, params.Data) if err != nil { return nil, xerrors.Errorf("computing commP failed: %w", err) } @@ -374,16 +374,16 @@ func (c *Client) ProposeStorageDeal(ctx context.Context, params storagemarket.Pr } deal := &storagemarket.ClientDeal{ - ProposalCid: proposalNd.Cid(), - ClientDealProposal: *clientDealProposal, - State: storagemarket.StorageDealUnknown, - Miner: params.Info.PeerID, - MinerWorker: params.Info.Worker, - DataRef: params.Data, - FastRetrieval: params.FastRetrieval, - DealStages: storagemarket.NewDealStages(), - CreationTime: curTime(), - FilestoreCARv2FilePath: params.FilestoreCARv2FilePath, + ProposalCid: proposalNd.Cid(), + ClientDealProposal: *clientDealProposal, + State: storagemarket.StorageDealUnknown, + Miner: params.Info.PeerID, + MinerWorker: params.Info.Worker, + DataRef: params.Data, + FastRetrieval: params.FastRetrieval, + DealStages: storagemarket.NewDealStages(), + CreationTime: curTime(), + IndexedCAR: params.IndexedCAR, } err = c.statemachines.Begin(proposalNd.Cid(), deal) diff --git a/storagemarket/impl/client_environments.go b/storagemarket/impl/client_environments.go index eb9fa6e0..c2531fed 100644 --- a/storagemarket/impl/client_environments.go +++ b/storagemarket/impl/client_environments.go @@ -12,9 +12,9 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-fil-markets/filestorecaradapter" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/network" + "github.com/filecoin-project/go-fil-markets/stores" ) // ------- @@ -34,7 +34,7 @@ func (c *clientDealEnvironment) Node() storagemarket.StorageClientNode { } func (c *clientDealEnvironment) CleanBlockstore(proposalCid cid.Cid) error { - return c.c.readOnlyCARStoreTracker.CleanBlockstore(proposalCid.String()) + return c.c.stores.Untrack(proposalCid.String()) } func (c *clientDealEnvironment) StartDataTransfer(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.ChannelID, @@ -66,20 +66,18 @@ func (csg *clientStoreGetter) Get(proposalCid cid.Cid) (bstore.Blockstore, error return nil, xerrors.Errorf("failed to get client deal state: %w", err) } - // get a read Only CARv2 blockstore that provides random access on top of - // the client's CARv2 file containing the CARv1 payload that needs to be - // transferred as part of the deal. - - fs, err := filestorecaradapter.NewReadOnlyFileStore(deal.FilestoreCARv2FilePath) + // Open a read-only blockstore off the CAR file, wrapped in a filestore so + // it can read file positional references. + bs, err := stores.ReadOnlyFilestore(deal.IndexedCAR) if err != nil { - return nil, xerrors.Errorf("failed to get blockstore from tracker: %w", err) + return nil, xerrors.Errorf("failed to open car filestore: %w", err) } - _, err = csg.c.readOnlyCARStoreTracker.Add(proposalCid.String(), fs) + _, err = csg.c.stores.Track(proposalCid.String(), bs) if err != nil { return nil, xerrors.Errorf("failed to get blockstore from tracker: %w", err) } - return fs, nil + return bs, nil } func (c *clientDealEnvironment) TagPeer(peer peer.ID, tag string) { diff --git a/storagemarket/impl/clientutils/clientutils.go b/storagemarket/impl/clientutils/clientutils.go index 518e03fd..ae204843 100644 --- a/storagemarket/impl/clientutils/clientutils.go +++ b/storagemarket/impl/clientutils/clientutils.go @@ -15,10 +15,10 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/crypto" - "github.com/filecoin-project/go-fil-markets/filestorecaradapter" "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/network" + "github.com/filecoin-project/go-fil-markets/stores" ) // CommP calculates the commP for a given dataref @@ -26,7 +26,7 @@ import ( // We can't rely on the CARv1 payload in the given CARv2 file being deterministic as the client could have // written a "non-deterministic/unordered" CARv2 file. // So, we need to do a CARv1 traversal here by giving the traverser a random access CARv2 blockstore that wraps the given CARv2 file. -func CommP(ctx context.Context, FileStoreCARv2FilePath string, data *storagemarket.DataRef) (cid.Cid, abi.UnpaddedPieceSize, error) { +func CommP(ctx context.Context, carPath string, data *storagemarket.DataRef) (cid.Cid, abi.UnpaddedPieceSize, error) { // if we already have the PieceCid, there's no need to do anything here. if data.PieceCid != nil { return *data.PieceCid, data.PieceSize, nil @@ -37,13 +37,15 @@ func CommP(ctx context.Context, FileStoreCARv2FilePath string, data *storagemark return cid.Undef, 0, xerrors.New("Piece CID and size must be set for manual transfer") } - if FileStoreCARv2FilePath == "" { + if carPath == "" { return cid.Undef, 0, xerrors.New("need Carv2 file path to get a read-only blockstore") } - fs, err := filestorecaradapter.NewReadOnlyFileStore(FileStoreCARv2FilePath) + // Open a read-only blockstore off the CAR file, wrapped in a filestore so + // it can read file positional references. + fs, err := stores.ReadOnlyFilestore(carPath) if err != nil { - return cid.Undef, 0, xerrors.Errorf("failed to create read-only filestore: %w", err) + return cid.Undef, 0, xerrors.Errorf("failed to open carv2 blockstore: %w", err) } defer fs.Close() @@ -92,7 +94,6 @@ func VerifyResponse(ctx context.Context, resp network.SignedResponse, minerAddr // LabelField makes a label field for a deal proposal as a multibase encoding // of the payload CID (B58BTC for V0, B64 for V1) -// func LabelField(payloadCID cid.Cid) (string, error) { if payloadCID.Version() == 0 { return payloadCID.StringOfBase(multibase.Base58BTC) diff --git a/storagemarket/impl/clientutils/clientutils_test.go b/storagemarket/impl/clientutils/clientutils_test.go index 511a2d61..90a310de 100644 --- a/storagemarket/impl/clientutils/clientutils_test.go +++ b/storagemarket/impl/clientutils/clientutils_test.go @@ -62,10 +62,10 @@ func TestCommPSuccess(t *testing.T) { // ---------------- // commP for file 1. - root1, f1FullCAR := shared_testutil.GenFullCARv2FromNormalFile(t, file1) + root1, f1FullCAR := shared_testutil.CreateDenseCARv2(t, file1) defer os.Remove(f1FullCAR) - root2, f1FileStoreCAR := shared_testutil.GenFileStoreCARv2FromNormalFile(t, file1) + root2, f1FileStoreCAR := shared_testutil.CreateRefCARv2(t, file1) defer os.Remove(f1FileStoreCAR) // assert the two files have different contents @@ -81,10 +81,10 @@ func TestCommPSuccess(t *testing.T) { // ------------ // commP for file2. - root1, f2FullCAR := shared_testutil.GenFullCARv2FromNormalFile(t, file2) + root1, f2FullCAR := shared_testutil.CreateDenseCARv2(t, file2) defer os.Remove(f2FullCAR) - root2, f2FileStoreCAR := shared_testutil.GenFileStoreCARv2FromNormalFile(t, file2) + root2, f2FileStoreCAR := shared_testutil.CreateRefCARv2(t, file2) defer os.Remove(f2FileStoreCAR) // assert the two files have different contents @@ -147,7 +147,7 @@ func TestLabelField(t *testing.T) { func TestNoDuplicatesInCARv2(t *testing.T) { // The CARv2 file for a UnixFS DAG that has duplicates should NOT have duplicates. file1 := filepath.Join("storagemarket", "fixtures", "duplicate_blocks.txt") - _, CARv2Path := shared_testutil.GenFullCARv2FromNormalFile(t, file1) + _, CARv2Path := shared_testutil.CreateDenseCARv2(t, file1) require.NotEmpty(t, CARv2Path) defer os.Remove(CARv2Path) diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index 67c1fdfc..76b18013 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -25,7 +25,6 @@ import ( "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/go-statemachine/fsm" - "github.com/filecoin-project/go-fil-markets/carstore" "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/shared" @@ -37,6 +36,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/migrations" "github.com/filecoin-project/go-fil-markets/storagemarket/network" + "github.com/filecoin-project/go-fil-markets/stores" ) var _ storagemarket.StorageProvider = &Provider{} @@ -69,9 +69,9 @@ type Provider struct { unsubDataTransfer datatransfer.Unsubscribe - shardReg *ShardMigrator - dagStore shared.DagStoreWrapper - readWriteBlockStores *carstore.CarReadWriteStoreTracker + shardReg *ShardMigrator + dagStore stores.DAGStoreWrapper + stores *stores.ReadWriteBlockstores } // StorageProviderOption allows custom configuration of a storage provider @@ -105,7 +105,7 @@ func CustomDealDecisionLogic(decider DealDeciderFunc) StorageProviderOption { func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, fs filestore.FileStore, - dagStore shared.DagStoreWrapper, + dagStore stores.DAGStoreWrapper, pieceStore piecestore.PieceStore, dataTransfer datatransfer.Manager, spn storagemarket.StorageProviderNode, @@ -115,19 +115,19 @@ func NewProvider(net network.StorageMarketNetwork, options ...StorageProviderOption, ) (storagemarket.StorageProvider, error) { h := &Provider{ - net: net, - spn: spn, - fs: fs, - pieceStore: pieceStore, - conns: connmanager.NewConnManager(), - storedAsk: storedAsk, - actor: minerAddress, - dataTransfer: dataTransfer, - pubSub: pubsub.New(providerDispatcher), - readyMgr: shared.NewReadyManager(), - shardReg: shardReg, - dagStore: dagStore, - readWriteBlockStores: carstore.NewCarReadWriteStoreTracker(), + net: net, + spn: spn, + fs: fs, + pieceStore: pieceStore, + conns: connmanager.NewConnManager(), + storedAsk: storedAsk, + actor: minerAddress, + dataTransfer: dataTransfer, + pubSub: pubsub.New(providerDispatcher), + readyMgr: shared.NewReadyManager(), + shardReg: shardReg, + dagStore: dagStore, + stores: stores.NewReadWriteBlockstores(), } storageMigrations, err := migrations.ProviderMigrations.Build() if err != nil { @@ -220,7 +220,6 @@ func (p *Provider) HandleDealStream(s network.StorageDealStream) { } } -// TODO Write a one time script that registers shards for all Pieces that a miner has. func (p *Provider) receiveDeal(s network.StorageDealStream) error { proposal, err := s.ReadDealProposal() if err != nil { @@ -240,7 +239,7 @@ func (p *Provider) receiveDeal(s network.StorageDealStream) error { return p.resendProposalResponse(s, &md) } - var carV2FilePath string + var path string // create an empty CARv2 file at a temp location that Graphysnc will write the incoming blocks to via a CARv2 ReadWrite blockstore wrapper. if proposal.Piece.TransferType != storagemarket.TTManual { tmp, err := p.fs.CreateTemp() @@ -251,7 +250,7 @@ func (p *Provider) receiveDeal(s network.StorageDealStream) error { _ = os.Remove(string(tmp.OsPath())) return xerrors.Errorf("failed to close temp file: %w", err) } - carV2FilePath = string(tmp.OsPath()) + path = string(tmp.OsPath()) } deal := &storagemarket.MinerDeal{ @@ -263,7 +262,7 @@ func (p *Provider) receiveDeal(s network.StorageDealStream) error { Ref: proposal.Piece, FastRetrieval: proposal.FastRetrieval, CreationTime: curTime(), - CARv2FilePath: carV2FilePath, + InboundCAR: path, } err = p.deals.Begin(proposalNd.Cid(), deal) @@ -595,10 +594,10 @@ func (p *Provider) start(ctx context.Context) error { err := p.migrateDeals(ctx) publishErr := p.readyMgr.FireReady(err) if publishErr != nil { - log.Warnf("Publish storage provider ready event: %s", err.Error()) + log.Warnf("publish storage provider ready event: %s", err.Error()) } if err != nil { - return fmt.Errorf("Migrating storage provider state machines: %w", err) + return fmt.Errorf("migrating storage provider state machines: %w", err) } var deals []storagemarket.MinerDeal @@ -606,11 +605,19 @@ func (p *Provider) start(ctx context.Context) error { if err != nil { return xerrors.Errorf("failed to fetch deals during startup: %w", err) } + + // re-track all deals for whom we still have a local blockstore. + for _, d := range deals { + if _, err := os.Stat(d.InboundCAR); err == nil && d.Ref != nil { + _, _ = p.stores.GetOrOpen(d.ProposalCid.String(), d.InboundCAR, d.Ref.Root) + } + } + if err := p.shardReg.registerShards(ctx, deals); err != nil { - return fmt.Errorf("Failed to restart deals: %w", err) + return fmt.Errorf("failed to restart deals: %w", err) } if err := p.restartDeals(deals); err != nil { - return fmt.Errorf("Failed to restart deals: %w", err) + return fmt.Errorf("failed to restart deals: %w", err) } return nil } diff --git a/storagemarket/impl/provider_environments.go b/storagemarket/impl/provider_environments.go index b69c14bf..82085df9 100644 --- a/storagemarket/impl/provider_environments.go +++ b/storagemarket/impl/provider_environments.go @@ -19,10 +19,10 @@ import ( "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/piecestore" - "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates" "github.com/filecoin-project/go-fil-markets/storagemarket/network" + "github.com/filecoin-project/go-fil-markets/stores" ) // ------- @@ -34,21 +34,36 @@ type providerDealEnvironment struct { } func (p *providerDealEnvironment) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool) error { - return shared.RegisterShardSync(ctx, p.p.dagStore, pieceCid, carPath, eagerInit) + return stores.RegisterShardSync(ctx, p.p.dagStore, pieceCid, carPath, eagerInit) } -func (p *providerDealEnvironment) CARv2Reader(carV2FilePath string) (*carv2.Reader, error) { - return carv2.OpenReader(carV2FilePath) +func (p *providerDealEnvironment) ReadCAR(path string) (*carv2.Reader, error) { + return carv2.OpenReader(path) } -func (p *providerDealEnvironment) FinalizeReadWriteBlockstore(proposalCid cid.Cid, carPath string, rootCid cid.Cid) error { - bs, err := p.p.readWriteBlockStores.GetOrCreate(proposalCid.String(), carPath, rootCid) +func (p *providerDealEnvironment) FinalizeBlockstore(proposalCid cid.Cid) error { + bs, err := p.p.stores.Get(proposalCid.String()) if err != nil { - return xerrors.Errorf("failed to get read-write blockstore: %w", err) + return xerrors.Errorf("failed to get read/write blockstore: %w", err) } if err := bs.Finalize(); err != nil { - return xerrors.Errorf("failed to finalize read-write blockstore: %w", err) + return xerrors.Errorf("failed to finalize read/write blockstore: %w", err) + } + + return nil +} + +func (p *providerDealEnvironment) TerminateBlockstore(proposalCid cid.Cid, path string) error { + // stop tracking it. + if err := p.p.stores.Untrack(proposalCid.String()); err != nil { + log.Warnf("failed to untrack read write blockstore, proposalCid=%s, car_path=%s: %s", proposalCid, path, err) + } + + // delete the backing CARv2 file as it was a temporary file we created for + // this storage deal; the piece has now been handed off, or the deal has failed. + if err := os.Remove(path); err != nil { + log.Warnf("failed to delete carv2 file on termination, car_path=%s: %s", path, err) } return nil @@ -70,33 +85,23 @@ func (p *providerDealEnvironment) Ask() storagemarket.StorageAsk { return *sask.Ask } -func (p *providerDealEnvironment) CleanReadWriteBlockstore(proposalCid cid.Cid, carV2FilePath string) error { - // close the backing CARv2 file and stop tracking the read-write blockstore for the deal with the given proposalCid. - if err := p.p.readWriteBlockStores.CleanBlockstore(proposalCid.String()); err != nil { - log.Warnf("failed to clean read write blockstore, proposalCid=%s, carV2FilePath=%s: %s", proposalCid, carV2FilePath, err) - } - - // clean up the backing CARv2 file as it was a temporary file we created for this Storage deal and the deal dag has - // now either been sealed into a Sector or the storage deal has failed. - return os.Remove(carV2FilePath) -} - -// GeneratePieceCommitment generates the pieceCid for the CARv1 deal payload in the CARv2 file that already exists at the given path. -func (p *providerDealEnvironment) GeneratePieceCommitment(proposalCid cid.Cid, carV2FilePath string, dealSize abi.PaddedPieceSize) (c cid.Cid, path filestore.Path, finalErr error) { - rd, err := carv2.OpenReader(carV2FilePath) +// GeneratePieceCommitment generates the pieceCid for the CARv1 deal payload in +// the CARv2 file that already exists at the given path. +func (p *providerDealEnvironment) GeneratePieceCommitment(proposalCid cid.Cid, carPath string, dealSize abi.PaddedPieceSize) (c cid.Cid, path filestore.Path, finalErr error) { + rd, err := carv2.OpenReader(carPath) if err != nil { - return cid.Undef, "", xerrors.Errorf("failed to get CARv2 reader, proposalCid=%s, carV2FilePath=%s: %w", proposalCid, carV2FilePath, err) + return cid.Undef, "", xerrors.Errorf("failed to get CARv2 reader, proposalCid=%s, carPath=%s: %w", proposalCid, carPath, err) } defer func() { if err := rd.Close(); err != nil { - log.Errorf("failed to close CARv2 reader, carV2FilePath=%s, err=%s", carV2FilePath, err) + log.Errorf("failed to close CARv2 reader, carPath=%s, err=%s", carPath, err) if finalErr == nil { c = cid.Undef path = "" - finalErr = xerrors.Errorf("failed to close CARv2 reader, proposalCid=%s, carV2FilePath=%s, err=%s", - proposalCid, carV2FilePath, err) + finalErr = xerrors.Errorf("failed to close CARv2 reader, proposalCid=%s, carPath=%s: %w", + proposalCid, carPath, err) return } } @@ -209,7 +214,7 @@ func (psg *providerStoreGetter) Get(proposalCid cid.Cid) (bstore.Blockstore, err return nil, xerrors.Errorf("failed to get deal state: %w", err) } - return psg.p.readWriteBlockStores.GetOrCreate(proposalCid.String(), deal.CARv2FilePath, deal.Ref.Root) + return psg.p.stores.GetOrOpen(proposalCid.String(), deal.InboundCAR, deal.Ref.Root) } type providerPushDeals struct { diff --git a/storagemarket/impl/provider_environments_test.go b/storagemarket/impl/provider_environments_test.go index 19f3ad60..aac9ed90 100644 --- a/storagemarket/impl/provider_environments_test.go +++ b/storagemarket/impl/provider_environments_test.go @@ -16,9 +16,9 @@ import ( func TestGeneratePieceCommitment(t *testing.T) { // both payload.txt and payload2.txt are about 18kb long pieceSize := abi.PaddedPieceSize(32768) - _, carV2File1 := shared_testutil.GenFullCARv2FromNormalFile(t, filepath.Join("storagemarket", "fixtures", "payload.txt")) + _, carV2File1 := shared_testutil.CreateDenseCARv2(t, filepath.Join("storagemarket", "fixtures", "payload.txt")) defer os.Remove(carV2File1) - _, carV2File2 := shared_testutil.GenFullCARv2FromNormalFile(t, filepath.Join("storagemarket", "fixtures", "payload2.txt")) + _, carV2File2 := shared_testutil.CreateDenseCARv2(t, filepath.Join("storagemarket", "fixtures", "payload2.txt")) defer os.Remove(carV2File2) commP1 := genProviderCommP(t, carV2File1, pieceSize) diff --git a/storagemarket/impl/providerstates/provider_states.go b/storagemarket/impl/providerstates/provider_states.go index 647e8626..81d1fef2 100644 --- a/storagemarket/impl/providerstates/provider_states.go +++ b/storagemarket/impl/providerstates/provider_states.go @@ -34,15 +34,14 @@ const DealMaxLabelSize = 256 // ProviderDealEnvironment are the dependencies needed for processing deals // with a ProviderStateEntryFunc type ProviderDealEnvironment interface { - CARv2Reader(carV2FilePath string) (*carv2.Reader, error) + ReadCAR(path string) (*carv2.Reader, error) RegisterShard(ctx context.Context, pieceCid cid.Cid, path string, eagerInit bool) error - FinalizeReadWriteBlockstore(proposalCid cid.Cid, carPath string, root cid.Cid) error + FinalizeBlockstore(proposalCid cid.Cid) error + TerminateBlockstore(proposalCid cid.Cid, path string) error - CleanReadWriteBlockstore(proposalCid cid.Cid, carFilePath string) error - - GeneratePieceCommitment(proposalCid cid.Cid, carV2FilePath string, dealSize abi.PaddedPieceSize) (cid.Cid, filestore.Path, error) + GeneratePieceCommitment(proposalCid cid.Cid, path string, dealSize abi.PaddedPieceSize) (cid.Cid, filestore.Path, error) Address() address.Address Node() storagemarket.StorageProviderNode @@ -203,11 +202,11 @@ func DecideOnProposal(ctx fsm.Context, environment ProviderDealEnvironment, deal // in the proposal func VerifyData(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { // finalize the blockstore as we're done writing deal data to it. - if err := environment.FinalizeReadWriteBlockstore(deal.ProposalCid, deal.CARv2FilePath, deal.Ref.Root); err != nil { - return ctx.Trigger(storagemarket.ProviderEventDataVerificationFailed, xerrors.Errorf("failed to finalize read-write blockstore: %w", err), filestore.Path(""), filestore.Path("")) + if err := environment.FinalizeBlockstore(deal.ProposalCid); err != nil { + return ctx.Trigger(storagemarket.ProviderEventDataVerificationFailed, xerrors.Errorf("failed to finalize read/write blockstore: %w", err), filestore.Path(""), filestore.Path("")) } - pieceCid, metadataPath, err := environment.GeneratePieceCommitment(deal.ProposalCid, deal.CARv2FilePath, deal.Proposal.PieceSize) + pieceCid, metadataPath, err := environment.GeneratePieceCommitment(deal.ProposalCid, deal.InboundCAR, deal.Proposal.PieceSize) if err != nil { return ctx.Trigger(storagemarket.ProviderEventDataVerificationFailed, xerrors.Errorf("error generating CommP: %w", err), filestore.Path(""), filestore.Path("")) } @@ -318,9 +317,9 @@ func HandoffDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal stor return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err) } } else { - carFilePath = deal.CARv2FilePath + carFilePath = deal.InboundCAR - v2r, err := environment.CARv2Reader(deal.CARv2FilePath) + v2r, err := environment.ReadCAR(deal.InboundCAR) if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, xerrors.Errorf("failed to open CARv2 file, proposalCid=%s: %w", deal.ProposalCid, err)) @@ -429,9 +428,9 @@ func CleanupDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal stor } } - if deal.CARv2FilePath != "" { - if err := environment.CleanReadWriteBlockstore(deal.ProposalCid, deal.CARv2FilePath); err != nil { - log.Warnf("failed to cleanup blockstore, CARv2FilePath=%s: %s", deal.CARv2FilePath, err) + if deal.InboundCAR != "" { + if err := environment.TerminateBlockstore(deal.ProposalCid, deal.InboundCAR); err != nil { + log.Warnf("failed to cleanup blockstore, car_path=%s: %s", deal.InboundCAR, err) } } @@ -553,13 +552,13 @@ func FailDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal storage } } - if deal.CARv2FilePath != "" { - if err := environment.FinalizeReadWriteBlockstore(deal.ProposalCid, deal.CARv2FilePath, deal.Ref.Root); err != nil { - log.Warnf("error finalizing read-write store, CARv2FilePath=%s: %s", deal.CARv2FilePath, err) + if deal.InboundCAR != "" { + if err := environment.FinalizeBlockstore(deal.ProposalCid); err != nil { + log.Warnf("error finalizing read-write store, car_path=%s: %s", deal.InboundCAR, err) } - if err := environment.CleanReadWriteBlockstore(deal.ProposalCid, deal.CARv2FilePath); err != nil { - log.Warnf("error deleting store, CARv2FilePath=%s: %s", deal.CARv2FilePath, err) + if err := environment.TerminateBlockstore(deal.ProposalCid, deal.InboundCAR); err != nil { + log.Warnf("error deleting store, car_path=%s: %s", deal.InboundCAR, err) } } diff --git a/storagemarket/impl/providerstates/provider_states_test.go b/storagemarket/impl/providerstates/provider_states_test.go index a4a53c10..a18cf95e 100644 --- a/storagemarket/impl/providerstates/provider_states_test.go +++ b/storagemarket/impl/providerstates/provider_states_test.go @@ -1547,19 +1547,19 @@ func (fe *fakeEnvironment) RegisterShard(ctx context.Context, pieceCid cid.Cid, return fe.shardActivationError } -func (fe *fakeEnvironment) CleanReadWriteBlockstore(proposalCid cid.Cid, carFilePath string) error { +func (fe *fakeEnvironment) TerminateBlockstore(proposalCid cid.Cid, carFilePath string) error { return nil } -func (fe *fakeEnvironment) GeneratePieceCommitment(proposalCid cid.Cid, carV2FilePath string, dealSize abi.PaddedPieceSize) (cid.Cid, filestore.Path, error) { +func (fe *fakeEnvironment) GeneratePieceCommitment(proposalCid cid.Cid, _ string, dealSize abi.PaddedPieceSize) (cid.Cid, filestore.Path, error) { return fe.pieceCid, fe.metadataPath, fe.generateCommPError } -func (fe *fakeEnvironment) FinalizeReadWriteBlockstore(proposalCid cid.Cid, carPath string, root cid.Cid) error { +func (fe *fakeEnvironment) FinalizeBlockstore(proposalCid cid.Cid) error { return fe.finalizeBlockstoreErr } -func (fe *fakeEnvironment) CARv2Reader(carV2FilePath string) (*carv2.Reader, error) { +func (fe *fakeEnvironment) ReadCAR(_ string) (*carv2.Reader, error) { return fe.carV2Reader, fe.carV2Error } diff --git a/storagemarket/impl/shardregistration.go b/storagemarket/impl/shard_migrator.go similarity index 97% rename from storagemarket/impl/shardregistration.go rename to storagemarket/impl/shard_migrator.go index c27986ae..b59c4377 100644 --- a/storagemarket/impl/shardregistration.go +++ b/storagemarket/impl/shard_migrator.go @@ -13,9 +13,9 @@ import ( "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/go-fil-markets/piecestore" - "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates" + "github.com/filecoin-project/go-fil-markets/stores" ) var shardRegMarker = ".shard-registration-complete" @@ -27,7 +27,7 @@ var shardRegMarker = ".shard-registration-complete" type ShardMigrator struct { providerAddr address.Address markerFilePath string - dagStore shared.DagStoreWrapper + dagStore stores.DAGStoreWrapper pieceStore piecestore.PieceStore spn storagemarket.StorageProviderNode @@ -36,7 +36,7 @@ type ShardMigrator struct { func NewShardMigrator( maddr address.Address, dagStorePath string, - dagStore shared.DagStoreWrapper, + dagStore stores.DAGStoreWrapper, pieceStore piecestore.PieceStore, spn storagemarket.StorageProviderNode, ) *ShardMigrator { @@ -101,7 +101,7 @@ func (r *ShardMigrator) registerShards(ctx context.Context, deals []storagemarke log.Infow("all migrated shards initialized") }() - // Filter for deals that are currently sealing. + // Filter for deals that are handed off. // If the deal has not yet been handed off to the sealing subsystem, we // don't need to call RegisterShard in this migration; RegisterShard will // be called in the new code once the deal reaches the state where it's diff --git a/storagemarket/impl/shardregistration_test.go b/storagemarket/impl/shard_migrator_test.go similarity index 97% rename from storagemarket/impl/shardregistration_test.go rename to storagemarket/impl/shard_migrator_test.go index 958f8ef7..fe5d897c 100644 --- a/storagemarket/impl/shardregistration_test.go +++ b/storagemarket/impl/shard_migrator_test.go @@ -71,7 +71,7 @@ func TestShardRegistration(t *testing.T) { Ref: &storagemarket.DataRef{ PieceCid: &pieceCidUnsealed, }, - CARv2FilePath: "", + InboundCAR: "", }, { // Should be registered with lazy registration (because sector is sealed) State: storagemarket.StorageDealSealing, @@ -79,7 +79,7 @@ func TestShardRegistration(t *testing.T) { Ref: &storagemarket.DataRef{ PieceCid: &pieceCidSealed, }, - CARv2FilePath: "", + InboundCAR: "", }, { // Should be ignored because deal is no longer active State: storagemarket.StorageDealError, @@ -87,7 +87,7 @@ func TestShardRegistration(t *testing.T) { Ref: &storagemarket.DataRef{ PieceCid: &pieceCidUnsealed2, }, - CARv2FilePath: "", + InboundCAR: "", }, { // Should be ignored because deal is not yet sealing State: storagemarket.StorageDealFundsReserved, @@ -95,7 +95,7 @@ func TestShardRegistration(t *testing.T) { Ref: &storagemarket.DataRef{ PieceCid: &pieceCidUnsealed3, }, - CARv2FilePath: "", + InboundCAR: "", }} err = shardReg.registerShards(ctx, deals) require.NoError(t, err) diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index e19f1f45..7f879f30 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -20,7 +20,6 @@ import ( dtnet "github.com/filecoin-project/go-data-transfer/network" "github.com/filecoin-project/go-state-types/big" - "github.com/filecoin-project/go-fil-markets/filestorecaradapter" "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" @@ -28,6 +27,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket/testharness" "github.com/filecoin-project/go-fil-markets/storagemarket/testharness/dependencies" "github.com/filecoin-project/go-fil-markets/storagemarket/testnodes" + "github.com/filecoin-project/go-fil-markets/stores" ) var noOpDelay = testnodes.DelayFakeCommonNode{} @@ -58,7 +58,7 @@ func TestMakeDeal(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() h := testharness.NewHarness(t, ctx, data.useStore, noOpDelay, noOpDelay, data.disableNewDeals, fileName) - defer os.Remove(h.FileStoreCARv2FilePath) + defer os.Remove(h.IndexedCAR) shared_testutil.StartAndWaitForReady(ctx, t, h.Provider) shared_testutil.StartAndWaitForReady(ctx, t, h.Client) @@ -191,12 +191,12 @@ func TestMakeDealOffline(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() h := testharness.NewHarness(t, ctx, true, noOpDelay, noOpDelay, false) - defer os.Remove(h.FileStoreCARv2FilePath) + defer os.Remove(h.IndexedCAR) shared_testutil.StartAndWaitForReady(ctx, t, h.Provider) shared_testutil.StartAndWaitForReady(ctx, t, h.Client) - commP, size, err := clientutils.CommP(ctx, h.FileStoreCARv2FilePath, &storagemarket.DataRef{ + commP, size, err := clientutils.CommP(ctx, h.IndexedCAR, &storagemarket.DataRef{ // hacky but need it for now because if it's manual, we wont get a CommP. TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid, @@ -233,8 +233,10 @@ func TestMakeDealOffline(t *testing.T) { assert.True(t, pd.ProposalCid.Equals(proposalCid)) shared_testutil.AssertDealState(t, storagemarket.StorageDealWaitingForData, pd.State) - // Do a Selective CARv1 traversal on the CARv2 file to get a deterministic CARv1 that we can import on the miner side. - fs, err := filestorecaradapter.NewReadOnlyFileStore(h.FileStoreCARv2FilePath) + // Do a selective CARv1 traversal on the CARv2 file to get a + // deterministic CARv1 that we can import on the miner side. + + fs, err := stores.ReadOnlyFilestore(h.IndexedCAR) require.NoError(t, err) sc := car.NewSelectiveCar(ctx, fs, []car.Dag{{Root: h.PayloadCid, Selector: shared.AllSelector()}}) prepared, err := sc.Prepare() @@ -294,7 +296,7 @@ func TestMakeDealNonBlocking(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() h := testharness.NewHarness(t, ctx, true, noOpDelay, noOpDelay, false) - defer os.Remove(h.FileStoreCARv2FilePath) + defer os.Remove(h.IndexedCAR) testCids := shared_testutil.GenerateCids(2) @@ -358,7 +360,7 @@ func TestRestartOnlyProviderDataTransfer(t *testing.T) { } deps := depGen.New(t, ctx, td, smState, "", noOpDelay, noOpDelay) h := testharness.NewHarnessWithTestData(t, td, deps, true, false) - defer os.Remove(h.FileStoreCARv2FilePath) + defer os.Remove(h.IndexedCAR) client := h.Client host1 := h.TestData.Host1 @@ -513,8 +515,8 @@ func TestRestartClient(t *testing.T) { }, "ClientEventFundingComplete": { - //Edge case : Provider begins the state machine on recieving a deal stream request - //client crashes -> restarts -> sends deal stream again -> state machine fails + // Edge case: Provider begins the state machine on receiving a deal stream request + // client crashes -> restarts -> sends deal stream again -> state machine fails // See https://github.com/filecoin-project/lotus/issues/3966 stopAtClientEvent: storagemarket.ClientEventFundingComplete, expectedClientState: storagemarket.StorageDealFundsReserved, @@ -558,7 +560,7 @@ func TestRestartClient(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 50*time.Second) defer cancel() h := testharness.NewHarness(t, ctx, true, tc.clientDelay, tc.providerDelay, false) - defer os.Remove(h.FileStoreCARv2FilePath) + defer os.Remove(h.IndexedCAR) host1 := h.TestData.Host1 host2 := h.TestData.Host2 @@ -620,7 +622,7 @@ func TestRestartClient(t *testing.T) { deps := dependencies.NewDependenciesWithTestData(t, ctx, h.TestData, h.SMState, "", noOpDelay, noOpDelay) h = testharness.NewHarnessWithTestData(t, h.TestData, deps, true, false) - defer os.Remove(h.FileStoreCARv2FilePath) + defer os.Remove(h.IndexedCAR) if len(providerState) == 0 { t.Log("no deal created on provider after stopping") @@ -704,7 +706,7 @@ func TestBounceConnectionDataTransfer(t *testing.T) { } deps := depGen.New(t, ctx, td, smState, "", noOpDelay, noOpDelay) h := testharness.NewHarnessWithTestData(t, td, deps, true, false) - defer os.Remove(h.FileStoreCARv2FilePath) + defer os.Remove(h.IndexedCAR) client := h.Client clientHost := h.TestData.Host1.ID() @@ -804,7 +806,7 @@ func TestCancelDataTransfer(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() h := testharness.NewHarness(t, ctx, true, noOpDelay, noOpDelay, false) - defer os.Remove(h.FileStoreCARv2FilePath) + defer os.Remove(h.IndexedCAR) client := h.Client provider := h.Provider host1 := h.TestData.Host1 diff --git a/storagemarket/testharness/dependencies/dependencies.go b/storagemarket/testharness/dependencies/dependencies.go index c4bc770a..f10a2833 100644 --- a/storagemarket/testharness/dependencies/dependencies.go +++ b/storagemarket/testharness/dependencies/dependencies.go @@ -28,12 +28,12 @@ import ( "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/piecestore" piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl" - "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask" "github.com/filecoin-project/go-fil-markets/storagemarket/testnodes" + "github.com/filecoin-project/go-fil-markets/stores" ) // StorageDependencies are the dependencies required to initialize a storage client/provider @@ -49,7 +49,7 @@ type StorageDependencies struct { ProviderInfo storagemarket.StorageProviderInfo TestData *shared_testutil.Libp2pTestData PieceStore piecestore.PieceStore - DagStore shared.DagStoreWrapper + DagStore stores.DAGStoreWrapper ShardReg *storageimpl.ShardMigrator DTClient datatransfer.Manager DTProvider datatransfer.Manager diff --git a/storagemarket/testharness/testharness.go b/storagemarket/testharness/testharness.go index c638b502..1c19fb5f 100644 --- a/storagemarket/testharness/testharness.go +++ b/storagemarket/testharness/testharness.go @@ -34,10 +34,10 @@ import ( type StorageHarness struct { *dependencies.StorageDependencies - PayloadCid cid.Cid - Client storagemarket.StorageClient - Provider storagemarket.StorageProvider - FileStoreCARv2FilePath string + PayloadCid cid.Cid + Client storagemarket.StorageClient + Provider storagemarket.StorageProvider + IndexedCAR string // path } func NewHarness(t *testing.T, ctx context.Context, useStore bool, cd testnodes.DelayFakeCommonNode, pd testnodes.DelayFakeCommonNode, @@ -61,12 +61,12 @@ func NewHarnessWithTestData(t *testing.T, td *shared_testutil.Libp2pTestData, de fpath := filepath.Join("storagemarket", "fixtures", file) var rootLink ipld.Link - var carV2FilePath string + var path string // TODO Both functions here should return the root cid of the UnixFSDag and the carv2 file path. if useStore { - rootLink, carV2FilePath = td.LoadUnixFSFileToStore(t, fpath) + rootLink, path = td.LoadUnixFSFileToStore(t, fpath) } else { - rootLink, carV2FilePath = td.LoadUnixFSFile(t, fpath, false) + rootLink, path = td.LoadUnixFSFile(t, fpath, false) } payloadCid := rootLink.(cidlink.Link).Cid @@ -112,11 +112,11 @@ func NewHarnessWithTestData(t *testing.T, td *shared_testutil.Libp2pTestData, de assert.NoError(t, err) return &StorageHarness{ - StorageDependencies: deps, - PayloadCid: payloadCid, - Client: client, - Provider: provider, - FileStoreCARv2FilePath: carV2FilePath, + StorageDependencies: deps, + PayloadCid: payloadCid, + Client: client, + Provider: provider, + IndexedCAR: path, } } @@ -148,19 +148,19 @@ func (h *StorageHarness) ProposeStorageDeal(t *testing.T, dataRef *storagemarket var dealDuration = abi.ChainEpoch(180 * builtin.EpochsInDay) result, err := h.Client.ProposeStorageDeal(h.Ctx, storagemarket.ProposeStorageDealParams{ - Addr: h.ClientAddr, - Info: &h.ProviderInfo, - Data: dataRef, - StartEpoch: h.Epoch + 100, - EndEpoch: h.Epoch + 100 + dealDuration, - Price: big.NewInt(1), - Collateral: big.NewInt(0), - Rt: abi.RegisteredSealProof_StackedDrg2KiBV1, - FastRetrieval: fastRetrieval, - VerifiedDeal: verifiedDeal, - FilestoreCARv2FilePath: h.FileStoreCARv2FilePath, + Addr: h.ClientAddr, + Info: &h.ProviderInfo, + Data: dataRef, + StartEpoch: h.Epoch + 100, + EndEpoch: h.Epoch + 100 + dealDuration, + Price: big.NewInt(1), + Collateral: big.NewInt(0), + Rt: abi.RegisteredSealProof_StackedDrg2KiBV1, + FastRetrieval: fastRetrieval, + VerifiedDeal: verifiedDeal, + IndexedCAR: h.IndexedCAR, }) - assert.NoError(t, err) + require.NoError(t, err) return result } diff --git a/storagemarket/types.go b/storagemarket/types.go index 16a37218..6a9776fd 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -111,7 +111,7 @@ type MinerDeal struct { TransferChannelId *datatransfer.ChannelID SectorNumber abi.SectorNumber - CARv2FilePath string + InboundCAR string } // NewDealStages creates a new DealStages object ready to be used. @@ -248,8 +248,9 @@ type ClientDeal struct { TransferChannelID *datatransfer.ChannelID SectorNumber abi.SectorNumber - // path of the CARv2 file containing the entire UnixFSDAG OR a CARv2 file that can be used as backing store for a FileStore. - FilestoreCARv2FilePath string + // IndexedCAR is the path to an indexed CARv2 path. This needs to be opened + // as a filestore, as it most likely contains file-positional references. + IndexedCAR string } // StorageProviderInfo describes on chain information about a StorageProvider @@ -282,7 +283,7 @@ type ProposeStorageDealParams struct { VerifiedDeal bool // path of the CARv2 file containing the entire UnixFSDAG OR a CARv2 file that can be used as backing store for a FileStore. - FilestoreCARv2FilePath string + IndexedCAR string } const ( diff --git a/storagemarket/types_cbor_gen.go b/storagemarket/types_cbor_gen.go index e6dbc176..54f5913b 100644 --- a/storagemarket/types_cbor_gen.go +++ b/storagemarket/types_cbor_gen.go @@ -371,26 +371,26 @@ func (t *ClientDeal) MarshalCBOR(w io.Writer) error { return err } - // t.FilestoreCARv2FilePath (string) (string) - if len("FilestoreCARv2FilePath") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"FilestoreCARv2FilePath\" was too long") + // t.IndexedCAR (string) (string) + if len("IndexedCAR") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"IndexedCAR\" was too long") } - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("FilestoreCARv2FilePath"))); err != nil { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("IndexedCAR"))); err != nil { return err } - if _, err := io.WriteString(w, string("FilestoreCARv2FilePath")); err != nil { + if _, err := io.WriteString(w, string("IndexedCAR")); err != nil { return err } - if len(t.FilestoreCARv2FilePath) > cbg.MaxLength { - return xerrors.Errorf("Value in field t.FilestoreCARv2FilePath was too long") + if len(t.IndexedCAR) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.IndexedCAR was too long") } - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.FilestoreCARv2FilePath))); err != nil { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.IndexedCAR))); err != nil { return err } - if _, err := io.WriteString(w, string(t.FilestoreCARv2FilePath)); err != nil { + if _, err := io.WriteString(w, string(t.IndexedCAR)); err != nil { return err } return nil @@ -729,8 +729,8 @@ func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error { t.SectorNumber = abi.SectorNumber(extra) } - // t.FilestoreCARv2FilePath (string) (string) - case "FilestoreCARv2FilePath": + // t.IndexedCAR (string) (string) + case "IndexedCAR": { sval, err := cbg.ReadStringBuf(br, scratch) @@ -738,7 +738,7 @@ func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error { return err } - t.FilestoreCARv2FilePath = string(sval) + t.IndexedCAR = string(sval) } default: @@ -1117,26 +1117,26 @@ func (t *MinerDeal) MarshalCBOR(w io.Writer) error { return err } - // t.CARv2FilePath (string) (string) - if len("CARv2FilePath") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"CARv2FilePath\" was too long") + // t.InboundCAR (string) (string) + if len("InboundCAR") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"InboundCAR\" was too long") } - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("CARv2FilePath"))); err != nil { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("InboundCAR"))); err != nil { return err } - if _, err := io.WriteString(w, string("CARv2FilePath")); err != nil { + if _, err := io.WriteString(w, string("InboundCAR")); err != nil { return err } - if len(t.CARv2FilePath) > cbg.MaxLength { - return xerrors.Errorf("Value in field t.CARv2FilePath was too long") + if len(t.InboundCAR) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.InboundCAR was too long") } - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.CARv2FilePath))); err != nil { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.InboundCAR))); err != nil { return err } - if _, err := io.WriteString(w, string(t.CARv2FilePath)); err != nil { + if _, err := io.WriteString(w, string(t.InboundCAR)); err != nil { return err } return nil @@ -1466,8 +1466,8 @@ func (t *MinerDeal) UnmarshalCBOR(r io.Reader) error { t.SectorNumber = abi.SectorNumber(extra) } - // t.CARv2FilePath (string) (string) - case "CARv2FilePath": + // t.InboundCAR (string) (string) + case "InboundCAR": { sval, err := cbg.ReadStringBuf(br, scratch) @@ -1475,7 +1475,7 @@ func (t *MinerDeal) UnmarshalCBOR(r io.Reader) error { return err } - t.CARv2FilePath = string(sval) + t.InboundCAR = string(sval) } default: diff --git a/shared/dagstorewrapper.go b/stores/dagstore.go similarity index 67% rename from shared/dagstorewrapper.go rename to stores/dagstore.go index 2cb94e87..ec30bf9d 100644 --- a/shared/dagstorewrapper.go +++ b/stores/dagstore.go @@ -1,23 +1,33 @@ -package shared +package stores import ( "context" + "io" "github.com/ipfs/go-cid" + bstore "github.com/ipfs/go-ipfs-blockstore" "github.com/filecoin-project/dagstore" - - "github.com/filecoin-project/go-fil-markets/carstore" ) -// DagStoreWrapper hides the details of the DAG store implementation from -// the other parts of go-fil-markets -type DagStoreWrapper interface { +type ClosableBlockstore interface { + bstore.Blockstore + io.Closer +} + +// DAGStoreWrapper hides the details of the DAG store implementation from +// the other parts of go-fil-markets. +type DAGStoreWrapper interface { // RegisterShard loads a CAR file into the DAG store and builds an // index for it, sending the result on the supplied channel on completion RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error - // LoadShard fetches the data for a shard and provides a blockstore interface to it - LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) + + // LoadShard fetches the data for a shard and provides a blockstore + // interface to it. + // + // The blockstore must be closed to release the shard. + LoadShard(ctx context.Context, pieceCid cid.Cid) (ClosableBlockstore, error) + // Close closes the dag store wrapper. Close() error } @@ -25,7 +35,7 @@ type DagStoreWrapper interface { // RegisterShardSync calls the DAGStore RegisterShard method and waits // synchronously in a dedicated channel until the registration has completed // fully. -func RegisterShardSync(ctx context.Context, ds DagStoreWrapper, pieceCid cid.Cid, carPath string, eagerInit bool) error { +func RegisterShardSync(ctx context.Context, ds DAGStoreWrapper, pieceCid cid.Cid, carPath string, eagerInit bool) error { resch := make(chan dagstore.ShardResult, 1) if err := ds.RegisterShard(ctx, pieceCid, carPath, eagerInit, resch); err != nil { return err diff --git a/carstore/error.go b/stores/error.go similarity index 89% rename from carstore/error.go rename to stores/error.go index 5eeaec33..cc9a4767 100644 --- a/carstore/error.go +++ b/stores/error.go @@ -1,4 +1,4 @@ -package carstore +package stores import "golang.org/x/xerrors" diff --git a/stores/filestore.go b/stores/filestore.go new file mode 100644 index 00000000..213bb7cf --- /dev/null +++ b/stores/filestore.go @@ -0,0 +1,161 @@ +package stores + +import ( + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" + "github.com/ipfs/go-filestore" + bstore "github.com/ipfs/go-ipfs-blockstore" + carv2 "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/blockstore" + mh "github.com/multiformats/go-multihash" + "golang.org/x/xerrors" +) + +// ReadOnlyFilestore opens the CAR in the specified path as as a read-only +// blockstore, and fronts it with a Filestore whose positional mappings are +// stored inside the CAR itself. It must be closed after done. +func ReadOnlyFilestore(path string) (ClosableBlockstore, error) { + ro, err := blockstore.OpenReadOnly(path, + carv2.ZeroLengthSectionAsEOF(true), + blockstore.UseWholeCIDs(true), + ) + + if err != nil { + return nil, err + } + + bs, err := FilestoreOf(ro) + if err != nil { + return nil, err + } + + return &closableBlockstore{Blockstore: bs, closeFn: ro.Close}, nil +} + +// ReadWriteFilestore opens the CAR in the specified path as as a read-write +// blockstore, and fronts it with a Filestore whose positional mappings are +// stored inside the CAR itself. It must be closed after done. Closing will +// finalize the CAR blockstore. +func ReadWriteFilestore(path string, roots ...cid.Cid) (ClosableBlockstore, error) { + rw, err := blockstore.OpenReadWrite(path, roots, + carv2.ZeroLengthSectionAsEOF(true), + blockstore.UseWholeCIDs(true), + ) + if err != nil { + return nil, err + } + + bs, err := FilestoreOf(rw) + if err != nil { + return nil, err + } + + return &closableBlockstore{Blockstore: bs, closeFn: rw.Finalize}, nil +} + +// FilestoreOf returns a FileManager/Filestore backed entirely by a +// blockstore without requiring a datastore. It achieves this by coercing the +// blockstore into a datastore. The resulting blockstore is suitable for usage +// with DagBuilderHelper with DagBuilderParams#NoCopy=true. +func FilestoreOf(bs bstore.Blockstore) (bstore.Blockstore, error) { + coercer := &dsCoercer{bs} + + // the FileManager stores positional infos (positional mappings) in a + // datastore, which in our case is the blockstore coerced into a datastore. + // + // Passing the root dir as a base path makes me uneasy, but these filestores + // are only used locally. + fm := filestore.NewFileManager(coercer, "/") + fm.AllowFiles = true + + // the Filestore sifts leaves (PosInfos) from intermediate nodes. It writes + // PosInfo leaves to the datastore (which in our case is the coerced + // blockstore), and the intermediate nodes to the blockstore proper (since + // they cannot be mapped to the file. + fstore := filestore.NewFilestore(bs, fm) + bs = bstore.NewIdStore(fstore) + + return bs, nil +} + +var cidBuilder = cid.V1Builder{Codec: cid.Raw, MhType: mh.IDENTITY} + +// dsCoercer coerces a Blockstore to present a datastore interface, apt for +// usage with the Filestore/FileManager. Only PosInfos will be written through +// this path. +type dsCoercer struct { + bstore.Blockstore +} + +var _ datastore.Batching = (*dsCoercer)(nil) + +func (crcr *dsCoercer) Get(key datastore.Key) (value []byte, err error) { + c, err := cidBuilder.Sum(key.Bytes()) + if err != nil { + return nil, xerrors.Errorf("failed to create cid: %w", err) + } + + blk, err := crcr.Blockstore.Get(c) + if err != nil { + return nil, xerrors.Errorf("failed to get cid %s: %w", c, err) + } + return blk.RawData(), nil +} + +func (crcr *dsCoercer) Put(key datastore.Key, value []byte) error { + c, err := cidBuilder.Sum(key.Bytes()) + if err != nil { + return xerrors.Errorf("failed to create cid: %w", err) + } + blk, err := blocks.NewBlockWithCid(value, c) + if err != nil { + return xerrors.Errorf("failed to create block: %w", err) + } + if err := crcr.Blockstore.Put(blk); err != nil { + return xerrors.Errorf("failed to put block: %w", err) + } + return nil +} + +func (crcr *dsCoercer) Has(key datastore.Key) (exists bool, err error) { + c, err := cidBuilder.Sum(key.Bytes()) + if err != nil { + return false, xerrors.Errorf("failed to create cid: %w", err) + } + return crcr.Blockstore.Has(c) +} + +func (crcr *dsCoercer) Batch() (datastore.Batch, error) { + return datastore.NewBasicBatch(crcr), nil +} + +func (crcr *dsCoercer) GetSize(_ datastore.Key) (size int, err error) { + return 0, xerrors.New("operation NOT supported: GetSize") +} + +func (crcr *dsCoercer) Query(_ query.Query) (query.Results, error) { + return nil, xerrors.New("operation NOT supported: Query") +} + +func (crcr *dsCoercer) Delete(_ datastore.Key) error { + return xerrors.New("operation NOT supported: Delete") +} + +func (crcr *dsCoercer) Sync(_ datastore.Key) error { + return xerrors.New("operation NOT supported: Sync") +} + +func (crcr *dsCoercer) Close() error { + return nil +} + +type closableBlockstore struct { + bstore.Blockstore + closeFn func() error +} + +func (c *closableBlockstore) Close() error { + return c.closeFn() +} diff --git a/filestorecaradapter/adapter_test.go b/stores/filestore_test.go similarity index 88% rename from filestorecaradapter/adapter_test.go rename to stores/filestore_test.go index 64290e65..76dcebf2 100644 --- a/filestorecaradapter/adapter_test.go +++ b/stores/filestore_test.go @@ -1,4 +1,4 @@ -package filestorecaradapter +package stores import ( "context" @@ -10,7 +10,7 @@ import ( "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" - cidutil "github.com/ipfs/go-cidutil" + "github.com/ipfs/go-cidutil" ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" bstore "github.com/ipfs/go-ipfs-blockstore" @@ -22,13 +22,14 @@ import ( unixfile "github.com/ipfs/go-unixfs/file" "github.com/ipfs/go-unixfs/importer/balanced" ihelper "github.com/ipfs/go-unixfs/importer/helpers" - "github.com/ipld/go-car/v2/blockstore" mh "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" ) -const unixfsChunkSize uint64 = 1 << 10 -const unixfsLinksPerLevel = 1024 +const ( + unixfsChunkSize uint64 = 1 << 10 + unixfsLinksPerLevel = 1024 +) var defaultHashFunction = uint64(mh.BLAKE2B_MIN + 31) @@ -44,25 +45,28 @@ func TestReadOnlyFilstoreWithPosInfoCARFile(t *testing.T) { tmpCARv2, err := os.CreateTemp(t.TempDir(), "rand") require.NoError(t, err) require.NoError(t, tmpCARv2.Close()) - fs, err := NewReadWriteFileStore(tmpCARv2.Name(), []cid.Cid{root}) + + fs, err := ReadWriteFilestore(tmpCARv2.Name(), root) require.NoError(t, err) + dagSvc := merkledag.NewDAGService(blockservice.New(fs, offline.Exchange(fs))) root2 := writeUnixfsDAGTo(t, ctx, normalFilePath, dagSvc) require.NoError(t, fs.Close()) require.Equal(t, root, root2) // it works if we use a Filestore backed by the given CAR file - rofs, err := NewReadOnlyFileStore(tmpCARv2.Name()) + fs, err = ReadOnlyFilestore(tmpCARv2.Name()) require.NoError(t, err) - fbz, err := dagToNormalFile(t, ctx, root, rofs) + + fbz, err := dagToNormalFile(t, ctx, root, fs) require.NoError(t, err) - require.NoError(t, rofs.Close()) + require.NoError(t, fs.Close()) // assert contents are equal require.EqualValues(t, origBytes, fbz) } -func TestReadOnlyFilestoreWithFullCARFile(t *testing.T) { +func TestReadOnlyFilestoreWithDenseCARFile(t *testing.T) { ctx := context.Background() normalFilePath, origContent := createFile(t, 10, 10485760) @@ -74,15 +78,17 @@ func TestReadOnlyFilestoreWithFullCARFile(t *testing.T) { tmpCARv2, err := os.CreateTemp(t.TempDir(), "rand") require.NoError(t, err) require.NoError(t, tmpCARv2.Close()) - rw, err := blockstore.OpenReadWrite(tmpCARv2.Name(), []cid.Cid{root}, blockstore.UseWholeCIDs(true)) + + fs, err := ReadWriteFilestore(tmpCARv2.Name(), root) require.NoError(t, err) - dagSvc := merkledag.NewDAGService(blockservice.New(rw, offline.Exchange(rw))) + + dagSvc := merkledag.NewDAGService(blockservice.New(fs, offline.Exchange(fs))) root2 := writeUnixfsDAGTo(t, ctx, normalFilePath, dagSvc) - require.NoError(t, rw.Finalize()) + require.NoError(t, fs.Close()) require.Equal(t, root, root2) // Open a read only filestore with the full CARv2 file - fs, err := NewReadOnlyFileStore(tmpCARv2.Name()) + fs, err = ReadOnlyFilestore(tmpCARv2.Name()) require.NoError(t, err) // write out the normal file using the Filestore and assert the contents match. diff --git a/stores/ro_bstores.go b/stores/ro_bstores.go new file mode 100644 index 00000000..9cf2413f --- /dev/null +++ b/stores/ro_bstores.go @@ -0,0 +1,60 @@ +package stores + +import ( + "io" + "sync" + + bstore "github.com/ipfs/go-ipfs-blockstore" + "golang.org/x/xerrors" +) + +// ReadOnlyBlockstores tracks open read blockstores. +type ReadOnlyBlockstores struct { + mu sync.RWMutex + stores map[string]bstore.Blockstore +} + +func NewReadOnlyBlockstores() *ReadOnlyBlockstores { + return &ReadOnlyBlockstores{ + stores: make(map[string]bstore.Blockstore), + } +} + +func (r *ReadOnlyBlockstores) Track(key string, bs bstore.Blockstore) (bool, error) { + r.mu.Lock() + defer r.mu.Unlock() + + if _, ok := r.stores[key]; ok { + return false, nil + } + + r.stores[key] = bs + return true, nil +} + +func (r *ReadOnlyBlockstores) Get(key string) (bstore.Blockstore, error) { + r.mu.RLock() + defer r.mu.RUnlock() + + if bs, ok := r.stores[key]; ok { + return bs, nil + } + + return nil, xerrors.Errorf("could not get blockstore for key %s: %w", key, ErrNotFound) +} + +func (r *ReadOnlyBlockstores) Untrack(key string) error { + r.mu.Lock() + defer r.mu.Unlock() + + if bs, ok := r.stores[key]; ok { + delete(r.stores, key) + if closer, ok := bs.(io.Closer); ok { + if err := closer.Close(); err != nil { + return xerrors.Errorf("failed to close read-only blockstore: %w", err) + } + } + } + + return nil +} diff --git a/carstore/read_only_blockstore_test.go b/stores/ro_bstores_test.go similarity index 72% rename from carstore/read_only_blockstore_test.go rename to stores/ro_bstores_test.go index b15badc0..687a1775 100644 --- a/carstore/read_only_blockstore_test.go +++ b/stores/ro_bstores_test.go @@ -1,4 +1,4 @@ -package carstore_test +package stores_test import ( "context" @@ -11,8 +11,8 @@ import ( "github.com/filecoin-project/dagstore" - "github.com/filecoin-project/go-fil-markets/carstore" tut "github.com/filecoin-project/go-fil-markets/shared_testutil" + "github.com/filecoin-project/go-fil-markets/stores" ) func TestReadOnlyStoreTracker(t *testing.T) { @@ -20,24 +20,31 @@ func TestReadOnlyStoreTracker(t *testing.T) { // Create a CARv2 file from a fixture testData := tut.NewLibp2pTestData(ctx, t) + fpath1 := filepath.Join("retrievalmarket", "impl", "fixtures", "lorem.txt") _, carFilePath := testData.LoadUnixFSFileToStore(t, fpath1) + fpath2 := filepath.Join("retrievalmarket", "impl", "fixtures", "lorem_under_1_block.txt") _, carFilePath2 := testData.LoadUnixFSFileToStore(t, fpath2) + rdOnlyBS1, err := blockstore.OpenReadOnly(carFilePath, carv2.ZeroLengthSectionAsEOF(true), blockstore.UseWholeCIDs(true)) require.NoError(t, err) + + rdOnlyBS2, err := blockstore.OpenReadOnly(carFilePath2, carv2.ZeroLengthSectionAsEOF(true), blockstore.UseWholeCIDs(true)) + require.NoError(t, err) + len1 := getBstoreLen(ctx, t, rdOnlyBS1) k1 := "k1" k2 := "k2" - tracker := carstore.NewReadOnlyStoreTracker() + tracker := stores.NewReadOnlyBlockstores() // Get a non-existent key _, err = tracker.Get(k1) - require.True(t, carstore.IsNotFound(err)) + require.True(t, stores.IsNotFound(err)) // Add a read-only blockstore - ok, err := tracker.Add(k1, rdOnlyBS1) + ok, err := tracker.Track(k1, rdOnlyBS1) require.NoError(t, err) require.True(t, ok) @@ -49,29 +56,22 @@ func TestReadOnlyStoreTracker(t *testing.T) { lenGot := getBstoreLen(ctx, t, got) require.Equal(t, len1, lenGot) - // Call GetOrCreate using the same key - got2, err := tracker.GetOrCreate(k1, carFilePath) - require.NoError(t, err) - - // Verify the blockstore is the same - lenGot2 := getBstoreLen(ctx, t, got2) - require.Equal(t, len1, lenGot2) - - // Call GetOrCreate with a different CAR file - rdOnlyBS2, err := tracker.GetOrCreate(k2, carFilePath2) + // Call GetOrOpen with a different CAR file + ok, err = tracker.Track(k2, rdOnlyBS2) require.NoError(t, err) + require.True(t, ok) // Verify the blockstore is different len2 := getBstoreLen(ctx, t, rdOnlyBS2) require.NotEqual(t, len1, len2) - // Clean the second blockstore from the tracker - err = tracker.CleanBlockstore(k2) + // Untrack the second blockstore from the tracker + err = tracker.Untrack(k2) require.NoError(t, err) // Verify it's been removed _, err = tracker.Get(k2) - require.True(t, carstore.IsNotFound(err)) + require.True(t, stores.IsNotFound(err)) } func getBstoreLen(ctx context.Context, t *testing.T, bs dagstore.ReadBlockstore) int { diff --git a/stores/rw_bstores.go b/stores/rw_bstores.go new file mode 100644 index 00000000..ca4130ff --- /dev/null +++ b/stores/rw_bstores.go @@ -0,0 +1,62 @@ +package stores + +import ( + "sync" + + "github.com/ipfs/go-cid" + "github.com/ipld/go-car/v2/blockstore" + "golang.org/x/xerrors" +) + +// ReadWriteBlockstores tracks open ReadWrite CAR blockstores. +type ReadWriteBlockstores struct { + mu sync.RWMutex + stores map[string]*blockstore.ReadWrite +} + +func NewReadWriteBlockstores() *ReadWriteBlockstores { + return &ReadWriteBlockstores{ + stores: make(map[string]*blockstore.ReadWrite), + } +} + +func (r *ReadWriteBlockstores) Get(key string) (*blockstore.ReadWrite, error) { + r.mu.RLock() + defer r.mu.RUnlock() + + if bs, ok := r.stores[key]; ok { + return bs, nil + } + return nil, xerrors.Errorf("could not get blockstore for key %s: %w", key, ErrNotFound) +} + +func (r *ReadWriteBlockstores) GetOrOpen(key string, path string, rootCid cid.Cid) (*blockstore.ReadWrite, error) { + r.mu.Lock() + defer r.mu.Unlock() + + if bs, ok := r.stores[key]; ok { + return bs, nil + } + + bs, err := blockstore.OpenReadWrite(path, []cid.Cid{rootCid}, blockstore.UseWholeCIDs(true)) + if err != nil { + return nil, xerrors.Errorf("failed to create read-write blockstore: %w", err) + } + r.stores[key] = bs + return bs, nil +} + +func (r *ReadWriteBlockstores) Untrack(key string) error { + r.mu.Lock() + defer r.mu.Unlock() + + if bs, ok := r.stores[key]; ok { + // If the blockstore has already been finalized, calling Finalize again + // will return an error. For our purposes it's simplest if Finalize is + // idempotent so we just ignore any error. + _ = bs.Finalize() + } + + delete(r.stores, key) + return nil +} diff --git a/carstore/read_write_blockstore_test.go b/stores/rw_bstores_test.go similarity index 71% rename from carstore/read_write_blockstore_test.go rename to stores/rw_bstores_test.go index 0387bb20..b5bfe37d 100644 --- a/carstore/read_write_blockstore_test.go +++ b/stores/rw_bstores_test.go @@ -1,4 +1,4 @@ -package carstore_test +package stores_test import ( "context" @@ -8,8 +8,8 @@ import ( cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/stretchr/testify/require" - "github.com/filecoin-project/go-fil-markets/carstore" tut "github.com/filecoin-project/go-fil-markets/shared_testutil" + "github.com/filecoin-project/go-fil-markets/stores" ) func TestReadWriteStoreTracker(t *testing.T) { @@ -28,14 +28,14 @@ func TestReadWriteStoreTracker(t *testing.T) { k1 := "k1" k2 := "k2" - tracker := carstore.NewCarReadWriteStoreTracker() + tracker := stores.NewReadWriteBlockstores() // Get a non-existent key _, err := tracker.Get(k1) - require.True(t, carstore.IsNotFound(err)) + require.True(t, stores.IsNotFound(err)) - // Create a blockstore by calling GetOrCreate - rdOnlyBS1, err := tracker.GetOrCreate(k1, carFilePath1, rootCidLnk1.Cid) + // Create a blockstore by calling GetOrOpen + rdOnlyBS1, err := tracker.GetOrOpen(k1, carFilePath1, rootCidLnk1.Cid) require.NoError(t, err) // Get the blockstore using its key @@ -47,19 +47,19 @@ func TestReadWriteStoreTracker(t *testing.T) { lenGot := getBstoreLen(ctx, t, got) require.Equal(t, len1, lenGot) - // Call GetOrCreate with a different CAR file - rdOnlyBS2, err := tracker.GetOrCreate(k2, carFilePath2, rootCidLnk2.Cid) + // Call GetOrOpen with a different CAR file + rdOnlyBS2, err := tracker.GetOrOpen(k2, carFilePath2, rootCidLnk2.Cid) require.NoError(t, err) // Verify the blockstore is different len2 := getBstoreLen(ctx, t, rdOnlyBS2) require.NotEqual(t, len1, len2) - // Clean the second blockstore from the tracker - err = tracker.CleanBlockstore(k2) + // Untrack the second blockstore from the tracker + err = tracker.Untrack(k2) require.NoError(t, err) // Verify it's been removed _, err = tracker.Get(k2) - require.True(t, carstore.IsNotFound(err)) + require.True(t, stores.IsNotFound(err)) }