Skip to content

Commit

Permalink
feat(storagemarket): create network abstraction
Browse files Browse the repository at this point in the history
build an abstraction layer for communicating with the network over storage protocols
  • Loading branch information
hannahhoward committed Feb 4, 2020
1 parent b40c0e5 commit c5fe28e
Show file tree
Hide file tree
Showing 16 changed files with 1,103 additions and 426 deletions.
83 changes: 83 additions & 0 deletions shared_testutil/test_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
"github.com/filecoin-project/go-fil-markets/shared/types"
"github.com/filecoin-project/go-fil-markets/storagemarket"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
)

// MakeTestSignedVoucher generates a random SignedVoucher that has all non-zero fields
Expand Down Expand Up @@ -110,6 +112,87 @@ func MakeTestDealPayment() retrievalmarket.DealPayment {
}
}

// MakeTestStorageDealProposal generates a valid storage deal proposal
func MakeTestStorageDealProposal() *storagemarket.StorageDealProposal {
return &storagemarket.StorageDealProposal{
PieceRef: RandomBytes(32),
PieceSize: rand.Uint64(),

Client: address.TestAddress,
Provider: address.TestAddress2,

ProposalExpiration: rand.Uint64(),
Duration: rand.Uint64(),

StoragePricePerEpoch: MakeTestTokenAmount(),
StorageCollateral: MakeTestTokenAmount(),

ProposerSignature: MakeTestSignature(),
}
}

// MakeTestStorageAsk generates a storage ask
func MakeTestStorageAsk() *types.StorageAsk {
return &types.StorageAsk{
Price: MakeTestTokenAmount(),
MinPieceSize: rand.Uint64(),
Miner: address.TestAddress2,
Timestamp: rand.Uint64(),
Expiry: rand.Uint64(),
SeqNo: rand.Uint64(),
}
}

// MakeTestSignedStorageAsk generates a signed storage ask
func MakeTestSignedStorageAsk() *types.SignedStorageAsk {
return &types.SignedStorageAsk{
Ask: MakeTestStorageAsk(),
Signature: MakeTestSignature(),
}
}

// MakeTestStorageNetworkProposal generates a proposal that can be sent over the
// network to a provider
func MakeTestStorageNetworkProposal() smnet.Proposal {
return smnet.Proposal{
DealProposal: MakeTestStorageDealProposal(),
Piece: GenerateCids(1)[0],
}
}

// MakeTestStorageNetworkResponse generates a response to a proposal sent over
// the network
func MakeTestStorageNetworkResponse() smnet.Response {
return smnet.Response{
State: storagemarket.StorageDealPublished,
Proposal: GenerateCids(1)[0],
PublishMessage: &(GenerateCids(1)[0]),
}
}

// MakeTestStorageNetworkSignedResponse generates a response to a proposal sent over
// the network that is signed
func MakeTestStorageNetworkSignedResponse() smnet.SignedResponse {
return smnet.SignedResponse{
Response: MakeTestStorageNetworkResponse(),
Signature: MakeTestSignature(),
}
}

// MakeTestStorageAskRequest generates a request to get a provider's ask
func MakeTestStorageAskRequest() smnet.AskRequest {
return smnet.AskRequest{
Miner: address.TestAddress2,
}
}

// MakeTestStorageAskResponse generates a response to an ask request
func MakeTestStorageAskResponse() smnet.AskResponse {
return smnet.AskResponse{
Ask: MakeTestSignedStorageAsk(),
}
}

