Skip to content

Commit

Permalink
Storage market network abstraction (#109)
Browse files Browse the repository at this point in the history
* feat(storagemarket): create network abstraction

build an abstraction layer for communicating with the network over storage protocols

* feat(storagemarket): integrate network abstraction

Replace direct references to libp2p host with network abstraction layer in client and provider

* fix(deps): go mod tidy
  • Loading branch information
hannahhoward committed Feb 5, 2020
1 parent 8425024 commit aa23f79
Show file tree
Hide file tree
Showing 17 changed files with 1,150 additions and 476 deletions.
83 changes: 83 additions & 0 deletions shared_testutil/test_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
"github.com/filecoin-project/go-fil-markets/shared/types"
"github.com/filecoin-project/go-fil-markets/storagemarket"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
)

// MakeTestSignedVoucher generates a random SignedVoucher that has all non-zero fields
Expand Down Expand Up @@ -110,6 +112,87 @@ func MakeTestDealPayment() retrievalmarket.DealPayment {
}
}

// MakeTestStorageDealProposal generates a valid storage deal proposal
func MakeTestStorageDealProposal() *storagemarket.StorageDealProposal {
return &storagemarket.StorageDealProposal{
PieceRef: RandomBytes(32),
PieceSize: rand.Uint64(),

Client: address.TestAddress,
Provider: address.TestAddress2,

ProposalExpiration: rand.Uint64(),
Duration: rand.Uint64(),

StoragePricePerEpoch: MakeTestTokenAmount(),
StorageCollateral: MakeTestTokenAmount(),

ProposerSignature: MakeTestSignature(),
}
}

// MakeTestStorageAsk generates a storage ask
func MakeTestStorageAsk() *types.StorageAsk {
return &types.StorageAsk{
Price: MakeTestTokenAmount(),
MinPieceSize: rand.Uint64(),
Miner: address.TestAddress2,
Timestamp: rand.Uint64(),
Expiry: rand.Uint64(),
SeqNo: rand.Uint64(),
}
}

// MakeTestSignedStorageAsk generates a signed storage ask
func MakeTestSignedStorageAsk() *types.SignedStorageAsk {
return &types.SignedStorageAsk{
Ask: MakeTestStorageAsk(),
Signature: MakeTestSignature(),
}
}

// MakeTestStorageNetworkProposal generates a proposal that can be sent over the
// network to a provider
func MakeTestStorageNetworkProposal() smnet.Proposal {
return smnet.Proposal{
DealProposal: MakeTestStorageDealProposal(),
Piece: GenerateCids(1)[0],
}
}

// MakeTestStorageNetworkResponse generates a response to a proposal sent over
// the network
func MakeTestStorageNetworkResponse() smnet.Response {
return smnet.Response{
State: storagemarket.StorageDealPublished,
Proposal: GenerateCids(1)[0],
PublishMessage: &(GenerateCids(1)[0]),
}
}

// MakeTestStorageNetworkSignedResponse generates a response to a proposal sent over
// the network that is signed
func MakeTestStorageNetworkSignedResponse() smnet.SignedResponse {
return smnet.SignedResponse{
Response: MakeTestStorageNetworkResponse(),
Signature: MakeTestSignature(),
}
}

// MakeTestStorageAskRequest generates a request to get a provider's ask
func MakeTestStorageAskRequest() smnet.AskRequest {
return smnet.AskRequest{
Miner: address.TestAddress2,
}
}

// MakeTestStorageAskResponse generates a response to an ask request
func MakeTestStorageAskResponse() smnet.AskResponse {
return smnet.AskResponse{
Ask: MakeTestSignedStorageAsk(),
}
}

