Skip to content

Commit

Permalink
refactor: code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jul 6, 2021
1 parent b47f9e6 commit f8eabc1
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 28 deletions.
5 changes: 1 addition & 4 deletions retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,6 @@ func (c *Client) checkForActiveDeal(payloadCID cid.Cid, pid peer.ID) error {
return nil
}

func (c *Client) GetBlockstore(dealID retrievalmarket.DealID) (bstore.Blockstore, error) {
return c.readWriteBlockstores.Get(dealID.String())
}

func (c *Client) notifySubscribers(eventName fsm.EventName, state fsm.StateType) {
evt := eventName.(retrievalmarket.ClientEvent)
ds := state.(retrievalmarket.ClientDealState)
Expand Down Expand Up @@ -463,6 +459,7 @@ func (c *clientDealEnvironment) CloseDataTransfer(ctx context.Context, channelID
return err
}

// FinalizeBlockstore is called when all blocks have been received
func (c *clientDealEnvironment) FinalizeBlockstore(ctx context.Context, dealID retrievalmarket.DealID) error {
bs, err := c.c.readWriteBlockstores.Get(dealID.String())
if err != nil {
Expand Down
23 changes: 19 additions & 4 deletions retrievalmarket/impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func TestClientCanMakeDealWithProvider(t *testing.T) {
zeroPricePerByte bool
paramsV1, addFunds bool
skipStores bool
failsUnseal bool
paymentInterval uint64
paymentIntervalIncrease uint64
channelAvailableFunds retrievalmarket.ChannelAvailableFunds
Expand Down Expand Up @@ -313,6 +314,13 @@ func TestClientCanMakeDealWithProvider(t *testing.T) {
voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(10136000), abi.NewTokenAmount(19920000)},
skipStores: true,
},
{
name: "failed unseal",
filename: "lorem.txt",
filesize: 19000,
voucherAmts: []abi.TokenAmount{},
failsUnseal: true,
},
{name: "multi-block file retrieval succeeds, final block exceeds payment interval",
filename: "lorem.txt",
filesize: 19000,
Expand Down Expand Up @@ -410,7 +418,12 @@ func TestClientCanMakeDealWithProvider(t *testing.T) {
}
providerNode := testnodes.NewTestRetrievalProviderNode()
providerNode.ExpectPricingParams(pieceInfo.PieceCID, []abi.DealID{100})
providerNode.ExpectUnseal(sectorID, offset.Unpadded(), abi.UnpaddedPieceSize(len(carData)), carData)

if testCase.failsUnseal {
providerNode.ExpectFailedUnseal(sectorID, offset.Unpadded(), abi.UnpaddedPieceSize(len(carData)))
} else {
providerNode.ExpectUnseal(sectorID, offset.Unpadded(), abi.UnpaddedPieceSize(len(carData)), carData)
}

decider := rmtesting.TrivialTestDecider
if testCase.decider != nil {
Expand Down Expand Up @@ -524,7 +537,7 @@ CurrentInterval: %d
t.FailNow()
case clientDealState = <-clientDealStateChan:
}
if testCase.cancelled {
if testCase.failsUnseal || testCase.cancelled {
assert.Equal(t, retrievalmarket.DealStatusCancelled, clientDealState.Status)
} else {
if !testCase.zeroPricePerByte {
Expand All @@ -549,7 +562,9 @@ CurrentInterval: %d
case providerDealState = <-providerDealStateChan:
}

if testCase.cancelled {
if testCase.failsUnseal {
tut.AssertRetrievalDealState(t, retrievalmarket.DealStatusErrored, providerDealState.Status)
} else if testCase.cancelled {
tut.AssertRetrievalDealState(t, retrievalmarket.DealStatusCancelled, providerDealState.Status)
} else {
tut.AssertRetrievalDealState(t, retrievalmarket.DealStatusCompleted, providerDealState.Status)
Expand All @@ -563,7 +578,7 @@ CurrentInterval: %d
// verify that the nodes we interacted with as expected
clientNode.VerifyExpectations(t)
providerNode.VerifyExpectations(t)
if !testCase.cancelled {
if !testCase.failsUnseal && !testCase.cancelled {
testData.VerifyFileTransferredIntoStore(t, pieceLink, rresp.CarFilePath, testCase.filesize)
}
})
Expand Down
14 changes: 14 additions & 0 deletions retrievalmarket/impl/provider_environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ func (pde *providerDealEnvironment) Node() retrievalmarket.RetrievalProviderNode
return pde.p.node
}

// PrepareBlockstore is called when the deal data has been unsealed and we need
// to add all blocks to a blockstore that is used to serve retrieval
func (pde *providerDealEnvironment) PrepareBlockstore(ctx context.Context, dealID retrievalmarket.DealID, pieceCid cid.Cid) error {
// Load the blockstore that has the deal data
key := shard.Key(pieceCid.String())
bs, err := pde.p.dagStore.LoadShard(ctx, key, pde.p.mountApi)
if err != nil {
Expand Down Expand Up @@ -302,6 +305,17 @@ func (psg *providerStoreGetter) Get(otherPeer peer.ID, dealID retrievalmarket.De
if err != nil {
return nil, xerrors.Errorf("failed to get deal state: %w", err)
}

//
// When a request for data is received
// 1. The data transfer layer calls Get to get the blockstore
// 2. The data for the deal is unsealed
// 3. The unsealed data is put into the blockstore
// 4. The data is served from the blockstore (using blockstore.Get)
//
// So we use a "lazy" blockstore that can be returned in step 1
// but is only accessed in step 4 after the data has been unsealed.
//
return newLazyBlockstore(func() (bstore.Blockstore, error) {
return psg.p.readOnlyBlockStores.Get(dealID.String())
}), nil
Expand Down
19 changes: 3 additions & 16 deletions shared_testutil/mocknet.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,7 @@ func (ltd *Libp2pTestData) LoadUnixFSFile(t *testing.T, fixturesPath string, use
return ltd.loadUnixFSFile(t, fixturesPath, dagService)
}

// LoadUnixFSFileToStore injects the fixture `filename` from the
// fixtures directory, creating a new multistore in the process. If useSecondNode is true,
// fixture is injected to the second node. Otherwise the first node gets it
// LoadUnixFSFileToStore creates a CAR file from the fixture at `fixturesPath`
func (ltd *Libp2pTestData) LoadUnixFSFileToStore(t *testing.T, fixturesPath string) (ipld.Link, string) {
dstore := dss.MutexWrap(datastore.NewMapDatastore())
bs := bstore.NewBlockstore(dstore)
Expand Down Expand Up @@ -293,20 +291,9 @@ func (ltd *Libp2pTestData) VerifyFileTransferred(t *testing.T, link ipld.Link, u
ltd.verifyFileTransferred(t, link, dagService, readLen)
}

// VerifyFileTransferredIntoStore checks that the fixture file was sent from one node to the other, into the store specified by
// storeID
// VerifyFileTransferredIntoStore checks that the fixture file was sent from
// one node to the other, and stored in the given CAR file
func (ltd *Libp2pTestData) VerifyFileTransferredIntoStore(t *testing.T, link ipld.Link, carFilePath string, readLen uint64) {
//var dagService ipldformat.DAGService
//if useSecondNode {
// store, err := ltd.MultiStore2.Get(bstore)
// require.NoError(t, err)
// dagService = store.DAG
//} else {
// store, err := ltd.MultiStore1.Get(bstore)
// require.NoError(t, err)
// dagService = store.DAG
//}

bstore, err := blockstore.OpenReadOnly(carFilePath)
require.NoError(t, err)
bsvc := blockservice.New(bstore, offline.Exchange(bstore))
Expand Down
3 changes: 1 addition & 2 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ type Provider struct {
unsubDataTransfer datatransfer.Unsubscribe

// TODO Uncomment this when DAGStore compiles -> Lotus will inject these deps here.
dagStore dagstore.DagStore
//mountApi marketdagstore.LotusMountAPI

dagStore dagstore.DagStore
readWriteBlockStores *carstore.CarReadWriteStoreTracker
}

Expand Down
2 changes: 0 additions & 2 deletions storagemarket/testharness/testharness.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
"github.com/filecoin-project/go-data-transfer/testutil"
dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
Expand All @@ -38,7 +37,6 @@ type StorageHarness struct {
PayloadCid cid.Cid
Client storagemarket.StorageClient
Provider storagemarket.StorageProvider
StoreID *multistore.StoreID
CARv2FilePath string
}

Expand Down

0 comments on commit f8eabc1

Please sign in to comment.