diff --git a/retrievalmarket/storage_retrieval_integration_test.go b/retrievalmarket/storage_retrieval_integration_test.go index 6e4833afa..a803da70b 100644 --- a/retrievalmarket/storage_retrieval_integration_test.go +++ b/retrievalmarket/storage_retrieval_integration_test.go @@ -80,7 +80,12 @@ func TestStorageRetrieval(t *testing.T) { case storageProviderSeenDeal = <-providerDealChan: case storageClientSeenDeal = <-clientDealChan: case <-ctxTimeout.Done(): - t.Fatalf("never saw completed: %d, %d", storageClientSeenDeal.State, storageProviderSeenDeal.State) + t.Fatalf("never saw completed deal, client deal state: %s (%d), provider deal state: %s (%d)", + storagemarket.DealStates[storageClientSeenDeal.State], + storageClientSeenDeal.State, + storagemarket.DealStates[storageProviderSeenDeal.State], + storageProviderSeenDeal.State, + ) } } @@ -219,7 +224,9 @@ func newStorageHarness(ctx context.Context, t *testing.T) *storageHarness { &clientNode, ) require.NoError(t, err) + dt2 := graphsyncimpl.NewGraphSyncDataTransfer(td.Host2, td.GraphSync2, td.DTStoredCounter2) + require.NoError(t, dt2.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, &fakeDTValidator{})) provider, err := stormkt.NewProvider( stornet.NewFromLibp2pHost(td.Host2), td.Ds2, diff --git a/shared_testutil/test_types.go b/shared_testutil/test_types.go index fb10335ee..d1297c7bd 100644 --- a/shared_testutil/test_types.go +++ b/shared_testutil/test_types.go @@ -141,14 +141,20 @@ func MakeTestClientDealProposal() *market.ClientDealProposal { } // MakeTestDataRef returns a storage market data ref -func MakeTestDataRef() *storagemarket.DataRef { - return &storagemarket.DataRef{ +func MakeTestDataRef(manualXfer bool) *storagemarket.DataRef { + out := &storagemarket.DataRef{ Root: GenerateCids(1)[0], } + + if manualXfer { + out.TransferType = storagemarket.TTManual + } + + return out } // MakeTestClientDeal returns a storage market client deal -func MakeTestClientDeal(state storagemarket.StorageDealStatus, clientDealProposal *market.ClientDealProposal) (*storagemarket.ClientDeal, error) { +func MakeTestClientDeal(state storagemarket.StorageDealStatus, clientDealProposal *market.ClientDealProposal, manualXfer bool) (*storagemarket.ClientDeal, error) { proposalNd, err := cborutil.AsIpld(clientDealProposal) if err != nil { @@ -165,7 +171,7 @@ func MakeTestClientDeal(state storagemarket.StorageDealStatus, clientDealProposa State: state, Miner: p, MinerWorker: address.TestAddress2, - DataRef: MakeTestDataRef(), + DataRef: MakeTestDataRef(manualXfer), }, nil } diff --git a/shared_testutil/testutil.go b/shared_testutil/testutil.go index bb0fd2a4a..4dd8cfa49 100644 --- a/shared_testutil/testutil.go +++ b/shared_testutil/testutil.go @@ -11,7 +11,10 @@ import ( blocksutil "github.com/ipfs/go-ipfs-blocksutil" "github.com/jbenet/go-random" "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-fil-markets/storagemarket" ) var blockGenerator = blocksutil.NewBlockGenerator() @@ -94,3 +97,12 @@ func TestVoucherEquality(t *testing.T, a, b *paych.SignedVoucher) { require.NoError(t, err) require.True(t, bytes.Equal(aB, bB)) } + +// AssertDealState asserts equality of StorageDealStatus but with better error messaging +func AssertDealState(t *testing.T, expected storagemarket.StorageDealStatus, actual storagemarket.StorageDealStatus) { + assert.Equal(t, expected, actual, + "Unexpected deal status\nexpected: %s (%d)\nactual : %s (%d)", + storagemarket.DealStates[expected], expected, + storagemarket.DealStates[actual], actual, + ) +} diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index 179f94dbb..9af62ba4f 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -17,6 +17,7 @@ import ( "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log/v2" + "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" @@ -29,6 +30,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/connmanager" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/dtutils" "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) @@ -39,11 +41,6 @@ var _ storagemarket.StorageClient = &Client{} type Client struct { net network.StorageMarketNetwork - // dataTransfer - // TODO: once the data transfer module is complete, the - // client will listen to events on the data transfer module - // Because we are using only a fake DAGService - // implementation, there's no validation or events on the client side dataTransfer datatransfer.Manager bs blockstore.Blockstore pio pieceio.PieceIO @@ -89,6 +86,10 @@ func NewClient( return nil, err } c.statemachines = statemachines + + // register a data transfer event handler -- this will send events to the state machines based on DT events + dataTransfer.SubscribeToEvents(dtutils.ClientDataTransferSubscriber(statemachines)) + return c, nil } @@ -385,4 +386,7 @@ func (c *clientDealEnvironment) CloseStream(proposalCid cid.Cid) error { return c.c.conns.Disconnect(proposalCid) } -var _ clientstates.ClientDealEnvironment = &clientDealEnvironment{} +func (c *clientDealEnvironment) StartDataTransfer(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error { + _, err := c.c.dataTransfer.OpenPushDataChannel(ctx, to, voucher, baseCid, selector) + return err +} diff --git a/storagemarket/impl/clientstates/client_fsm.go b/storagemarket/impl/clientstates/client_fsm.go index 432064728..15c24939b 100644 --- a/storagemarket/impl/clientstates/client_fsm.go +++ b/storagemarket/impl/clientstates/client_fsm.go @@ -35,7 +35,7 @@ var ClientEvents = fsm.Events{ return nil }), fsm.Event(storagemarket.ClientEventDealProposed). - From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealValidating), + From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealWaitingForDataRequest), fsm.Event(storagemarket.ClientEventDealStreamLookupErrored). FromAny().To(storagemarket.StorageDealFailing). Action(func(deal *storagemarket.ClientDeal, err error) error { @@ -43,17 +43,33 @@ var ClientEvents = fsm.Events{ return nil }), fsm.Event(storagemarket.ClientEventReadResponseFailed). - From(storagemarket.StorageDealValidating).To(storagemarket.StorageDealError). + FromMany(storagemarket.StorageDealWaitingForDataRequest, storagemarket.StorageDealValidating).To(storagemarket.StorageDealError). Action(func(deal *storagemarket.ClientDeal, err error) error { deal.Message = xerrors.Errorf("error reading Response message: %w", err).Error() return nil }), fsm.Event(storagemarket.ClientEventResponseVerificationFailed). - From(storagemarket.StorageDealValidating).To(storagemarket.StorageDealFailing). + FromMany(storagemarket.StorageDealWaitingForDataRequest, storagemarket.StorageDealValidating).To(storagemarket.StorageDealFailing). Action(func(deal *storagemarket.ClientDeal) error { deal.Message = "unable to verify signature on deal response" return nil }), + fsm.Event(storagemarket.ClientEventUnexpectedDealState). + From(storagemarket.StorageDealWaitingForDataRequest).To(storagemarket.StorageDealFailing). + Action(func(deal *storagemarket.ClientDeal, status storagemarket.StorageDealStatus) error { + deal.Message = xerrors.Errorf("unexpected deal status while waiting for data request: %d", status).Error() + return nil + }), + fsm.Event(storagemarket.ClientEventDataTransferFailed). + FromMany(storagemarket.StorageDealWaitingForDataRequest, storagemarket.StorageDealTransferring).To(storagemarket.StorageDealFailing). + Action(func(deal *storagemarket.ClientDeal, err error) error { + deal.Message = xerrors.Errorf("failed to initiate data transfer: %w", err).Error() + return nil + }), + fsm.Event(storagemarket.ClientEventDataTransferInitiated). + From(storagemarket.StorageDealWaitingForDataRequest).To(storagemarket.StorageDealTransferring), + fsm.Event(storagemarket.ClientEventDataTransferComplete). + FromMany(storagemarket.StorageDealTransferring, storagemarket.StorageDealWaitingForDataRequest).To(storagemarket.StorageDealValidating), fsm.Event(storagemarket.ClientEventResponseDealDidNotMatch). From(storagemarket.StorageDealValidating).To(storagemarket.StorageDealFailing). Action(func(deal *storagemarket.ClientDeal, responseCid cid.Cid, proposalCid cid.Cid) error { @@ -104,11 +120,12 @@ var ClientEvents = fsm.Events{ // ClientStateEntryFuncs are the handlers for different states in a storage client var ClientStateEntryFuncs = fsm.StateEntryFuncs{ - storagemarket.StorageDealEnsureClientFunds: EnsureClientFunds, - storagemarket.StorageDealClientFunding: WaitForFunding, - storagemarket.StorageDealFundsEnsured: ProposeDeal, - storagemarket.StorageDealValidating: VerifyDealResponse, - storagemarket.StorageDealProposalAccepted: ValidateDealPublished, - storagemarket.StorageDealSealing: VerifyDealActivated, - storagemarket.StorageDealFailing: FailDeal, + storagemarket.StorageDealEnsureClientFunds: EnsureClientFunds, + storagemarket.StorageDealClientFunding: WaitForFunding, + storagemarket.StorageDealFundsEnsured: ProposeDeal, + storagemarket.StorageDealWaitingForDataRequest: WaitingForDataRequest, + storagemarket.StorageDealValidating: VerifyDealResponse, + storagemarket.StorageDealProposalAccepted: ValidateDealPublished, + storagemarket.StorageDealSealing: VerifyDealActivated, + storagemarket.StorageDealFailing: FailDeal, } diff --git a/storagemarket/impl/clientstates/client_states.go b/storagemarket/impl/clientstates/client_states.go index 0158b6f51..27cb742ad 100644 --- a/storagemarket/impl/clientstates/client_states.go +++ b/storagemarket/impl/clientstates/client_states.go @@ -1,15 +1,21 @@ package clientstates import ( + "context" + + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" - peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/ipld/go-ipld-prime" + "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" + "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) @@ -23,6 +29,7 @@ type ClientDealEnvironment interface { TagConnection(proposalCid cid.Cid) error ReadDealResponse(proposalCid cid.Cid) (network.SignedResponse, error) CloseStream(proposalCid cid.Cid) error + StartDataTransfer(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error } // ClientStateEntryFunc is the type for all state entry functions on a storage client @@ -83,6 +90,50 @@ func ProposeDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storag return ctx.Trigger(storagemarket.ClientEventDealProposed) } +func WaitingForDataRequest(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { + resp, err := environment.ReadDealResponse(deal.ProposalCid) + if err != nil { + return ctx.Trigger(storagemarket.ClientEventReadResponseFailed, err) + } + + tok, _, err := environment.Node().GetChainHead(ctx.Context()) + if err != nil { + return ctx.Trigger(storagemarket.ClientEventResponseVerificationFailed) + } + + if err := clientutils.VerifyResponse(ctx.Context(), resp, deal.MinerWorker, tok, environment.Node().VerifySignature); err != nil { + return ctx.Trigger(storagemarket.ClientEventResponseVerificationFailed) + } + + if resp.Response.State != storagemarket.StorageDealWaitingForData { + return ctx.Trigger(storagemarket.ClientEventUnexpectedDealState, resp.Response.State) + } + + if deal.DataRef.TransferType == storagemarket.TTManual { + log.Infof("manual data transfer for deal %s", deal.ProposalCid) + + // Temporary, we will move to a query/response protocol to check on deal status + return ctx.Trigger(storagemarket.ClientEventDataTransferComplete) + } + + log.Infof("sending data for a deal %s", deal.ProposalCid) + + // initiate a push data transfer. This will complete asynchronously and the + // completion of the data transfer will trigger a change in deal state + err = environment.StartDataTransfer(ctx.Context(), + deal.Miner, + &requestvalidation.StorageDataTransferVoucher{Proposal: deal.ProposalCid}, + deal.DataRef.Root, + shared.AllSelector(), + ) + + if err != nil { + return ctx.Trigger(storagemarket.ClientEventDataTransferFailed, xerrors.Errorf("failed to open push data channel: %w", err)) + } + + return ctx.Trigger(storagemarket.ClientEventDataTransferInitiated) +} + // VerifyDealResponse reads and verifies the response from the provider to the proposed deal func VerifyDealResponse(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { diff --git a/storagemarket/impl/clientstates/client_states_test.go b/storagemarket/impl/clientstates/client_states_test.go index 10332bf26..87a494d0e 100644 --- a/storagemarket/impl/clientstates/client_states_test.go +++ b/storagemarket/impl/clientstates/client_states_test.go @@ -8,13 +8,15 @@ import ( "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-statemachine/fsm" fsmtest "github.com/filecoin-project/go-statemachine/fsm/testutil" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/ipfs/go-cid" - peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/ipld/go-ipld-prime" + "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" tut "github.com/filecoin-project/go-fil-markets/shared_testutil" @@ -29,30 +31,30 @@ func TestEnsureFunds(t *testing.T) { eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) require.NoError(t, err) clientDealProposal := tut.MakeTestClientDealProposal() - runEnsureFunds := makeExecutor(ctx, eventProcessor, clientstates.EnsureClientFunds, clientDealParams{state: storagemarket.StorageDealEnsureClientFunds}, clientDealProposal) + runEnsureFunds := makeExecutor(ctx, eventProcessor, clientstates.EnsureClientFunds, storagemarket.StorageDealEnsureClientFunds, clientDealProposal) addFundsCid := tut.GenerateCids(1)[0] t.Run("immediately succeeds", func(t *testing.T) { - runEnsureFunds(t, makeNode(nodeParams{}), nil, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealFundsEnsured, deal.State) + runEnsureFunds(t, nodeParams{}, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFundsEnsured, deal.State) }) }) t.Run("succeeds by sending an AddFunds message", func(t *testing.T) { - params := nodeParams{ + nodeParams := nodeParams{ AddFundsCid: addFundsCid, } - runEnsureFunds(t, makeNode(params), nil, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealClientFunding, deal.State) + runEnsureFunds(t, nodeParams, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealClientFunding, deal.State) }) }) t.Run("EnsureClientFunds fails", func(t *testing.T) { - n := makeNode(nodeParams{ + nodeParams := nodeParams{ EnsureFundsError: errors.New("Something went wrong"), - }) - runEnsureFunds(t, n, nil, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + } + runEnsureFunds(t, nodeParams, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "adding market funds failed: Something went wrong", deal.Message) }) }) @@ -63,17 +65,17 @@ func TestWaitForFunding(t *testing.T) { eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) require.NoError(t, err) clientDealProposal := tut.MakeTestClientDealProposal() - runEnsureFunds := makeExecutor(ctx, eventProcessor, clientstates.WaitForFunding, clientDealParams{state: storagemarket.StorageDealClientFunding}, clientDealProposal) + runEnsureFunds := makeExecutor(ctx, eventProcessor, clientstates.WaitForFunding, storagemarket.StorageDealClientFunding, clientDealProposal) t.Run("succeeds", func(t *testing.T) { - runEnsureFunds(t, makeNode(nodeParams{WaitForMessageExitCode: exitcode.Ok}), nil, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealFundsEnsured, deal.State) + runEnsureFunds(t, nodeParams{WaitForMessageExitCode: exitcode.Ok}, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFundsEnsured, deal.State) }) }) t.Run("EnsureClientFunds fails", func(t *testing.T) { - runEnsureFunds(t, makeNode(nodeParams{WaitForMessageExitCode: exitcode.ErrInsufficientFunds}), nil, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + runEnsureFunds(t, nodeParams{WaitForMessageExitCode: exitcode.ErrInsufficientFunds}, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "adding market funds failed: AddFunds exit code: 19", deal.Message) }) }) @@ -84,7 +86,7 @@ func TestProposeDeal(t *testing.T) { eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) require.NoError(t, err) clientDealProposal := tut.MakeTestClientDealProposal() - runProposeDeal := makeExecutor(ctx, eventProcessor, clientstates.ProposeDeal, clientDealParams{state: storagemarket.StorageDealFundsEnsured}, clientDealProposal) + runProposeDeal := makeExecutor(ctx, eventProcessor, clientstates.ProposeDeal, storagemarket.StorageDealFundsEnsured, clientDealProposal) dealStream := func(writer tut.StorageDealProposalWriter) *tut.TestStorageDealStream { return tut.NewTestStorageDealStream(tut.TestStorageDealStreamParams{ @@ -94,123 +96,182 @@ func TestProposeDeal(t *testing.T) { t.Run("succeeds", func(t *testing.T) { ds := dealStream(tut.TrivialStorageDealProposalWriter) - runProposeDeal(t, makeNode(nodeParams{}), ds, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealValidating, deal.State) + runProposeDeal(t, nodeParams{}, envParams{dealStream: ds}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealWaitingForDataRequest, deal.State) ds.AssertConnectionTagged(t, deal.ProposalCid.String()) }) }) t.Run("write proposal fails fails", func(t *testing.T) { - runProposeDeal(t, makeNode(nodeParams{}), dealStream(tut.FailStorageProposalWriter), nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealError, deal.State) + runProposeDeal(t, nodeParams{}, envParams{dealStream: dealStream(tut.FailStorageProposalWriter)}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "sending proposal to storage provider failed: write proposal failed", deal.Message) }) }) } -func TestVerifyResponse(t *testing.T) { +func TestWaitingForDataRequest(t *testing.T) { ctx := context.Background() eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) require.NoError(t, err) clientDealProposal := tut.MakeTestClientDealProposal() - proposalNd, err := cborutil.AsIpld(clientDealProposal) require.NoError(t, err) - runVerifyResponse := makeExecutor(ctx, eventProcessor, clientstates.VerifyDealResponse, clientDealParams{state: storagemarket.StorageDealValidating}, clientDealProposal) + runWaitingForDataRequest := makeExecutor(ctx, eventProcessor, clientstates.WaitingForDataRequest, storagemarket.StorageDealWaitingForDataRequest, clientDealProposal) - publishMessage := &(tut.GenerateCids(1)[0]) + t.Run("succeeds", func(t *testing.T) { + stream := testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealWaitingForData, + }) - dealStream := func(reader tut.StorageDealResponseReader) smnet.StorageDealStream { - return tut.NewTestStorageDealStream(tut.TestStorageDealStreamParams{ - ResponseReader: reader, + runWaitingForDataRequest(t, nodeParams{}, envParams{dealStream: stream}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + require.Len(t, env.startDataTransferCalls, 1) + require.Equal(t, env.startDataTransferCalls[0].to, deal.Miner) + require.Equal(t, env.startDataTransferCalls[0].baseCid, deal.DataRef.Root) + + tut.AssertDealState(t, storagemarket.StorageDealTransferring, deal.State) }) - } + }) + + t.Run("response contains unexpected state", func(t *testing.T) { + stream := testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealProposalNotFound, + }) + + runWaitingForDataRequest(t, nodeParams{}, envParams{dealStream: stream}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) + require.Equal(t, "unexpected deal status while waiting for data request: 1", deal.Message) + }) + }) + + t.Run("read response fails", func(t *testing.T) { + stream := tut.NewTestStorageDealStream(tut.TestStorageDealStreamParams{ + ResponseReader: tut.FailStorageResponseReader, + }) + + runWaitingForDataRequest(t, nodeParams{}, envParams{dealStream: stream}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) + require.Equal(t, "error reading Response message: read response failed", deal.Message) + }) + }) + + t.Run("fails starting the data transfer request", func(t *testing.T) { + stream := testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealWaitingForData, + }) + envParams := envParams{ + dealStream: stream, + startDataTransferError: errors.New("failed"), + } + + runWaitingForDataRequest(t, nodeParams{}, envParams, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) + require.Equal(t, "failed to initiate data transfer: failed to open push data channel: failed", deal.Message) + }) + }) + + t.Run("waits for another response with manual transfers", func(t *testing.T) { + stream := testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealWaitingForData, + }) + envParams := envParams{ + dealStream: stream, + manualTransfer: true, + } + + runWaitingForDataRequest(t, nodeParams{}, envParams, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealValidating, deal.State) + }) + }) +} + +func TestVerifyResponse(t *testing.T) { + ctx := context.Background() + eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) + require.NoError(t, err) + clientDealProposal := tut.MakeTestClientDealProposal() + require.NoError(t, err) + runVerifyResponse := makeExecutor(ctx, eventProcessor, clientstates.VerifyDealResponse, storagemarket.StorageDealValidating, clientDealProposal) t.Run("succeeds", func(t *testing.T) { - stream := dealStream(tut.StubbedStorageResponseReader(smnet.SignedResponse{ - Response: smnet.Response{ - State: storagemarket.StorageDealProposalAccepted, - Proposal: proposalNd.Cid(), - PublishMessage: publishMessage, - }, - Signature: tut.MakeTestSignature(), - })) - runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), stream, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealProposalAccepted, deal.State) + publishMessage := &(tut.GenerateCids(1)[0]) + stream := testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealProposalAccepted, + publishMessage: publishMessage, + }) + + runVerifyResponse(t, nodeParams{}, envParams{dealStream: stream}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealProposalAccepted, deal.State) require.Equal(t, publishMessage, deal.PublishMessage) }) }) t.Run("read response fails", func(t *testing.T) { - runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), dealStream(tut.FailStorageResponseReader), nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealError, deal.State) + stream := tut.NewTestStorageDealStream(tut.TestStorageDealStreamParams{ + ResponseReader: tut.FailStorageResponseReader, + }) + + runVerifyResponse(t, nodeParams{}, envParams{dealStream: stream}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "error reading Response message: read response failed", deal.Message) }) }) t.Run("verify response fails", func(t *testing.T) { - stream := dealStream(tut.StubbedStorageResponseReader(smnet.SignedResponse{ - Response: smnet.Response{ - State: storagemarket.StorageDealProposalAccepted, - Proposal: proposalNd.Cid(), - PublishMessage: publishMessage, - }, - Signature: tut.MakeTestSignature(), - })) - failToVerifyNode := makeNode(nodeParams{VerifySignatureFails: true}) - runVerifyResponse(t, failToVerifyNode, stream, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + stream := testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealProposalAccepted, + }) + + runVerifyResponse(t, nodeParams{VerifySignatureFails: true}, envParams{dealStream: stream}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "unable to verify signature on deal response", deal.Message) }) }) t.Run("incorrect proposal cid", func(t *testing.T) { - stream := dealStream(tut.StubbedStorageResponseReader(smnet.SignedResponse{ - Response: smnet.Response{ - State: storagemarket.StorageDealProposalAccepted, - Proposal: tut.GenerateCids(1)[0], - PublishMessage: publishMessage, - }, - Signature: tut.MakeTestSignature(), - })) - runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), stream, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + stream := testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealProposalAccepted, + proposalCid: tut.GenerateCids(1)[0], + }) + + runVerifyResponse(t, nodeParams{}, envParams{dealStream: stream}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Regexp(t, "^miner responded to a wrong proposal:", deal.Message) }) }) t.Run("deal rejected", func(t *testing.T) { - stream := dealStream(tut.StubbedStorageResponseReader(smnet.SignedResponse{ - Response: smnet.Response{ - State: storagemarket.StorageDealProposalRejected, - Proposal: proposalNd.Cid(), - PublishMessage: publishMessage, - Message: "because reasons", - }, - Signature: tut.MakeTestSignature(), - })) + stream := testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealProposalRejected, + message: "because reasons", + }) + expErr := fmt.Sprintf("deal failed: (State=%d) because reasons", storagemarket.StorageDealProposalRejected) - runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), stream, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + runVerifyResponse(t, nodeParams{}, envParams{dealStream: stream}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, deal.Message, expErr) }) }) t.Run("deal stream close errors", func(t *testing.T) { - stream := dealStream(tut.StubbedStorageResponseReader(smnet.SignedResponse{ - Response: smnet.Response{ - State: storagemarket.StorageDealProposalAccepted, - Proposal: proposalNd.Cid(), - PublishMessage: publishMessage, - }, - Signature: tut.MakeTestSignature(), - })) + stream := testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealProposalAccepted, + }) closeStreamErr := errors.New("something went wrong") - runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), stream, closeStreamErr, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealError, deal.State) + + runVerifyResponse(t, nodeParams{}, envParams{dealStream: stream, closeStreamErr: closeStreamErr}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "error attempting to close stream: something went wrong", deal.Message) }) }) - } func TestValidateDealPublished(t *testing.T) { @@ -218,22 +279,22 @@ func TestValidateDealPublished(t *testing.T) { eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) require.NoError(t, err) clientDealProposal := tut.MakeTestClientDealProposal() - runValidateDealPublished := makeExecutor(ctx, eventProcessor, clientstates.ValidateDealPublished, clientDealParams{state: storagemarket.StorageDealProposalAccepted}, clientDealProposal) + runValidateDealPublished := makeExecutor(ctx, eventProcessor, clientstates.ValidateDealPublished, storagemarket.StorageDealProposalAccepted, clientDealProposal) t.Run("succeeds", func(t *testing.T) { - runValidateDealPublished(t, makeNode(nodeParams{ValidatePublishedDealID: abi.DealID(5)}), nil, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealSealing, deal.State) + runValidateDealPublished(t, nodeParams{ValidatePublishedDealID: abi.DealID(5)}, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealSealing, deal.State) require.Equal(t, abi.DealID(5), deal.DealID) }) }) t.Run("fails", func(t *testing.T) { - n := makeNode(nodeParams{ + nodeParams := nodeParams{ ValidatePublishedDealID: abi.DealID(5), ValidatePublishedError: errors.New("Something went wrong"), - }) - runValidateDealPublished(t, n, nil, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealError, deal.State) + } + runValidateDealPublished(t, nodeParams, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "error validating deal published: Something went wrong", deal.Message) }) }) @@ -244,24 +305,24 @@ func TestVerifyDealActivated(t *testing.T) { eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) require.NoError(t, err) clientDealProposal := tut.MakeTestClientDealProposal() - runVerifyDealActivated := makeExecutor(ctx, eventProcessor, clientstates.VerifyDealActivated, clientDealParams{state: storagemarket.StorageDealSealing}, clientDealProposal) + runVerifyDealActivated := makeExecutor(ctx, eventProcessor, clientstates.VerifyDealActivated, storagemarket.StorageDealSealing, clientDealProposal) t.Run("succeeds", func(t *testing.T) { - runVerifyDealActivated(t, makeNode(nodeParams{}), nil, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealActive, deal.State) + runVerifyDealActivated(t, nodeParams{}, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealActive, deal.State) }) }) t.Run("fails synchronously", func(t *testing.T) { - runVerifyDealActivated(t, makeNode(nodeParams{DealCommittedSyncError: errors.New("Something went wrong")}), nil, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealError, deal.State) + runVerifyDealActivated(t, nodeParams{DealCommittedSyncError: errors.New("Something went wrong")}, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "error in deal activation: Something went wrong", deal.Message) }) }) t.Run("fails asynchronously", func(t *testing.T) { - runVerifyDealActivated(t, makeNode(nodeParams{DealCommittedAsyncError: errors.New("Something went wrong later")}), nil, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealError, deal.State) + runVerifyDealActivated(t, nodeParams{DealCommittedAsyncError: errors.New("Something went wrong later")}, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "error in deal activation: Something went wrong later", deal.Message) }) }) @@ -272,79 +333,54 @@ func TestFailDeal(t *testing.T) { eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) require.NoError(t, err) clientDealProposal := tut.MakeTestClientDealProposal() + runFailDeal := makeExecutor(ctx, eventProcessor, clientstates.FailDeal, storagemarket.StorageDealFailing, clientDealProposal) - t.Run("with an open connection", func(t *testing.T) { - stateParams := clientDealParams{ - state: storagemarket.StorageDealFailing, - connectionClosed: false, - } - runFailDeal := makeExecutor(ctx, eventProcessor, clientstates.FailDeal, stateParams, clientDealProposal) - - t.Run("able to close stream", func(t *testing.T) { - runFailDeal(t, nil, nil, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealError, deal.State) - }) - }) - - t.Run("unable to close stream", func(t *testing.T) { - runFailDeal(t, nil, nil, errors.New("unable to close"), func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Equal(t, storagemarket.StorageDealError, deal.State) - require.Equal(t, "error attempting to close stream: unable to close", deal.Message) - }) + t.Run("able to close stream", func(t *testing.T) { + runFailDeal(t, nodeParams{}, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) }) }) - t.Run("with a closed connection", func(t *testing.T) { - stateParams := clientDealParams{ - state: storagemarket.StorageDealFailing, - connectionClosed: true, - } - runFailDeal := makeExecutor(ctx, eventProcessor, clientstates.FailDeal, stateParams, clientDealProposal) - - t.Run("doesn't attempt to close stream if not open", func(t *testing.T) { - runFailDeal(t, nil, nil, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Len(t, env.closeStreamCalls, 0) - require.Equal(t, storagemarket.StorageDealError, deal.State) - }) + t.Run("unable to close stream", func(t *testing.T) { + runFailDeal(t, nodeParams{}, envParams{closeStreamErr: errors.New("unable to close")}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) + require.Equal(t, "error attempting to close stream: unable to close", deal.Message) }) }) } +type envParams struct { + dealStream smnet.StorageDealStream + closeStreamErr error + startDataTransferError error + manualTransfer bool +} + type executor func(t *testing.T, - node storagemarket.StorageClientNode, - dealStream smnet.StorageDealStream, - closeStreamErr error, + nodeParams nodeParams, + envParams envParams, dealInspector func(deal storagemarket.ClientDeal, env *fakeEnvironment)) -type clientDealParams struct { - state storagemarket.StorageDealStatus - connectionClosed bool - addFundsCid *cid.Cid -} - func makeExecutor(ctx context.Context, eventProcessor fsm.EventProcessor, stateEntryFunc clientstates.ClientStateEntryFunc, - dealParams clientDealParams, + initialState storagemarket.StorageDealStatus, clientDealProposal *market.ClientDealProposal) executor { return func(t *testing.T, - node storagemarket.StorageClientNode, - dealStream smnet.StorageDealStream, - closeStreamErr error, + nodeParams nodeParams, + envParams envParams, dealInspector func(deal storagemarket.ClientDeal, env *fakeEnvironment)) { - - dealState, err := tut.MakeTestClientDeal(dealParams.state, clientDealProposal) + node := makeNode(nodeParams) + dealState, err := tut.MakeTestClientDeal(initialState, clientDealProposal, envParams.manualTransfer) require.NoError(t, err) - - dealState.ConnectionClosed = dealParams.connectionClosed dealState.AddFundsCid = &tut.GenerateCids(1)[0] - if dealParams.addFundsCid != nil { - dealState.AddFundsCid = dealParams.addFundsCid + environment := &fakeEnvironment{ + node: node, + dealStream: envParams.dealStream, + closeStreamErr: envParams.closeStreamErr, + startDataTransferError: envParams.startDataTransferError, } - - require.NoError(t, err) - environment := &fakeEnvironment{node, dealStream, closeStreamErr, nil} fsmCtx := fsmtest.NewTestContext(ctx, eventProcessor) err = stateEntryFunc(fsmCtx, environment, *dealState) require.NoError(t, err) @@ -393,10 +429,28 @@ func makeNode(params nodeParams) storagemarket.StorageClientNode { } type fakeEnvironment struct { - node storagemarket.StorageClientNode - dealStream smnet.StorageDealStream - closeStreamErr error - closeStreamCalls []cid.Cid + node storagemarket.StorageClientNode + dealStream smnet.StorageDealStream + closeStreamErr error + startDataTransferError error + startDataTransferCalls []dataTransferParams +} + +type dataTransferParams struct { + to peer.ID + voucher datatransfer.Voucher + baseCid cid.Cid + selector ipld.Node +} + +func (fe *fakeEnvironment) StartDataTransfer(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error { + fe.startDataTransferCalls = append(fe.startDataTransferCalls, dataTransferParams{ + to: to, + voucher: voucher, + baseCid: baseCid, + selector: selector, + }) + return fe.startDataTransferError } func (fe *fakeEnvironment) Node() storagemarket.StorageClientNode { @@ -417,6 +471,39 @@ func (fe *fakeEnvironment) TagConnection(proposalCid cid.Cid) error { } func (fe *fakeEnvironment) CloseStream(proposalCid cid.Cid) error { - fe.closeStreamCalls = append(fe.closeStreamCalls, proposalCid) return fe.closeStreamErr } + +var _ clientstates.ClientDealEnvironment = &fakeEnvironment{} + +type responseParams struct { + proposal *market.ClientDealProposal + state storagemarket.StorageDealStatus + message string + publishMessage *cid.Cid + proposalCid cid.Cid +} + +func testResponseStream(t *testing.T, params responseParams) smnet.StorageDealStream { + response := smnet.Response{ + State: params.state, + Proposal: params.proposalCid, + Message: params.message, + PublishMessage: params.publishMessage, + } + + if response.Proposal == cid.Undef { + proposalNd, err := cborutil.AsIpld(params.proposal) + require.NoError(t, err) + response.Proposal = proposalNd.Cid() + } + + reader := tut.StubbedStorageResponseReader(smnet.SignedResponse{ + Response: response, + Signature: tut.MakeTestSignature(), + }) + + return tut.NewTestStorageDealStream(tut.TestStorageDealStreamParams{ + ResponseReader: reader, + }) +} diff --git a/storagemarket/impl/dtutils/dtutils.go b/storagemarket/impl/dtutils/dtutils.go new file mode 100644 index 000000000..dc79edb06 --- /dev/null +++ b/storagemarket/impl/dtutils/dtutils.go @@ -0,0 +1,90 @@ +// Package dtutils provides go-data-transfer related types and functionality for +// client and provider FSMs +package dtutils + +import ( + "errors" + + datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-statemachine/fsm" + logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" +) + +var log = logging.Logger("storagemarket_impl") + +var ( + // ErrDataTransferFailed means a data transfer for a deal failed + ErrDataTransferFailed = errors.New("deal data transfer failed") +) + +// EventReceiver is any thing that can receive FSM events +type EventReceiver interface { + Send(id interface{}, name fsm.EventName, args ...interface{}) (err error) +} + +// DataTransferSubscriber is the function called when an event occurs in a data +// transfer -- it reads the voucher to verify this even occurred in a storage +// market deal, then, based on the data transfer event that occurred, it generates +// and update message for the deal -- either moving to staged for a completion +// event or moving to error if a data transfer error occurs +func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber { + return func(event datatransfer.Event, channelState datatransfer.ChannelState) { + voucher, ok := channelState.Voucher().(*requestvalidation.StorageDataTransferVoucher) + // if this event is for a transfer not related to storage, ignore + if !ok { + return + } + + // data transfer events for progress do not affect deal state + switch event.Code { + case datatransfer.Open: + err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferInitiated) + if err != nil { + log.Errorf("processing dt event: %w", err) + } + case datatransfer.Complete: + err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferCompleted) + if err != nil { + log.Errorf("processing dt event: %w", err) + } + case datatransfer.Error: + err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferFailed, ErrDataTransferFailed) + if err != nil { + log.Errorf("processing dt event: %w", err) + } + default: + } + } +} + +// DataTransferSubscriber is the function called when an event occurs in a data +// transfer -- it reads the voucher to verify this even occurred in a storage +// market deal, then, based on the data transfer event that occurred, it dispatches +// an event to the appropriate state machine +func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber { + return func(event datatransfer.Event, channelState datatransfer.ChannelState) { + voucher, ok := channelState.Voucher().(*requestvalidation.StorageDataTransferVoucher) + // if this event is for a transfer not related to storage, ignore + if !ok { + return + } + + // data transfer events for progress do not affect deal state + switch event.Code { + case datatransfer.Complete: + err := deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferComplete) + if err != nil { + log.Errorf("processing dt event: %w", err) + } + case datatransfer.Error: + err := deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferFailed, ErrDataTransferFailed) + if err != nil { + log.Errorf("processing dt event: %w", err) + } + default: + } + } +} diff --git a/storagemarket/impl/dtutils/dtutils_test.go b/storagemarket/impl/dtutils/dtutils_test.go new file mode 100644 index 000000000..40059d38a --- /dev/null +++ b/storagemarket/impl/dtutils/dtutils_test.go @@ -0,0 +1,161 @@ +package dtutils_test + +import ( + "testing" + + datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-statemachine/fsm" + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-fil-markets/shared_testutil" + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/dtutils" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" +) + +func TestProviderDataTransferSubscriber(t *testing.T) { + expectedProposalCID := shared_testutil.GenerateCids(1)[0] + tests := map[string]struct { + code datatransfer.EventCode + called bool + voucher datatransfer.Voucher + expectedID interface{} + expectedEvent fsm.EventName + expectedArgs []interface{} + }{ + "not a storage voucher": { + called: false, + voucher: nil, + }, + "open event": { + code: datatransfer.Open, + called: true, + voucher: &requestvalidation.StorageDataTransferVoucher{ + Proposal: expectedProposalCID, + }, + expectedID: expectedProposalCID, + expectedEvent: storagemarket.ProviderEventDataTransferInitiated, + }, + "completion event": { + code: datatransfer.Complete, + called: true, + voucher: &requestvalidation.StorageDataTransferVoucher{ + Proposal: expectedProposalCID, + }, + expectedID: expectedProposalCID, + expectedEvent: storagemarket.ProviderEventDataTransferCompleted, + }, + "error event": { + code: datatransfer.Error, + called: true, + voucher: &requestvalidation.StorageDataTransferVoucher{ + Proposal: expectedProposalCID, + }, + expectedID: expectedProposalCID, + expectedEvent: storagemarket.ProviderEventDataTransferFailed, + expectedArgs: []interface{}{dtutils.ErrDataTransferFailed}, + }, + "other event": { + code: datatransfer.Progress, + called: false, + voucher: &requestvalidation.StorageDataTransferVoucher{ + Proposal: expectedProposalCID, + }, + }, + } + for test, data := range tests { + t.Run(test, func(t *testing.T) { + fdg := &fakeDealGroup{} + subscriber := dtutils.ProviderDataTransferSubscriber(fdg) + subscriber(datatransfer.Event{Code: data.code}, datatransfer.ChannelState{ + Channel: datatransfer.NewChannel(datatransfer.TransferID(0), cid.Undef, nil, data.voucher, peer.ID(""), peer.ID(""), 0), + }) + if data.called { + require.True(t, fdg.called) + require.Equal(t, fdg.lastID, data.expectedID) + require.Equal(t, fdg.lastEvent, data.expectedEvent) + require.Equal(t, fdg.lastArgs, data.expectedArgs) + } else { + require.False(t, fdg.called) + } + }) + } +} + +func TestClientDataTransferSubscriber(t *testing.T) { + expectedProposalCID := shared_testutil.GenerateCids(1)[0] + tests := map[string]struct { + code datatransfer.EventCode + called bool + voucher datatransfer.Voucher + expectedID interface{} + expectedEvent fsm.EventName + expectedArgs []interface{} + }{ + "not a storage voucher": { + called: false, + voucher: nil, + }, + "completion event": { + code: datatransfer.Complete, + called: true, + voucher: &requestvalidation.StorageDataTransferVoucher{ + Proposal: expectedProposalCID, + }, + expectedID: expectedProposalCID, + expectedEvent: storagemarket.ClientEventDataTransferComplete, + }, + "error event": { + code: datatransfer.Error, + called: true, + voucher: &requestvalidation.StorageDataTransferVoucher{ + Proposal: expectedProposalCID, + }, + expectedID: expectedProposalCID, + expectedEvent: storagemarket.ClientEventDataTransferFailed, + expectedArgs: []interface{}{dtutils.ErrDataTransferFailed}, + }, + "other event": { + code: datatransfer.Progress, + called: false, + voucher: &requestvalidation.StorageDataTransferVoucher{ + Proposal: expectedProposalCID, + }, + }, + } + for test, data := range tests { + t.Run(test, func(t *testing.T) { + fdg := &fakeDealGroup{} + subscriber := dtutils.ClientDataTransferSubscriber(fdg) + subscriber(datatransfer.Event{Code: data.code}, datatransfer.ChannelState{ + Channel: datatransfer.NewChannel(datatransfer.TransferID(0), cid.Undef, nil, data.voucher, peer.ID(""), peer.ID(""), 0), + }) + if data.called { + require.True(t, fdg.called) + require.Equal(t, fdg.lastID, data.expectedID) + require.Equal(t, fdg.lastEvent, data.expectedEvent) + require.Equal(t, fdg.lastArgs, data.expectedArgs) + } else { + require.False(t, fdg.called) + } + }) + } +} + +type fakeDealGroup struct { + returnedErr error + called bool + lastID interface{} + lastEvent fsm.EventName + lastArgs []interface{} +} + +func (fdg *fakeDealGroup) Send(id interface{}, name fsm.EventName, args ...interface{}) (err error) { + fdg.lastID = id + fdg.lastEvent = name + fdg.lastArgs = args + fdg.called = true + return fdg.returnedErr +} diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index 966d3dbb3..e6fb2f12e 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -26,6 +26,7 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/connmanager" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/dtutils" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask" @@ -117,9 +118,8 @@ func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blo h.Configure(options...) - // register a data transfer event handler -- this will move deals from - // accepted to staged - dataTransfer.SubscribeToEvents(providerutils.DataTransferSubscriber(deals)) + // register a data transfer event handler -- this will send events to the state machines based on DT events + dataTransfer.SubscribeToEvents(dtutils.ProviderDataTransferSubscriber(deals)) return h, nil } diff --git a/storagemarket/impl/providerstates/provider_fsm.go b/storagemarket/impl/providerstates/provider_fsm.go index 84605f001..8ba0cd60a 100644 --- a/storagemarket/impl/providerstates/provider_fsm.go +++ b/storagemarket/impl/providerstates/provider_fsm.go @@ -24,18 +24,16 @@ var ProviderEvents = fsm.Events{ deal.Message = xerrors.Errorf("deal rejected: %w", err).Error() return nil }), - fsm.Event(storagemarket.ProviderEventDealAccepted). - From(storagemarket.StorageDealValidating).To(storagemarket.StorageDealProposalAccepted), - fsm.Event(storagemarket.ProviderEventWaitingForManualData). - From(storagemarket.StorageDealProposalAccepted).To(storagemarket.StorageDealWaitingForData), + fsm.Event(storagemarket.ProviderEventDataRequested). + From(storagemarket.StorageDealValidating).To(storagemarket.StorageDealWaitingForData), fsm.Event(storagemarket.ProviderEventDataTransferFailed). - FromMany(storagemarket.StorageDealProposalAccepted, storagemarket.StorageDealTransferring).To(storagemarket.StorageDealFailing). + From(storagemarket.StorageDealTransferring).To(storagemarket.StorageDealFailing). Action(func(deal *storagemarket.MinerDeal, err error) error { deal.Message = xerrors.Errorf("error transferring data: %w", err).Error() return nil }), fsm.Event(storagemarket.ProviderEventDataTransferInitiated). - From(storagemarket.StorageDealProposalAccepted).To(storagemarket.StorageDealTransferring), + From(storagemarket.StorageDealWaitingForData).To(storagemarket.StorageDealTransferring), fsm.Event(storagemarket.ProviderEventDataTransferCompleted). From(storagemarket.StorageDealTransferring).To(storagemarket.StorageDealVerifyData), fsm.Event(storagemarket.ProviderEventGeneratePieceCIDFailed). @@ -125,7 +123,6 @@ var ProviderEvents = fsm.Events{ // ProviderStateEntryFuncs are the handlers for different states in a storage client var ProviderStateEntryFuncs = fsm.StateEntryFuncs{ storagemarket.StorageDealValidating: ValidateDealProposal, - storagemarket.StorageDealProposalAccepted: TransferData, storagemarket.StorageDealVerifyData: VerifyData, storagemarket.StorageDealEnsureProviderFunds: EnsureProviderFunds, storagemarket.StorageDealProviderFunding: WaitForFunding, diff --git a/storagemarket/impl/providerstates/provider_states.go b/storagemarket/impl/providerstates/provider_states.go index ddcae2a0a..4a4ebb66d 100644 --- a/storagemarket/impl/providerstates/provider_states.go +++ b/storagemarket/impl/providerstates/provider_states.go @@ -23,7 +23,6 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils" - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) @@ -107,35 +106,17 @@ func ValidateDealProposal(ctx fsm.Context, environment ProviderDealEnvironment, log.Warnf("Error tagging deal connection: %w", err) } - // TODO: Send intent to accept - return ctx.Trigger(storagemarket.ProviderEventDealAccepted) -} - -// TransferData initiates a data transfer or places the deal in a waiting state if it is a -// manual deal -func TransferData(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { - if deal.Ref.TransferType == storagemarket.TTManual { - log.Info("deal entering manual transfer state") - return ctx.Trigger(storagemarket.ProviderEventWaitingForManualData) - } - - log.Infof("fetching data for a deal %s", deal.ProposalCid) - - // initiate a pull data transfer. This will complete asynchronously and the - // completion of the data transfer will trigger a change in deal state - // (see onDataTransferEvent) - err := environment.StartDataTransfer(ctx.Context(), - deal.Client, - &requestvalidation.StorageDataTransferVoucher{Proposal: deal.ProposalCid}, - deal.Ref.Root, - shared.AllSelector(), - ) + // Send intent to accept + err = environment.SendSignedResponse(ctx.Context(), &network.Response{ + State: storagemarket.StorageDealWaitingForData, + Proposal: deal.ProposalCid, + }) if err != nil { - return ctx.Trigger(storagemarket.ProviderEventDataTransferFailed, xerrors.Errorf("failed to open pull data channel: %w", err)) + return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err) } - return ctx.Trigger(storagemarket.ProviderEventDataTransferInitiated) + return ctx.Trigger(storagemarket.ProviderEventDataRequested) } // VerifyData verifies that data received for a deal matches the pieceCID diff --git a/storagemarket/impl/providerstates/provider_states_test.go b/storagemarket/impl/providerstates/provider_states_test.go index f5c61431d..5ccfe06d6 100644 --- a/storagemarket/impl/providerstates/provider_states_test.go +++ b/storagemarket/impl/providerstates/provider_states_test.go @@ -16,7 +16,6 @@ import ( "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/ipfs/go-cid" - "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" @@ -45,22 +44,22 @@ func TestValidateDealProposal(t *testing.T) { environmentParams environmentParams fileStoreParams tut.TestFileStoreParams pieceStoreParams tut.TestPieceStoreParams - dealInspector func(t *testing.T, deal storagemarket.MinerDeal) + dealInspector func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) }{ "succeeds": { environmentParams: environmentParams{ TagsProposal: true, }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealProposalAccepted, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealWaitingForData, deal.State) }, }, "verify signature fails": { nodeParams: nodeParams{ VerifySignatureFails: true, }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "deal rejected: verifying StorageDealProposal: could not verify signature", deal.Message) }, }, @@ -68,8 +67,8 @@ func TestValidateDealProposal(t *testing.T) { environmentParams: environmentParams{ Address: otherAddr, }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "deal rejected: incorrect provider for deal", deal.Message) }, }, @@ -77,8 +76,8 @@ func TestValidateDealProposal(t *testing.T) { nodeParams: nodeParams{ MostRecentStateIDError: errors.New("couldn't get id"), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "error calling node: getting most recent state id: couldn't get id", deal.Message) }, }, @@ -86,16 +85,16 @@ func TestValidateDealProposal(t *testing.T) { environmentParams: environmentParams{DealAcceptanceBuffer: 10, TagsProposal: true}, dealParams: dealParams{StartEpoch: 200}, nodeParams: nodeParams{Height: 190}, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealProposalAccepted, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealWaitingForData, deal.State) }, }, "CurrentHeight > StartEpoch - DealAcceptanceBuffer() fails": { environmentParams: environmentParams{DealAcceptanceBuffer: 10}, dealParams: dealParams{StartEpoch: 200}, nodeParams: nodeParams{Height: 191}, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "deal rejected: deal start epoch is too soon or deal already expired", deal.Message) }, }, @@ -103,8 +102,8 @@ func TestValidateDealProposal(t *testing.T) { dealParams: dealParams{ StoragePricePerEpoch: abi.NewTokenAmount(5000), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "deal rejected: storage price per epoch less than asking price: 5000 < 9765", deal.Message) }, }, @@ -112,8 +111,8 @@ func TestValidateDealProposal(t *testing.T) { dealParams: dealParams{ PieceSize: abi.PaddedPieceSize(128), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "deal rejected: piece size less than minimum required size: 128 < 256", deal.Message) }, }, @@ -121,8 +120,8 @@ func TestValidateDealProposal(t *testing.T) { nodeParams: nodeParams{ ClientMarketBalanceError: errors.New("could not get balance"), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "error calling node: getting client market balance failed: could not get balance", deal.Message) }, }, @@ -130,8 +129,8 @@ func TestValidateDealProposal(t *testing.T) { nodeParams: nodeParams{ ClientMarketBalance: abi.NewTokenAmount(150 * 10000), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "deal rejected: clientMarketBalance.Available too small", deal.Message) }, }, @@ -143,51 +142,6 @@ func TestValidateDealProposal(t *testing.T) { } } -func TestTransferData(t *testing.T) { - ctx := context.Background() - eventProcessor, err := fsm.NewEventProcessor(storagemarket.MinerDeal{}, "State", providerstates.ProviderEvents) - require.NoError(t, err) - runTransferData := makeExecutor(ctx, eventProcessor, providerstates.TransferData, storagemarket.StorageDealProposalAccepted) - tests := map[string]struct { - nodeParams nodeParams - dealParams dealParams - environmentParams environmentParams - fileStoreParams tut.TestFileStoreParams - pieceStoreParams tut.TestPieceStoreParams - dealInspector func(t *testing.T, deal storagemarket.MinerDeal) - }{ - "succeeds": { - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealTransferring, deal.State) - }, - }, - "manual transfer": { - dealParams: dealParams{ - DataRef: &storagemarket.DataRef{ - TransferType: storagemarket.TTManual, - }, - }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealWaitingForData, deal.State) - }, - }, - "data transfer failure": { - environmentParams: environmentParams{ - DataTransferError: errors.New("could not initiate"), - }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, "error transferring data: failed to open pull data channel: could not initiate", deal.Message) - }, - }, - } - for test, data := range tests { - t.Run(test, func(t *testing.T) { - runTransferData(t, data.nodeParams, data.environmentParams, data.dealParams, data.fileStoreParams, data.pieceStoreParams, data.dealInspector) - }) - } -} - func TestVerifyData(t *testing.T) { ctx := context.Background() eventProcessor, err := fsm.NewEventProcessor(storagemarket.MinerDeal{}, "State", providerstates.ProviderEvents) @@ -201,15 +155,15 @@ func TestVerifyData(t *testing.T) { environmentParams environmentParams fileStoreParams tut.TestFileStoreParams pieceStoreParams tut.TestPieceStoreParams - dealInspector func(t *testing.T, deal storagemarket.MinerDeal) + dealInspector func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) }{ "succeeds": { environmentParams: environmentParams{ Path: expPath, MetadataPath: expMetaPath, }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealEnsureProviderFunds, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealEnsureProviderFunds, deal.State) require.Equal(t, expPath, deal.PiecePath) require.Equal(t, expMetaPath, deal.MetadataPath) }, @@ -218,8 +172,8 @@ func TestVerifyData(t *testing.T) { environmentParams: environmentParams{ GenerateCommPError: errors.New("could not generate CommP"), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "generating piece committment: could not generate CommP", deal.Message) }, }, @@ -227,8 +181,8 @@ func TestVerifyData(t *testing.T) { environmentParams: environmentParams{ PieceCid: tut.GenerateCids(1)[0], }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "deal rejected: proposal CommP doesn't match calculated CommP", deal.Message) }, }, @@ -251,15 +205,15 @@ func TestWaitForFunding(t *testing.T) { environmentParams environmentParams fileStoreParams tut.TestFileStoreParams pieceStoreParams tut.TestPieceStoreParams - dealInspector func(t *testing.T, deal storagemarket.MinerDeal) + dealInspector func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) }{ "succeeds": { nodeParams: nodeParams{ WaitForMessageExitCode: exitcode.Ok, WaitForMessageRetBytes: []byte{}, }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealPublish, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealPublish, deal.State) }, }, "AddFunds returns non-ok exit code": { @@ -267,8 +221,8 @@ func TestWaitForFunding(t *testing.T) { WaitForMessageExitCode: exitcode.ErrInsufficientFunds, WaitForMessageRetBytes: []byte{}, }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, fmt.Sprintf("error calling node: AddFunds exit code: %s", exitcode.ErrInsufficientFunds), deal.Message) }, }, @@ -292,11 +246,11 @@ func TestEnsureProviderFunds(t *testing.T) { environmentParams environmentParams fileStoreParams tut.TestFileStoreParams pieceStoreParams tut.TestPieceStoreParams - dealInspector func(t *testing.T, deal storagemarket.MinerDeal) + dealInspector func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) }{ "succeeds immediately": { - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealPublish, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealPublish, deal.State) }, }, "succeeds by sending an AddBalance message": { @@ -306,8 +260,8 @@ func TestEnsureProviderFunds(t *testing.T) { nodeParams: nodeParams{ AddFundsCid: cids[0], }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealProviderFunding, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealProviderFunding, deal.State) require.Equal(t, &cids[0], deal.AddFundsCid) }, }, @@ -315,8 +269,8 @@ func TestEnsureProviderFunds(t *testing.T) { nodeParams: nodeParams{ MinerWorkerError: errors.New("could not get worker"), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "error calling node: looking up miner worker: could not get worker", deal.Message) }, }, @@ -324,8 +278,8 @@ func TestEnsureProviderFunds(t *testing.T) { nodeParams: nodeParams{ EnsureFundsError: errors.New("not enough funds"), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "error calling node: ensuring funds: not enough funds", deal.Message) }, }, @@ -348,19 +302,19 @@ func TestPublishDeal(t *testing.T) { environmentParams environmentParams fileStoreParams tut.TestFileStoreParams pieceStoreParams tut.TestPieceStoreParams - dealInspector func(t *testing.T, deal storagemarket.MinerDeal) + dealInspector func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) }{ "succeeds": { - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealPublishing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealPublishing, deal.State) }, }, "PublishDealsErrors errors": { nodeParams: nodeParams{ PublishDealsError: errors.New("could not post to chain"), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "error calling node: publishing deal: could not post to chain", deal.Message) }, }, @@ -373,8 +327,6 @@ func TestPublishDeal(t *testing.T) { } func TestWaitForPublish(t *testing.T) { - log.SetDebugLogging() - ctx := context.Background() eventProcessor, err := fsm.NewEventProcessor(storagemarket.MinerDeal{}, "State", providerstates.ProviderEvents) require.NoError(t, err) @@ -387,14 +339,14 @@ func TestWaitForPublish(t *testing.T) { environmentParams environmentParams fileStoreParams tut.TestFileStoreParams pieceStoreParams tut.TestPieceStoreParams - dealInspector func(t *testing.T, deal storagemarket.MinerDeal) + dealInspector func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) }{ "succeeds": { nodeParams: nodeParams{ WaitForMessageRetBytes: psdReturnBytes, }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealStaged, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealStaged, deal.State) require.Equal(t, expDealID, deal.DealID) require.Equal(t, true, deal.ConnectionClosed) }, @@ -403,8 +355,8 @@ func TestWaitForPublish(t *testing.T) { nodeParams: nodeParams{ WaitForMessageExitCode: exitcode.SysErrForbidden, }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "PublishStorageDeal error: PublishStorageDeals exit code: SysErrForbidden(8)", deal.Message) }, }, @@ -415,8 +367,8 @@ func TestWaitForPublish(t *testing.T) { environmentParams: environmentParams{ SendSignedResponseError: errors.New("could not send"), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealError, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "sending response to deal: could not send", deal.Message) }, }, @@ -439,7 +391,7 @@ func TestHandoffDeal(t *testing.T) { environmentParams environmentParams fileStoreParams tut.TestFileStoreParams pieceStoreParams tut.TestPieceStoreParams - dealInspector func(t *testing.T, deal storagemarket.MinerDeal) + dealInspector func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) }{ "succeeds": { dealParams: dealParams{ @@ -449,16 +401,16 @@ func TestHandoffDeal(t *testing.T) { Files: []filestore.File{defaultDataFile}, ExpectedOpens: []filestore.Path{defaultPath}, }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealSealing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealSealing, deal.State) }, }, "opening file errors": { dealParams: dealParams{ PiecePath: filestore.Path("missing.txt"), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, fmt.Sprintf("accessing file store: reading piece at path missing.txt: %s", tut.TestErrNotFound.Error()), deal.Message) }, }, @@ -473,8 +425,8 @@ func TestHandoffDeal(t *testing.T) { nodeParams: nodeParams{ OnDealCompleteError: errors.New("failed building sector"), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "handing off deal to node: failed building sector", deal.Message) }, }, @@ -497,19 +449,19 @@ func TestVerifyDealActivated(t *testing.T) { environmentParams environmentParams fileStoreParams tut.TestFileStoreParams pieceStoreParams tut.TestPieceStoreParams - dealInspector func(t *testing.T, deal storagemarket.MinerDeal) + dealInspector func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) }{ "succeeds": { - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealActive, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealActive, deal.State) }, }, "sync error": { nodeParams: nodeParams{ DealCommittedSyncError: errors.New("couldn't check deal commitment"), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "error activating deal: couldn't check deal commitment", deal.Message) }, }, @@ -517,8 +469,8 @@ func TestVerifyDealActivated(t *testing.T) { nodeParams: nodeParams{ DealCommittedAsyncError: errors.New("deal did not appear on chain"), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "error activating deal: deal did not appear on chain", deal.Message) }, }, @@ -541,7 +493,7 @@ func TestRecordPieceInfo(t *testing.T) { environmentParams environmentParams fileStoreParams tut.TestFileStoreParams pieceStoreParams tut.TestPieceStoreParams - dealInspector func(t *testing.T, deal storagemarket.MinerDeal) + dealInspector func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) }{ "succeeds": { dealParams: dealParams{ @@ -551,8 +503,8 @@ func TestRecordPieceInfo(t *testing.T) { Files: []filestore.File{defaultDataFile}, ExpectedDeletions: []filestore.Path{defaultPath}, }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealCompleted, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealCompleted, deal.State) }, }, "succeeds w metadata": { @@ -565,8 +517,8 @@ func TestRecordPieceInfo(t *testing.T) { ExpectedOpens: []filestore.Path{defaultMetadataPath}, ExpectedDeletions: []filestore.Path{defaultMetadataPath, defaultPath}, }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealCompleted, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealCompleted, deal.State) }, }, "locate piece fails": { @@ -576,8 +528,8 @@ func TestRecordPieceInfo(t *testing.T) { nodeParams: nodeParams{ LocatePieceForDealWithinSectorError: errors.New("could not find piece"), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "locating piece for deal ID 1234 in sector: could not find piece", deal.Message) }, }, @@ -585,8 +537,8 @@ func TestRecordPieceInfo(t *testing.T) { dealParams: dealParams{ MetadataPath: filestore.Path("Missing.txt"), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, fmt.Sprintf("error reading piece metadata: %s", tut.TestErrNotFound.Error()), deal.Message) }, }, @@ -594,8 +546,8 @@ func TestRecordPieceInfo(t *testing.T) { pieceStoreParams: tut.TestPieceStoreParams{ AddPieceBlockLocationsError: errors.New("could not add block locations"), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "accessing piece store: adding piece block locations: could not add block locations", deal.Message) }, }, @@ -603,8 +555,8 @@ func TestRecordPieceInfo(t *testing.T) { pieceStoreParams: tut.TestPieceStoreParams{ AddDealForPieceError: errors.New("could not add deal info"), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "accessing piece store: adding deal info for piece: could not add deal info", deal.Message) }, }, @@ -627,11 +579,11 @@ func TestFailDeal(t *testing.T) { environmentParams environmentParams fileStoreParams tut.TestFileStoreParams pieceStoreParams tut.TestPieceStoreParams - dealInspector func(t *testing.T, deal storagemarket.MinerDeal) + dealInspector func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) }{ "succeeds": { - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealError, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) }, }, "succeeds, skips response": { @@ -643,8 +595,8 @@ func TestFailDeal(t *testing.T) { dealParams: dealParams{ ConnectionClosed: true, }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealError, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) // should not have additional error message require.Equal(t, "", deal.Message) }, @@ -658,16 +610,16 @@ func TestFailDeal(t *testing.T) { Files: []filestore.File{defaultDataFile, defaultMetadataFile}, ExpectedDeletions: []filestore.Path{defaultPath, defaultMetadataPath}, }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealError, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) }, }, "SendSignedResponse errors": { environmentParams: environmentParams{ SendSignedResponseError: errors.New("could not send"), }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealError, deal.State) + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "sending response to deal: could not send", deal.Message) }, }, @@ -789,7 +741,7 @@ type executor func(t *testing.T, dealParams dealParams, fileStoreParams tut.TestFileStoreParams, pieceStoreParams tut.TestPieceStoreParams, - dealInspector func(t *testing.T, deal storagemarket.MinerDeal)) + dealInspector func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment)) func makeExecutor(ctx context.Context, eventProcessor fsm.EventProcessor, @@ -801,7 +753,7 @@ func makeExecutor(ctx context.Context, dealParams dealParams, fileStoreParams tut.TestFileStoreParams, pieceStoreParams tut.TestPieceStoreParams, - dealInspector func(t *testing.T, deal storagemarket.MinerDeal)) { + dealInspector func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment)) { smstate := testnodes.NewStorageMarketState() if nodeParams.Height != abi.ChainEpoch(0) { @@ -941,7 +893,8 @@ func makeExecutor(ctx context.Context, err = stateEntryFunc(fsmCtx, environment, *dealState) require.NoError(t, err) fsmCtx.ReplayEvents(t, dealState) - dealInspector(t, *dealState) + dealInspector(t, *dealState, environment) + fs.VerifyExpectations(t) pieceStore.VerifyExpectations(t) environment.VerifyExpectations(t) @@ -1014,3 +967,5 @@ func (fe *fakeEnvironment) PieceStore() piecestore.PieceStore { func (fe *fakeEnvironment) DealAcceptanceBuffer() abi.ChainEpoch { return fe.dealAcceptanceBuffer } + +var _ providerstates.ProviderDealEnvironment = &fakeEnvironment{} diff --git a/storagemarket/impl/providerutils/providerutils.go b/storagemarket/impl/providerutils/providerutils.go index 56df3ab54..8153fbeb4 100644 --- a/storagemarket/impl/providerutils/providerutils.go +++ b/storagemarket/impl/providerutils/providerutils.go @@ -2,17 +2,13 @@ package providerutils import ( "context" - "errors" "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" - datatransfer "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/crypto" "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-car" "github.com/ipld/go-ipld-prime" "golang.org/x/xerrors" @@ -20,15 +16,7 @@ import ( "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/shared" - "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/blockrecorder" - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" -) - -var log = logging.Logger("storagemarket_impl") -var ( - // ErrDataTransferFailed means a data transfer for a deal failed - ErrDataTransferFailed = errors.New("deal data transfer failed") ) // VerifyFunc is a function that can validate a signature for a given address and bytes @@ -79,41 +67,6 @@ func SignMinerData(ctx context.Context, data interface{}, address address.Addres return sig, nil } -// EventReceiver is any thing that can receive FSM events -type EventReceiver interface { - Send(id interface{}, name fsm.EventName, args ...interface{}) (err error) -} - -// DataTransferSubscriber is the function called when an event occurs in a data -// transfer -- it reads the voucher to verify this even occurred in a storage -// market deal, then, based on the data transfer event that occurred, it generates -// and update message for the deal -- either moving to staged for a completion -// event or moving to error if a data transfer error occurs -func DataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber { - return func(event datatransfer.Event, channelState datatransfer.ChannelState) { - voucher, ok := channelState.Voucher().(*requestvalidation.StorageDataTransferVoucher) - // if this event is for a transfer not related to storage, ignore - if !ok { - return - } - - // data transfer events for opening and progress do not affect deal state - switch event.Code { - case datatransfer.Complete: - err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferCompleted) - if err != nil { - log.Errorf("processing dt event: %w", err) - } - case datatransfer.Error: - err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferFailed, ErrDataTransferFailed) - if err != nil { - log.Errorf("processing dt event: %w", err) - } - default: - } - } -} - // CommPGenerator is a commP generating function that writes to a file type CommPGenerator func(abi.RegisteredProof, cid.Cid, ipld.Node, ...car.OnNewCarBlockFunc) (cid.Cid, filestore.Path, abi.UnpaddedPieceSize, error) diff --git a/storagemarket/impl/providerutils/providerutils_test.go b/storagemarket/impl/providerutils/providerutils_test.go index 44ac3d349..ded46f86f 100644 --- a/storagemarket/impl/providerutils/providerutils_test.go +++ b/storagemarket/impl/providerutils/providerutils_test.go @@ -8,8 +8,6 @@ import ( "testing" "github.com/filecoin-project/go-address" - datatransfer "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/crypto" @@ -18,16 +16,13 @@ import ( "github.com/ipld/go-ipld-prime" basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/ipld/go-ipld-prime/traversal/selector/builder" - "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/shared_testutil" - "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/blockrecorder" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils" - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) @@ -121,82 +116,6 @@ func TestSignMinerData(t *testing.T) { } } -func TestDataTransferSubscriber(t *testing.T) { - expectedProposalCID := shared_testutil.GenerateCids(1)[0] - tests := map[string]struct { - code datatransfer.EventCode - called bool - voucher datatransfer.Voucher - expectedID interface{} - expectedEvent fsm.EventName - expectedArgs []interface{} - }{ - "not a storage voucher": { - called: false, - voucher: nil, - }, - "completion event": { - code: datatransfer.Complete, - called: true, - voucher: &requestvalidation.StorageDataTransferVoucher{ - Proposal: expectedProposalCID, - }, - expectedID: expectedProposalCID, - expectedEvent: storagemarket.ProviderEventDataTransferCompleted, - }, - "error event": { - code: datatransfer.Error, - called: true, - voucher: &requestvalidation.StorageDataTransferVoucher{ - Proposal: expectedProposalCID, - }, - expectedID: expectedProposalCID, - expectedEvent: storagemarket.ProviderEventDataTransferFailed, - expectedArgs: []interface{}{providerutils.ErrDataTransferFailed}, - }, - "other event": { - code: datatransfer.Progress, - called: false, - voucher: &requestvalidation.StorageDataTransferVoucher{ - Proposal: expectedProposalCID, - }, - }, - } - for test, data := range tests { - t.Run(test, func(t *testing.T) { - fdg := &fakeDealGroup{} - subscriber := providerutils.DataTransferSubscriber(fdg) - subscriber(datatransfer.Event{Code: data.code}, datatransfer.ChannelState{ - Channel: datatransfer.NewChannel(datatransfer.TransferID(0), cid.Undef, nil, data.voucher, peer.ID(""), peer.ID(""), 0), - }) - if data.called { - require.True(t, fdg.called) - require.Equal(t, fdg.lastID, data.expectedID) - require.Equal(t, fdg.lastEvent, data.expectedEvent) - require.Equal(t, fdg.lastArgs, data.expectedArgs) - } else { - require.False(t, fdg.called) - } - }) - } -} - -type fakeDealGroup struct { - returnedErr error - called bool - lastID interface{} - lastEvent fsm.EventName - lastArgs []interface{} -} - -func (fdg *fakeDealGroup) Send(id interface{}, name fsm.EventName, args ...interface{}) (err error) { - fdg.lastID = id - fdg.lastEvent = name - fdg.lastArgs = args - fdg.called = true - return fdg.returnedErr -} - func TestCommPGenerationWithMetadata(t *testing.T) { tempFilePath := filestore.Path("applesauce.jpg") tempFile := shared_testutil.NewTestFile(shared_testutil.TestFileParams{Path: tempFilePath}) diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index 5b51c9dff..76ba80b30 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -90,13 +90,18 @@ func TestMakeDeal(t *testing.T) { case providerSeenDeal = <-providerDealChan: providerstates = append(providerstates, providerSeenDeal.State) case <-ctx.Done(): - t.Fatalf("never saw all events: %d, %d", clientSeenDeal.State, providerSeenDeal.State) + t.Fatalf("deal incomplete, client deal state: %s (%d), provider deal state: %s (%d)", + storagemarket.DealStates[clientSeenDeal.State], + clientSeenDeal.State, + storagemarket.DealStates[providerSeenDeal.State], + providerSeenDeal.State, + ) } } expProviderStates := []storagemarket.StorageDealStatus{ storagemarket.StorageDealValidating, - storagemarket.StorageDealProposalAccepted, + storagemarket.StorageDealWaitingForData, storagemarket.StorageDealTransferring, storagemarket.StorageDealVerifyData, storagemarket.StorageDealEnsureProviderFunds, @@ -112,6 +117,8 @@ func TestMakeDeal(t *testing.T) { storagemarket.StorageDealEnsureClientFunds, //storagemarket.StorageDealClientFunding, // skipped because funds available storagemarket.StorageDealFundsEnsured, + storagemarket.StorageDealWaitingForDataRequest, + storagemarket.StorageDealTransferring, storagemarket.StorageDealValidating, storagemarket.StorageDealProposalAccepted, storagemarket.StorageDealSealing, @@ -129,14 +136,14 @@ func TestMakeDeal(t *testing.T) { cd, err := h.Client.GetLocalDeal(ctx, proposalCid) assert.NoError(t, err) - assert.Equal(t, storagemarket.StorageDealActive, cd.State) + shared_testutil.AssertDealState(t, storagemarket.StorageDealActive, cd.State) providerDeals, err := h.Provider.ListLocalDeals() assert.NoError(t, err) pd := providerDeals[0] assert.Equal(t, pd.ProposalCid, proposalCid) - assert.Equal(t, storagemarket.StorageDealCompleted, pd.State) + shared_testutil.AssertDealState(t, storagemarket.StorageDealCompleted, pd.State) } func TestMakeDealOffline(t *testing.T) { @@ -166,14 +173,14 @@ func TestMakeDealOffline(t *testing.T) { cd, err := h.Client.GetLocalDeal(ctx, proposalCid) assert.NoError(t, err) - assert.Equal(t, storagemarket.StorageDealValidating, cd.State) + shared_testutil.AssertDealState(t, storagemarket.StorageDealValidating, cd.State) providerDeals, err := h.Provider.ListLocalDeals() assert.NoError(t, err) pd := providerDeals[0] assert.True(t, pd.ProposalCid.Equals(proposalCid)) - assert.Equal(t, storagemarket.StorageDealWaitingForData, pd.State) + shared_testutil.AssertDealState(t, storagemarket.StorageDealWaitingForData, pd.State) err = cario.NewCarIO().WriteCar(ctx, h.TestData.Bs1, h.PayloadCid, shared.AllSelector(), carBuf) require.NoError(t, err) @@ -184,14 +191,14 @@ func TestMakeDealOffline(t *testing.T) { cd, err = h.Client.GetLocalDeal(ctx, proposalCid) assert.NoError(t, err) - assert.Equal(t, storagemarket.StorageDealActive, cd.State) + shared_testutil.AssertDealState(t, storagemarket.StorageDealActive, cd.State) providerDeals, err = h.Provider.ListLocalDeals() assert.NoError(t, err) pd = providerDeals[0] assert.True(t, pd.ProposalCid.Equals(proposalCid)) - assert.Equal(t, storagemarket.StorageDealCompleted, pd.State) + shared_testutil.AssertDealState(t, storagemarket.StorageDealCompleted, pd.State) } func TestMakeDealNonBlocking(t *testing.T) { @@ -213,7 +220,7 @@ func TestMakeDealNonBlocking(t *testing.T) { cd, err := h.Client.GetLocalDeal(ctx, result.ProposalCid) assert.NoError(t, err) - assert.Equal(t, storagemarket.StorageDealValidating, cd.State) + shared_testutil.AssertDealState(t, storagemarket.StorageDealValidating, cd.State) providerDeals, err := h.Provider.ListLocalDeals() assert.NoError(t, err) @@ -221,7 +228,7 @@ func TestMakeDealNonBlocking(t *testing.T) { // Provider should be blocking on waiting for funds to appear on chain pd := providerDeals[0] assert.Equal(t, result.ProposalCid, pd.ProposalCid) - assert.Equal(t, storagemarket.StorageDealProviderFunding, pd.State) + shared_testutil.AssertDealState(t, storagemarket.StorageDealProviderFunding, pd.State) } type harness struct { @@ -283,7 +290,10 @@ func newHarness(t *testing.T, ctx context.Context) *harness { &clientNode, ) require.NoError(t, err) + dt2 := graphsync.NewGraphSyncDataTransfer(td.Host2, td.GraphSync2, td.DTStoredCounter2) + require.NoError(t, dt2.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, &fakeDTValidator{})) + provider, err := storageimpl.NewProvider( network.NewFromLibp2pHost(td.Host2), td.Ds2, diff --git a/storagemarket/types.go b/storagemarket/types.go index 7ce7f23a4..a5874a95e 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -42,45 +42,47 @@ const ( // Internal - StorageDealFundsEnsured // Deposited funds as neccesary to create a deal, ready to move forward - StorageDealValidating // Verifying that deal parameters are good - StorageDealTransferring // Moving data - StorageDealWaitingForData // Manual transfer - StorageDealVerifyData // Verify transferred data - generate CAR / piece data - StorageDealEnsureProviderFunds // Ensuring that provider collateral is sufficient - StorageDealEnsureClientFunds // Ensuring that client funds are sufficient - StorageDealProviderFunding // Waiting for funds to appear in Provider balance - StorageDealClientFunding // Waiting for funds to appear in Client balance - StorageDealPublish // Publishing deal to chain - StorageDealPublishing // Waiting for deal to appear on chain - StorageDealError // deal failed with an unexpected error - StorageDealCompleted // on provider side, indicates deal is active and info for retrieval is recorded + StorageDealFundsEnsured // Deposited funds as neccesary to create a deal, ready to move forward + StorageDealWaitingForDataRequest // Client is waiting for a request for the deal data + StorageDealValidating // Verifying that deal parameters are good + StorageDealTransferring // Moving data + StorageDealWaitingForData // Manual transfer + StorageDealVerifyData // Verify transferred data - generate CAR / piece data + StorageDealEnsureProviderFunds // Ensuring that provider collateral is sufficient + StorageDealEnsureClientFunds // Ensuring that client funds are sufficient + StorageDealProviderFunding // Waiting for funds to appear in Provider balance + StorageDealClientFunding // Waiting for funds to appear in Client balance + StorageDealPublish // Publishing deal to chain + StorageDealPublishing // Waiting for deal to appear on chain + StorageDealError // deal failed with an unexpected error + StorageDealCompleted // on provider side, indicates deal is active and info for retrieval is recorded ) // DealStates maps StorageDealStatus codes to string names var DealStates = map[StorageDealStatus]string{ - StorageDealUnknown: "StorageDealUnknown", - StorageDealProposalNotFound: "StorageDealProposalNotFound", - StorageDealProposalRejected: "StorageDealProposalRejected", - StorageDealProposalAccepted: "StorageDealProposalAccepted", - StorageDealStaged: "StorageDealStaged", - StorageDealSealing: "StorageDealSealing", - StorageDealActive: "StorageDealActive", - StorageDealFailing: "StorageDealFailing", - StorageDealNotFound: "StorageDealNotFound", - StorageDealFundsEnsured: "StorageDealFundsEnsured", - StorageDealValidating: "StorageDealValidating", - StorageDealTransferring: "StorageDealTransferring", - StorageDealWaitingForData: "StorageDealWaitingForData", - StorageDealVerifyData: "StorageDealVerifyData", - StorageDealEnsureProviderFunds: "StorageDealEnsureProviderFunds", - StorageDealEnsureClientFunds: "StorageDealEnsureClientFunds", - StorageDealProviderFunding: "StorageDealProviderFunding", - StorageDealClientFunding: "StorageDealClientFunding", - StorageDealPublish: "StorageDealPublish", - StorageDealPublishing: "StorageDealPublishing", - StorageDealError: "StorageDealError", - StorageDealCompleted: "StorageDealCompleted", + StorageDealUnknown: "StorageDealUnknown", + StorageDealProposalNotFound: "StorageDealProposalNotFound", + StorageDealProposalRejected: "StorageDealProposalRejected", + StorageDealProposalAccepted: "StorageDealProposalAccepted", + StorageDealStaged: "StorageDealStaged", + StorageDealSealing: "StorageDealSealing", + StorageDealActive: "StorageDealActive", + StorageDealFailing: "StorageDealFailing", + StorageDealNotFound: "StorageDealNotFound", + StorageDealFundsEnsured: "StorageDealFundsEnsured", + StorageDealWaitingForDataRequest: "StorageDealWaitingForDataRequest", + StorageDealValidating: "StorageDealValidating", + StorageDealTransferring: "StorageDealTransferring", + StorageDealWaitingForData: "StorageDealWaitingForData", + StorageDealVerifyData: "StorageDealVerifyData", + StorageDealEnsureProviderFunds: "StorageDealEnsureProviderFunds", + StorageDealEnsureClientFunds: "StorageDealEnsureClientFunds", + StorageDealProviderFunding: "StorageDealProviderFunding", + StorageDealClientFunding: "StorageDealClientFunding", + StorageDealPublish: "StorageDealPublish", + StorageDealPublishing: "StorageDealPublishing", + StorageDealError: "StorageDealError", + StorageDealCompleted: "StorageDealCompleted", } func init() { @@ -157,10 +159,6 @@ const ( // ProviderEventDealAccepted happens when a deal is accepted based on provider criteria ProviderEventDealAccepted - // ProviderEventWaitingForManualData happens when an offline deal proposal is accepted, - // meaning the provider must wait until it receives data manually - ProviderEventWaitingForManualData - // ProviderEventInsufficientFunds indicates not enough funds available for a deal ProviderEventInsufficientFunds @@ -173,6 +171,9 @@ const ( // ProviderEventDataTransferFailed happens when an error occurs transferring data ProviderEventDataTransferFailed + // ProviderEventDataRequested happens when a provider requests data from a client + ProviderEventDataRequested + // ProviderEventDataTransferInitiated happens when a data transfer starts ProviderEventDataTransferInitiated @@ -238,11 +239,11 @@ var ProviderEvents = map[ProviderEvent]string{ ProviderEventNodeErrored: "ProviderEventNodeErrored", ProviderEventDealRejected: "ProviderEventDealRejected", ProviderEventDealAccepted: "ProviderEventDealAccepted", - ProviderEventWaitingForManualData: "ProviderEventWaitingForManualData", ProviderEventInsufficientFunds: "ProviderEventInsufficientFunds", ProviderEventFundingInitiated: "ProviderEventFundingInitiated", ProviderEventFunded: "ProviderEventFunded", ProviderEventDataTransferFailed: "ProviderEventDataTransferFailed", + ProviderEventDataRequested: "ProviderEventDataRequested", ProviderEventDataTransferInitiated: "ProviderEventDataTransferInitiated", ProviderEventDataTransferCompleted: "ProviderEventDataTransferCompleted", ProviderEventManualDataReceived: "ProviderEventManualDataReceived", @@ -299,6 +300,15 @@ const ( // ClientEventDealProposed happens when a new proposal is sent to a provider ClientEventDealProposed + // ClientEventDataTransferInitiated happens when piece data transfer has started + ClientEventDataTransferInitiated + + // ClientEventDataTransferComplete happens when piece data transfer has been completed + ClientEventDataTransferComplete + + // ClientEventDataTransferFailed happens the client can't initiate a push data transfer to the provider + ClientEventDataTransferFailed + // ClientEventDealStreamLookupErrored the deal stream for a deal could not be found ClientEventDealStreamLookupErrored @@ -311,6 +321,9 @@ const ( // ClientEventResponseDealDidNotMatch means a response was sent for the wrong deal ClientEventResponseDealDidNotMatch + // ClientEventUnexpectedDealState means a response was sent but the state wasn't what we expected + ClientEventUnexpectedDealState + // ClientEventStreamCloseError happens when an attempt to close a deals stream fails ClientEventStreamCloseError @@ -344,10 +357,14 @@ var ClientEvents = map[ClientEvent]string{ ClientEventFundsEnsured: "ClientEventFundsEnsured", ClientEventWriteProposalFailed: "ClientEventWriteProposalFailed", ClientEventDealProposed: "ClientEventDealProposed", + ClientEventDataTransferInitiated: "ClientEventDataTransferInitiated", + ClientEventDataTransferComplete: "ClientEventDataTransferComplete", + ClientEventDataTransferFailed: "ClientEventDataTransferFailed", ClientEventDealStreamLookupErrored: "ClientEventDealStreamLookupErrored", ClientEventReadResponseFailed: "ClientEventReadResponseFailed", ClientEventResponseVerificationFailed: "ClientEventResponseVerificationFailed", ClientEventResponseDealDidNotMatch: "ClientEventResponseDealDidNotMatch", + ClientEventUnexpectedDealState: "ClientEventUnexpectedDealState", ClientEventStreamCloseError: "ClientEventStreamCloseError", ClientEventDealRejected: "ClientEventDealRejected", ClientEventDealAccepted: "ClientEventDealAccepted",