func RequireGenerateRetrievalPeers(t *testing.T, numPeers int) []retrievalmarket.RetrievalPeer {
peers := make([]retrievalmarket.RetrievalPeer, numPeers)
for i := range peers {
Expand Down
8 changes: 5 additions & 3 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package storageimpl
import (
"context"

"github.com/filecoin-project/go-fil-markets/storagemarket/network"

cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
Expand Down Expand Up @@ -218,7 +220,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
return cid.Undef, xerrors.Errorf("connecting to storage provider failed: %w", err)
}

proposal := &Proposal{
proposal := &network.Proposal{
DealProposal: dealProposal,
Piece: p.Data,
}
Expand Down Expand Up @@ -255,14 +257,14 @@ func (c *Client) QueryAsk(ctx context.Context, p peer.ID, a address.Address) (*t
return nil, xerrors.Errorf("failed to open stream to miner: %w", err)
}

req := &AskRequest{
req := &network.AskRequest{
Miner: a,
}
if err := cborutil.WriteCborRPC(s, req); err != nil {
return nil, xerrors.Errorf("failed to send ask request: %w", err)
}

var out AskResponse
var out network.AskResponse
if err := cborutil.ReadCborRPC(s, &out); err != nil {
return nil, xerrors.Errorf("failed to read ask response: %w", err)
}
Expand Down
5 changes: 3 additions & 2 deletions storagemarket/impl/client_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-statestore"
)

Expand Down Expand Up @@ -49,14 +50,14 @@ func (c *Client) commP(ctx context.Context, root cid.Cid) ([]byte, uint64, error
return commp[:], paddedSize, nil
}

func (c *Client) readStorageDealResp(deal ClientDeal) (*Response, error) {
func (c *Client) readStorageDealResp(deal ClientDeal) (*network.Response, error) {
s, ok := c.conns[deal.ProposalCid]
if !ok {
// TODO: Try to re-establish the connection using query protocol
return nil, xerrors.Errorf("no connection to miner")
}

var resp SignedResponse
var resp network.SignedResponse
if err := cborutil.ReadCborRPC(s, &resp); err != nil {
log.Errorw("failed to read Response message", "error", err)
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
"github.com/filecoin-project/go-fil-markets/shared/types"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-statestore"
)

Expand Down Expand Up @@ -251,7 +252,7 @@ func (p *Provider) onDataTransferEvent(event datatransfer.Event, channelState da
}
}

func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error) {
func (p *Provider) newDeal(s inet.Stream, proposal network.Proposal) (MinerDeal, error) {
proposalNd, err := cborutil.AsIpld(proposal.DealProposal)
if err != nil {
return MinerDeal{}, err
Expand Down
9 changes: 5 additions & 4 deletions storagemarket/impl/provider_asks.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-cbor-util"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
"github.com/filecoin-project/go-fil-markets/shared/types"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
)

func (p *Provider) SetPrice(price tokenamount.TokenAmount, ttlsecs int64) error {
Expand Down Expand Up @@ -54,7 +55,7 @@ func (p *Provider) GetAsk(m address.Address) *types.SignedStorageAsk {

func (p *Provider) HandleAskStream(s inet.Stream) {
defer s.Close()
var ar AskRequest
var ar network.AskRequest
if err := cborutil.ReadCborRPC(s, &ar); err != nil {
log.Errorf("failed to read AskRequest from incoming stream: %s", err)
return
Expand All @@ -68,8 +69,8 @@ func (p *Provider) HandleAskStream(s inet.Stream) {
}
}

func (p *Provider) processAskRequest(ar *AskRequest) *AskResponse {
return &AskResponse{
func (p *Provider) processAskRequest(ar *network.AskRequest) *network.AskResponse {
return &network.AskResponse{
Ask: p.GetAsk(ar.Miner),
}
}
Expand Down
3 changes: 2 additions & 1 deletion storagemarket/impl/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
)

type providerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error)
Expand Down Expand Up @@ -149,7 +150,7 @@ func (p *Provider) publishing(ctx context.Context, deal MinerDeal) (func(*MinerD
return nil, err
}

err = p.sendSignedResponse(ctx, &Response{
err = p.sendSignedResponse(ctx, &network.Response{
State: storagemarket.StorageDealProposalAccepted,

Proposal: deal.ProposalCid,
Expand Down
12 changes: 6 additions & 6 deletions storagemarket/impl/provider_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"github.com/ipld/go-ipld-prime"

"github.com/filecoin-project/go-data-transfer"

"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"

"github.com/filecoin-project/go-cbor-util"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-statestore"

"github.com/ipfs/go-cid"
Expand All @@ -31,7 +31,7 @@ func (p *Provider) failDeal(ctx context.Context, id cid.Cid, cerr error) {

log.Warnf("deal %s failed: %s", id, cerr)

err := p.sendSignedResponse(ctx, &Response{
err := p.sendSignedResponse(ctx, &network.Response{
State: storagemarket.StorageDealFailing,
Message: cerr.Error(),
Proposal: id,
Expand All @@ -48,7 +48,7 @@ func (p *Provider) failDeal(ctx context.Context, id cid.Cid, cerr error) {
}
}

func (p *Provider) readProposal(s inet.Stream) (proposal Proposal, err error) {
func (p *Provider) readProposal(s inet.Stream) (proposal network.Proposal, err error) {
if err := cborutil.ReadCborRPC(s, &proposal); err != nil {
log.Errorw("failed to read proposal message", "error", err)
return proposal, err
Expand All @@ -70,7 +70,7 @@ func (p *Provider) readProposal(s inet.Stream) (proposal Proposal, err error) {
return
}

func (p *Provider) sendSignedResponse(ctx context.Context, resp *Response) error {
func (p *Provider) sendSignedResponse(ctx context.Context, resp *network.Response) error {
s, ok := p.conns[resp.Proposal]
if !ok {
return xerrors.New("couldn't send response: not connected")
Expand All @@ -91,7 +91,7 @@ func (p *Provider) sendSignedResponse(ctx context.Context, resp *Response) error
return xerrors.Errorf("failed to sign response message: %w", err)
}

signedResponse := &SignedResponse{
signedResponse := &network.SignedResponse{
Response: *resp,
Signature: sig,
}
Expand Down
46 changes: 1 addition & 45 deletions storagemarket/impl/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@ import (

"github.com/ipfs/go-cid"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-fil-markets/shared/types"
"github.com/filecoin-project/go-fil-markets/storagemarket"
)

//go:generate cbor-gen-for AskRequest AskResponse Proposal Response SignedResponse StorageDataTransferVoucher
//go:generate cbor-gen-for StorageDataTransferVoucher

var (
// ErrWrongVoucherType means the voucher was not the correct type can validate against
Expand Down Expand Up @@ -43,47 +40,6 @@ var (
DataTransferStates = []storagemarket.StorageDealStatus{storagemarket.StorageDealProposalAccepted, storagemarket.StorageDealUnknown}
)

type Proposal struct {
DealProposal *storagemarket.StorageDealProposal

Piece cid.Cid // Used for retrieving from the client
}

type Response struct {
State storagemarket.StorageDealStatus

// DealProposalRejected
Message string
Proposal cid.Cid

// StorageDealProposalAccepted
PublishMessage *cid.Cid
}

// TODO: Do we actually need this to be signed?
type SignedResponse struct {
Response Response

Signature *types.Signature
}

func (r *SignedResponse) Verify(addr address.Address) error {
b, err := cborutil.Dump(&r.Response)
if err != nil {
return err
}

return r.Signature.Verify(addr, b)
}

type AskRequest struct {
Miner address.Address
}

type AskResponse struct {
Ask *types.SignedStorageAsk
}

// StorageDataTransferVoucher is the voucher type for data transfers
// used by the storage market
type StorageDataTransferVoucher struct {
Expand Down
Loading

0 comments on commit c5fe28e

Please sign in to comment.