Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert most protocol changes #236

Merged
merged 1 commit into from
May 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 25 additions & 27 deletions shared_testutil/test_network_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/storagemarket"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
)

Expand Down Expand Up @@ -506,16 +505,16 @@ func StubbedDealPaymentReader(payment rm.DealPayment) DealPaymentReader {
}

// StorageDealProposalReader is a function to mock reading deal proposals.
type StorageDealProposalReader func() (storagemarket.ProposalRequest, error)
type StorageDealProposalReader func() (smnet.Proposal, error)

// StorageDealResponseReader is a function to mock reading deal responses.
type StorageDealResponseReader func() (storagemarket.SignedResponse, error)
type StorageDealResponseReader func() (smnet.SignedResponse, error)

// StorageDealResponseWriter is a function to mock writing deal responses.
type StorageDealResponseWriter func(storagemarket.SignedResponse) error
type StorageDealResponseWriter func(smnet.SignedResponse) error

// StorageDealProposalWriter is a function to mock writing deal proposals.
type StorageDealProposalWriter func(storagemarket.ProposalRequest) error
type StorageDealProposalWriter func(smnet.Proposal) error

// TestStorageDealStream is a retrieval deal stream with predefined
// stubbed behavior.
Expand Down Expand Up @@ -563,22 +562,22 @@ func NewTestStorageDealStream(params TestStorageDealStreamParams) smnet.StorageD
}