func RequireGenerateRetrievalPeers(t *testing.T, numPeers int) []retrievalmarket.RetrievalPeer {
peers := make([]retrievalmarket.RetrievalPeer, numPeers)
for i := range peers {
Expand Down
41 changes: 17 additions & 24 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package storageimpl
import (
"context"

"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-data-transfer"

cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/pieceio"
"github.com/filecoin-project/go-fil-markets/pieceio/cario"
Expand All @@ -32,11 +32,11 @@ var log = logging.Logger("deals")
type ClientDeal struct {
storagemarket.ClientDeal

s inet.Stream
s network.StorageDealStream
}

type Client struct {
h host.Host
net network.StorageMarketNetwork

// dataTransfer
// TODO: once the data transfer module is complete, the
Expand All @@ -52,7 +52,7 @@ type Client struct {
node storagemarket.StorageClientNode

deals *statestore.StateStore
conns map[cid.Cid]inet.Stream
conns map[cid.Cid]network.StorageDealStream

incoming chan *ClientDeal
updated chan clientDealUpdate
Expand All @@ -68,20 +68,20 @@ type clientDealUpdate struct {
mut func(*ClientDeal)
}

func NewClient(h host.Host, bs blockstore.Blockstore, dataTransfer datatransfer.Manager, discovery *discovery.Local, deals *statestore.StateStore, scn storagemarket.StorageClientNode) *Client {
func NewClient(net network.StorageMarketNetwork, bs blockstore.Blockstore, dataTransfer datatransfer.Manager, discovery *discovery.Local, deals *statestore.StateStore, scn storagemarket.StorageClientNode) *Client {
carIO := cario.NewCarIO()
pio := pieceio.NewPieceIO(carIO, bs)

c := &Client{
h: h,
net: net,
dataTransfer: dataTransfer,
bs: bs,
pio: pio,
discovery: discovery,
node: scn,

deals: deals,
conns: map[cid.Cid]inet.Stream{},
conns: map[cid.Cid]network.StorageDealStream{},

incoming: make(chan *ClientDeal, 16),
updated: make(chan clientDealUpdate, 16),
Expand Down Expand Up @@ -213,18 +213,13 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
return cid.Undef, xerrors.Errorf("getting proposal node failed: %w", err)
}

s, err := c.h.NewStream(ctx, p.MinerID, storagemarket.DealProtocolID)
s, err := c.net.NewDealStream(p.MinerID)
if err != nil {
return cid.Undef, xerrors.Errorf("connecting to storage provider failed: %w", err)
}

proposal := &Proposal{
DealProposal: dealProposal,
Piece: p.Data,
}

if err := cborutil.WriteCborRPC(s, proposal); err != nil {
_ = s.Reset()
proposal := network.Proposal{DealProposal: dealProposal, Piece: p.Data}
if err := s.WriteDealProposal(proposal); err != nil {
return cid.Undef, xerrors.Errorf("sending proposal to storage provider failed: %w", err)
}

Expand All @@ -250,20 +245,18 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
}

func (c *Client) QueryAsk(ctx context.Context, p peer.ID, a address.Address) (*types.SignedStorageAsk, error) {
s, err := c.h.NewStream(ctx, p, storagemarket.AskProtocolID)
s, err := c.net.NewAskStream(p)
if err != nil {
return nil, xerrors.Errorf("failed to open stream to miner: %w", err)
}

req := &AskRequest{
Miner: a,
}
if err := cborutil.WriteCborRPC(s, req); err != nil {
request := network.AskRequest{Miner: a}
if err := s.WriteAskRequest(request); err != nil {
return nil, xerrors.Errorf("failed to send ask request: %w", err)
}

var out AskResponse
if err := cborutil.ReadCborRPC(s, &out); err != nil {
out, err := s.ReadAskResponse()
if err != nil {
return nil, xerrors.Errorf("failed to read ask response: %w", err)
}

Expand Down
12 changes: 6 additions & 6 deletions storagemarket/impl/client_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-data-transfer"
)

func (c *Client) failDeal(id cid.Cid, cerr error) {
Expand All @@ -26,7 +26,7 @@ func (c *Client) failDeal(id cid.Cid, cerr error) {

s, ok := c.conns[id]
if ok {
_ = s.Reset()
_ = s.Close()
delete(c.conns, id)
}

Expand All @@ -49,15 +49,15 @@ func (c *Client) commP(ctx context.Context, root cid.Cid) ([]byte, uint64, error
return commp[:], paddedSize, nil
}

func (c *Client) readStorageDealResp(deal ClientDeal) (*Response, error) {
func (c *Client) readStorageDealResp(deal ClientDeal) (*network.Response, error) {
s, ok := c.conns[deal.ProposalCid]
if !ok {
// TODO: Try to re-establish the connection using query protocol
return nil, xerrors.Errorf("no connection to miner")
}

var resp SignedResponse
if err := cborutil.ReadCborRPC(s, &resp); err != nil {
resp, err := s.ReadDealResponse()
if err != nil {
log.Errorw("failed to read Response message", "error", err)
return nil, err
}
Expand Down
33 changes: 19 additions & 14 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,20 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/pieceio"
"github.com/filecoin-project/go-fil-markets/pieceio/cario"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
"github.com/filecoin-project/go-fil-markets/shared/types"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-data-transfer"
)

var ProviderDsPrefix = "/deals/provider"
Expand All @@ -32,10 +31,12 @@ var ProviderDsPrefix = "/deals/provider"

type MinerDeal struct {
storagemarket.MinerDeal
s inet.Stream
s network.StorageDealStream
}

type Provider struct {
net network.StorageMarketNetwork

pricePerByteBlock tokenamount.TokenAmount // how much we want for storing one byte for one block
minPieceSize uint64

Expand All @@ -54,7 +55,7 @@ type Provider struct {
deals *statestore.StateStore
ds datastore.Batching

conns map[cid.Cid]inet.Stream
conns map[cid.Cid]network.StorageDealStream

actor address.Address

Expand All @@ -76,7 +77,7 @@ var (
ErrDataTransferFailed = errors.New("deal data transfer failed")
)

func NewProvider(ds datastore.Batching, bs blockstore.Blockstore, fs filestore.FileStore, pieceStore piecestore.PieceStore, dataTransfer datatransfer.Manager, spn storagemarket.StorageProviderNode, minerAddress address.Address) (storagemarket.StorageProvider, error) {
func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blockstore.Blockstore, fs filestore.FileStore, pieceStore piecestore.PieceStore, dataTransfer datatransfer.Manager, spn storagemarket.StorageProviderNode, minerAddress address.Address) (storagemarket.StorageProvider, error) {
carIO := cario.NewCarIO()
pio := pieceio.NewPieceIOWithStore(carIO, fs, bs)

Expand All @@ -90,7 +91,7 @@ func NewProvider(ds datastore.Batching, bs blockstore.Blockstore, fs filestore.F
pricePerByteBlock: tokenamount.FromInt(3), // TODO: allow setting
minPieceSize: 256, // TODO: allow setting (BUT KEEP MIN 256! (because of how we fill sectors up))

conns: map[cid.Cid]inet.Stream{},
conns: map[cid.Cid]network.StorageDealStream{},

incoming: make(chan MinerDeal),
updated: make(chan minerDealUpdate),
Expand Down Expand Up @@ -122,11 +123,13 @@ func NewProvider(ds datastore.Batching, bs blockstore.Blockstore, fs filestore.F
return h, nil
}

func (p *Provider) Run(ctx context.Context, host host.Host) {
func (p *Provider) Start(ctx context.Context) error {
// TODO: restore state

host.SetStreamHandler(storagemarket.DealProtocolID, p.HandleStream)
host.SetStreamHandler(storagemarket.AskProtocolID, p.HandleAskStream)
err := p.net.SetDelegate(p)
if err != nil {
return err
}

go func() {
defer log.Warn("quitting deal provider loop")
Expand All @@ -143,6 +146,7 @@ func (p *Provider) Run(ctx context.Context, host host.Host) {
}
}
}()
return nil
}

func (p *Provider) onIncoming(deal MinerDeal) {
Expand Down Expand Up @@ -243,15 +247,15 @@ func (p *Provider) onDataTransferEvent(event datatransfer.Event, channelState da
}
}

func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error) {
func (p *Provider) newDeal(s network.StorageDealStream, proposal network.Proposal) (MinerDeal, error) {
proposalNd, err := cborutil.AsIpld(proposal.DealProposal)
if err != nil {
return MinerDeal{}, err
}

return MinerDeal{
MinerDeal: storagemarket.MinerDeal{
Client: s.Conn().RemotePeer(),
Client: s.RemotePeer(),
Proposal: *proposal.DealProposal,
ProposalCid: proposalNd.Cid(),
State: storagemarket.StorageDealUnknown,
Expand All @@ -262,7 +266,7 @@ func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error)
}, nil
}

func (p *Provider) HandleStream(s inet.Stream) {
func (p *Provider) HandleDealStream(s network.StorageDealStream) {
log.Info("Handling storage deal proposal!")

proposal, err := p.readProposal(s)
Expand All @@ -282,7 +286,8 @@ func (p *Provider) HandleStream(s inet.Stream) {
p.incoming <- deal
}

func (p *Provider) Stop() {
func (p *Provider) Stop() error {
close(p.stop)
<-p.stopped
return p.net.StopHandlingRequests()
}

0 comments on commit aa23f79

Please sign in to comment.