From 685b0a2205866d558b7339999ffb779e15fb3972 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 8 May 2020 18:29:46 -0700 Subject: [PATCH] feat(storagemarket): revert protocol changes revert handling of incoming streams on client side -- we need more thought into protocol redesign --- shared_testutil/test_network_types.go | 52 ++- shared_testutil/test_types.go | 21 +- storagemarket/impl/client.go | 42 +- storagemarket/impl/clientstates/client_fsm.go | 16 +- .../impl/clientstates/client_states.go | 23 +- .../impl/clientstates/client_states_test.go | 92 ++-- storagemarket/impl/clientutils/clientutils.go | 3 +- .../impl/clientutils/clientutils_test.go | 7 +- storagemarket/impl/provider.go | 28 +- .../impl/providerstates/provider_states.go | 16 +- .../providerstates/provider_states_test.go | 9 +- .../impl/providerutils/providerutils_test.go | 3 +- storagemarket/impl/requestvalidation/types.go | 2 +- storagemarket/integration_test.go | 5 +- storagemarket/network/ask_stream.go | 17 +- storagemarket/network/deal_stream.go | 19 +- storagemarket/network/libp2p_impl_test.go | 33 +- storagemarket/network/network.go | 17 +- storagemarket/network/types.go | 58 +++ storagemarket/network/types_cbor_gen.go | 390 +++++++++++++++++ storagemarket/types.go | 58 +-- storagemarket/types_cbor_gen.go | 404 +----------------- 22 files changed, 701 insertions(+), 614 deletions(-) create mode 100644 storagemarket/network/types.go create mode 100644 storagemarket/network/types_cbor_gen.go diff --git a/shared_testutil/test_network_types.go b/shared_testutil/test_network_types.go index bb7e8321..fb4ef05c 100644 --- a/shared_testutil/test_network_types.go +++ b/shared_testutil/test_network_types.go @@ -10,7 +10,6 @@ import ( rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" - "github.com/filecoin-project/go-fil-markets/storagemarket" smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) @@ -506,16 +505,16 @@ func StubbedDealPaymentReader(payment rm.DealPayment) DealPaymentReader { } // StorageDealProposalReader is a function to mock reading deal proposals. -type StorageDealProposalReader func() (storagemarket.ProposalRequest, error) +type StorageDealProposalReader func() (smnet.Proposal, error) // StorageDealResponseReader is a function to mock reading deal responses. -type StorageDealResponseReader func() (storagemarket.SignedResponse, error) +type StorageDealResponseReader func() (smnet.SignedResponse, error) // StorageDealResponseWriter is a function to mock writing deal responses. -type StorageDealResponseWriter func(storagemarket.SignedResponse) error +type StorageDealResponseWriter func(smnet.SignedResponse) error // StorageDealProposalWriter is a function to mock writing deal proposals. -type StorageDealProposalWriter func(storagemarket.ProposalRequest) error +type StorageDealProposalWriter func(smnet.Proposal) error // TestStorageDealStream is a retrieval deal stream with predefined // stubbed behavior. @@ -563,22 +562,22 @@ func NewTestStorageDealStream(params TestStorageDealStreamParams) smnet.StorageD } // ReadDealProposal calls the mocked deal proposal reader function. -func (tsds *TestStorageDealStream) ReadDealProposal() (storagemarket.ProposalRequest, error) { +func (tsds *TestStorageDealStream) ReadDealProposal() (smnet.Proposal, error) { return tsds.proposalReader() } // WriteDealProposal calls the mocked deal proposal writer function. -func (tsds *TestStorageDealStream) WriteDealProposal(dealProposal storagemarket.ProposalRequest) error { +func (tsds *TestStorageDealStream) WriteDealProposal(dealProposal smnet.Proposal) error { return tsds.proposalWriter(dealProposal) } // ReadDealResponse calls the mocked deal response reader function. -func (tsds *TestStorageDealStream) ReadDealResponse() (storagemarket.SignedResponse, error) { +func (tsds *TestStorageDealStream) ReadDealResponse() (smnet.SignedResponse, error) { return tsds.responseReader() } // WriteDealResponse calls the mocked deal response writer function. -func (tsds *TestStorageDealStream) WriteDealResponse(dealResponse storagemarket.SignedResponse) error { +func (tsds *TestStorageDealStream) WriteDealResponse(dealResponse smnet.SignedResponse) error { return tsds.responseWriter(dealResponse) } @@ -589,57 +588,57 @@ func (tsds TestStorageDealStream) RemotePeer() peer.ID { return tsds.p } func (tsds TestStorageDealStream) Close() error { return nil } // TrivialStorageDealProposalReader succeeds trivially, returning an empty proposal. -func TrivialStorageDealProposalReader() (storagemarket.ProposalRequest, error) { - return storagemarket.ProposalRequest{}, nil +func TrivialStorageDealProposalReader() (smnet.Proposal, error) { + return smnet.Proposal{}, nil } // TrivialStorageDealResponseReader succeeds trivially, returning an empty deal response. -func TrivialStorageDealResponseReader() (storagemarket.SignedResponse, error) { - return storagemarket.SignedResponse{}, nil +func TrivialStorageDealResponseReader() (smnet.SignedResponse, error) { + return smnet.SignedResponse{}, nil } // TrivialStorageDealProposalWriter succeeds trivially, returning no error. -func TrivialStorageDealProposalWriter(storagemarket.ProposalRequest) error { +func TrivialStorageDealProposalWriter(smnet.Proposal) error { return nil } // TrivialStorageDealResponseWriter succeeds trivially, returning no error. -func TrivialStorageDealResponseWriter(storagemarket.SignedResponse) error { +func TrivialStorageDealResponseWriter(smnet.SignedResponse) error { return nil } // StubbedStorageProposalReader returns the given proposal when called -func StubbedStorageProposalReader(proposal storagemarket.ProposalRequest) StorageDealProposalReader { - return func() (storagemarket.ProposalRequest, error) { +func StubbedStorageProposalReader(proposal smnet.Proposal) StorageDealProposalReader { + return func() (smnet.Proposal, error) { return proposal, nil } } // StubbedStorageResponseReader returns the given deal response when called -func StubbedStorageResponseReader(response storagemarket.SignedResponse) StorageDealResponseReader { - return func() (storagemarket.SignedResponse, error) { +func StubbedStorageResponseReader(response smnet.SignedResponse) StorageDealResponseReader { + return func() (smnet.SignedResponse, error) { return response, nil } } // FailStorageProposalWriter always fails -func FailStorageProposalWriter(storagemarket.ProposalRequest) error { +func FailStorageProposalWriter(smnet.Proposal) error { return errors.New("write proposal failed") } // FailStorageProposalReader always fails -func FailStorageProposalReader() (storagemarket.ProposalRequest, error) { - return storagemarket.ProposalRequestUndefined, errors.New("read proposal failed") +func FailStorageProposalReader() (smnet.Proposal, error) { + return smnet.ProposalUndefined, errors.New("read proposal failed") } // FailStorageResponseWriter always fails -func FailStorageResponseWriter(storagemarket.SignedResponse) error { +func FailStorageResponseWriter(smnet.SignedResponse) error { return errors.New("write proposal failed") } // FailStorageResponseReader always fails -func FailStorageResponseReader() (storagemarket.SignedResponse, error) { - return storagemarket.SignedResponseUndefined, errors.New("read response failed") +func FailStorageResponseReader() (smnet.SignedResponse, error) { + return smnet.SignedResponseUndefined, errors.New("read response failed") } // TestPeerResolver provides a fake retrievalmarket PeerResolver @@ -651,5 +650,4 @@ type TestPeerResolver struct { func (tpr TestPeerResolver) GetPeers(cid.Cid) ([]rm.RetrievalPeer, error) { return tpr.Peers, tpr.ResolverError } - -var _ rm.PeerResolver = &TestPeerResolver{} +var _ rm.PeerResolver = &TestPeerResolver{} \ No newline at end of file diff --git a/shared_testutil/test_types.go b/shared_testutil/test_types.go index a2fd064b..fb10335e 100644 --- a/shared_testutil/test_types.go +++ b/shared_testutil/test_types.go @@ -16,6 +16,7 @@ import ( "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/storagemarket" + smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) // MakeTestSignedVoucher generates a random SignedVoucher that has all non-zero fields @@ -212,8 +213,8 @@ func MakeTestSignedStorageAsk() *storagemarket.SignedStorageAsk { // MakeTestStorageNetworkProposal generates a proposal that can be sent over the // network to a provider -func MakeTestStorageNetworkProposal() storagemarket.ProposalRequest { - return storagemarket.ProposalRequest{ +func MakeTestStorageNetworkProposal() smnet.Proposal { + return smnet.Proposal{ DealProposal: MakeTestClientDealProposal(), Piece: &storagemarket.DataRef{Root: GenerateCids(1)[0]}, } @@ -221,8 +222,8 @@ func MakeTestStorageNetworkProposal() storagemarket.ProposalRequest { // MakeTestStorageNetworkResponse generates a response to a proposal sent over // the network -func MakeTestStorageNetworkResponse() storagemarket.ProposalResponse { - return storagemarket.ProposalResponse{ +func MakeTestStorageNetworkResponse() smnet.Response { + return smnet.Response{ State: storagemarket.StorageDealSealing, Proposal: GenerateCids(1)[0], PublishMessage: &(GenerateCids(1)[0]), @@ -231,23 +232,23 @@ func MakeTestStorageNetworkResponse() storagemarket.ProposalResponse { // MakeTestStorageNetworkSignedResponse generates a response to a proposal sent over // the network that is signed -func MakeTestStorageNetworkSignedResponse() storagemarket.SignedResponse { - return storagemarket.SignedResponse{ +func MakeTestStorageNetworkSignedResponse() smnet.SignedResponse { + return smnet.SignedResponse{ Response: MakeTestStorageNetworkResponse(), Signature: MakeTestSignature(), } } // MakeTestStorageAskRequest generates a request to get a provider's ask -func MakeTestStorageAskRequest() storagemarket.AskRequest { - return storagemarket.AskRequest{ +func MakeTestStorageAskRequest() smnet.AskRequest { + return smnet.AskRequest{ Miner: address.TestAddress2, } } // MakeTestStorageAskResponse generates a response to an ask request -func MakeTestStorageAskResponse() storagemarket.AskResponse { - return storagemarket.AskResponse{ +func MakeTestStorageAskResponse() smnet.AskResponse { + return smnet.AskResponse{ Ask: MakeTestSignedStorageAsk(), } } diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index b568f44e..d0c04ddc 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -93,7 +93,6 @@ func NewClient( } func (c *Client) Run(ctx context.Context) { - _ = c.net.SetDelegate(c) } func (c *Client) Stop() { @@ -158,7 +157,7 @@ func (c *Client) GetAsk(ctx context.Context, info storagemarket.StorageProviderI return nil, xerrors.Errorf("failed to open stream to miner: %w", err) } - request := storagemarket.AskRequest{Miner: info.Address} + request := network.AskRequest{Miner: info.Address} if err := s.WriteAskRequest(request); err != nil { return nil, xerrors.Errorf("failed to send ask request: %w", err) } @@ -301,25 +300,6 @@ func (c *Client) SubscribeToEvents(subscriber storagemarket.ClientSubscriber) sh return shared.Unsubscribe(c.pubSub.Subscribe(subscriber)) } -func (c *Client) HandleAskStream(s network.StorageAskStream) { - s.Close() -} - -func (c *Client) HandleDealStream(s network.StorageDealStream) { - defer s.Close() - log.Info("Handling storage deal proposal!") - - response, err := s.ReadDealResponse() - if err != nil { - log.Errorf("%+v", err) - return - } - err = c.statemachines.Send(response.Response.Proposal, storagemarket.ClientEventReceiveResponse, response) - if err != nil { - log.Errorf("%+v", err) - } -} - func (c *Client) dispatch(eventName fsm.EventName, deal fsm.StateType) { evt, ok := eventName.(storagemarket.ClientEvent) if !ok { @@ -367,11 +347,29 @@ func (c *clientDealEnvironment) Node() storagemarket.StorageClientNode { return c.c.node } -func (c *clientDealEnvironment) WriteDealProposal(p peer.ID, proposal storagemarket.ProposalRequest) error { +func (c *clientDealEnvironment) WriteDealProposal(p peer.ID, proposalCid cid.Cid, proposal network.Proposal) error { s, err := c.c.net.NewDealStream(p) if err != nil { return err } + err = c.c.conns.AddStream(proposalCid, s) + if err != nil { + return err + } err = s.WriteDealProposal(proposal) return err } + +func (c *clientDealEnvironment) ReadDealResponse(proposalCid cid.Cid) (network.SignedResponse, error) { + s, err := c.c.conns.DealStream(proposalCid) + if err != nil { + return network.SignedResponseUndefined, err + } + return s.ReadDealResponse() +} + +func (c *clientDealEnvironment) CloseStream(proposalCid cid.Cid) error { + return c.c.conns.Disconnect(proposalCid) +} + +var _ clientstates.ClientDealEnvironment = &clientDealEnvironment{} diff --git a/storagemarket/impl/clientstates/client_fsm.go b/storagemarket/impl/clientstates/client_fsm.go index 47ad2738..8a7642fe 100644 --- a/storagemarket/impl/clientstates/client_fsm.go +++ b/storagemarket/impl/clientstates/client_fsm.go @@ -34,11 +34,17 @@ var ClientEvents = fsm.Events{ return nil }), fsm.Event(storagemarket.ClientEventDealProposed). - From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealWaitingForResponse), - fsm.Event(storagemarket.ClientEventReceiveResponse). - From(storagemarket.StorageDealWaitingForResponse).To(storagemarket.StorageDealValidating). - Action(func(deal *storagemarket.ClientDeal, response storagemarket.SignedResponse) error { - deal.LastResponse = &response + From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealValidating), + fsm.Event(storagemarket.ClientEventDealStreamLookupErrored). + FromAny().To(storagemarket.StorageDealFailing). + Action(func(deal *storagemarket.ClientDeal, err error) error { + deal.Message = xerrors.Errorf("miner connection error: %w", err).Error() + return nil + }), + fsm.Event(storagemarket.ClientEventReadResponseFailed). + From(storagemarket.StorageDealValidating).To(storagemarket.StorageDealError). + Action(func(deal *storagemarket.ClientDeal, err error) error { + deal.Message = xerrors.Errorf("error reading Response message: %w", err).Error() return nil }), fsm.Event(storagemarket.ClientEventResponseVerificationFailed). diff --git a/storagemarket/impl/clientstates/client_states.go b/storagemarket/impl/clientstates/client_states.go index a568b5b7..52111ee5 100644 --- a/storagemarket/impl/clientstates/client_states.go +++ b/storagemarket/impl/clientstates/client_states.go @@ -10,6 +10,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) var log = logging.Logger("storagemarket_impl") @@ -18,7 +19,9 @@ var log = logging.Logger("storagemarket_impl") // dependencies from the storage client environment type ClientDealEnvironment interface { Node() storagemarket.StorageClientNode - WriteDealProposal(p peer.ID, proposal storagemarket.ProposalRequest) error + WriteDealProposal(p peer.ID, proposalCid cid.Cid, proposal network.Proposal) error + ReadDealResponse(proposalCid cid.Cid) (network.SignedResponse, error) + CloseStream(proposalCid cid.Cid) error } // ClientStateEntryFunc is the type for all state entry functions on a storage client @@ -66,8 +69,8 @@ func WaitForFunding(ctx fsm.Context, environment ClientDealEnvironment, deal sto // ProposeDeal sends the deal proposal to the provider func ProposeDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { - proposal := storagemarket.ProposalRequest{DealProposal: &deal.ClientDealProposal, Piece: deal.DataRef} - if err := environment.WriteDealProposal(deal.Miner, proposal); err != nil { + proposal := network.Proposal{DealProposal: &deal.ClientDealProposal, Piece: deal.DataRef} + if err := environment.WriteDealProposal(deal.Miner, deal.ProposalCid, proposal); err != nil { return ctx.Trigger(storagemarket.ClientEventWriteProposalFailed, err) } @@ -77,7 +80,11 @@ func ProposeDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storag // VerifyDealResponse reads and verifies the response from the provider to the proposed deal func VerifyDealResponse(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { - resp := *deal.LastResponse + resp, err := environment.ReadDealResponse(deal.ProposalCid) + if err != nil { + return ctx.Trigger(storagemarket.ClientEventReadResponseFailed, err) + } + tok, _, err := environment.Node().GetChainHead(ctx.Context()) if err != nil { return ctx.Trigger(storagemarket.ClientEventResponseVerificationFailed) @@ -95,6 +102,10 @@ func VerifyDealResponse(ctx fsm.Context, environment ClientDealEnvironment, deal return ctx.Trigger(storagemarket.ClientEventDealRejected, resp.Response.State, resp.Response.Message) } + if err := environment.CloseStream(deal.ProposalCid); err != nil { + return ctx.Trigger(storagemarket.ClientEventStreamCloseError, err) + } + return ctx.Trigger(storagemarket.ClientEventDealAccepted, resp.Response.PublishMessage) } @@ -129,6 +140,10 @@ func VerifyDealActivated(ctx fsm.Context, environment ClientDealEnvironment, dea // FailDeal cleans up a failing deal func FailDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { + if err := environment.CloseStream(deal.ProposalCid); err != nil { + return ctx.Trigger(storagemarket.ClientEventStreamCloseError, err) + } + // TODO: store in some sort of audit log log.Errorf("deal %s failed: %s", deal.ProposalCid, deal.Message) diff --git a/storagemarket/impl/clientstates/client_states_test.go b/storagemarket/impl/clientstates/client_states_test.go index 11403ae0..96731627 100644 --- a/storagemarket/impl/clientstates/client_states_test.go +++ b/storagemarket/impl/clientstates/client_states_test.go @@ -94,7 +94,7 @@ func TestProposeDeal(t *testing.T) { t.Run("succeeds", func(t *testing.T) { runProposeDeal(t, makeNode(nodeParams{}), dealStream(tut.TrivialStorageDealProposalWriter), nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, storagemarket.StorageDealWaitingForResponse, deal.State) + require.Equal(t, storagemarket.StorageDealValidating, deal.State) }) }) @@ -117,69 +117,98 @@ func TestVerifyResponse(t *testing.T) { publishMessage := &(tut.GenerateCids(1)[0]) + dealStream := func(reader tut.StorageDealResponseReader) smnet.StorageDealStream { + return tut.NewTestStorageDealStream(tut.TestStorageDealStreamParams{ + ResponseReader: reader, + }) + } + t.Run("succeeds", func(t *testing.T) { - response := storagemarket.SignedResponse{ - Response: storagemarket.ProposalResponse{ + stream := dealStream(tut.StubbedStorageResponseReader(smnet.SignedResponse{ + Response: smnet.Response{ State: storagemarket.StorageDealProposalAccepted, Proposal: proposalNd.Cid(), PublishMessage: publishMessage, }, Signature: tut.MakeTestSignature(), - } - runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), nil, &response, func(deal storagemarket.ClientDeal) { + })) + runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), stream, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealProposalAccepted, deal.State) require.Equal(t, publishMessage, deal.PublishMessage) }) }) + t.Run("read response fails", func(t *testing.T) { + runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), dealStream(tut.FailStorageResponseReader), nil, func(deal storagemarket.ClientDeal) { + require.Equal(t, storagemarket.StorageDealError, deal.State) + require.Equal(t, "error reading Response message: read response failed", deal.Message) + }) + }) + t.Run("verify response fails", func(t *testing.T) { - response := storagemarket.SignedResponse{ - Response: storagemarket.ProposalResponse{ + stream := dealStream(tut.StubbedStorageResponseReader(smnet.SignedResponse{ + Response: smnet.Response{ State: storagemarket.StorageDealProposalAccepted, Proposal: proposalNd.Cid(), PublishMessage: publishMessage, }, Signature: tut.MakeTestSignature(), - } + })) failToVerifyNode := makeNode(nodeParams{VerifySignatureFails: true}) - runVerifyResponse(t, failToVerifyNode, nil, &response, func(deal storagemarket.ClientDeal) { + runVerifyResponse(t, failToVerifyNode, stream, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "unable to verify signature on deal response", deal.Message) }) }) t.Run("incorrect proposal cid", func(t *testing.T) { - response := storagemarket.SignedResponse{ - Response: storagemarket.ProposalResponse{ + stream := dealStream(tut.StubbedStorageResponseReader(smnet.SignedResponse{ + Response: smnet.Response{ State: storagemarket.StorageDealProposalAccepted, Proposal: tut.GenerateCids(1)[0], PublishMessage: publishMessage, }, Signature: tut.MakeTestSignature(), - } - runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), nil, &response, func(deal storagemarket.ClientDeal) { + })) + runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), stream, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFailing, deal.State) require.Regexp(t, "^miner responded to a wrong proposal:", deal.Message) }) }) t.Run("deal rejected", func(t *testing.T) { - response := storagemarket.SignedResponse{ - Response: storagemarket.ProposalResponse{ + stream := dealStream(tut.StubbedStorageResponseReader(smnet.SignedResponse{ + Response: smnet.Response{ State: storagemarket.StorageDealProposalRejected, Proposal: proposalNd.Cid(), PublishMessage: publishMessage, Message: "because reasons", }, Signature: tut.MakeTestSignature(), - } + })) expErr := fmt.Sprintf("deal failed: (State=%d) because reasons", storagemarket.StorageDealProposalRejected) - runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), nil, &response, func(deal storagemarket.ClientDeal) { + runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), stream, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, deal.Message, expErr) }) }) + t.Run("deal stream close errors", func(t *testing.T) { + stream := dealStream(tut.StubbedStorageResponseReader(smnet.SignedResponse{ + Response: smnet.Response{ + State: storagemarket.StorageDealProposalAccepted, + Proposal: proposalNd.Cid(), + PublishMessage: publishMessage, + }, + Signature: tut.MakeTestSignature(), + })) + closeStreamErr := errors.New("something went wrong") + runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), stream, closeStreamErr, func(deal storagemarket.ClientDeal) { + require.Equal(t, storagemarket.StorageDealError, deal.State) + require.Equal(t, "error attempting to close stream: something went wrong", deal.Message) + }) + }) + } func TestValidateDealPublished(t *testing.T) { @@ -243,18 +272,24 @@ func TestFailDeal(t *testing.T) { clientDealProposal := tut.MakeTestClientDealProposal() runFailDeal := makeExecutor(ctx, eventProcessor, clientstates.FailDeal, storagemarket.StorageDealFailing, clientDealProposal) - t.Run("success", func(t *testing.T) { + t.Run("able to close stream", func(t *testing.T) { runFailDeal(t, nil, nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealError, deal.State) }) }) + t.Run("unable to close stream", func(t *testing.T) { + runFailDeal(t, nil, nil, errors.New("unable to close"), func(deal storagemarket.ClientDeal) { + require.Equal(t, storagemarket.StorageDealError, deal.State) + require.Equal(t, "error attempting to close stream: unable to close", deal.Message) + }) + }) } type executor func(t *testing.T, node storagemarket.StorageClientNode, dealStream smnet.StorageDealStream, - response *storagemarket.SignedResponse, + closeStreamErr error, dealInspector func(deal storagemarket.ClientDeal)) func makeExecutor(ctx context.Context, @@ -265,13 +300,12 @@ func makeExecutor(ctx context.Context, return func(t *testing.T, node storagemarket.StorageClientNode, dealStream smnet.StorageDealStream, - response *storagemarket.SignedResponse, + closeStreamErr error, dealInspector func(deal storagemarket.ClientDeal)) { dealState, err := tut.MakeTestClientDeal(initialState, clientDealProposal) dealState.AddFundsCid = &tut.GenerateCids(1)[0] - dealState.LastResponse = response require.NoError(t, err) - environment := &fakeEnvironment{node, dealStream} + environment := &fakeEnvironment{node, dealStream, closeStreamErr} fsmCtx := fsmtest.NewTestContext(ctx, eventProcessor) err = stateEntryFunc(fsmCtx, environment, *dealState) require.NoError(t, err) @@ -320,14 +354,22 @@ func makeNode(params nodeParams) storagemarket.StorageClientNode { } type fakeEnvironment struct { - node storagemarket.StorageClientNode - dealStream smnet.StorageDealStream + node storagemarket.StorageClientNode + dealStream smnet.StorageDealStream + closeStreamErr error } func (fe *fakeEnvironment) Node() storagemarket.StorageClientNode { return fe.node } -func (fe *fakeEnvironment) WriteDealProposal(p peer.ID, proposal storagemarket.ProposalRequest) error { +func (fe *fakeEnvironment) WriteDealProposal(p peer.ID, proposalCid cid.Cid, proposal smnet.Proposal) error { return fe.dealStream.WriteDealProposal(proposal) } + +func (fe *fakeEnvironment) ReadDealResponse(proposalCid cid.Cid) (smnet.SignedResponse, error) { + return fe.dealStream.ReadDealResponse() +} +func (fe *fakeEnvironment) CloseStream(proposalCid cid.Cid) error { + return fe.closeStreamErr +} diff --git a/storagemarket/impl/clientutils/clientutils.go b/storagemarket/impl/clientutils/clientutils.go index 20f4f9e5..513a44ce 100644 --- a/storagemarket/impl/clientutils/clientutils.go +++ b/storagemarket/impl/clientutils/clientutils.go @@ -13,6 +13,7 @@ import ( "github.com/filecoin-project/go-fil-markets/pieceio" "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) // CommP calculates the commP for a given dataref @@ -38,7 +39,7 @@ type VerifyFunc func(context.Context, crypto.Signature, address.Address, []byte, // VerifyResponse verifies the signature on the given signed response matches // the given miner address, using the given signature verification function -func VerifyResponse(ctx context.Context, resp storagemarket.SignedResponse, minerAddr address.Address, tok shared.TipSetToken, verifier VerifyFunc) error { +func VerifyResponse(ctx context.Context, resp network.SignedResponse, minerAddr address.Address, tok shared.TipSetToken, verifier VerifyFunc) error { b, err := cborutil.Dump(&resp.Response) if err != nil { return err diff --git a/storagemarket/impl/clientutils/clientutils_test.go b/storagemarket/impl/clientutils/clientutils_test.go index a02acf2c..05390160 100644 --- a/storagemarket/impl/clientutils/clientutils_test.go +++ b/storagemarket/impl/clientutils/clientutils_test.go @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) func TestCommP(t *testing.T) { @@ -68,7 +69,7 @@ func TestCommP(t *testing.T) { func TestVerifyResponse(t *testing.T) { tests := map[string]struct { - sresponse storagemarket.SignedResponse + sresponse network.SignedResponse verifier clientutils.VerifyFunc shouldErr bool }{ @@ -80,8 +81,8 @@ func TestVerifyResponse(t *testing.T) { shouldErr: false, }, "bad response": { - sresponse: storagemarket.SignedResponse{ - Response: storagemarket.ProposalResponse{}, + sresponse: network.SignedResponse{ + Response: network.Response{}, Signature: shared_testutil.MakeTestSignature(), }, verifier: func(context.Context, crypto.Signature, address.Address, []byte, shared.TipSetToken) (bool, error) { diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index 1f78cff0..8b49536d 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -139,8 +139,9 @@ func (p *Provider) HandleDealStream(s network.StorageDealStream) { err := p.receiveDeal(s) if err != nil { log.Errorf("%+v", err) + s.Close() + return } - s.Close() } func (p *Provider) receiveDeal(s network.StorageDealStream) error { @@ -167,6 +168,10 @@ func (p *Provider) receiveDeal(s network.StorageDealStream) error { if err != nil { return err } + err = p.conns.AddStream(proposalNd.Cid(), s) + if err != nil { + return err + } return p.deals.Send(proposalNd.Cid(), storagemarket.ProviderEventOpen) } @@ -298,7 +303,7 @@ func (p *Provider) HandleAskStream(s network.StorageAskStream) { return } - resp := storagemarket.AskResponse{ + resp := network.AskResponse{ Ask: p.storedAsk.GetAsk(ar.Miner), } @@ -408,7 +413,11 @@ func (p *providerDealEnvironment) PieceStore() piecestore.PieceStore { return p.p.pieceStore } -func (p *providerDealEnvironment) SendSignedResponse(ctx context.Context, client peer.ID, resp *storagemarket.ProposalResponse) error { +func (p *providerDealEnvironment) SendSignedResponse(ctx context.Context, resp *network.Response) error { + s, err := p.p.conns.DealStream(resp.Proposal) + if err != nil { + return xerrors.Errorf("couldn't send response: %w", err) + } tok, _, err := p.p.spn.GetChainHead(ctx) if err != nil { @@ -420,20 +429,23 @@ func (p *providerDealEnvironment) SendSignedResponse(ctx context.Context, client return xerrors.Errorf("failed to sign response message: %w", err) } - signedResponse := storagemarket.SignedResponse{ + signedResponse := network.SignedResponse{ Response: *resp, Signature: sig, } - s, err := p.p.net.NewDealStream(client) + err = s.WriteDealResponse(signedResponse) if err != nil { - return err + // Assume client disconnected + _ = p.p.conns.Disconnect(resp.Proposal) } - - err = s.WriteDealResponse(signedResponse) return err } +func (p *providerDealEnvironment) Disconnect(proposalCid cid.Cid) error { + return p.p.conns.Disconnect(proposalCid) +} + func (p *providerDealEnvironment) DealAcceptanceBuffer() abi.ChainEpoch { return p.p.dealAcceptanceBuffer } diff --git a/storagemarket/impl/providerstates/provider_states.go b/storagemarket/impl/providerstates/provider_states.go index 0cd213a0..3867d5da 100644 --- a/storagemarket/impl/providerstates/provider_states.go +++ b/storagemarket/impl/providerstates/provider_states.go @@ -24,6 +24,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) var log = logging.Logger("providerstates") @@ -36,7 +37,8 @@ type ProviderDealEnvironment interface { Ask() storagemarket.StorageAsk StartDataTransfer(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error GeneratePieceCommitmentToFile(payloadCid cid.Cid, selector ipld.Node) (cid.Cid, filestore.Path, filestore.Path, error) - SendSignedResponse(ctx context.Context, client peer.ID, response *storagemarket.ProposalResponse) error + SendSignedResponse(ctx context.Context, response *network.Response) error + Disconnect(proposalCid cid.Cid) error FileStore() filestore.FileStore PieceStore() piecestore.PieceStore DealAcceptanceBuffer() abi.ChainEpoch @@ -221,7 +223,7 @@ func WaitForPublish(ctx fsm.Context, environment ProviderDealEnvironment, deal s return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals error unmarshalling result: %w", err)) } - err = environment.SendSignedResponse(ctx.Context(), deal.Client, &storagemarket.ProposalResponse{ + err = environment.SendSignedResponse(ctx.Context(), &network.Response{ State: storagemarket.StorageDealProposalAccepted, Proposal: deal.ProposalCid, PublishMessage: deal.PublishCid, @@ -231,6 +233,10 @@ func WaitForPublish(ctx fsm.Context, environment ProviderDealEnvironment, deal s return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err) } + if err := environment.Disconnect(deal.ProposalCid); err != nil { + log.Warnf("closing client connection: %+v", err) + } + return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0]) }) @@ -345,7 +351,7 @@ func FailDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal storage log.Warnf("deal %s failed: %s", deal.ProposalCid, deal.Message) if !deal.ConnectionClosed { - err := environment.SendSignedResponse(ctx.Context(), deal.Client, &storagemarket.ProposalResponse{ + err := environment.SendSignedResponse(ctx.Context(), &network.Response{ State: storagemarket.StorageDealFailing, Message: deal.Message, Proposal: deal.ProposalCid, @@ -354,6 +360,10 @@ func FailDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal storage if err != nil { return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err) } + + if err := environment.Disconnect(deal.ProposalCid); err != nil { + log.Warnf("closing client connection: %+v", err) + } } if deal.PiecePath != filestore.Path("") { diff --git a/storagemarket/impl/providerstates/provider_states_test.go b/storagemarket/impl/providerstates/provider_states_test.go index 46fd62f6..0aee17ac 100644 --- a/storagemarket/impl/providerstates/provider_states_test.go +++ b/storagemarket/impl/providerstates/provider_states_test.go @@ -28,6 +28,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/blockrecorder" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-fil-markets/storagemarket/testnodes" ) @@ -905,6 +906,7 @@ func makeExecutor(ctx context.Context, metadataPath: params.MetadataPath, generateCommPError: params.GenerateCommPError, sendSignedResponseError: params.SendSignedResponseError, + disconnectError: params.DisconnectError, dealAcceptanceBuffer: abi.ChainEpoch(params.DealAcceptanceBuffer), fs: fs, pieceStore: pieceStore, @@ -945,6 +947,7 @@ type fakeEnvironment struct { metadataPath filestore.Path generateCommPError error sendSignedResponseError error + disconnectError error fs filestore.FileStore pieceStore piecestore.PieceStore dealAcceptanceBuffer abi.ChainEpoch @@ -970,10 +973,14 @@ func (fe *fakeEnvironment) GeneratePieceCommitmentToFile(payloadCid cid.Cid, sel return fe.pieceCid, fe.path, fe.metadataPath, fe.generateCommPError } -func (fe *fakeEnvironment) SendSignedResponse(ctx context.Context, client peer.ID, response *storagemarket.ProposalResponse) error { +func (fe *fakeEnvironment) SendSignedResponse(ctx context.Context, response *network.Response) error { return fe.sendSignedResponseError } +func (fe *fakeEnvironment) Disconnect(proposalCid cid.Cid) error { + return fe.disconnectError +} + func (fe *fakeEnvironment) FileStore() filestore.FileStore { return fe.fs } diff --git a/storagemarket/impl/providerutils/providerutils_test.go b/storagemarket/impl/providerutils/providerutils_test.go index 1f4c0601..44ac3d34 100644 --- a/storagemarket/impl/providerutils/providerutils_test.go +++ b/storagemarket/impl/providerutils/providerutils_test.go @@ -28,6 +28,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket/impl/blockrecorder" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) func TestVerifyProposal(t *testing.T) { @@ -90,7 +91,7 @@ func TestSignMinerData(t *testing.T) { shouldErr: false, }, "cbor dump errors": { - data: &storagemarket.ProposalResponse{}, + data: &network.Response{}, workerLookup: successLookup, signBytes: successSign, shouldErr: true, diff --git a/storagemarket/impl/requestvalidation/types.go b/storagemarket/impl/requestvalidation/types.go index cc5d04ac..18885de4 100644 --- a/storagemarket/impl/requestvalidation/types.go +++ b/storagemarket/impl/requestvalidation/types.go @@ -37,7 +37,7 @@ var ( ErrInacceptableDealState = errors.New("deal is not a in a state where deals are accepted") // DataTransferStates are the states in which it would make sense to actually start a data transfer - DataTransferStates = []storagemarket.StorageDealStatus{storagemarket.StorageDealWaitingForResponse, storagemarket.StorageDealValidating, storagemarket.StorageDealUnknown} + DataTransferStates = []storagemarket.StorageDealStatus{storagemarket.StorageDealValidating, storagemarket.StorageDealUnknown} ) // StorageDataTransferVoucher is the voucher type for data transfers diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index f6fa0a4f..5b51c9df 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -112,7 +112,6 @@ func TestMakeDeal(t *testing.T) { storagemarket.StorageDealEnsureClientFunds, //storagemarket.StorageDealClientFunding, // skipped because funds available storagemarket.StorageDealFundsEnsured, - storagemarket.StorageDealWaitingForResponse, storagemarket.StorageDealValidating, storagemarket.StorageDealProposalAccepted, storagemarket.StorageDealSealing, @@ -167,7 +166,7 @@ func TestMakeDealOffline(t *testing.T) { cd, err := h.Client.GetLocalDeal(ctx, proposalCid) assert.NoError(t, err) - assert.Equal(t, storagemarket.StorageDealWaitingForResponse, cd.State) + assert.Equal(t, storagemarket.StorageDealValidating, cd.State) providerDeals, err := h.Provider.ListLocalDeals() assert.NoError(t, err) @@ -214,7 +213,7 @@ func TestMakeDealNonBlocking(t *testing.T) { cd, err := h.Client.GetLocalDeal(ctx, result.ProposalCid) assert.NoError(t, err) - assert.Equal(t, storagemarket.StorageDealWaitingForResponse, cd.State) + assert.Equal(t, storagemarket.StorageDealValidating, cd.State) providerDeals, err := h.Provider.ListLocalDeals() assert.NoError(t, err) diff --git a/storagemarket/network/ask_stream.go b/storagemarket/network/ask_stream.go index e1c51ff1..6622e0ad 100644 --- a/storagemarket/network/ask_stream.go +++ b/storagemarket/network/ask_stream.go @@ -4,7 +4,6 @@ import ( "bufio" cborutil "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-libp2p-core/peer" ) @@ -17,34 +16,34 @@ type askStream struct { var _ StorageAskStream = (*askStream)(nil) -func (as *askStream) ReadAskRequest() (storagemarket.AskRequest, error) { - var a storagemarket.AskRequest +func (as *askStream) ReadAskRequest() (AskRequest, error) { + var a AskRequest if err := a.UnmarshalCBOR(as.buffered); err != nil { log.Warn(err) - return storagemarket.AskRequestUndefined, err + return AskRequestUndefined, err } return a, nil } -func (as *askStream) WriteAskRequest(q storagemarket.AskRequest) error { +func (as *askStream) WriteAskRequest(q AskRequest) error { return cborutil.WriteCborRPC(as.rw, &q) } -func (as *askStream) ReadAskResponse() (storagemarket.AskResponse, error) { - var resp storagemarket.AskResponse +func (as *askStream) ReadAskResponse() (AskResponse, error) { + var resp AskResponse if err := resp.UnmarshalCBOR(as.buffered); err != nil { log.Warn(err) - return storagemarket.AskResponseUndefined, err + return AskResponseUndefined, err } return resp, nil } -func (as *askStream) WriteAskResponse(qr storagemarket.AskResponse) error { +func (as *askStream) WriteAskResponse(qr AskResponse) error { return cborutil.WriteCborRPC(as.rw, &qr) } diff --git a/storagemarket/network/deal_stream.go b/storagemarket/network/deal_stream.go index 2a653357..dec3f894 100644 --- a/storagemarket/network/deal_stream.go +++ b/storagemarket/network/deal_stream.go @@ -4,7 +4,6 @@ import ( "bufio" cborutil "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-libp2p-core/peer" ) @@ -17,35 +16,35 @@ type dealStream struct { var _ StorageDealStream = (*dealStream)(nil) -func (d *dealStream) ReadDealProposal() (storagemarket.ProposalRequest, error) { - var ds storagemarket.ProposalRequest +func (d *dealStream) ReadDealProposal() (Proposal, error) { + var ds Proposal if err := ds.UnmarshalCBOR(d.buffered); err != nil { log.Warn(err) - return storagemarket.ProposalRequestUndefined, err + return ProposalUndefined, err } return ds, nil } -func (d *dealStream) WriteDealProposal(dp storagemarket.ProposalRequest) error { +func (d *dealStream) WriteDealProposal(dp Proposal) error { return cborutil.WriteCborRPC(d.rw, &dp) } -func (d *dealStream) ReadDealResponse() (storagemarket.SignedResponse, error) { - var dr storagemarket.SignedResponse +func (d *dealStream) ReadDealResponse() (SignedResponse, error) { + var dr SignedResponse if err := dr.UnmarshalCBOR(d.buffered); err != nil { - return storagemarket.SignedResponseUndefined, err + return SignedResponseUndefined, err } return dr, nil } -func (d *dealStream) WriteDealResponse(dr storagemarket.SignedResponse) error { +func (d *dealStream) WriteDealResponse(dr SignedResponse) error { return cborutil.WriteCborRPC(d.rw, &dr) } func (d *dealStream) Close() error { - return d.rw.Reset() + return d.rw.Close() } func (d *dealStream) RemotePeer() peer.ID { diff --git a/storagemarket/network/libp2p_impl_test.go b/storagemarket/network/libp2p_impl_test.go index 5f59f5f8..f47f7d53 100644 --- a/storagemarket/network/libp2p_impl_test.go +++ b/storagemarket/network/libp2p_impl_test.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/require" "github.com/filecoin-project/go-fil-markets/shared_testutil" - "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) @@ -46,7 +45,7 @@ func TestAskStreamSendReceiveAskRequest(t *testing.T) { require.NoError(t, fromNetwork.SetDelegate(tr)) // host2 gets receiver - achan := make(chan storagemarket.AskRequest) + achan := make(chan network.AskRequest) tr2 := &testReceiver{t: t, askStreamHandler: func(s network.StorageAskStream) { readq, err := s.ReadAskRequest() require.NoError(t, err) @@ -70,7 +69,7 @@ func TestAskStreamSendReceiveAskResponse(t *testing.T) { require.NoError(t, fromNetwork.SetDelegate(tr)) // host2 gets receiver - achan := make(chan storagemarket.AskResponse) + achan := make(chan network.AskResponse) tr2 := &testReceiver{t: t, askStreamHandler: func(s network.StorageAskStream) { a, err := s.ReadAskResponse() require.NoError(t, err) @@ -108,7 +107,7 @@ func TestAskStreamSendReceiveMultipleSuccessful(t *testing.T) { qs, err := nw1.NewAskStream(td.Host2.ID()) require.NoError(t, err) - var resp storagemarket.AskResponse + var resp network.AskResponse go require.NoError(t, qs.WriteAskRequest(shared_testutil.MakeTestStorageAskRequest())) resp, err = qs.ReadAskResponse() require.NoError(t, err) @@ -133,7 +132,7 @@ func TestDealStreamSendReceiveDealProposal(t *testing.T) { tr := &testReceiver{t: t} require.NoError(t, fromNetwork.SetDelegate(tr)) - dchan := make(chan storagemarket.ProposalRequest) + dchan := make(chan network.Proposal) tr2 := &testReceiver{ t: t, dealStreamHandler: func(s network.StorageDealStream) { @@ -157,7 +156,7 @@ func TestDealStreamSendReceiveDealResponse(t *testing.T) { tr := &testReceiver{t: t} require.NoError(t, fromNetwork.SetDelegate(tr)) - drChan := make(chan storagemarket.SignedResponse) + drChan := make(chan network.SignedResponse) tr2 := &testReceiver{ t: t, dealStreamHandler: func(s network.StorageDealStream) { @@ -231,7 +230,7 @@ func TestLibp2pStorageMarketNetwork_StopHandlingRequests(t *testing.T) { require.NoError(t, fromNetwork.SetDelegate(tr)) // host2 gets receiver - achan := make(chan storagemarket.AskRequest) + achan := make(chan network.AskRequest) tr2 := &testReceiver{t: t, askStreamHandler: func(s network.StorageAskStream) { readar, err := s.ReadAskRequest() require.NoError(t, err) @@ -246,7 +245,7 @@ func TestLibp2pStorageMarketNetwork_StopHandlingRequests(t *testing.T) { } // assertDealProposalReceived performs the verification that a deal proposal is received -func assertDealProposalReceived(inCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toPeer peer.ID, inChan chan storagemarket.ProposalRequest) { +func assertDealProposalReceived(inCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toPeer peer.ID, inChan chan network.Proposal) { ctx, cancel := context.WithTimeout(inCtx, 10*time.Second) defer cancel() @@ -257,7 +256,7 @@ func assertDealProposalReceived(inCtx context.Context, t *testing.T, fromNetwork dp := shared_testutil.MakeTestStorageNetworkProposal() require.NoError(t, qs1.WriteDealProposal(dp)) - var dealReceived storagemarket.ProposalRequest + var dealReceived network.Proposal select { case <-ctx.Done(): t.Error("deal proposal not received") @@ -267,7 +266,7 @@ func assertDealProposalReceived(inCtx context.Context, t *testing.T, fromNetwork assert.Equal(t, dp, dealReceived) } -func assertDealResponseReceived(parentCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toPeer peer.ID, inChan chan storagemarket.SignedResponse) { +func assertDealResponseReceived(parentCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toPeer peer.ID, inChan chan network.SignedResponse) { ctx, cancel := context.WithTimeout(parentCtx, 10*time.Second) defer cancel() @@ -277,7 +276,7 @@ func assertDealResponseReceived(parentCtx context.Context, t *testing.T, fromNet dr := shared_testutil.MakeTestStorageNetworkSignedResponse() require.NoError(t, ds1.WriteDealResponse(dr)) - var responseReceived storagemarket.SignedResponse + var responseReceived network.SignedResponse select { case <-ctx.Done(): t.Error("response not received") @@ -287,8 +286,8 @@ func assertDealResponseReceived(parentCtx context.Context, t *testing.T, fromNet assert.Equal(t, dr, responseReceived) } -// assertAskRequestReceived performs the verification that a storagemarket.AskRequest is received -func assertAskRequestReceived(inCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toHost peer.ID, achan chan storagemarket.AskRequest) { +// assertAskRequestReceived performs the verification that a AskRequest is received +func assertAskRequestReceived(inCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toHost peer.ID, achan chan network.AskRequest) { ctx, cancel := context.WithTimeout(inCtx, 10*time.Second) defer cancel() @@ -299,7 +298,7 @@ func assertAskRequestReceived(inCtx context.Context, t *testing.T, fromNetwork n a := shared_testutil.MakeTestStorageAskRequest() require.NoError(t, as1.WriteAskRequest(a)) - var ina storagemarket.AskRequest + var ina network.AskRequest select { case <-ctx.Done(): t.Error("msg not received") @@ -309,11 +308,11 @@ func assertAskRequestReceived(inCtx context.Context, t *testing.T, fromNetwork n assert.Equal(t, a.Miner, ina.Miner) } -// assertAskResponseReceived performs the verification that a storagemarket.AskResponse is received +// assertAskResponseReceived performs the verification that a AskResponse is received func assertAskResponseReceived(inCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toHost peer.ID, - achan chan storagemarket.AskResponse) { + achan chan network.AskResponse) { ctx, cancel := context.WithTimeout(inCtx, 10*time.Second) defer cancel() @@ -326,7 +325,7 @@ func assertAskResponseReceived(inCtx context.Context, t *testing.T, require.NoError(t, as1.WriteAskResponse(ar)) // read queryresponse - var inar storagemarket.AskResponse + var inar network.AskResponse select { case <-ctx.Done(): t.Error("msg not received") diff --git a/storagemarket/network/network.go b/storagemarket/network/network.go index 5affe2e4..03c4b8d7 100644 --- a/storagemarket/network/network.go +++ b/storagemarket/network/network.go @@ -1,27 +1,26 @@ package network import ( - "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/libp2p/go-libp2p-core/peer" ) // StorageAskStream is a stream for reading/writing requests & // responses on the Storage Ask protocol type StorageAskStream interface { - ReadAskRequest() (storagemarket.AskRequest, error) - WriteAskRequest(storagemarket.AskRequest) error - ReadAskResponse() (storagemarket.AskResponse, error) - WriteAskResponse(storagemarket.AskResponse) error + ReadAskRequest() (AskRequest, error) + WriteAskRequest(AskRequest) error + ReadAskResponse() (AskResponse, error) + WriteAskResponse(AskResponse) error Close() error } // StorageDealStream is a stream for reading and writing requests // and responses on the storage deal protocol type StorageDealStream interface { - ReadDealProposal() (storagemarket.ProposalRequest, error) - WriteDealProposal(storagemarket.ProposalRequest) error - ReadDealResponse() (storagemarket.SignedResponse, error) - WriteDealResponse(storagemarket.SignedResponse) error + ReadDealProposal() (Proposal, error) + WriteDealProposal(Proposal) error + ReadDealResponse() (SignedResponse, error) + WriteDealResponse(SignedResponse) error RemotePeer() peer.ID Close() error } diff --git a/storagemarket/network/types.go b/storagemarket/network/types.go new file mode 100644 index 00000000..7d0294e5 --- /dev/null +++ b/storagemarket/network/types.go @@ -0,0 +1,58 @@ +package network + +import ( + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/specs-actors/actors/builtin/market" + "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/go-fil-markets/storagemarket" +) + +//go:generate cbor-gen-for AskRequest AskResponse Proposal Response SignedResponse + +// Proposal is the data sent over the network from client to provider when proposing +// a deal +type Proposal struct { + DealProposal *market.ClientDealProposal + + Piece *storagemarket.DataRef +} + +var ProposalUndefined = Proposal{} + +// Response is a response to a proposal sent over the network +type Response struct { + State storagemarket.StorageDealStatus + + // DealProposalRejected + Message string + Proposal cid.Cid + + // StorageDealProposalAccepted + PublishMessage *cid.Cid +} + +// SignedResponse is a response that is signed +type SignedResponse struct { + Response Response + + Signature *crypto.Signature +} + +var SignedResponseUndefined = SignedResponse{} + +// AskRequest is a request for current ask parameters for a given miner +type AskRequest struct { + Miner address.Address +} + +var AskRequestUndefined = AskRequest{} + +// AskResponse is the response sent over the network in response +// to an ask request +type AskResponse struct { + Ask *storagemarket.SignedStorageAsk +} + +var AskResponseUndefined = AskResponse{} diff --git a/storagemarket/network/types_cbor_gen.go b/storagemarket/network/types_cbor_gen.go new file mode 100644 index 00000000..73b42f5a --- /dev/null +++ b/storagemarket/network/types_cbor_gen.go @@ -0,0 +1,390 @@ +// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. + +package network + +import ( + "fmt" + "io" + + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/specs-actors/actors/builtin/market" + "github.com/filecoin-project/specs-actors/actors/crypto" + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" +) + +var _ = xerrors.Errorf + +func (t *AskRequest) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{129}); err != nil { + return err + } + + // t.Miner (address.Address) (struct) + if err := t.Miner.MarshalCBOR(w); err != nil { + return err + } + return nil +} + +func (t *AskRequest) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 1 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Miner (address.Address) (struct) + + { + + if err := t.Miner.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.Miner: %w", err) + } + + } + return nil +} + +func (t *AskResponse) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{129}); err != nil { + return err + } + + // t.Ask (storagemarket.SignedStorageAsk) (struct) + if err := t.Ask.MarshalCBOR(w); err != nil { + return err + } + return nil +} + +func (t *AskResponse) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 1 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Ask (storagemarket.SignedStorageAsk) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.Ask = new(storagemarket.SignedStorageAsk) + if err := t.Ask.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.Ask pointer: %w", err) + } + } + + } + return nil +} + +func (t *Proposal) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{130}); err != nil { + return err + } + + // t.DealProposal (market.ClientDealProposal) (struct) + if err := t.DealProposal.MarshalCBOR(w); err != nil { + return err + } + + // t.Piece (storagemarket.DataRef) (struct) + if err := t.Piece.MarshalCBOR(w); err != nil { + return err + } + return nil +} + +func (t *Proposal) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 2 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.DealProposal (market.ClientDealProposal) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.DealProposal = new(market.ClientDealProposal) + if err := t.DealProposal.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.DealProposal pointer: %w", err) + } + } + + } + // t.Piece (storagemarket.DataRef) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.Piece = new(storagemarket.DataRef) + if err := t.Piece.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.Piece pointer: %w", err) + } + } + + } + return nil +} + +func (t *Response) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{132}); err != nil { + return err + } + + // t.State (uint64) (uint64) + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { + return err + } + + // t.Message (string) (string) + if len(t.Message) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Message was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Message)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.Message)); err != nil { + return err + } + + // t.Proposal (cid.Cid) (struct) + + if err := cbg.WriteCid(w, t.Proposal); err != nil { + return xerrors.Errorf("failed to write cid field t.Proposal: %w", err) + } + + // t.PublishMessage (cid.Cid) (struct) + + if t.PublishMessage == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.PublishMessage); err != nil { + return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err) + } + } + + return nil +} + +func (t *Response) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 4 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.State (uint64) (uint64) + + { + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.State = uint64(extra) + + } + // t.Message (string) (string) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.Message = string(sval) + } + // t.Proposal (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.Proposal: %w", err) + } + + t.Proposal = c + + } + // t.PublishMessage (cid.Cid) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err) + } + + t.PublishMessage = &c + } + + } + return nil +} + +func (t *SignedResponse) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{130}); err != nil { + return err + } + + // t.Response (network.Response) (struct) + if err := t.Response.MarshalCBOR(w); err != nil { + return err + } + + // t.Signature (crypto.Signature) (struct) + if err := t.Signature.MarshalCBOR(w); err != nil { + return err + } + return nil +} + +func (t *SignedResponse) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 2 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Response (network.Response) (struct) + + { + + if err := t.Response.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.Response: %w", err) + } + + } + // t.Signature (crypto.Signature) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.Signature = new(crypto.Signature) + if err := t.Signature.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.Signature pointer: %w", err) + } + } + + } + return nil +} diff --git a/storagemarket/types.go b/storagemarket/types.go index 458e8752..3d89e1da 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -17,7 +17,7 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" ) -//go:generate cbor-gen-for ClientDeal MinerDeal Balance SignedStorageAsk StorageAsk StorageDeal DataRef AskRequest AskResponse ProposalRequest ProposalResponse SignedResponse +//go:generate cbor-gen-for ClientDeal MinerDeal Balance SignedStorageAsk StorageAsk StorageDeal DataRef const DealProtocolID = "/fil/storage/mk/1.0.1" const AskProtocolID = "/fil/storage/ask/1.0.1" @@ -46,7 +46,6 @@ const ( StorageDealValidating // Verifying that deal parameters are good StorageDealTransferring // Moving data StorageDealWaitingForData // Manual transfer - StorageDealWaitingForResponse // Waiting for a response from the provider StorageDealVerifyData // Verify transferred data - generate CAR / piece data StorageDealEnsureProviderFunds // Ensuring that provider collateral is sufficient StorageDealEnsureClientFunds // Ensuring that client funds are sufficient @@ -73,7 +72,6 @@ var DealStates = map[StorageDealStatus]string{ StorageDealValidating: "StorageDealValidating", StorageDealTransferring: "StorageDealTransferring", StorageDealWaitingForData: "StorageDealWaitingForData", - StorageDealWaitingForResponse: "StorageDealWaitingForResponse", StorageDealVerifyData: "StorageDealVerifyData", StorageDealEnsureProviderFunds: "StorageDealEnsureProviderFunds", StorageDealEnsureClientFunds: "StorageDealEnsureClientFunds", @@ -277,7 +275,6 @@ type ClientDeal struct { DataRef *DataRef Message string PublishMessage *cid.Cid - LastResponse *SignedResponse } type ClientEvent uint64 @@ -301,8 +298,8 @@ const ( // ClientEventDealProposed happens when a new proposal is sent to a provider ClientEventDealProposed - // ClientEventReceiveResponse happens when a new deal response is received - ClientEventReceiveResponse + // ClientEventDealStreamLookupErrored the deal stream for a deal could not be found + ClientEventDealStreamLookupErrored // ClientEventReadResponseFailed means a network error occurred reading a deal response ClientEventReadResponseFailed @@ -346,7 +343,8 @@ var ClientEvents = map[ClientEvent]string{ ClientEventFundsEnsured: "ClientEventFundsEnsured", ClientEventWriteProposalFailed: "ClientEventWriteProposalFailed", ClientEventDealProposed: "ClientEventDealProposed", - ClientEventReceiveResponse: "ClientEventReceiveResponse", + ClientEventDealStreamLookupErrored: "ClientEventDealStreamLookupErrored", + ClientEventReadResponseFailed: "ClientEventReadResponseFailed", ClientEventResponseVerificationFailed: "ClientEventResponseVerificationFailed", ClientEventResponseDealDidNotMatch: "ClientEventResponseDealDidNotMatch", ClientEventStreamCloseError: "ClientEventStreamCloseError", @@ -545,49 +543,3 @@ type StorageClient interface { SubscribeToEvents(subscriber ClientSubscriber) shared.Unsubscribe } - -// ProposalRequest is the data sent over the network from client to provider when proposing -// a deal -type ProposalRequest struct { - DealProposal *market.ClientDealProposal - - Piece *DataRef -} - -var ProposalRequestUndefined = ProposalRequest{} - -// ProposalResponse is a response to a proposal sent over the network -type ProposalResponse struct { - State StorageDealStatus - - // DealProposalRejected - Message string - Proposal cid.Cid - - // StorageDealProposalAccepted - PublishMessage *cid.Cid -} - -// SignedResponse is a response that is signed -type SignedResponse struct { - Response ProposalResponse - - Signature *crypto.Signature -} - -var SignedResponseUndefined = SignedResponse{} - -// AskRequest is a request for current ask parameters for a given miner -type AskRequest struct { - Miner address.Address -} - -var AskRequestUndefined = AskRequest{} - -// AskResponse is the response sent over the network in response -// to an ask request -type AskResponse struct { - Ask *SignedStorageAsk -} - -var AskResponseUndefined = AskResponse{} diff --git a/storagemarket/types_cbor_gen.go b/storagemarket/types_cbor_gen.go index 3421559f..fab1b00e 100644 --- a/storagemarket/types_cbor_gen.go +++ b/storagemarket/types_cbor_gen.go @@ -8,7 +8,6 @@ import ( "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/crypto" "github.com/libp2p/go-libp2p-core/peer" cbg "github.com/whyrusleeping/cbor-gen" @@ -22,7 +21,7 @@ func (t *ClientDeal) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{139}); err != nil { + if _, err := w.Write([]byte{138}); err != nil { return err } @@ -107,10 +106,6 @@ func (t *ClientDeal) MarshalCBOR(w io.Writer) error { } } - // t.LastResponse (storagemarket.SignedResponse) (struct) - if err := t.LastResponse.MarshalCBOR(w); err != nil { - return err - } return nil } @@ -125,7 +120,7 @@ func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 11 { + if extra != 10 { return fmt.Errorf("cbor input had wrong number of fields") } @@ -275,27 +270,6 @@ func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error { t.PublishMessage = &c } - } - // t.LastResponse (storagemarket.SignedResponse) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.LastResponse = new(SignedResponse) - if err := t.LastResponse.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.LastResponse pointer: %w", err) - } - } - } return nil } @@ -1142,377 +1116,3 @@ func (t *DataRef) UnmarshalCBOR(r io.Reader) error { } return nil } - -func (t *AskRequest) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{129}); err != nil { - return err - } - - // t.Miner (address.Address) (struct) - if err := t.Miner.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *AskRequest) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 1 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Miner (address.Address) (struct) - - { - - if err := t.Miner.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.Miner: %w", err) - } - - } - return nil -} - -func (t *AskResponse) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{129}); err != nil { - return err - } - - // t.Ask (storagemarket.SignedStorageAsk) (struct) - if err := t.Ask.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *AskResponse) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 1 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Ask (storagemarket.SignedStorageAsk) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.Ask = new(SignedStorageAsk) - if err := t.Ask.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.Ask pointer: %w", err) - } - } - - } - return nil -} - -func (t *ProposalRequest) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{130}); err != nil { - return err - } - - // t.DealProposal (market.ClientDealProposal) (struct) - if err := t.DealProposal.MarshalCBOR(w); err != nil { - return err - } - - // t.Piece (storagemarket.DataRef) (struct) - if err := t.Piece.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *ProposalRequest) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 2 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.DealProposal (market.ClientDealProposal) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.DealProposal = new(market.ClientDealProposal) - if err := t.DealProposal.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.DealProposal pointer: %w", err) - } - } - - } - // t.Piece (storagemarket.DataRef) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.Piece = new(DataRef) - if err := t.Piece.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.Piece pointer: %w", err) - } - } - - } - return nil -} - -func (t *ProposalResponse) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{132}); err != nil { - return err - } - - // t.State (uint64) (uint64) - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { - return err - } - - // t.Message (string) (string) - if len(t.Message) > cbg.MaxLength { - return xerrors.Errorf("Value in field t.Message was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Message)))); err != nil { - return err - } - if _, err := w.Write([]byte(t.Message)); err != nil { - return err - } - - // t.Proposal (cid.Cid) (struct) - - if err := cbg.WriteCid(w, t.Proposal); err != nil { - return xerrors.Errorf("failed to write cid field t.Proposal: %w", err) - } - - // t.PublishMessage (cid.Cid) (struct) - - if t.PublishMessage == nil { - if _, err := w.Write(cbg.CborNull); err != nil { - return err - } - } else { - if err := cbg.WriteCid(w, *t.PublishMessage); err != nil { - return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err) - } - } - - return nil -} - -func (t *ProposalResponse) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 4 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.State (uint64) (uint64) - - { - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.State = uint64(extra) - - } - // t.Message (string) (string) - - { - sval, err := cbg.ReadString(br) - if err != nil { - return err - } - - t.Message = string(sval) - } - // t.Proposal (cid.Cid) (struct) - - { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.Proposal: %w", err) - } - - t.Proposal = c - - } - // t.PublishMessage (cid.Cid) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err) - } - - t.PublishMessage = &c - } - - } - return nil -} - -func (t *SignedResponse) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{130}); err != nil { - return err - } - - // t.Response (storagemarket.ProposalResponse) (struct) - if err := t.Response.MarshalCBOR(w); err != nil { - return err - } - - // t.Signature (crypto.Signature) (struct) - if err := t.Signature.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *SignedResponse) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 2 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Response (storagemarket.ProposalResponse) (struct) - - { - - if err := t.Response.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.Response: %w", err) - } - - } - // t.Signature (crypto.Signature) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.Signature = new(crypto.Signature) - if err := t.Signature.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.Signature pointer: %w", err) - } - } - - } - return nil -}