// ReadDealProposal calls the mocked deal proposal reader function.
func (tsds *TestStorageDealStream) ReadDealProposal() (storagemarket.ProposalRequest, error) {
func (tsds *TestStorageDealStream) ReadDealProposal() (smnet.Proposal, error) {
return tsds.proposalReader()
}

// WriteDealProposal calls the mocked deal proposal writer function.
func (tsds *TestStorageDealStream) WriteDealProposal(dealProposal storagemarket.ProposalRequest) error {
func (tsds *TestStorageDealStream) WriteDealProposal(dealProposal smnet.Proposal) error {
return tsds.proposalWriter(dealProposal)
}

// ReadDealResponse calls the mocked deal response reader function.
func (tsds *TestStorageDealStream) ReadDealResponse() (storagemarket.SignedResponse, error) {
func (tsds *TestStorageDealStream) ReadDealResponse() (smnet.SignedResponse, error) {
return tsds.responseReader()
}

// WriteDealResponse calls the mocked deal response writer function.
func (tsds *TestStorageDealStream) WriteDealResponse(dealResponse storagemarket.SignedResponse) error {
func (tsds *TestStorageDealStream) WriteDealResponse(dealResponse smnet.SignedResponse) error {
return tsds.responseWriter(dealResponse)
}

Expand All @@ -589,57 +588,57 @@ func (tsds TestStorageDealStream) RemotePeer() peer.ID { return tsds.p }
func (tsds TestStorageDealStream) Close() error { return nil }

// TrivialStorageDealProposalReader succeeds trivially, returning an empty proposal.
func TrivialStorageDealProposalReader() (storagemarket.ProposalRequest, error) {
return storagemarket.ProposalRequest{}, nil
func TrivialStorageDealProposalReader() (smnet.Proposal, error) {
return smnet.Proposal{}, nil
}

// TrivialStorageDealResponseReader succeeds trivially, returning an empty deal response.
func TrivialStorageDealResponseReader() (storagemarket.SignedResponse, error) {
return storagemarket.SignedResponse{}, nil
func TrivialStorageDealResponseReader() (smnet.SignedResponse, error) {
return smnet.SignedResponse{}, nil
}

// TrivialStorageDealProposalWriter succeeds trivially, returning no error.
func TrivialStorageDealProposalWriter(storagemarket.ProposalRequest) error {
func TrivialStorageDealProposalWriter(smnet.Proposal) error {
return nil
}

// TrivialStorageDealResponseWriter succeeds trivially, returning no error.
func TrivialStorageDealResponseWriter(storagemarket.SignedResponse) error {
func TrivialStorageDealResponseWriter(smnet.SignedResponse) error {
return nil
}

// StubbedStorageProposalReader returns the given proposal when called
func StubbedStorageProposalReader(proposal storagemarket.ProposalRequest) StorageDealProposalReader {
return func() (storagemarket.ProposalRequest, error) {
func StubbedStorageProposalReader(proposal smnet.Proposal) StorageDealProposalReader {
return func() (smnet.Proposal, error) {
return proposal, nil
}
}

// StubbedStorageResponseReader returns the given deal response when called
func StubbedStorageResponseReader(response storagemarket.SignedResponse) StorageDealResponseReader {
return func() (storagemarket.SignedResponse, error) {
func StubbedStorageResponseReader(response smnet.SignedResponse) StorageDealResponseReader {
return func() (smnet.SignedResponse, error) {
return response, nil
}
}

// FailStorageProposalWriter always fails
func FailStorageProposalWriter(storagemarket.ProposalRequest) error {
func FailStorageProposalWriter(smnet.Proposal) error {
return errors.New("write proposal failed")
}

// FailStorageProposalReader always fails
func FailStorageProposalReader() (storagemarket.ProposalRequest, error) {
return storagemarket.ProposalRequestUndefined, errors.New("read proposal failed")
func FailStorageProposalReader() (smnet.Proposal, error) {
return smnet.ProposalUndefined, errors.New("read proposal failed")
}

// FailStorageResponseWriter always fails
func FailStorageResponseWriter(storagemarket.SignedResponse) error {
func FailStorageResponseWriter(smnet.SignedResponse) error {
return errors.New("write proposal failed")
}

// FailStorageResponseReader always fails
func FailStorageResponseReader() (storagemarket.SignedResponse, error) {
return storagemarket.SignedResponseUndefined, errors.New("read response failed")
func FailStorageResponseReader() (smnet.SignedResponse, error) {
return smnet.SignedResponseUndefined, errors.New("read response failed")
}

// TestPeerResolver provides a fake retrievalmarket PeerResolver
Expand All @@ -651,5 +650,4 @@ type TestPeerResolver struct {
func (tpr TestPeerResolver) GetPeers(cid.Cid) ([]rm.RetrievalPeer, error) {
return tpr.Peers, tpr.ResolverError
}

var _ rm.PeerResolver = &TestPeerResolver{}
var _ rm.PeerResolver = &TestPeerResolver{}
21 changes: 11 additions & 10 deletions shared_testutil/test_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
)

// MakeTestSignedVoucher generates a random SignedVoucher that has all non-zero fields
Expand Down Expand Up @@ -212,17 +213,17 @@ func MakeTestSignedStorageAsk() *storagemarket.SignedStorageAsk {

// MakeTestStorageNetworkProposal generates a proposal that can be sent over the
// network to a provider
func MakeTestStorageNetworkProposal() storagemarket.ProposalRequest {
return storagemarket.ProposalRequest{
func MakeTestStorageNetworkProposal() smnet.Proposal {
return smnet.Proposal{
DealProposal: MakeTestClientDealProposal(),
Piece: &storagemarket.DataRef{Root: GenerateCids(1)[0]},
}
}

// MakeTestStorageNetworkResponse generates a response to a proposal sent over
// the network
func MakeTestStorageNetworkResponse() storagemarket.ProposalResponse {
return storagemarket.ProposalResponse{
func MakeTestStorageNetworkResponse() smnet.Response {
return smnet.Response{
State: storagemarket.StorageDealSealing,
Proposal: GenerateCids(1)[0],
PublishMessage: &(GenerateCids(1)[0]),
Expand All @@ -231,23 +232,23 @@ func MakeTestStorageNetworkResponse() storagemarket.ProposalResponse {

// MakeTestStorageNetworkSignedResponse generates a response to a proposal sent over
// the network that is signed
func MakeTestStorageNetworkSignedResponse() storagemarket.SignedResponse {
return storagemarket.SignedResponse{
func MakeTestStorageNetworkSignedResponse() smnet.SignedResponse {
return smnet.SignedResponse{
Response: MakeTestStorageNetworkResponse(),
Signature: MakeTestSignature(),
}
}

// MakeTestStorageAskRequest generates a request to get a provider's ask
func MakeTestStorageAskRequest() storagemarket.AskRequest {
return storagemarket.AskRequest{
func MakeTestStorageAskRequest() smnet.AskRequest {
return smnet.AskRequest{
Miner: address.TestAddress2,
}
}

// MakeTestStorageAskResponse generates a response to an ask request
func MakeTestStorageAskResponse() storagemarket.AskResponse {
return storagemarket.AskResponse{
func MakeTestStorageAskResponse() smnet.AskResponse {
return smnet.AskResponse{
Ask: MakeTestSignedStorageAsk(),
}
}
Expand Down
42 changes: 20 additions & 22 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func NewClient(
}

func (c *Client) Run(ctx context.Context) {
_ = c.net.SetDelegate(c)
}

func (c *Client) Stop() {
Expand Down Expand Up @@ -158,7 +157,7 @@ func (c *Client) GetAsk(ctx context.Context, info storagemarket.StorageProviderI
return nil, xerrors.Errorf("failed to open stream to miner: %w", err)
}

request := storagemarket.AskRequest{Miner: info.Address}
request := network.AskRequest{Miner: info.Address}
if err := s.WriteAskRequest(request); err != nil {
return nil, xerrors.Errorf("failed to send ask request: %w", err)
}
Expand Down Expand Up @@ -301,25 +300,6 @@ func (c *Client) SubscribeToEvents(subscriber storagemarket.ClientSubscriber) sh
return shared.Unsubscribe(c.pubSub.Subscribe(subscriber))
}

func (c *Client) HandleAskStream(s network.StorageAskStream) {
s.Close()
}

func (c *Client) HandleDealStream(s network.StorageDealStream) {
defer s.Close()
log.Info("Handling storage deal proposal!")

response, err := s.ReadDealResponse()
if err != nil {
log.Errorf("%+v", err)
return
}
err = c.statemachines.Send(response.Response.Proposal, storagemarket.ClientEventReceiveResponse, response)
if err != nil {
log.Errorf("%+v", err)
}
}

func (c *Client) dispatch(eventName fsm.EventName, deal fsm.StateType) {
evt, ok := eventName.(storagemarket.ClientEvent)
if !ok {
Expand Down Expand Up @@ -367,11 +347,29 @@ func (c *clientDealEnvironment) Node() storagemarket.StorageClientNode {
return c.c.node
}

func (c *clientDealEnvironment) WriteDealProposal(p peer.ID, proposal storagemarket.ProposalRequest) error {
func (c *clientDealEnvironment) WriteDealProposal(p peer.ID, proposalCid cid.Cid, proposal network.Proposal) error {
s, err := c.c.net.NewDealStream(p)
if err != nil {
return err
}
err = c.c.conns.AddStream(proposalCid, s)
if err != nil {
return err
}
err = s.WriteDealProposal(proposal)
return err
}

func (c *clientDealEnvironment) ReadDealResponse(proposalCid cid.Cid) (network.SignedResponse, error) {
s, err := c.c.conns.DealStream(proposalCid)
if err != nil {
return network.SignedResponseUndefined, err
}
return s.ReadDealResponse()
}

func (c *clientDealEnvironment) CloseStream(proposalCid cid.Cid) error {
return c.c.conns.Disconnect(proposalCid)
}

var _ clientstates.ClientDealEnvironment = &clientDealEnvironment{}
16 changes: 11 additions & 5 deletions storagemarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,17 @@ var ClientEvents = fsm.Events{
return nil
}),
fsm.Event(storagemarket.ClientEventDealProposed).
From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealWaitingForResponse),
fsm.Event(storagemarket.ClientEventReceiveResponse).
From(storagemarket.StorageDealWaitingForResponse).To(storagemarket.StorageDealValidating).
Action(func(deal *storagemarket.ClientDeal, response storagemarket.SignedResponse) error {
deal.LastResponse = &response
From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealValidating),
fsm.Event(storagemarket.ClientEventDealStreamLookupErrored).
FromAny().To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("miner connection error: %w", err).Error()
return nil
}),
fsm.Event(storagemarket.ClientEventReadResponseFailed).
From(storagemarket.StorageDealValidating).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error reading Response message: %w", err).Error()
return nil
}),
fsm.Event(storagemarket.ClientEventResponseVerificationFailed).
Expand Down
23 changes: 19 additions & 4 deletions storagemarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
)

var log = logging.Logger("storagemarket_impl")
Expand All @@ -18,7 +19,9 @@ var log = logging.Logger("storagemarket_impl")
// dependencies from the storage client environment
type ClientDealEnvironment interface {
Node() storagemarket.StorageClientNode
WriteDealProposal(p peer.ID, proposal storagemarket.ProposalRequest) error
WriteDealProposal(p peer.ID, proposalCid cid.Cid, proposal network.Proposal) error
ReadDealResponse(proposalCid cid.Cid) (network.SignedResponse, error)
CloseStream(proposalCid cid.Cid) error
}

// ClientStateEntryFunc is the type for all state entry functions on a storage client
Expand Down Expand Up @@ -66,8 +69,8 @@ func WaitForFunding(ctx fsm.Context, environment ClientDealEnvironment, deal sto
// ProposeDeal sends the deal proposal to the provider
func ProposeDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {

proposal := storagemarket.ProposalRequest{DealProposal: &deal.ClientDealProposal, Piece: deal.DataRef}
if err := environment.WriteDealProposal(deal.Miner, proposal); err != nil {
proposal := network.Proposal{DealProposal: &deal.ClientDealProposal, Piece: deal.DataRef}
if err := environment.WriteDealProposal(deal.Miner, deal.ProposalCid, proposal); err != nil {
return ctx.Trigger(storagemarket.ClientEventWriteProposalFailed, err)
}

Expand All @@ -77,7 +80,11 @@ func ProposeDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storag
// VerifyDealResponse reads and verifies the response from the provider to the proposed deal
func VerifyDealResponse(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {

resp := *deal.LastResponse
resp, err := environment.ReadDealResponse(deal.ProposalCid)
if err != nil {
return ctx.Trigger(storagemarket.ClientEventReadResponseFailed, err)
}

tok, _, err := environment.Node().GetChainHead(ctx.Context())
if err != nil {
return ctx.Trigger(storagemarket.ClientEventResponseVerificationFailed)
Expand All @@ -95,6 +102,10 @@ func VerifyDealResponse(ctx fsm.Context, environment ClientDealEnvironment, deal
return ctx.Trigger(storagemarket.ClientEventDealRejected, resp.Response.State, resp.Response.Message)
}

if err := environment.CloseStream(deal.ProposalCid); err != nil {
return ctx.Trigger(storagemarket.ClientEventStreamCloseError, err)
}

return ctx.Trigger(storagemarket.ClientEventDealAccepted, resp.Response.PublishMessage)
}

Expand Down Expand Up @@ -129,6 +140,10 @@ func VerifyDealActivated(ctx fsm.Context, environment ClientDealEnvironment, dea
// FailDeal cleans up a failing deal
func FailDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {

if err := environment.CloseStream(deal.ProposalCid); err != nil {
return ctx.Trigger(storagemarket.ClientEventStreamCloseError, err)
}

// TODO: store in some sort of audit log
log.Errorf("deal %s failed: %s", deal.ProposalCid, deal.Message)

Expand Down