From 1c2f98d523f323b49a6f8bf86ee78e448143f4e2 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 4 Feb 2020 07:18:58 -0800 Subject: [PATCH] feat(storagemarket): integrate network abstraction Replace direct references to libp2p host with network abstraction layer in client and provider --- go.sum | 1 + storagemarket/impl/client.go | 39 +++++++++++----------------- storagemarket/impl/client_utils.go | 9 +++---- storagemarket/impl/provider.go | 32 +++++++++++++---------- storagemarket/impl/provider_asks.go | 13 +++++----- storagemarket/impl/provider_utils.go | 14 +++++----- storagemarket/network/deal_stream.go | 4 +++ storagemarket/network/network.go | 1 + storagemarket/types.go | 5 ++-- 9 files changed, 58 insertions(+), 60 deletions(-) diff --git a/go.sum b/go.sum index 360004a85..96bedf37f 100644 --- a/go.sum +++ b/go.sum @@ -74,6 +74,7 @@ github.com/filecoin-project/go-sectorbuilder v0.0.1 h1:yiLSEprWA1E43DFTSCXLSuCst github.com/filecoin-project/go-sectorbuilder v0.0.1/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= +github.com/filecoin-project/lotus v0.2.7 h1:kMroa4l/F3fcQp1s0T4wIhV0w0RZ6PPeFioXb3Mpwkw= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJYCFOLkIBwI7xFExG03bbkOkCvUPI= diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index 5b533699c..2c9258a07 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -4,18 +4,16 @@ import ( "context" "github.com/filecoin-project/go-fil-markets/storagemarket/network" + "github.com/filecoin-project/go-data-transfer" cborutil "github.com/filecoin-project/go-cbor-util" "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log/v2" - "github.com/libp2p/go-libp2p-core/host" - inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/pieceio" "github.com/filecoin-project/go-fil-markets/pieceio/cario" @@ -34,11 +32,11 @@ var log = logging.Logger("deals") type ClientDeal struct { storagemarket.ClientDeal - s inet.Stream + s network.StorageDealStream } type Client struct { - h host.Host + net network.StorageMarketNetwork // dataTransfer // TODO: once the data transfer module is complete, the @@ -54,7 +52,7 @@ type Client struct { node storagemarket.StorageClientNode deals *statestore.StateStore - conns map[cid.Cid]inet.Stream + conns map[cid.Cid]network.StorageDealStream incoming chan *ClientDeal updated chan clientDealUpdate @@ -70,12 +68,12 @@ type clientDealUpdate struct { mut func(*ClientDeal) } -func NewClient(h host.Host, bs blockstore.Blockstore, dataTransfer datatransfer.Manager, discovery *discovery.Local, deals *statestore.StateStore, scn storagemarket.StorageClientNode) *Client { +func NewClient(net network.StorageMarketNetwork, bs blockstore.Blockstore, dataTransfer datatransfer.Manager, discovery *discovery.Local, deals *statestore.StateStore, scn storagemarket.StorageClientNode) *Client { carIO := cario.NewCarIO() pio := pieceio.NewPieceIO(carIO, bs) c := &Client{ - h: h, + net: net, dataTransfer: dataTransfer, bs: bs, pio: pio, @@ -83,7 +81,7 @@ func NewClient(h host.Host, bs blockstore.Blockstore, dataTransfer datatransfer. node: scn, deals: deals, - conns: map[cid.Cid]inet.Stream{}, + conns: map[cid.Cid]network.StorageDealStream{}, incoming: make(chan *ClientDeal, 16), updated: make(chan clientDealUpdate, 16), @@ -215,18 +213,13 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro return cid.Undef, xerrors.Errorf("getting proposal node failed: %w", err) } - s, err := c.h.NewStream(ctx, p.MinerID, storagemarket.DealProtocolID) + s, err := c.net.NewDealStream(p.MinerID) if err != nil { return cid.Undef, xerrors.Errorf("connecting to storage provider failed: %w", err) } - proposal := &network.Proposal{ - DealProposal: dealProposal, - Piece: p.Data, - } - - if err := cborutil.WriteCborRPC(s, proposal); err != nil { - _ = s.Reset() + proposal := network.Proposal{DealProposal: dealProposal, Piece: p.Data} + if err := s.WriteDealProposal(proposal); err != nil { return cid.Undef, xerrors.Errorf("sending proposal to storage provider failed: %w", err) } @@ -252,20 +245,18 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro } func (c *Client) QueryAsk(ctx context.Context, p peer.ID, a address.Address) (*types.SignedStorageAsk, error) { - s, err := c.h.NewStream(ctx, p, storagemarket.AskProtocolID) + s, err := c.net.NewAskStream(p) if err != nil { return nil, xerrors.Errorf("failed to open stream to miner: %w", err) } - req := &network.AskRequest{ - Miner: a, - } - if err := cborutil.WriteCborRPC(s, req); err != nil { + request := network.AskRequest{Miner: a} + if err := s.WriteAskRequest(request); err != nil { return nil, xerrors.Errorf("failed to send ask request: %w", err) } - var out network.AskResponse - if err := cborutil.ReadCborRPC(s, &out); err != nil { + out, err := s.ReadAskResponse() + if err != nil { return nil, xerrors.Errorf("failed to read ask response: %w", err) } diff --git a/storagemarket/impl/client_utils.go b/storagemarket/impl/client_utils.go index f74e449af..9b8a6f439 100644 --- a/storagemarket/impl/client_utils.go +++ b/storagemarket/impl/client_utils.go @@ -13,10 +13,9 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" - cborutil "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-statestore" + "github.com/filecoin-project/go-data-transfer" ) func (c *Client) failDeal(id cid.Cid, cerr error) { @@ -27,7 +26,7 @@ func (c *Client) failDeal(id cid.Cid, cerr error) { s, ok := c.conns[id] if ok { - _ = s.Reset() + _ = s.Close() delete(c.conns, id) } @@ -57,8 +56,8 @@ func (c *Client) readStorageDealResp(deal ClientDeal) (*network.Response, error) return nil, xerrors.Errorf("no connection to miner") } - var resp network.SignedResponse - if err := cborutil.ReadCborRPC(s, &resp); err != nil { + resp, err := s.ReadDealResponse() + if err != nil { log.Errorw("failed to read Response message", "error", err) return nil, err } diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index 66d26cb49..fea5c9b1d 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -9,13 +9,10 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" blockstore "github.com/ipfs/go-ipfs-blockstore" - "github.com/libp2p/go-libp2p-core/host" - inet "github.com/libp2p/go-libp2p-core/network" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/pieceio" "github.com/filecoin-project/go-fil-markets/pieceio/cario" @@ -25,6 +22,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-statestore" + "github.com/filecoin-project/go-data-transfer" ) var ProviderDsPrefix = "/deals/provider" @@ -33,10 +31,12 @@ var ProviderDsPrefix = "/deals/provider" type MinerDeal struct { storagemarket.MinerDeal - s inet.Stream + s network.StorageDealStream } type Provider struct { + net network.StorageMarketNetwork + pricePerByteBlock tokenamount.TokenAmount // how much we want for storing one byte for one block minPieceSize uint64 @@ -55,7 +55,7 @@ type Provider struct { deals *statestore.StateStore ds datastore.Batching - conns map[cid.Cid]inet.Stream + conns map[cid.Cid]network.StorageDealStream actor address.Address @@ -77,7 +77,7 @@ var ( ErrDataTransferFailed = errors.New("deal data transfer failed") ) -func NewProvider(ds datastore.Batching, bs blockstore.Blockstore, fs filestore.FileStore, pieceStore piecestore.PieceStore, dataTransfer datatransfer.Manager, spn storagemarket.StorageProviderNode) (storagemarket.StorageProvider, error) { +func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blockstore.Blockstore, fs filestore.FileStore, pieceStore piecestore.PieceStore, dataTransfer datatransfer.Manager, spn storagemarket.StorageProviderNode) (storagemarket.StorageProvider, error) { addr, err := ds.Get(datastore.NewKey("miner-address")) if err != nil { return nil, err @@ -99,7 +99,7 @@ func NewProvider(ds datastore.Batching, bs blockstore.Blockstore, fs filestore.F pricePerByteBlock: tokenamount.FromInt(3), // TODO: allow setting minPieceSize: 256, // TODO: allow setting (BUT KEEP MIN 256! (because of how we fill sectors up)) - conns: map[cid.Cid]inet.Stream{}, + conns: map[cid.Cid]network.StorageDealStream{}, incoming: make(chan MinerDeal), updated: make(chan minerDealUpdate), @@ -131,11 +131,13 @@ func NewProvider(ds datastore.Batching, bs blockstore.Blockstore, fs filestore.F return h, nil } -func (p *Provider) Run(ctx context.Context, host host.Host) { +func (p *Provider) Start(ctx context.Context) error { // TODO: restore state - host.SetStreamHandler(storagemarket.DealProtocolID, p.HandleStream) - host.SetStreamHandler(storagemarket.AskProtocolID, p.HandleAskStream) + err := p.net.SetDelegate(p) + if err != nil { + return err + } go func() { defer log.Warn("quitting deal provider loop") @@ -152,6 +154,7 @@ func (p *Provider) Run(ctx context.Context, host host.Host) { } } }() + return nil } func (p *Provider) onIncoming(deal MinerDeal) { @@ -252,7 +255,7 @@ func (p *Provider) onDataTransferEvent(event datatransfer.Event, channelState da } } -func (p *Provider) newDeal(s inet.Stream, proposal network.Proposal) (MinerDeal, error) { +func (p *Provider) newDeal(s network.StorageDealStream, proposal network.Proposal) (MinerDeal, error) { proposalNd, err := cborutil.AsIpld(proposal.DealProposal) if err != nil { return MinerDeal{}, err @@ -260,7 +263,7 @@ func (p *Provider) newDeal(s inet.Stream, proposal network.Proposal) (MinerDeal, return MinerDeal{ MinerDeal: storagemarket.MinerDeal{ - Client: s.Conn().RemotePeer(), + Client: s.RemotePeer(), Proposal: *proposal.DealProposal, ProposalCid: proposalNd.Cid(), State: storagemarket.StorageDealUnknown, @@ -271,7 +274,7 @@ func (p *Provider) newDeal(s inet.Stream, proposal network.Proposal) (MinerDeal, }, nil } -func (p *Provider) HandleStream(s inet.Stream) { +func (p *Provider) HandleDealStream(s network.StorageDealStream) { log.Info("Handling storage deal proposal!") proposal, err := p.readProposal(s) @@ -291,7 +294,8 @@ func (p *Provider) HandleStream(s inet.Stream) { p.incoming <- deal } -func (p *Provider) Stop() { +func (p *Provider) Stop() error { close(p.stop) <-p.stopped + return p.net.StopHandlingRequests() } diff --git a/storagemarket/impl/provider_asks.go b/storagemarket/impl/provider_asks.go index f37b5eab3..bfc24bdde 100644 --- a/storagemarket/impl/provider_asks.go +++ b/storagemarket/impl/provider_asks.go @@ -6,7 +6,6 @@ import ( "time" "github.com/ipfs/go-datastore" - inet "github.com/libp2p/go-libp2p-core/network" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -53,24 +52,24 @@ func (p *Provider) GetAsk(m address.Address) *types.SignedStorageAsk { return p.ask } -func (p *Provider) HandleAskStream(s inet.Stream) { +func (p *Provider) HandleAskStream(s network.StorageAskStream) { defer s.Close() - var ar network.AskRequest - if err := cborutil.ReadCborRPC(s, &ar); err != nil { + ar, err := s.ReadAskRequest() + if err != nil { log.Errorf("failed to read AskRequest from incoming stream: %s", err) return } resp := p.processAskRequest(&ar) - if err := cborutil.WriteCborRPC(s, resp); err != nil { + if err := s.WriteAskResponse(resp); err != nil { log.Errorf("failed to write ask response: %s", err) return } } -func (p *Provider) processAskRequest(ar *network.AskRequest) *network.AskResponse { - return &network.AskResponse{ +func (p *Provider) processAskRequest(ar *network.AskRequest) network.AskResponse { + return network.AskResponse{ Ask: p.GetAsk(ar.Miner), } } diff --git a/storagemarket/impl/provider_utils.go b/storagemarket/impl/provider_utils.go index 0b60782a8..a7e8c2b0c 100644 --- a/storagemarket/impl/provider_utils.go +++ b/storagemarket/impl/provider_utils.go @@ -6,15 +6,14 @@ import ( "github.com/ipld/go-ipld-prime" - "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/network" + "github.com/filecoin-project/go-data-transfer" cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-statestore" "github.com/ipfs/go-cid" - inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" ) @@ -39,7 +38,7 @@ func (p *Provider) failDeal(ctx context.Context, id cid.Cid, cerr error) { s, ok := p.conns[id] if ok { - _ = s.Reset() + _ = s.Close() delete(p.conns, id) } @@ -48,8 +47,9 @@ func (p *Provider) failDeal(ctx context.Context, id cid.Cid, cerr error) { } } -func (p *Provider) readProposal(s inet.Stream) (proposal network.Proposal, err error) { - if err := cborutil.ReadCborRPC(s, &proposal); err != nil { +func (p *Provider) readProposal(s network.StorageDealStream) (proposal network.Proposal, err error) { + proposal, err = s.ReadDealProposal() + if err != nil { log.Errorw("failed to read proposal message", "error", err) return proposal, err } @@ -91,12 +91,12 @@ func (p *Provider) sendSignedResponse(ctx context.Context, resp *network.Respons return xerrors.Errorf("failed to sign response message: %w", err) } - signedResponse := &network.SignedResponse{ + signedResponse := network.SignedResponse{ Response: *resp, Signature: sig, } - err = cborutil.WriteCborRPC(s, signedResponse) + err = s.WriteDealResponse(signedResponse) if err != nil { // Assume client disconnected s.Close() diff --git a/storagemarket/network/deal_stream.go b/storagemarket/network/deal_stream.go index ef90f795a..d7b346f87 100644 --- a/storagemarket/network/deal_stream.go +++ b/storagemarket/network/deal_stream.go @@ -43,3 +43,7 @@ func (d *dealStream) WriteDealResponse(dr SignedResponse) error { func (d *dealStream) Close() error { return d.rw.Close() } + +func (d *dealStream) RemotePeer() peer.ID { + return d.p +} diff --git a/storagemarket/network/network.go b/storagemarket/network/network.go index a7ce9b8dd..bc9543808 100644 --- a/storagemarket/network/network.go +++ b/storagemarket/network/network.go @@ -21,6 +21,7 @@ type StorageDealStream interface { WriteDealProposal(Proposal) error ReadDealResponse() (SignedResponse, error) WriteDealResponse(SignedResponse) error + RemotePeer() peer.ID Close() error } diff --git a/storagemarket/types.go b/storagemarket/types.go index 3ab6161bb..4c3342d02 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -6,7 +6,6 @@ import ( "io" "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" xerrors "golang.org/x/xerrors" @@ -189,9 +188,9 @@ type ClientDeal struct { // The interface provided for storage providers type StorageProvider interface { - Run(ctx context.Context, host host.Host) + Start(ctx context.Context) error - Stop() + Stop() error AddAsk(price tokenamount.TokenAmount, ttlsecs int64) error