diff --git a/shared_testutil/test_types.go b/shared_testutil/test_types.go index 8bd24a09..4f8b6452 100644 --- a/shared_testutil/test_types.go +++ b/shared_testutil/test_types.go @@ -12,6 +12,8 @@ import ( "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/shared/tokenamount" "github.com/filecoin-project/go-fil-markets/shared/types" + "github.com/filecoin-project/go-fil-markets/storagemarket" + smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) // MakeTestSignedVoucher generates a random SignedVoucher that has all non-zero fields @@ -110,6 +112,87 @@ func MakeTestDealPayment() retrievalmarket.DealPayment { } } +// MakeTestStorageDealProposal generates a valid storage deal proposal +func MakeTestStorageDealProposal() *storagemarket.StorageDealProposal { + return &storagemarket.StorageDealProposal{ + PieceRef: RandomBytes(32), + PieceSize: rand.Uint64(), + + Client: address.TestAddress, + Provider: address.TestAddress2, + + ProposalExpiration: rand.Uint64(), + Duration: rand.Uint64(), + + StoragePricePerEpoch: MakeTestTokenAmount(), + StorageCollateral: MakeTestTokenAmount(), + + ProposerSignature: MakeTestSignature(), + } +} + +// MakeTestStorageAsk generates a storage ask +func MakeTestStorageAsk() *types.StorageAsk { + return &types.StorageAsk{ + Price: MakeTestTokenAmount(), + MinPieceSize: rand.Uint64(), + Miner: address.TestAddress2, + Timestamp: rand.Uint64(), + Expiry: rand.Uint64(), + SeqNo: rand.Uint64(), + } +} + +// MakeTestSignedStorageAsk generates a signed storage ask +func MakeTestSignedStorageAsk() *types.SignedStorageAsk { + return &types.SignedStorageAsk{ + Ask: MakeTestStorageAsk(), + Signature: MakeTestSignature(), + } +} + +// MakeTestStorageNetworkProposal generates a proposal that can be sent over the +// network to a provider +func MakeTestStorageNetworkProposal() smnet.Proposal { + return smnet.Proposal{ + DealProposal: MakeTestStorageDealProposal(), + Piece: GenerateCids(1)[0], + } +} + +// MakeTestStorageNetworkResponse generates a response to a proposal sent over +// the network +func MakeTestStorageNetworkResponse() smnet.Response { + return smnet.Response{ + State: storagemarket.StorageDealPublished, + Proposal: GenerateCids(1)[0], + PublishMessage: &(GenerateCids(1)[0]), + } +} + +// MakeTestStorageNetworkSignedResponse generates a response to a proposal sent over +// the network that is signed +func MakeTestStorageNetworkSignedResponse() smnet.SignedResponse { + return smnet.SignedResponse{ + Response: MakeTestStorageNetworkResponse(), + Signature: MakeTestSignature(), + } +} + +// MakeTestStorageAskRequest generates a request to get a provider's ask +func MakeTestStorageAskRequest() smnet.AskRequest { + return smnet.AskRequest{ + Miner: address.TestAddress2, + } +} + +// MakeTestStorageAskResponse generates a response to an ask request +func MakeTestStorageAskResponse() smnet.AskResponse { + return smnet.AskResponse{ + Ask: MakeTestSignedStorageAsk(), + } +} + func RequireGenerateRetrievalPeers(t *testing.T, numPeers int) []retrievalmarket.RetrievalPeer { peers := make([]retrievalmarket.RetrievalPeer, numPeers) for i := range peers { diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index 4b99b7b9..2c9258a0 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -3,17 +3,17 @@ package storageimpl import ( "context" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" + "github.com/filecoin-project/go-data-transfer" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log/v2" - "github.com/libp2p/go-libp2p-core/host" - inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/pieceio" "github.com/filecoin-project/go-fil-markets/pieceio/cario" @@ -32,11 +32,11 @@ var log = logging.Logger("deals") type ClientDeal struct { storagemarket.ClientDeal - s inet.Stream + s network.StorageDealStream } type Client struct { - h host.Host + net network.StorageMarketNetwork // dataTransfer // TODO: once the data transfer module is complete, the @@ -52,7 +52,7 @@ type Client struct { node storagemarket.StorageClientNode deals *statestore.StateStore - conns map[cid.Cid]inet.Stream + conns map[cid.Cid]network.StorageDealStream incoming chan *ClientDeal updated chan clientDealUpdate @@ -68,12 +68,12 @@ type clientDealUpdate struct { mut func(*ClientDeal) } -func NewClient(h host.Host, bs blockstore.Blockstore, dataTransfer datatransfer.Manager, discovery *discovery.Local, deals *statestore.StateStore, scn storagemarket.StorageClientNode) *Client { +func NewClient(net network.StorageMarketNetwork, bs blockstore.Blockstore, dataTransfer datatransfer.Manager, discovery *discovery.Local, deals *statestore.StateStore, scn storagemarket.StorageClientNode) *Client { carIO := cario.NewCarIO() pio := pieceio.NewPieceIO(carIO, bs) c := &Client{ - h: h, + net: net, dataTransfer: dataTransfer, bs: bs, pio: pio, @@ -81,7 +81,7 @@ func NewClient(h host.Host, bs blockstore.Blockstore, dataTransfer datatransfer. node: scn, deals: deals, - conns: map[cid.Cid]inet.Stream{}, + conns: map[cid.Cid]network.StorageDealStream{}, incoming: make(chan *ClientDeal, 16), updated: make(chan clientDealUpdate, 16), @@ -213,18 +213,13 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro return cid.Undef, xerrors.Errorf("getting proposal node failed: %w", err) } - s, err := c.h.NewStream(ctx, p.MinerID, storagemarket.DealProtocolID) + s, err := c.net.NewDealStream(p.MinerID) if err != nil { return cid.Undef, xerrors.Errorf("connecting to storage provider failed: %w", err) } - proposal := &Proposal{ - DealProposal: dealProposal, - Piece: p.Data, - } - - if err := cborutil.WriteCborRPC(s, proposal); err != nil { - _ = s.Reset() + proposal := network.Proposal{DealProposal: dealProposal, Piece: p.Data} + if err := s.WriteDealProposal(proposal); err != nil { return cid.Undef, xerrors.Errorf("sending proposal to storage provider failed: %w", err) } @@ -250,20 +245,18 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro } func (c *Client) QueryAsk(ctx context.Context, p peer.ID, a address.Address) (*types.SignedStorageAsk, error) { - s, err := c.h.NewStream(ctx, p, storagemarket.AskProtocolID) + s, err := c.net.NewAskStream(p) if err != nil { return nil, xerrors.Errorf("failed to open stream to miner: %w", err) } - req := &AskRequest{ - Miner: a, - } - if err := cborutil.WriteCborRPC(s, req); err != nil { + request := network.AskRequest{Miner: a} + if err := s.WriteAskRequest(request); err != nil { return nil, xerrors.Errorf("failed to send ask request: %w", err) } - var out AskResponse - if err := cborutil.ReadCborRPC(s, &out); err != nil { + out, err := s.ReadAskResponse() + if err != nil { return nil, xerrors.Errorf("failed to read ask response: %w", err) } diff --git a/storagemarket/impl/client_utils.go b/storagemarket/impl/client_utils.go index 14d94318..9b8a6f43 100644 --- a/storagemarket/impl/client_utils.go +++ b/storagemarket/impl/client_utils.go @@ -13,9 +13,9 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" - cborutil "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-statestore" + "github.com/filecoin-project/go-data-transfer" ) func (c *Client) failDeal(id cid.Cid, cerr error) { @@ -26,7 +26,7 @@ func (c *Client) failDeal(id cid.Cid, cerr error) { s, ok := c.conns[id] if ok { - _ = s.Reset() + _ = s.Close() delete(c.conns, id) } @@ -49,15 +49,15 @@ func (c *Client) commP(ctx context.Context, root cid.Cid) ([]byte, uint64, error return commp[:], paddedSize, nil } -func (c *Client) readStorageDealResp(deal ClientDeal) (*Response, error) { +func (c *Client) readStorageDealResp(deal ClientDeal) (*network.Response, error) { s, ok := c.conns[deal.ProposalCid] if !ok { // TODO: Try to re-establish the connection using query protocol return nil, xerrors.Errorf("no connection to miner") } - var resp SignedResponse - if err := cborutil.ReadCborRPC(s, &resp); err != nil { + resp, err := s.ReadDealResponse() + if err != nil { log.Errorw("failed to read Response message", "error", err) return nil, err } diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index a1954a8a..5bad74bf 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -9,13 +9,10 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" blockstore "github.com/ipfs/go-ipfs-blockstore" - "github.com/libp2p/go-libp2p-core/host" - inet "github.com/libp2p/go-libp2p-core/network" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/pieceio" "github.com/filecoin-project/go-fil-markets/pieceio/cario" @@ -23,7 +20,9 @@ import ( "github.com/filecoin-project/go-fil-markets/shared/tokenamount" "github.com/filecoin-project/go-fil-markets/shared/types" "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-statestore" + "github.com/filecoin-project/go-data-transfer" ) var ProviderDsPrefix = "/deals/provider" @@ -32,10 +31,12 @@ var ProviderDsPrefix = "/deals/provider" type MinerDeal struct { storagemarket.MinerDeal - s inet.Stream + s network.StorageDealStream } type Provider struct { + net network.StorageMarketNetwork + pricePerByteBlock tokenamount.TokenAmount // how much we want for storing one byte for one block minPieceSize uint64 @@ -54,7 +55,7 @@ type Provider struct { deals *statestore.StateStore ds datastore.Batching - conns map[cid.Cid]inet.Stream + conns map[cid.Cid]network.StorageDealStream actor address.Address @@ -76,7 +77,7 @@ var ( ErrDataTransferFailed = errors.New("deal data transfer failed") ) -func NewProvider(ds datastore.Batching, bs blockstore.Blockstore, fs filestore.FileStore, pieceStore piecestore.PieceStore, dataTransfer datatransfer.Manager, spn storagemarket.StorageProviderNode, minerAddress address.Address) (storagemarket.StorageProvider, error) { +func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blockstore.Blockstore, fs filestore.FileStore, pieceStore piecestore.PieceStore, dataTransfer datatransfer.Manager, spn storagemarket.StorageProviderNode, minerAddress address.Address) (storagemarket.StorageProvider, error) { carIO := cario.NewCarIO() pio := pieceio.NewPieceIOWithStore(carIO, fs, bs) @@ -90,7 +91,7 @@ func NewProvider(ds datastore.Batching, bs blockstore.Blockstore, fs filestore.F pricePerByteBlock: tokenamount.FromInt(3), // TODO: allow setting minPieceSize: 256, // TODO: allow setting (BUT KEEP MIN 256! (because of how we fill sectors up)) - conns: map[cid.Cid]inet.Stream{}, + conns: map[cid.Cid]network.StorageDealStream{}, incoming: make(chan MinerDeal), updated: make(chan minerDealUpdate), @@ -122,11 +123,13 @@ func NewProvider(ds datastore.Batching, bs blockstore.Blockstore, fs filestore.F return h, nil } -func (p *Provider) Run(ctx context.Context, host host.Host) { +func (p *Provider) Start(ctx context.Context) error { // TODO: restore state - host.SetStreamHandler(storagemarket.DealProtocolID, p.HandleStream) - host.SetStreamHandler(storagemarket.AskProtocolID, p.HandleAskStream) + err := p.net.SetDelegate(p) + if err != nil { + return err + } go func() { defer log.Warn("quitting deal provider loop") @@ -143,6 +146,7 @@ func (p *Provider) Run(ctx context.Context, host host.Host) { } } }() + return nil } func (p *Provider) onIncoming(deal MinerDeal) { @@ -243,7 +247,7 @@ func (p *Provider) onDataTransferEvent(event datatransfer.Event, channelState da } } -func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error) { +func (p *Provider) newDeal(s network.StorageDealStream, proposal network.Proposal) (MinerDeal, error) { proposalNd, err := cborutil.AsIpld(proposal.DealProposal) if err != nil { return MinerDeal{}, err @@ -251,7 +255,7 @@ func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error) return MinerDeal{ MinerDeal: storagemarket.MinerDeal{ - Client: s.Conn().RemotePeer(), + Client: s.RemotePeer(), Proposal: *proposal.DealProposal, ProposalCid: proposalNd.Cid(), State: storagemarket.StorageDealUnknown, @@ -262,7 +266,7 @@ func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error) }, nil } -func (p *Provider) HandleStream(s inet.Stream) { +func (p *Provider) HandleDealStream(s network.StorageDealStream) { log.Info("Handling storage deal proposal!") proposal, err := p.readProposal(s) @@ -282,7 +286,8 @@ func (p *Provider) HandleStream(s inet.Stream) { p.incoming <- deal } -func (p *Provider) Stop() { +func (p *Provider) Stop() error { close(p.stop) <-p.stopped + return p.net.StopHandlingRequests() } diff --git a/storagemarket/impl/provider_asks.go b/storagemarket/impl/provider_asks.go index 00e2d455..bfc24bdd 100644 --- a/storagemarket/impl/provider_asks.go +++ b/storagemarket/impl/provider_asks.go @@ -6,13 +6,13 @@ import ( "time" "github.com/ipfs/go-datastore" - inet "github.com/libp2p/go-libp2p-core/network" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-cbor-util" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-fil-markets/shared/tokenamount" "github.com/filecoin-project/go-fil-markets/shared/types" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) func (p *Provider) SetPrice(price tokenamount.TokenAmount, ttlsecs int64) error { @@ -52,24 +52,24 @@ func (p *Provider) GetAsk(m address.Address) *types.SignedStorageAsk { return p.ask } -func (p *Provider) HandleAskStream(s inet.Stream) { +func (p *Provider) HandleAskStream(s network.StorageAskStream) { defer s.Close() - var ar AskRequest - if err := cborutil.ReadCborRPC(s, &ar); err != nil { + ar, err := s.ReadAskRequest() + if err != nil { log.Errorf("failed to read AskRequest from incoming stream: %s", err) return } resp := p.processAskRequest(&ar) - if err := cborutil.WriteCborRPC(s, resp); err != nil { + if err := s.WriteAskResponse(resp); err != nil { log.Errorf("failed to write ask response: %s", err) return } } -func (p *Provider) processAskRequest(ar *AskRequest) *AskResponse { - return &AskResponse{ +func (p *Provider) processAskRequest(ar *network.AskRequest) network.AskResponse { + return network.AskResponse{ Ask: p.GetAsk(ar.Miner), } } diff --git a/storagemarket/impl/provider_states.go b/storagemarket/impl/provider_states.go index e7d524fc..ec847fac 100644 --- a/storagemarket/impl/provider_states.go +++ b/storagemarket/impl/provider_states.go @@ -13,6 +13,7 @@ import ( "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/shared/tokenamount" "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) type providerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) @@ -148,7 +149,7 @@ func (p *Provider) publishing(ctx context.Context, deal MinerDeal) (func(*MinerD return nil, err } - err = p.sendSignedResponse(ctx, &Response{ + err = p.sendSignedResponse(ctx, &network.Response{ State: storagemarket.StorageDealProposalAccepted, Proposal: deal.ProposalCid, diff --git a/storagemarket/impl/provider_utils.go b/storagemarket/impl/provider_utils.go index afc1ae74..a7e8c2b0 100644 --- a/storagemarket/impl/provider_utils.go +++ b/storagemarket/impl/provider_utils.go @@ -6,15 +6,14 @@ import ( "github.com/ipld/go-ipld-prime" - "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" + "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-cbor-util" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-statestore" "github.com/ipfs/go-cid" - inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" ) @@ -31,7 +30,7 @@ func (p *Provider) failDeal(ctx context.Context, id cid.Cid, cerr error) { log.Warnf("deal %s failed: %s", id, cerr) - err := p.sendSignedResponse(ctx, &Response{ + err := p.sendSignedResponse(ctx, &network.Response{ State: storagemarket.StorageDealFailing, Message: cerr.Error(), Proposal: id, @@ -39,7 +38,7 @@ func (p *Provider) failDeal(ctx context.Context, id cid.Cid, cerr error) { s, ok := p.conns[id] if ok { - _ = s.Reset() + _ = s.Close() delete(p.conns, id) } @@ -48,8 +47,9 @@ func (p *Provider) failDeal(ctx context.Context, id cid.Cid, cerr error) { } } -func (p *Provider) readProposal(s inet.Stream) (proposal Proposal, err error) { - if err := cborutil.ReadCborRPC(s, &proposal); err != nil { +func (p *Provider) readProposal(s network.StorageDealStream) (proposal network.Proposal, err error) { + proposal, err = s.ReadDealProposal() + if err != nil { log.Errorw("failed to read proposal message", "error", err) return proposal, err } @@ -70,7 +70,7 @@ func (p *Provider) readProposal(s inet.Stream) (proposal Proposal, err error) { return } -func (p *Provider) sendSignedResponse(ctx context.Context, resp *Response) error { +func (p *Provider) sendSignedResponse(ctx context.Context, resp *network.Response) error { s, ok := p.conns[resp.Proposal] if !ok { return xerrors.New("couldn't send response: not connected") @@ -91,12 +91,12 @@ func (p *Provider) sendSignedResponse(ctx context.Context, resp *Response) error return xerrors.Errorf("failed to sign response message: %w", err) } - signedResponse := &SignedResponse{ + signedResponse := network.SignedResponse{ Response: *resp, Signature: sig, } - err = cborutil.WriteCborRPC(s, signedResponse) + err = s.WriteDealResponse(signedResponse) if err != nil { // Assume client disconnected s.Close() diff --git a/storagemarket/impl/types.go b/storagemarket/impl/types.go index f432ae7b..d43d4adc 100644 --- a/storagemarket/impl/types.go +++ b/storagemarket/impl/types.go @@ -6,13 +6,10 @@ import ( "github.com/ipfs/go-cid" - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/go-fil-markets/shared/types" "github.com/filecoin-project/go-fil-markets/storagemarket" ) -//go:generate cbor-gen-for AskRequest AskResponse Proposal Response SignedResponse StorageDataTransferVoucher +//go:generate cbor-gen-for StorageDataTransferVoucher var ( // ErrWrongVoucherType means the voucher was not the correct type can validate against @@ -43,47 +40,6 @@ var ( DataTransferStates = []storagemarket.StorageDealStatus{storagemarket.StorageDealProposalAccepted, storagemarket.StorageDealUnknown} ) -type Proposal struct { - DealProposal *storagemarket.StorageDealProposal - - Piece cid.Cid // Used for retrieving from the client -} - -type Response struct { - State storagemarket.StorageDealStatus - - // DealProposalRejected - Message string - Proposal cid.Cid - - // StorageDealProposalAccepted - PublishMessage *cid.Cid -} - -// TODO: Do we actually need this to be signed? -type SignedResponse struct { - Response Response - - Signature *types.Signature -} - -func (r *SignedResponse) Verify(addr address.Address) error { - b, err := cborutil.Dump(&r.Response) - if err != nil { - return err - } - - return r.Signature.Verify(addr, b) -} - -type AskRequest struct { - Miner address.Address -} - -type AskResponse struct { - Ask *types.SignedStorageAsk -} - // StorageDataTransferVoucher is the voucher type for data transfers // used by the storage market type StorageDataTransferVoucher struct { diff --git a/storagemarket/impl/types_cbor_gen.go b/storagemarket/impl/types_cbor_gen.go index 1383df16..31b295cf 100644 --- a/storagemarket/impl/types_cbor_gen.go +++ b/storagemarket/impl/types_cbor_gen.go @@ -6,376 +6,12 @@ import ( "fmt" "io" - "github.com/filecoin-project/go-fil-markets/shared/types" - "github.com/filecoin-project/go-fil-markets/storagemarket" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" ) var _ = xerrors.Errorf -func (t *AskRequest) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{129}); err != nil { - return err - } - - // t.Miner (address.Address) (struct) - if err := t.Miner.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *AskRequest) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 1 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Miner (address.Address) (struct) - - { - - if err := t.Miner.UnmarshalCBOR(br); err != nil { - return err - } - - } - return nil -} - -func (t *AskResponse) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{129}); err != nil { - return err - } - - // t.Ask (types.SignedStorageAsk) (struct) - if err := t.Ask.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *AskResponse) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 1 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Ask (types.SignedStorageAsk) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.Ask = new(types.SignedStorageAsk) - if err := t.Ask.UnmarshalCBOR(br); err != nil { - return err - } - } - - } - return nil -} - -func (t *Proposal) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{130}); err != nil { - return err - } - - // t.DealProposal (storagemarket.StorageDealProposal) (struct) - if err := t.DealProposal.MarshalCBOR(w); err != nil { - return err - } - - // t.Piece (cid.Cid) (struct) - - if err := cbg.WriteCid(w, t.Piece); err != nil { - return xerrors.Errorf("failed to write cid field t.Piece: %w", err) - } - - return nil -} - -func (t *Proposal) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 2 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.DealProposal (storagemarket.StorageDealProposal) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.DealProposal = new(storagemarket.StorageDealProposal) - if err := t.DealProposal.UnmarshalCBOR(br); err != nil { - return err - } - } - - } - // t.Piece (cid.Cid) (struct) - - { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.Piece: %w", err) - } - - t.Piece = c - - } - return nil -} - -func (t *Response) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{132}); err != nil { - return err - } - - // t.State (uint64) (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { - return err - } - - // t.Message (string) (string) - if len(t.Message) > cbg.MaxLength { - return xerrors.Errorf("Value in field t.Message was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Message)))); err != nil { - return err - } - if _, err := w.Write([]byte(t.Message)); err != nil { - return err - } - - // t.Proposal (cid.Cid) (struct) - - if err := cbg.WriteCid(w, t.Proposal); err != nil { - return xerrors.Errorf("failed to write cid field t.Proposal: %w", err) - } - - // t.PublishMessage (cid.Cid) (struct) - - if t.PublishMessage == nil { - if _, err := w.Write(cbg.CborNull); err != nil { - return err - } - } else { - if err := cbg.WriteCid(w, *t.PublishMessage); err != nil { - return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err) - } - } - - return nil -} - -func (t *Response) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 4 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.State (uint64) (uint64) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.State = uint64(extra) - // t.Message (string) (string) - - { - sval, err := cbg.ReadString(br) - if err != nil { - return err - } - - t.Message = string(sval) - } - // t.Proposal (cid.Cid) (struct) - - { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.Proposal: %w", err) - } - - t.Proposal = c - - } - // t.PublishMessage (cid.Cid) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err) - } - - t.PublishMessage = &c - } - - } - return nil -} - -func (t *SignedResponse) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{130}); err != nil { - return err - } - - // t.Response (storageimpl.Response) (struct) - if err := t.Response.MarshalCBOR(w); err != nil { - return err - } - - // t.Signature (types.Signature) (struct) - if err := t.Signature.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *SignedResponse) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 2 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Response (storageimpl.Response) (struct) - - { - - if err := t.Response.UnmarshalCBOR(br); err != nil { - return err - } - - } - // t.Signature (types.Signature) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.Signature = new(types.Signature) - if err := t.Signature.UnmarshalCBOR(br); err != nil { - return err - } - } - - } - return nil -} - func (t *StorageDataTransferVoucher) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) diff --git a/storagemarket/network/ask_stream.go b/storagemarket/network/ask_stream.go new file mode 100644 index 00000000..7cb5ea1a --- /dev/null +++ b/storagemarket/network/ask_stream.go @@ -0,0 +1,50 @@ +package network + +import ( + "github.com/libp2p/go-libp2p-core/mux" + "github.com/libp2p/go-libp2p-core/peer" + + cborutil "github.com/filecoin-project/go-cbor-util" +) + +type askStream struct { + p peer.ID + rw mux.MuxedStream +} + +var _ StorageAskStream = (*askStream)(nil) + +func (as *askStream) ReadAskRequest() (AskRequest, error) { + var a AskRequest + + if err := a.UnmarshalCBOR(as.rw); err != nil { + log.Warn(err) + return AskRequestUndefined, err + + } + + return a, nil +} + +func (as *askStream) WriteAskRequest(q AskRequest) error { + return cborutil.WriteCborRPC(as.rw, &q) +} + +func (as *askStream) ReadAskResponse() (AskResponse, error) { + var resp AskResponse + + if err := resp.UnmarshalCBOR(as.rw); err != nil { + log.Warn(err) + return AskResponseUndefined, err + } + + return resp, nil +} + +func (as *askStream) WriteAskResponse(qr AskResponse) error { + return cborutil.WriteCborRPC(as.rw, &qr) +} + +func (as *askStream) Close() error { + return as.rw.Close() +} diff --git a/storagemarket/network/deal_stream.go b/storagemarket/network/deal_stream.go new file mode 100644 index 00000000..d7b346f8 --- /dev/null +++ b/storagemarket/network/deal_stream.go @@ -0,0 +1,49 @@ +package network + +import ( + cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/libp2p/go-libp2p-core/mux" + "github.com/libp2p/go-libp2p-core/peer" +) + +type dealStream struct { + p peer.ID + rw mux.MuxedStream +} + +var _ StorageDealStream = (*dealStream)(nil) + +func (d *dealStream) ReadDealProposal() (Proposal, error) { + var ds Proposal + + if err := ds.UnmarshalCBOR(d.rw); err != nil { + log.Warn(err) + return ProposalUndefined, err + } + return ds, nil +} + +func (d *dealStream) WriteDealProposal(dp Proposal) error { + return cborutil.WriteCborRPC(d.rw, &dp) +} + +func (d *dealStream) ReadDealResponse() (SignedResponse, error) { + var dr SignedResponse + + if err := dr.UnmarshalCBOR(d.rw); err != nil { + return SignedResponseUndefined, err + } + return dr, nil +} + +func (d *dealStream) WriteDealResponse(dr SignedResponse) error { + return cborutil.WriteCborRPC(d.rw, &dr) +} + +func (d *dealStream) Close() error { + return d.rw.Close() +} + +func (d *dealStream) RemotePeer() peer.ID { + return d.p +} diff --git a/storagemarket/network/libp2p_impl.go b/storagemarket/network/libp2p_impl.go new file mode 100644 index 00000000..616075e6 --- /dev/null +++ b/storagemarket/network/libp2p_impl.go @@ -0,0 +1,79 @@ +package network + +import ( + "context" + + "github.com/filecoin-project/go-fil-markets/storagemarket" + logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" +) + +var log = logging.Logger("retrieval_network") + +// NewFromLibp2pHost builds a storage market network on top of libp2p +func NewFromLibp2pHost(h host.Host) StorageMarketNetwork { + return &libp2pStorageMarketNetwork{host: h} +} + +// libp2pStorageMarketNetwork transforms the libp2p host interface, which sends and receives +// NetMessage objects, into the graphsync network interface. +type libp2pStorageMarketNetwork struct { + host host.Host + // inbound messages from the network are forwarded to the receiver + receiver StorageReceiver +} + +func (impl *libp2pStorageMarketNetwork) NewAskStream(id peer.ID) (StorageAskStream, error) { + s, err := impl.host.NewStream(context.Background(), id, storagemarket.AskProtocolID) + if err != nil { + log.Warn(err) + return nil, err + } + return &askStream{p: id, rw: s}, nil +} + +func (impl *libp2pStorageMarketNetwork) NewDealStream(id peer.ID) (StorageDealStream, error) { + s, err := impl.host.NewStream(context.Background(), id, storagemarket.DealProtocolID) + if err != nil { + return nil, err + } + return &dealStream{p: id, rw: s}, nil +} + +func (impl *libp2pStorageMarketNetwork) SetDelegate(r StorageReceiver) error { + impl.receiver = r + impl.host.SetStreamHandler(storagemarket.DealProtocolID, impl.handleNewDealStream) + impl.host.SetStreamHandler(storagemarket.AskProtocolID, impl.handleNewAskStream) + return nil +} + +func (impl *libp2pStorageMarketNetwork) StopHandlingRequests() error { + impl.receiver = nil + impl.host.RemoveStreamHandler(storagemarket.DealProtocolID) + impl.host.RemoveStreamHandler(storagemarket.AskProtocolID) + return nil +} + +func (impl *libp2pStorageMarketNetwork) handleNewAskStream(s network.Stream) { + if impl.receiver == nil { + log.Warn("no receiver set") + s.Reset() // nolint: errcheck,gosec + return + } + remotePID := s.Conn().RemotePeer() + as := &askStream{remotePID, s} + impl.receiver.HandleAskStream(as) +} + +func (impl *libp2pStorageMarketNetwork) handleNewDealStream(s network.Stream) { + if impl.receiver == nil { + log.Warn("no receiver set") + s.Reset() // nolint: errcheck,gosec + return + } + remotePID := s.Conn().RemotePeer() + ds := &dealStream{remotePID, s} + impl.receiver.HandleDealStream(ds) +} diff --git a/storagemarket/network/libp2p_impl_test.go b/storagemarket/network/libp2p_impl_test.go new file mode 100644 index 00000000..f47f7d53 --- /dev/null +++ b/storagemarket/network/libp2p_impl_test.go @@ -0,0 +1,337 @@ +package network_test + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-fil-markets/shared_testutil" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" +) + +type testReceiver struct { + t *testing.T + dealStreamHandler func(network.StorageDealStream) + askStreamHandler func(network.StorageAskStream) +} + +func (tr *testReceiver) HandleDealStream(s network.StorageDealStream) { + defer s.Close() + if tr.dealStreamHandler != nil { + tr.dealStreamHandler(s) + } +} +func (tr *testReceiver) HandleAskStream(s network.StorageAskStream) { + defer s.Close() + if tr.askStreamHandler != nil { + tr.askStreamHandler(s) + } +} + +func TestAskStreamSendReceiveAskRequest(t *testing.T) { + ctx := context.Background() + td := shared_testutil.NewLibp2pTestData(ctx, t) + + fromNetwork := network.NewFromLibp2pHost(td.Host1) + toNetwork := network.NewFromLibp2pHost(td.Host2) + toHost := td.Host2.ID() + + // host1 gets no-op receiver + tr := &testReceiver{t: t} + require.NoError(t, fromNetwork.SetDelegate(tr)) + + // host2 gets receiver + achan := make(chan network.AskRequest) + tr2 := &testReceiver{t: t, askStreamHandler: func(s network.StorageAskStream) { + readq, err := s.ReadAskRequest() + require.NoError(t, err) + achan <- readq + }} + require.NoError(t, toNetwork.SetDelegate(tr2)) + + // setup query stream host1 --> host 2 + assertAskRequestReceived(ctx, t, fromNetwork, toHost, achan) +} + +func TestAskStreamSendReceiveAskResponse(t *testing.T) { + ctx := context.Background() + td := shared_testutil.NewLibp2pTestData(ctx, t) + fromNetwork := network.NewFromLibp2pHost(td.Host1) + toNetwork := network.NewFromLibp2pHost(td.Host2) + toHost := td.Host2.ID() + + // host1 gets no-op receiver + tr := &testReceiver{t: t} + require.NoError(t, fromNetwork.SetDelegate(tr)) + + // host2 gets receiver + achan := make(chan network.AskResponse) + tr2 := &testReceiver{t: t, askStreamHandler: func(s network.StorageAskStream) { + a, err := s.ReadAskResponse() + require.NoError(t, err) + achan <- a + }} + require.NoError(t, toNetwork.SetDelegate(tr2)) + + assertAskResponseReceived(ctx, t, fromNetwork, toHost, achan) + +} + +func TestAskStreamSendReceiveMultipleSuccessful(t *testing.T) { + // send query, read in handler, send response back, read response + ctxBg := context.Background() + td := shared_testutil.NewLibp2pTestData(ctxBg, t) + nw1 := network.NewFromLibp2pHost(td.Host1) + nw2 := network.NewFromLibp2pHost(td.Host2) + require.NoError(t, td.Host1.Connect(ctxBg, peer.AddrInfo{ID: td.Host2.ID()})) + + // host2 gets a query and sends a response + ar := shared_testutil.MakeTestStorageAskResponse() + done := make(chan bool) + tr2 := &testReceiver{t: t, askStreamHandler: func(s network.StorageAskStream) { + _, err := s.ReadAskRequest() + require.NoError(t, err) + + require.NoError(t, s.WriteAskResponse(ar)) + done <- true + }} + require.NoError(t, nw2.SetDelegate(tr2)) + + ctx, cancel := context.WithTimeout(ctxBg, 10*time.Second) + defer cancel() + + qs, err := nw1.NewAskStream(td.Host2.ID()) + require.NoError(t, err) + + var resp network.AskResponse + go require.NoError(t, qs.WriteAskRequest(shared_testutil.MakeTestStorageAskRequest())) + resp, err = qs.ReadAskResponse() + require.NoError(t, err) + + select { + case <-ctx.Done(): + t.Error("response not received") + case <-done: + } + + assert.Equal(t, ar, resp) +} + +func TestDealStreamSendReceiveDealProposal(t *testing.T) { + // send proposal, read in handler + ctx := context.Background() + td := shared_testutil.NewLibp2pTestData(ctx, t) + fromNetwork := network.NewFromLibp2pHost(td.Host1) + toNetwork := network.NewFromLibp2pHost(td.Host2) + toHost := td.Host2.ID() + + tr := &testReceiver{t: t} + require.NoError(t, fromNetwork.SetDelegate(tr)) + + dchan := make(chan network.Proposal) + tr2 := &testReceiver{ + t: t, + dealStreamHandler: func(s network.StorageDealStream) { + readD, err := s.ReadDealProposal() + require.NoError(t, err) + dchan <- readD + }, + } + require.NoError(t, toNetwork.SetDelegate(tr2)) + + assertDealProposalReceived(ctx, t, fromNetwork, toHost, dchan) +} + +func TestDealStreamSendReceiveDealResponse(t *testing.T) { + ctx := context.Background() + td := shared_testutil.NewLibp2pTestData(ctx, t) + fromNetwork := network.NewFromLibp2pHost(td.Host1) + toNetwork := network.NewFromLibp2pHost(td.Host2) + toPeer := td.Host2.ID() + + tr := &testReceiver{t: t} + require.NoError(t, fromNetwork.SetDelegate(tr)) + + drChan := make(chan network.SignedResponse) + tr2 := &testReceiver{ + t: t, + dealStreamHandler: func(s network.StorageDealStream) { + readDP, err := s.ReadDealResponse() + require.NoError(t, err) + drChan <- readDP + }, + } + require.NoError(t, toNetwork.SetDelegate(tr2)) + assertDealResponseReceived(ctx, t, fromNetwork, toPeer, drChan) +} + +func TestDealStreamSendReceiveMultipleSuccessful(t *testing.T) { + // send proposal, read in handler, send response back, + // read response, + + bgCtx := context.Background() + td := shared_testutil.NewLibp2pTestData(bgCtx, t) + fromNetwork := network.NewFromLibp2pHost(td.Host1) + toNetwork := network.NewFromLibp2pHost(td.Host2) + toPeer := td.Host2.ID() + + // set up stream handler, channels, and response + dr := shared_testutil.MakeTestStorageNetworkSignedResponse() + done := make(chan bool) + + tr2 := &testReceiver{t: t, dealStreamHandler: func(s network.StorageDealStream) { + _, err := s.ReadDealProposal() + require.NoError(t, err) + + require.NoError(t, s.WriteDealResponse(dr)) + done <- true + }} + require.NoError(t, toNetwork.SetDelegate(tr2)) + + // start sending deal proposal + ds1, err := fromNetwork.NewDealStream(toPeer) + require.NoError(t, err) + + dp := shared_testutil.MakeTestStorageNetworkProposal() + + ctx, cancel := context.WithTimeout(bgCtx, 10*time.Second) + defer cancel() + + // write proposal + require.NoError(t, ds1.WriteDealProposal(dp)) + + // read response and verify it's the one we told toNetwork to send + responseReceived, err := ds1.ReadDealResponse() + require.NoError(t, err) + assert.Equal(t, dr, responseReceived) + + select { + case <-ctx.Done(): + t.Errorf("failed to receive messages") + case <-done: + } + +} + +func TestLibp2pStorageMarketNetwork_StopHandlingRequests(t *testing.T) { + bgCtx := context.Background() + td := shared_testutil.NewLibp2pTestData(bgCtx, t) + + fromNetwork := network.NewFromLibp2pHost(td.Host1) + toNetwork := network.NewFromLibp2pHost(td.Host2) + toHost := td.Host2.ID() + + // host1 gets no-op receiver + tr := &testReceiver{t: t} + require.NoError(t, fromNetwork.SetDelegate(tr)) + + // host2 gets receiver + achan := make(chan network.AskRequest) + tr2 := &testReceiver{t: t, askStreamHandler: func(s network.StorageAskStream) { + readar, err := s.ReadAskRequest() + require.NoError(t, err) + achan <- readar + }} + require.NoError(t, toNetwork.SetDelegate(tr2)) + + require.NoError(t, toNetwork.StopHandlingRequests()) + + _, err := fromNetwork.NewAskStream(toHost) + require.Error(t, err, "protocol not supported") +} + +// assertDealProposalReceived performs the verification that a deal proposal is received +func assertDealProposalReceived(inCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toPeer peer.ID, inChan chan network.Proposal) { + ctx, cancel := context.WithTimeout(inCtx, 10*time.Second) + defer cancel() + + qs1, err := fromNetwork.NewDealStream(toPeer) + require.NoError(t, err) + + // send query to host2 + dp := shared_testutil.MakeTestStorageNetworkProposal() + require.NoError(t, qs1.WriteDealProposal(dp)) + + var dealReceived network.Proposal + select { + case <-ctx.Done(): + t.Error("deal proposal not received") + case dealReceived = <-inChan: + } + require.NotNil(t, dealReceived) + assert.Equal(t, dp, dealReceived) +} + +func assertDealResponseReceived(parentCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toPeer peer.ID, inChan chan network.SignedResponse) { + ctx, cancel := context.WithTimeout(parentCtx, 10*time.Second) + defer cancel() + + ds1, err := fromNetwork.NewDealStream(toPeer) + require.NoError(t, err) + + dr := shared_testutil.MakeTestStorageNetworkSignedResponse() + require.NoError(t, ds1.WriteDealResponse(dr)) + + var responseReceived network.SignedResponse + select { + case <-ctx.Done(): + t.Error("response not received") + case responseReceived = <-inChan: + } + require.NotNil(t, responseReceived) + assert.Equal(t, dr, responseReceived) +} + +// assertAskRequestReceived performs the verification that a AskRequest is received +func assertAskRequestReceived(inCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toHost peer.ID, achan chan network.AskRequest) { + ctx, cancel := context.WithTimeout(inCtx, 10*time.Second) + defer cancel() + + as1, err := fromNetwork.NewAskStream(toHost) + require.NoError(t, err) + + // send query to host2 + a := shared_testutil.MakeTestStorageAskRequest() + require.NoError(t, as1.WriteAskRequest(a)) + + var ina network.AskRequest + select { + case <-ctx.Done(): + t.Error("msg not received") + case ina = <-achan: + } + require.NotNil(t, ina) + assert.Equal(t, a.Miner, ina.Miner) +} + +// assertAskResponseReceived performs the verification that a AskResponse is received +func assertAskResponseReceived(inCtx context.Context, t *testing.T, + fromNetwork network.StorageMarketNetwork, + toHost peer.ID, + achan chan network.AskResponse) { + ctx, cancel := context.WithTimeout(inCtx, 10*time.Second) + defer cancel() + + // setup query stream host1 --> host 2 + as1, err := fromNetwork.NewAskStream(toHost) + require.NoError(t, err) + + // send queryresponse to host2 + ar := shared_testutil.MakeTestStorageAskResponse() + require.NoError(t, as1.WriteAskResponse(ar)) + + // read queryresponse + var inar network.AskResponse + select { + case <-ctx.Done(): + t.Error("msg not received") + case inar = <-achan: + } + + require.NotNil(t, inar) + assert.Equal(t, ar, inar) +} diff --git a/storagemarket/network/network.go b/storagemarket/network/network.go new file mode 100644 index 00000000..bc954380 --- /dev/null +++ b/storagemarket/network/network.go @@ -0,0 +1,41 @@ +package network + +import ( + "github.com/libp2p/go-libp2p-core/peer" +) + +// StorageAskStream is a stream for reading/writing requests & +// responses on the Storage Ask protocol +type StorageAskStream interface { + ReadAskRequest() (AskRequest, error) + WriteAskRequest(AskRequest) error + ReadAskResponse() (AskResponse, error) + WriteAskResponse(AskResponse) error + Close() error +} + +// StorageDealStream is a stream for reading and writing requests +// and responses on the storage deal protocol +type StorageDealStream interface { + ReadDealProposal() (Proposal, error) + WriteDealProposal(Proposal) error + ReadDealResponse() (SignedResponse, error) + WriteDealResponse(SignedResponse) error + RemotePeer() peer.ID + Close() error +} + +// StorageReceiver implements functions for receiving +// incoming data on storage protocols +type StorageReceiver interface { + HandleAskStream(StorageAskStream) + HandleDealStream(StorageDealStream) +} + +// StorageMarketNetwork is a network abstraction for the storage market +type StorageMarketNetwork interface { + NewAskStream(peer.ID) (StorageAskStream, error) + NewDealStream(peer.ID) (StorageDealStream, error) + SetDelegate(StorageReceiver) error + StopHandlingRequests() error +} diff --git a/storagemarket/network/types.go b/storagemarket/network/types.go new file mode 100644 index 00000000..f28597e7 --- /dev/null +++ b/storagemarket/network/types.go @@ -0,0 +1,68 @@ +package network + +import ( + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/go-address" + cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-fil-markets/shared/types" + "github.com/filecoin-project/go-fil-markets/storagemarket" +) + +//go:generate cbor-gen-for AskRequest AskResponse Proposal Response SignedResponse + +// Proposal is the data sent over the network from client to provider when proposing +// a deal +type Proposal struct { + DealProposal *storagemarket.StorageDealProposal + + Piece cid.Cid // Used for retrieving from the client +} + +var ProposalUndefined = Proposal{} + +// Response is a response to a proposal sent over the network +type Response struct { + State storagemarket.StorageDealStatus + + // DealProposalRejected + Message string + Proposal cid.Cid + + // StorageDealProposalAccepted + PublishMessage *cid.Cid +} + +// SignedResponse is a response that is signed +type SignedResponse struct { + Response Response + + Signature *types.Signature +} + +var SignedResponseUndefined = SignedResponse{} + +// Verify verifies that a proposal was signed by the given provider +func (r *SignedResponse) Verify(addr address.Address) error { + b, err := cborutil.Dump(&r.Response) + if err != nil { + return err + } + + return r.Signature.Verify(addr, b) +} + +// AskRequest is a request for current ask parameters for a given miner +type AskRequest struct { + Miner address.Address +} + +var AskRequestUndefined = AskRequest{} + +// AskResponse is the response sent over the network in response +// to an ask request +type AskResponse struct { + Ask *types.SignedStorageAsk +} + +var AskResponseUndefined = AskResponse{} diff --git a/storagemarket/network/types_cbor_gen.go b/storagemarket/network/types_cbor_gen.go new file mode 100644 index 00000000..a807d1ed --- /dev/null +++ b/storagemarket/network/types_cbor_gen.go @@ -0,0 +1,377 @@ +package network + +import ( + "fmt" + "io" + + "github.com/filecoin-project/go-fil-markets/shared/types" + "github.com/filecoin-project/go-fil-markets/storagemarket" + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" +) + +// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. + +var _ = xerrors.Errorf + +func (t *AskRequest) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{129}); err != nil { + return err + } + + // t.Miner (address.Address) (struct) + if err := t.Miner.MarshalCBOR(w); err != nil { + return err + } + return nil +} + +func (t *AskRequest) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 1 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Miner (address.Address) (struct) + + { + + if err := t.Miner.UnmarshalCBOR(br); err != nil { + return err + } + + } + return nil +} + +func (t *AskResponse) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{129}); err != nil { + return err + } + + // t.Ask (types.SignedStorageAsk) (struct) + if err := t.Ask.MarshalCBOR(w); err != nil { + return err + } + return nil +} + +func (t *AskResponse) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 1 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Ask (types.SignedStorageAsk) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.Ask = new(types.SignedStorageAsk) + if err := t.Ask.UnmarshalCBOR(br); err != nil { + return err + } + } + + } + return nil +} + +func (t *Proposal) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{130}); err != nil { + return err + } + + // t.DealProposal (storagemarket.StorageDealProposal) (struct) + if err := t.DealProposal.MarshalCBOR(w); err != nil { + return err + } + + // t.Piece (cid.Cid) (struct) + + if err := cbg.WriteCid(w, t.Piece); err != nil { + return xerrors.Errorf("failed to write cid field t.Piece: %w", err) + } + + return nil +} + +func (t *Proposal) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 2 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.DealProposal (storagemarket.StorageDealProposal) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.DealProposal = new(storagemarket.StorageDealProposal) + if err := t.DealProposal.UnmarshalCBOR(br); err != nil { + return err + } + } + + } + // t.Piece (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.Piece: %w", err) + } + + t.Piece = c + + } + return nil +} + +func (t *Response) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{132}); err != nil { + return err + } + + // t.State (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { + return err + } + + // t.Message (string) (string) + if len(t.Message) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Message was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Message)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.Message)); err != nil { + return err + } + + // t.Proposal (cid.Cid) (struct) + + if err := cbg.WriteCid(w, t.Proposal); err != nil { + return xerrors.Errorf("failed to write cid field t.Proposal: %w", err) + } + + // t.PublishMessage (cid.Cid) (struct) + + if t.PublishMessage == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.PublishMessage); err != nil { + return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err) + } + } + + return nil +} + +func (t *Response) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 4 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.State (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.State = uint64(extra) + // t.Message (string) (string) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.Message = string(sval) + } + // t.Proposal (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.Proposal: %w", err) + } + + t.Proposal = c + + } + // t.PublishMessage (cid.Cid) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err) + } + + t.PublishMessage = &c + } + + } + return nil +} + +func (t *SignedResponse) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{130}); err != nil { + return err + } + + // t.Response (network.Response) (struct) + if err := t.Response.MarshalCBOR(w); err != nil { + return err + } + + // t.Signature (types.Signature) (struct) + if err := t.Signature.MarshalCBOR(w); err != nil { + return err + } + return nil +} + +func (t *SignedResponse) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 2 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Response (network.Response) (struct) + + { + + if err := t.Response.UnmarshalCBOR(br); err != nil { + return err + } + + } + // t.Signature (types.Signature) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.Signature = new(types.Signature) + if err := t.Signature.UnmarshalCBOR(br); err != nil { + return err + } + } + + } + return nil +} diff --git a/storagemarket/types.go b/storagemarket/types.go index 96f8a0f1..3efe28f8 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -6,7 +6,6 @@ import ( "io" "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" xerrors "golang.org/x/xerrors" @@ -188,9 +187,9 @@ type ClientDeal struct { // The interface provided for storage providers type StorageProvider interface { - Run(ctx context.Context, host host.Host) + Start(ctx context.Context) error - Stop() + Stop() error AddAsk(price tokenamount.TokenAmount, ttlsecs int64) error