From af9f828e899d78d4c986c8acb9b722ed55a48b02 Mon Sep 17 00:00:00 2001 From: Ingar Shu Date: Tue, 19 May 2020 13:57:08 -0700 Subject: [PATCH] Integrating changes from master - Remove unused event - Re-re-refactor client states test --- storagemarket/impl/clientstates/client_fsm.go | 6 - .../impl/clientstates/client_states_test.go | 506 ++++++++++-------- storagemarket/types.go | 4 - 3 files changed, 273 insertions(+), 243 deletions(-) diff --git a/storagemarket/impl/clientstates/client_fsm.go b/storagemarket/impl/clientstates/client_fsm.go index 15c24939b..dd6e7a291 100644 --- a/storagemarket/impl/clientstates/client_fsm.go +++ b/storagemarket/impl/clientstates/client_fsm.go @@ -36,12 +36,6 @@ var ClientEvents = fsm.Events{ }), fsm.Event(storagemarket.ClientEventDealProposed). From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealWaitingForDataRequest), - fsm.Event(storagemarket.ClientEventDealStreamLookupErrored). - FromAny().To(storagemarket.StorageDealFailing). - Action(func(deal *storagemarket.ClientDeal, err error) error { - deal.Message = xerrors.Errorf("miner connection error: %w", err).Error() - return nil - }), fsm.Event(storagemarket.ClientEventReadResponseFailed). FromMany(storagemarket.StorageDealWaitingForDataRequest, storagemarket.StorageDealValidating).To(storagemarket.StorageDealError). Action(func(deal *storagemarket.ClientDeal, err error) error { diff --git a/storagemarket/impl/clientstates/client_states_test.go b/storagemarket/impl/clientstates/client_states_test.go index 87a494d0e..f734acf5f 100644 --- a/storagemarket/impl/clientstates/client_states_test.go +++ b/storagemarket/impl/clientstates/client_states_test.go @@ -17,7 +17,7 @@ import ( "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" tut "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" @@ -26,325 +26,337 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket/testnodes" ) -func TestEnsureFunds(t *testing.T) { - ctx := context.Background() - eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) - require.NoError(t, err) - clientDealProposal := tut.MakeTestClientDealProposal() - runEnsureFunds := makeExecutor(ctx, eventProcessor, clientstates.EnsureClientFunds, storagemarket.StorageDealEnsureClientFunds, clientDealProposal) - addFundsCid := tut.GenerateCids(1)[0] +var clientDealProposal = tut.MakeTestClientDealProposal() +func TestEnsureFunds(t *testing.T) { t.Run("immediately succeeds", func(t *testing.T) { - runEnsureFunds(t, nodeParams{}, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealFundsEnsured, deal.State) + runAndInspect(t, clientstates.EnsureClientFunds, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealEnsureClientFunds}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFundsEnsured, deal.State) + }, }) }) - t.Run("succeeds by sending an AddFunds message", func(t *testing.T) { - nodeParams := nodeParams{ - AddFundsCid: addFundsCid, - } - runEnsureFunds(t, nodeParams, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealClientFunding, deal.State) + runAndInspect(t, clientstates.EnsureClientFunds, testCase{ + stateParams: clientDealParams{ + state: storagemarket.StorageDealEnsureClientFunds, + }, + nodeParams: nodeParams{AddFundsCid: tut.GenerateCids(1)[0]}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealClientFunding, deal.State) + }, }) }) - t.Run("EnsureClientFunds fails", func(t *testing.T) { - nodeParams := nodeParams{ - EnsureFundsError: errors.New("Something went wrong"), - } - runEnsureFunds(t, nodeParams, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, "adding market funds failed: Something went wrong", deal.Message) + runAndInspect(t, clientstates.EnsureClientFunds, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealEnsureClientFunds}, + nodeParams: nodeParams{ + EnsureFundsError: errors.New("Something went wrong"), + }, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) + assert.Equal(t, "adding market funds failed: Something went wrong", deal.Message) + }, }) }) } func TestWaitForFunding(t *testing.T) { - ctx := context.Background() - eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) - require.NoError(t, err) - clientDealProposal := tut.MakeTestClientDealProposal() - runEnsureFunds := makeExecutor(ctx, eventProcessor, clientstates.WaitForFunding, storagemarket.StorageDealClientFunding, clientDealProposal) - t.Run("succeeds", func(t *testing.T) { - runEnsureFunds(t, nodeParams{WaitForMessageExitCode: exitcode.Ok}, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealFundsEnsured, deal.State) + runAndInspect(t, clientstates.WaitForFunding, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealClientFunding}, + nodeParams: nodeParams{WaitForMessageExitCode: exitcode.Ok}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFundsEnsured, deal.State) + }, }) }) - t.Run("EnsureClientFunds fails", func(t *testing.T) { - runEnsureFunds(t, nodeParams{WaitForMessageExitCode: exitcode.ErrInsufficientFunds}, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, "adding market funds failed: AddFunds exit code: 19", deal.Message) + runAndInspect(t, clientstates.WaitForFunding, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealClientFunding}, + nodeParams: nodeParams{WaitForMessageExitCode: exitcode.ErrInsufficientFunds}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) + assert.Equal(t, "adding market funds failed: AddFunds exit code: 19", deal.Message) + }, }) }) } func TestProposeDeal(t *testing.T) { - ctx := context.Background() - eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) - require.NoError(t, err) - clientDealProposal := tut.MakeTestClientDealProposal() - runProposeDeal := makeExecutor(ctx, eventProcessor, clientstates.ProposeDeal, storagemarket.StorageDealFundsEnsured, clientDealProposal) - - dealStream := func(writer tut.StorageDealProposalWriter) *tut.TestStorageDealStream { - return tut.NewTestStorageDealStream(tut.TestStorageDealStreamParams{ - ProposalWriter: writer, - }) - } - t.Run("succeeds", func(t *testing.T) { - ds := dealStream(tut.TrivialStorageDealProposalWriter) - runProposeDeal(t, nodeParams{}, envParams{dealStream: ds}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealWaitingForDataRequest, deal.State) - ds.AssertConnectionTagged(t, deal.ProposalCid.String()) + ds := tut.NewTestStorageDealStream(tut.TestStorageDealStreamParams{ + ProposalWriter: tut.TrivialStorageDealProposalWriter, + }) + runAndInspect(t, clientstates.ProposeDeal, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealFundsEnsured}, + envParams: envParams{dealStream: ds}, + nodeParams: nodeParams{WaitForMessageExitCode: exitcode.Ok}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealWaitingForDataRequest, deal.State) + ds.AssertConnectionTagged(t, deal.ProposalCid.String()) + }, }) }) - t.Run("write proposal fails fails", func(t *testing.T) { - runProposeDeal(t, nodeParams{}, envParams{dealStream: dealStream(tut.FailStorageProposalWriter)}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) - require.Equal(t, "sending proposal to storage provider failed: write proposal failed", deal.Message) + ds := tut.NewTestStorageDealStream(tut.TestStorageDealStreamParams{ + ProposalWriter: tut.FailStorageProposalWriter, + }) + runAndInspect(t, clientstates.ProposeDeal, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealFundsEnsured}, + envParams: envParams{dealStream: ds}, + nodeParams: nodeParams{WaitForMessageExitCode: exitcode.Ok}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) + assert.Equal(t, "sending proposal to storage provider failed: write proposal failed", deal.Message) + }, }) }) } func TestWaitingForDataRequest(t *testing.T) { - ctx := context.Background() - eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) - require.NoError(t, err) - clientDealProposal := tut.MakeTestClientDealProposal() - require.NoError(t, err) - runWaitingForDataRequest := makeExecutor(ctx, eventProcessor, clientstates.WaitingForDataRequest, storagemarket.StorageDealWaitingForDataRequest, clientDealProposal) - t.Run("succeeds", func(t *testing.T) { - stream := testResponseStream(t, responseParams{ - proposal: clientDealProposal, - state: storagemarket.StorageDealWaitingForData, - }) - - runWaitingForDataRequest(t, nodeParams{}, envParams{dealStream: stream}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - require.Len(t, env.startDataTransferCalls, 1) - require.Equal(t, env.startDataTransferCalls[0].to, deal.Miner) - require.Equal(t, env.startDataTransferCalls[0].baseCid, deal.DataRef.Root) - - tut.AssertDealState(t, storagemarket.StorageDealTransferring, deal.State) + runAndInspect(t, clientstates.WaitingForDataRequest, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealWaitingForDataRequest}, + envParams: envParams{ + dealStream: testResponseStream(t, responseParams{ + state: storagemarket.StorageDealWaitingForData, + proposal: clientDealProposal, + }), + }, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + assert.Len(t, env.startDataTransferCalls, 1) + assert.Equal(t, env.startDataTransferCalls[0].to, deal.Miner) + assert.Equal(t, env.startDataTransferCalls[0].baseCid, deal.DataRef.Root) + + tut.AssertDealState(t, storagemarket.StorageDealTransferring, deal.State) + }, }) }) t.Run("response contains unexpected state", func(t *testing.T) { - stream := testResponseStream(t, responseParams{ - proposal: clientDealProposal, - state: storagemarket.StorageDealProposalNotFound, - }) - - runWaitingForDataRequest(t, nodeParams{}, envParams{dealStream: stream}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, "unexpected deal status while waiting for data request: 1", deal.Message) + runAndInspect(t, clientstates.WaitingForDataRequest, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealWaitingForDataRequest}, + envParams: envParams{ + dealStream: testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealProposalNotFound, + }), + }, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) + assert.Equal(t, "unexpected deal status while waiting for data request: 1", deal.Message) + }, }) }) - t.Run("read response fails", func(t *testing.T) { - stream := tut.NewTestStorageDealStream(tut.TestStorageDealStreamParams{ - ResponseReader: tut.FailStorageResponseReader, - }) - - runWaitingForDataRequest(t, nodeParams{}, envParams{dealStream: stream}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) - require.Equal(t, "error reading Response message: read response failed", deal.Message) + runAndInspect(t, clientstates.WaitingForDataRequest, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealWaitingForDataRequest}, + envParams: envParams{ + startDataTransferError: errors.New("failed"), + dealStream: testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealWaitingForData, + }), + }, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) + assert.Equal(t, "failed to initiate data transfer: failed to open push data channel: failed", deal.Message) + }, }) }) - - t.Run("fails starting the data transfer request", func(t *testing.T) { - stream := testResponseStream(t, responseParams{ - proposal: clientDealProposal, - state: storagemarket.StorageDealWaitingForData, - }) - envParams := envParams{ - dealStream: stream, - startDataTransferError: errors.New("failed"), - } - - runWaitingForDataRequest(t, nodeParams{}, envParams, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, "failed to initiate data transfer: failed to open push data channel: failed", deal.Message) - }) - }) - t.Run("waits for another response with manual transfers", func(t *testing.T) { - stream := testResponseStream(t, responseParams{ - proposal: clientDealProposal, - state: storagemarket.StorageDealWaitingForData, - }) - envParams := envParams{ - dealStream: stream, - manualTransfer: true, - } - - runWaitingForDataRequest(t, nodeParams{}, envParams, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealValidating, deal.State) + runAndInspect(t, clientstates.WaitingForDataRequest, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealWaitingForDataRequest}, + envParams: envParams{ + dealStream: testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealWaitingForData, + }), + manualTransfer: true, + }, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealValidating, deal.State) + }, }) }) } -func TestVerifyResponse(t *testing.T) { - ctx := context.Background() - eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) - require.NoError(t, err) - clientDealProposal := tut.MakeTestClientDealProposal() - require.NoError(t, err) - runVerifyResponse := makeExecutor(ctx, eventProcessor, clientstates.VerifyDealResponse, storagemarket.StorageDealValidating, clientDealProposal) - +func TestVerifyDealResponse(t *testing.T) { t.Run("succeeds", func(t *testing.T) { publishMessage := &(tut.GenerateCids(1)[0]) - stream := testResponseStream(t, responseParams{ - proposal: clientDealProposal, - state: storagemarket.StorageDealProposalAccepted, - publishMessage: publishMessage, - }) - runVerifyResponse(t, nodeParams{}, envParams{dealStream: stream}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealProposalAccepted, deal.State) - require.Equal(t, publishMessage, deal.PublishMessage) + runAndInspect(t, clientstates.VerifyDealResponse, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealValidating}, + envParams: envParams{ + dealStream: testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealProposalAccepted, + publishMessage: publishMessage, + }), + }, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealProposalAccepted, deal.State) + assert.Equal(t, publishMessage, deal.PublishMessage) + }, }) }) - t.Run("read response fails", func(t *testing.T) { - stream := tut.NewTestStorageDealStream(tut.TestStorageDealStreamParams{ - ResponseReader: tut.FailStorageResponseReader, - }) - - runVerifyResponse(t, nodeParams{}, envParams{dealStream: stream}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) - require.Equal(t, "error reading Response message: read response failed", deal.Message) + runAndInspect(t, clientstates.VerifyDealResponse, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealValidating}, + envParams: envParams{dealStream: tut.NewTestStorageDealStream(tut.TestStorageDealStreamParams{ + ResponseReader: tut.FailStorageResponseReader, + })}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) + assert.Equal(t, "error reading Response message: read response failed", deal.Message) + }, }) }) - t.Run("verify response fails", func(t *testing.T) { - stream := testResponseStream(t, responseParams{ - proposal: clientDealProposal, - state: storagemarket.StorageDealProposalAccepted, - }) - - runVerifyResponse(t, nodeParams{VerifySignatureFails: true}, envParams{dealStream: stream}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, "unable to verify signature on deal response", deal.Message) + runAndInspect(t, clientstates.VerifyDealResponse, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealValidating}, + nodeParams: nodeParams{VerifySignatureFails: true}, + envParams: envParams{dealStream: testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealProposalAccepted, + })}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) + assert.Equal(t, "unable to verify signature on deal response", deal.Message) + }, }) }) - t.Run("incorrect proposal cid", func(t *testing.T) { - stream := testResponseStream(t, responseParams{ - proposal: clientDealProposal, - state: storagemarket.StorageDealProposalAccepted, - proposalCid: tut.GenerateCids(1)[0], - }) - - runVerifyResponse(t, nodeParams{}, envParams{dealStream: stream}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) - require.Regexp(t, "^miner responded to a wrong proposal:", deal.Message) + runAndInspect(t, clientstates.VerifyDealResponse, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealValidating}, + envParams: envParams{dealStream: testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealProposalAccepted, + proposalCid: tut.GenerateCids(1)[0], + })}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) + assert.Regexp(t, "^miner responded to a wrong proposal:", deal.Message) + }, }) }) - t.Run("deal rejected", func(t *testing.T) { - stream := testResponseStream(t, responseParams{ - proposal: clientDealProposal, - state: storagemarket.StorageDealProposalRejected, - message: "because reasons", - }) - - expErr := fmt.Sprintf("deal failed: (State=%d) because reasons", storagemarket.StorageDealProposalRejected) - runVerifyResponse(t, nodeParams{}, envParams{dealStream: stream}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, deal.Message, expErr) + runAndInspect(t, clientstates.VerifyDealResponse, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealValidating}, + envParams: envParams{dealStream: testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealProposalRejected, + message: "because reasons", + })}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + expErr := fmt.Sprintf("deal failed: (State=%d) because reasons", storagemarket.StorageDealProposalRejected) + + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) + assert.Equal(t, deal.Message, expErr) + }, }) }) - t.Run("deal stream close errors", func(t *testing.T) { - stream := testResponseStream(t, responseParams{ - proposal: clientDealProposal, - state: storagemarket.StorageDealProposalAccepted, - }) - closeStreamErr := errors.New("something went wrong") - - runVerifyResponse(t, nodeParams{}, envParams{dealStream: stream, closeStreamErr: closeStreamErr}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) - require.Equal(t, "error attempting to close stream: something went wrong", deal.Message) + runAndInspect(t, clientstates.VerifyDealResponse, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealValidating}, + envParams: envParams{dealStream: testResponseStream(t, responseParams{ + proposal: clientDealProposal, + state: storagemarket.StorageDealProposalAccepted, + }), closeStreamErr: errors.New("something went wrong")}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) + assert.Equal(t, "error attempting to close stream: something went wrong", deal.Message) + }, }) }) } func TestValidateDealPublished(t *testing.T) { - ctx := context.Background() - eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) - require.NoError(t, err) - clientDealProposal := tut.MakeTestClientDealProposal() - runValidateDealPublished := makeExecutor(ctx, eventProcessor, clientstates.ValidateDealPublished, storagemarket.StorageDealProposalAccepted, clientDealProposal) - t.Run("succeeds", func(t *testing.T) { - runValidateDealPublished(t, nodeParams{ValidatePublishedDealID: abi.DealID(5)}, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealSealing, deal.State) - require.Equal(t, abi.DealID(5), deal.DealID) + runAndInspect(t, clientstates.ValidateDealPublished, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealProposalAccepted}, + nodeParams: nodeParams{ValidatePublishedDealID: abi.DealID(5)}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealSealing, deal.State) + assert.Equal(t, abi.DealID(5), deal.DealID) + }, }) }) - t.Run("fails", func(t *testing.T) { - nodeParams := nodeParams{ - ValidatePublishedDealID: abi.DealID(5), - ValidatePublishedError: errors.New("Something went wrong"), - } - runValidateDealPublished(t, nodeParams, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) - require.Equal(t, "error validating deal published: Something went wrong", deal.Message) + runAndInspect(t, clientstates.ValidateDealPublished, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealProposalAccepted}, + nodeParams: nodeParams{ + ValidatePublishedDealID: abi.DealID(5), + ValidatePublishedError: errors.New("Something went wrong"), + }, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) + assert.Equal(t, "error validating deal published: Something went wrong", deal.Message) + }, }) }) } func TestVerifyDealActivated(t *testing.T) { - ctx := context.Background() - eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) - require.NoError(t, err) - clientDealProposal := tut.MakeTestClientDealProposal() - runVerifyDealActivated := makeExecutor(ctx, eventProcessor, clientstates.VerifyDealActivated, storagemarket.StorageDealSealing, clientDealProposal) - t.Run("succeeds", func(t *testing.T) { - runVerifyDealActivated(t, nodeParams{}, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealActive, deal.State) + runAndInspect(t, clientstates.VerifyDealActivated, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealSealing}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealActive, deal.State) + }, }) }) - t.Run("fails synchronously", func(t *testing.T) { - runVerifyDealActivated(t, nodeParams{DealCommittedSyncError: errors.New("Something went wrong")}, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) - require.Equal(t, "error in deal activation: Something went wrong", deal.Message) + runAndInspect(t, clientstates.VerifyDealActivated, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealSealing}, + + nodeParams: nodeParams{DealCommittedSyncError: errors.New("Something went wrong")}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) + assert.Equal(t, "error in deal activation: Something went wrong", deal.Message) + }, }) }) - t.Run("fails asynchronously", func(t *testing.T) { - runVerifyDealActivated(t, nodeParams{DealCommittedAsyncError: errors.New("Something went wrong later")}, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) - require.Equal(t, "error in deal activation: Something went wrong later", deal.Message) + runAndInspect(t, clientstates.VerifyDealActivated, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealSealing}, + nodeParams: nodeParams{DealCommittedAsyncError: errors.New("Something went wrong later")}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) + assert.Equal(t, "error in deal activation: Something went wrong later", deal.Message) + }, }) }) } func TestFailDeal(t *testing.T) { - ctx := context.Background() - eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) - require.NoError(t, err) - clientDealProposal := tut.MakeTestClientDealProposal() - runFailDeal := makeExecutor(ctx, eventProcessor, clientstates.FailDeal, storagemarket.StorageDealFailing, clientDealProposal) - - t.Run("able to close stream", func(t *testing.T) { - runFailDeal(t, nodeParams{}, envParams{}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) + t.Run("closes an open stream", func(t *testing.T) { + runAndInspect(t, clientstates.FailDeal, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealFailing, connectionClosed: false}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + assert.Equal(t, storagemarket.StorageDealError, deal.State) + }, }) }) - - t.Run("unable to close stream", func(t *testing.T) { - runFailDeal(t, nodeParams{}, envParams{closeStreamErr: errors.New("unable to close")}, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) - require.Equal(t, "error attempting to close stream: unable to close", deal.Message) + t.Run("unable to close an the open stream", func(t *testing.T) { + runAndInspect(t, clientstates.FailDeal, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealFailing, connectionClosed: false}, + envParams: envParams{closeStreamErr: errors.New("unable to close")}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + assert.Equal(t, storagemarket.StorageDealError, deal.State) + assert.Equal(t, "error attempting to close stream: unable to close", deal.Message) + }, + }) + }) + t.Run("doesn't try to close a closed stream", func(t *testing.T) { + runAndInspect(t, clientstates.FailDeal, testCase{ + stateParams: clientDealParams{state: storagemarket.StorageDealFailing, connectionClosed: true}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + assert.Len(t, env.closeStreamCalls, 0) + assert.Equal(t, storagemarket.StorageDealError, deal.State) + }, }) }) } @@ -356,6 +368,12 @@ type envParams struct { manualTransfer bool } +type clientDealParams struct { + state storagemarket.StorageDealStatus + connectionClosed bool + addFundsCid *cid.Cid +} + type executor func(t *testing.T, nodeParams nodeParams, envParams envParams, @@ -364,16 +382,21 @@ type executor func(t *testing.T, func makeExecutor(ctx context.Context, eventProcessor fsm.EventProcessor, stateEntryFunc clientstates.ClientStateEntryFunc, - initialState storagemarket.StorageDealStatus, + dealParams clientDealParams, clientDealProposal *market.ClientDealProposal) executor { return func(t *testing.T, nodeParams nodeParams, envParams envParams, dealInspector func(deal storagemarket.ClientDeal, env *fakeEnvironment)) { node := makeNode(nodeParams) - dealState, err := tut.MakeTestClientDeal(initialState, clientDealProposal, envParams.manualTransfer) - require.NoError(t, err) + dealState, err := tut.MakeTestClientDeal(dealParams.state, clientDealProposal, envParams.manualTransfer) + assert.NoError(t, err) dealState.AddFundsCid = &tut.GenerateCids(1)[0] + dealState.ConnectionClosed = dealParams.connectionClosed + + if dealParams.addFundsCid != nil { + dealState.AddFundsCid = dealParams.addFundsCid + } environment := &fakeEnvironment{ node: node, @@ -383,7 +406,7 @@ func makeExecutor(ctx context.Context, } fsmCtx := fsmtest.NewTestContext(ctx, eventProcessor) err = stateEntryFunc(fsmCtx, environment, *dealState) - require.NoError(t, err) + assert.NoError(t, err) fsmCtx.ReplayEvents(t, dealState) dealInspector(*dealState, environment) } @@ -432,6 +455,7 @@ type fakeEnvironment struct { node storagemarket.StorageClientNode dealStream smnet.StorageDealStream closeStreamErr error + closeStreamCalls []cid.Cid startDataTransferError error startDataTransferCalls []dataTransferParams } @@ -471,6 +495,7 @@ func (fe *fakeEnvironment) TagConnection(proposalCid cid.Cid) error { } func (fe *fakeEnvironment) CloseStream(proposalCid cid.Cid) error { + fe.closeStreamCalls = append(fe.closeStreamCalls, proposalCid) return fe.closeStreamErr } @@ -494,7 +519,7 @@ func testResponseStream(t *testing.T, params responseParams) smnet.StorageDealSt if response.Proposal == cid.Undef { proposalNd, err := cborutil.AsIpld(params.proposal) - require.NoError(t, err) + assert.NoError(t, err) response.Proposal = proposalNd.Cid() } @@ -507,3 +532,18 @@ func testResponseStream(t *testing.T, params responseParams) smnet.StorageDealSt ResponseReader: reader, }) } + +type testCase struct { + envParams envParams + nodeParams nodeParams + stateParams clientDealParams + inspector func(deal storagemarket.ClientDeal, env *fakeEnvironment) +} + +func runAndInspect(t *testing.T, stateFunc clientstates.ClientStateEntryFunc, tc testCase) { + ctx := context.Background() + eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) + assert.NoError(t, err) + executor := makeExecutor(ctx, eventProcessor, stateFunc, tc.stateParams, clientDealProposal) + executor(t, tc.nodeParams, tc.envParams, tc.inspector) +} diff --git a/storagemarket/types.go b/storagemarket/types.go index a5874a95e..2c029c306 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -309,9 +309,6 @@ const ( // ClientEventDataTransferFailed happens the client can't initiate a push data transfer to the provider ClientEventDataTransferFailed - // ClientEventDealStreamLookupErrored the deal stream for a deal could not be found - ClientEventDealStreamLookupErrored - // ClientEventReadResponseFailed means a network error occurred reading a deal response ClientEventReadResponseFailed @@ -360,7 +357,6 @@ var ClientEvents = map[ClientEvent]string{ ClientEventDataTransferInitiated: "ClientEventDataTransferInitiated", ClientEventDataTransferComplete: "ClientEventDataTransferComplete", ClientEventDataTransferFailed: "ClientEventDataTransferFailed", - ClientEventDealStreamLookupErrored: "ClientEventDealStreamLookupErrored", ClientEventReadResponseFailed: "ClientEventReadResponseFailed", ClientEventResponseVerificationFailed: "ClientEventResponseVerificationFailed", ClientEventResponseDealDidNotMatch: "ClientEventResponseDealDidNotMatch",