Skip to content

Commit

Permalink
feat(storagemarket): integrate network abstraction
Browse files Browse the repository at this point in the history
Replace direct references to libp2p host with network abstraction layer in client and provider
  • Loading branch information
hannahhoward committed Feb 4, 2020
1 parent c5fe28e commit 1c2f98d
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 60 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ github.com/filecoin-project/go-sectorbuilder v0.0.1 h1:yiLSEprWA1E43DFTSCXLSuCst
github.com/filecoin-project/go-sectorbuilder v0.0.1/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/filecoin-project/lotus v0.2.7 h1:kMroa4l/F3fcQp1s0T4wIhV0w0RZ6PPeFioXb3Mpwkw=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJYCFOLkIBwI7xFExG03bbkOkCvUPI=
Expand Down
39 changes: 15 additions & 24 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@ import (
"context"

"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-data-transfer"

cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/pieceio"
"github.com/filecoin-project/go-fil-markets/pieceio/cario"
Expand All @@ -34,11 +32,11 @@ var log = logging.Logger("deals")
type ClientDeal struct {
storagemarket.ClientDeal

s inet.Stream
s network.StorageDealStream
}

type Client struct {
h host.Host
net network.StorageMarketNetwork

// dataTransfer
// TODO: once the data transfer module is complete, the
Expand All @@ -54,7 +52,7 @@ type Client struct {
node storagemarket.StorageClientNode

deals *statestore.StateStore
conns map[cid.Cid]inet.Stream
conns map[cid.Cid]network.StorageDealStream

incoming chan *ClientDeal
updated chan clientDealUpdate
Expand All @@ -70,20 +68,20 @@ type clientDealUpdate struct {
mut func(*ClientDeal)
}

func NewClient(h host.Host, bs blockstore.Blockstore, dataTransfer datatransfer.Manager, discovery *discovery.Local, deals *statestore.StateStore, scn storagemarket.StorageClientNode) *Client {
func NewClient(net network.StorageMarketNetwork, bs blockstore.Blockstore, dataTransfer datatransfer.Manager, discovery *discovery.Local, deals *statestore.StateStore, scn storagemarket.StorageClientNode) *Client {
carIO := cario.NewCarIO()
pio := pieceio.NewPieceIO(carIO, bs)

c := &Client{
h: h,
net: net,
dataTransfer: dataTransfer,
bs: bs,
pio: pio,
discovery: discovery,
node: scn,

deals: deals,
conns: map[cid.Cid]inet.Stream{},
conns: map[cid.Cid]network.StorageDealStream{},

incoming: make(chan *ClientDeal, 16),
updated: make(chan clientDealUpdate, 16),
Expand Down Expand Up @@ -215,18 +213,13 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
return cid.Undef, xerrors.Errorf("getting proposal node failed: %w", err)
}

s, err := c.h.NewStream(ctx, p.MinerID, storagemarket.DealProtocolID)
s, err := c.net.NewDealStream(p.MinerID)
if err != nil {
return cid.Undef, xerrors.Errorf("connecting to storage provider failed: %w", err)
}

proposal := &network.Proposal{
DealProposal: dealProposal,
Piece: p.Data,
}

if err := cborutil.WriteCborRPC(s, proposal); err != nil {
_ = s.Reset()
proposal := network.Proposal{DealProposal: dealProposal, Piece: p.Data}
if err := s.WriteDealProposal(proposal); err != nil {
return cid.Undef, xerrors.Errorf("sending proposal to storage provider failed: %w", err)
}

Expand All @@ -252,20 +245,18 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
}

func (c *Client) QueryAsk(ctx context.Context, p peer.ID, a address.Address) (*types.SignedStorageAsk, error) {
s, err := c.h.NewStream(ctx, p, storagemarket.AskProtocolID)
s, err := c.net.NewAskStream(p)
if err != nil {
return nil, xerrors.Errorf("failed to open stream to miner: %w", err)
}

req := &network.AskRequest{
Miner: a,
}
if err := cborutil.WriteCborRPC(s, req); err != nil {
request := network.AskRequest{Miner: a}
if err := s.WriteAskRequest(request); err != nil {
return nil, xerrors.Errorf("failed to send ask request: %w", err)
}

var out network.AskResponse
if err := cborutil.ReadCborRPC(s, &out); err != nil {
out, err := s.ReadAskResponse()
if err != nil {
return nil, xerrors.Errorf("failed to read ask response: %w", err)
}

Expand Down
9 changes: 4 additions & 5 deletions storagemarket/impl/client_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-data-transfer"
)

func (c *Client) failDeal(id cid.Cid, cerr error) {
Expand All @@ -27,7 +26,7 @@ func (c *Client) failDeal(id cid.Cid, cerr error) {

s, ok := c.conns[id]
if ok {
_ = s.Reset()
_ = s.Close()
delete(c.conns, id)
}

Expand Down Expand Up @@ -57,8 +56,8 @@ func (c *Client) readStorageDealResp(deal ClientDeal) (*network.Response, error)
return nil, xerrors.Errorf("no connection to miner")
}

var resp network.SignedResponse
if err := cborutil.ReadCborRPC(s, &resp); err != nil {
resp, err := s.ReadDealResponse()
if err != nil {
log.Errorw("failed to read Response message", "error", err)
return nil, err
}
Expand Down
32 changes: 18 additions & 14 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/pieceio"
"github.com/filecoin-project/go-fil-markets/pieceio/cario"
Expand All @@ -25,6 +22,7 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-data-transfer"
)

var ProviderDsPrefix = "/deals/provider"
Expand All @@ -33,10 +31,12 @@ var ProviderDsPrefix = "/deals/provider"

type MinerDeal struct {
storagemarket.MinerDeal
s inet.Stream
s network.StorageDealStream
}

type Provider struct {
net network.StorageMarketNetwork

pricePerByteBlock tokenamount.TokenAmount // how much we want for storing one byte for one block
minPieceSize uint64

Expand All @@ -55,7 +55,7 @@ type Provider struct {
deals *statestore.StateStore
ds datastore.Batching

conns map[cid.Cid]inet.Stream
conns map[cid.Cid]network.StorageDealStream

actor address.Address

Expand All @@ -77,7 +77,7 @@ var (
ErrDataTransferFailed = errors.New("deal data transfer failed")
)

func NewProvider(ds datastore.Batching, bs blockstore.Blockstore, fs filestore.FileStore, pieceStore piecestore.PieceStore, dataTransfer datatransfer.Manager, spn storagemarket.StorageProviderNode) (storagemarket.StorageProvider, error) {
func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blockstore.Blockstore, fs filestore.FileStore, pieceStore piecestore.PieceStore, dataTransfer datatransfer.Manager, spn storagemarket.StorageProviderNode) (storagemarket.StorageProvider, error) {
addr, err := ds.Get(datastore.NewKey("miner-address"))
if err != nil {
return nil, err
Expand All @@ -99,7 +99,7 @@ func NewProvider(ds datastore.Batching, bs blockstore.Blockstore, fs filestore.F
pricePerByteBlock: tokenamount.FromInt(3), // TODO: allow setting
minPieceSize: 256, // TODO: allow setting (BUT KEEP MIN 256! (because of how we fill sectors up))

conns: map[cid.Cid]inet.Stream{},
conns: map[cid.Cid]network.StorageDealStream{},

incoming: make(chan MinerDeal),
updated: make(chan minerDealUpdate),
Expand Down Expand Up @@ -131,11 +131,13 @@ func NewProvider(ds datastore.Batching, bs blockstore.Blockstore, fs filestore.F
return h, nil
}

func (p *Provider) Run(ctx context.Context, host host.Host) {
func (p *Provider) Start(ctx context.Context) error {
// TODO: restore state

host.SetStreamHandler(storagemarket.DealProtocolID, p.HandleStream)
host.SetStreamHandler(storagemarket.AskProtocolID, p.HandleAskStream)
err := p.net.SetDelegate(p)
if err != nil {
return err
}

go func() {
defer log.Warn("quitting deal provider loop")
Expand All @@ -152,6 +154,7 @@ func (p *Provider) Run(ctx context.Context, host host.Host) {
}
}
}()
return nil
}

func (p *Provider) onIncoming(deal MinerDeal) {
Expand Down Expand Up @@ -252,15 +255,15 @@ func (p *Provider) onDataTransferEvent(event datatransfer.Event, channelState da
}
}

func (p *Provider) newDeal(s inet.Stream, proposal network.Proposal) (MinerDeal, error) {
func (p *Provider) newDeal(s network.StorageDealStream, proposal network.Proposal) (MinerDeal, error) {
proposalNd, err := cborutil.AsIpld(proposal.DealProposal)
if err != nil {
return MinerDeal{}, err
}

return MinerDeal{
MinerDeal: storagemarket.MinerDeal{
Client: s.Conn().RemotePeer(),
Client: s.RemotePeer(),
Proposal: *proposal.DealProposal,
ProposalCid: proposalNd.Cid(),
State: storagemarket.StorageDealUnknown,
Expand All @@ -271,7 +274,7 @@ func (p *Provider) newDeal(s inet.Stream, proposal network.Proposal) (MinerDeal,
}, nil
}

func (p *Provider) HandleStream(s inet.Stream) {
func (p *Provider) HandleDealStream(s network.StorageDealStream) {
log.Info("Handling storage deal proposal!")

proposal, err := p.readProposal(s)
Expand All @@ -291,7 +294,8 @@ func (p *Provider) HandleStream(s inet.Stream) {
p.incoming <- deal
}

func (p *Provider) Stop() {
func (p *Provider) Stop() error {
close(p.stop)
<-p.stopped
return p.net.StopHandlingRequests()
}
13 changes: 6 additions & 7 deletions storagemarket/impl/provider_asks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/ipfs/go-datastore"
inet "github.com/libp2p/go-libp2p-core/network"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -53,24 +52,24 @@ func (p *Provider) GetAsk(m address.Address) *types.SignedStorageAsk {
return p.ask
}

func (p *Provider) HandleAskStream(s inet.Stream) {
func (p *Provider) HandleAskStream(s network.StorageAskStream) {
defer s.Close()
var ar network.AskRequest
if err := cborutil.ReadCborRPC(s, &ar); err != nil {
ar, err := s.ReadAskRequest()
if err != nil {
log.Errorf("failed to read AskRequest from incoming stream: %s", err)
return
}

resp := p.processAskRequest(&ar)

if err := cborutil.WriteCborRPC(s, resp); err != nil {
if err := s.WriteAskResponse(resp); err != nil {
log.Errorf("failed to write ask response: %s", err)
return
}
}

func (p *Provider) processAskRequest(ar *network.AskRequest) *network.AskResponse {
return &network.AskResponse{
func (p *Provider) processAskRequest(ar *network.AskRequest) network.AskResponse {
return network.AskResponse{
Ask: p.GetAsk(ar.Miner),
}
}
Expand Down
14 changes: 7 additions & 7 deletions storagemarket/impl/provider_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import (

"github.com/ipld/go-ipld-prime"

"github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-data-transfer"

cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-statestore"

"github.com/ipfs/go-cid"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
)
Expand All @@ -39,7 +38,7 @@ func (p *Provider) failDeal(ctx context.Context, id cid.Cid, cerr error) {

s, ok := p.conns[id]
if ok {
_ = s.Reset()
_ = s.Close()
delete(p.conns, id)
}

Expand All @@ -48,8 +47,9 @@ func (p *Provider) failDeal(ctx context.Context, id cid.Cid, cerr error) {
}
}

func (p *Provider) readProposal(s inet.Stream) (proposal network.Proposal, err error) {
if err := cborutil.ReadCborRPC(s, &proposal); err != nil {
func (p *Provider) readProposal(s network.StorageDealStream) (proposal network.Proposal, err error) {
proposal, err = s.ReadDealProposal()
if err != nil {
log.Errorw("failed to read proposal message", "error", err)
return proposal, err
}
Expand Down Expand Up @@ -91,12 +91,12 @@ func (p *Provider) sendSignedResponse(ctx context.Context, resp *network.Respons
return xerrors.Errorf("failed to sign response message: %w", err)
}

signedResponse := &network.SignedResponse{
signedResponse := network.SignedResponse{
Response: *resp,
Signature: sig,
}

err = cborutil.WriteCborRPC(s, signedResponse)
err = s.WriteDealResponse(signedResponse)
if err != nil {
// Assume client disconnected
s.Close()
Expand Down
4 changes: 4 additions & 0 deletions storagemarket/network/deal_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ func (d *dealStream) WriteDealResponse(dr SignedResponse) error {
func (d *dealStream) Close() error {
return d.rw.Close()
}

func (d *dealStream) RemotePeer() peer.ID {
return d.p
}
1 change: 1 addition & 0 deletions storagemarket/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type StorageDealStream interface {
WriteDealProposal(Proposal) error
ReadDealResponse() (SignedResponse, error)
WriteDealResponse(SignedResponse) error
RemotePeer() peer.ID
Close() error
}

Expand Down

0 comments on commit 1c2f98d

Please sign in to comment.