Skip to content

Commit

Permalink
Storagemarket/provider allows subscription to events (#202)
Browse files Browse the repository at this point in the history
* use @hannahhoward pubsub for subscribe events
* pass miner deal value, no goroutine, consume subscriber channel
  • Loading branch information
shannonwells committed Apr 24, 2020
1 parent bb08236 commit 7515137
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 5 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/filecoin-project/sector-storage v0.0.0-20200411000242-61616264b16d
github.com/filecoin-project/specs-actors v1.0.0
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ github.com/hannahhoward/cbor-gen-for v0.0.0-20191216214420-3e450425c40c h1:+MSf4
github.com/hannahhoward/cbor-gen-for v0.0.0-20191216214420-3e450425c40c/go.mod h1:WVPCl0HO/0RAL5+vBH2GMxBomlxBF70MAS78+Lu1//k=
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099 h1:vQqOW42RRM5LoM/1K5dK940VipLqpH8lEVGrMz+mNjU=
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099/go.mod h1:WVPCl0HO/0RAL5+vBH2GMxBomlxBF70MAS78+Lu1//k=
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e h1:3YKHER4nmd7b5qy5t0GWDTwSn4OyRgfAXSmo6VnryBY=
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e/go.mod h1:I8h3MITA53gN9OnWGCgaMa0JWVRdXthWw4M3CPM54OY=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
Expand Down
3 changes: 3 additions & 0 deletions shared/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ package shared

// TipSetToken is the implementation-nonspecific identity for a tipset.
type TipSetToken []byte

// Unsubscribe is a function that gets called to unsubscribe from (storage|retrieval)market events
type Unsubscribe func()
2 changes: 0 additions & 2 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/pieceio"
"github.com/filecoin-project/go-fil-markets/pieceio/cario"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
Expand All @@ -43,7 +42,6 @@ type Client struct {
// implementation, there's no validation or events on the client side
dataTransfer datatransfer.Manager
bs blockstore.Blockstore
fs filestore.FileStore
pio pieceio.PieceIO
discovery *discovery.Local

Expand Down
50 changes: 50 additions & 0 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-statemachine/fsm"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/filecoin-project/go-fil-markets/pieceio"
"github.com/filecoin-project/go-fil-markets/pieceio/cario"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/connmanager"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates"
Expand Down Expand Up @@ -49,6 +51,7 @@ type Provider struct {
dataTransfer datatransfer.Manager
universalRetrievalEnabled bool
dealAcceptanceBuffer abi.ChainEpoch
pubSub *pubsub.PubSub

deals fsm.Group
}
Expand Down Expand Up @@ -94,6 +97,7 @@ func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blo
actor: minerAddress,
dataTransfer: dataTransfer,
dealAcceptanceBuffer: DefaultDealAcceptanceBuffer,
pubSub: pubsub.New(dispatcher),
}

deals, err := fsm.New(namespace.Wrap(ds, datastore.NewKey(ProviderDsPrefix)), fsm.Parameters{
Expand All @@ -102,6 +106,7 @@ func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blo
StateKeyField: "State",
Events: providerstates.ProviderEvents,
StateEntryFuncs: providerstates.ProviderStateEntryFuncs,
Notifier: h.dispatch,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -298,6 +303,51 @@ func (p *Provider) UniversalRetrievalEnabled() bool {
return p.universalRetrievalEnabled
}

func (p *Provider) SubscribeToEvents(subscriber storagemarket.ProviderSubscriber) shared.Unsubscribe {
return shared.Unsubscribe(p.pubSub.Subscribe(subscriber))
}

// dispatch puts the fsm event into a form that pubSub can consume,
// then publishes the event
func (p *Provider) dispatch(eventName fsm.EventName, deal fsm.StateType) {
evt, ok := eventName.(storagemarket.ProviderEvent)
if !ok {
log.Errorf("dropped bad event %s", eventName)
}
realDeal, ok := deal.(storagemarket.MinerDeal)
if !ok {
log.Errorf("not a deal %v", deal)
}
pubSubEvt := internalEvent{evt, realDeal}

if err := p.pubSub.Publish(pubSubEvt); err != nil {
log.Errorf("failed to publish event %d", evt)
}
}

type internalEvent struct {
evt storagemarket.ProviderEvent
deal storagemarket.MinerDeal
}

func dispatcher(evt pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie, ok := evt.(internalEvent)
if !ok {
return xerrors.New("wrong type of event")
}
cb, ok := subscriberFn.(storagemarket.ProviderSubscriber)
if !ok {
return xerrors.New("wrong type of event")
}
log.Infof("dispatcher called with valid evt %d", ie.evt)
cb(ie.evt, ie.deal)
return nil
}

// -------
// providerDealEnvironment
// -------

type providerDealEnvironment struct {
p *Provider
}
Expand Down
45 changes: 42 additions & 3 deletions storagemarket/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package storagemarket_test
import (
"bytes"
"context"
"github.com/filecoin-project/go-fil-markets/pieceio"
"io/ioutil"
"reflect"
"testing"
Expand All @@ -22,6 +21,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/pieceio"
"github.com/filecoin-project/go-fil-markets/pieceio/cario"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery"
Expand Down Expand Up @@ -84,6 +84,13 @@ func TestMakeDeal(t *testing.T) {
)
assert.NoError(t, err)

// set up a subscriber
dealChan := make(chan storagemarket.MinerDeal)
subscriber := func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
dealChan <- deal
}
_ = provider.SubscribeToEvents(subscriber)

// set ask price where we'll accept any price
err = provider.AddAsk(big.NewInt(0), 50_000)
assert.NoError(t, err)
Expand Down Expand Up @@ -115,16 +122,48 @@ func TestMakeDeal(t *testing.T) {

time.Sleep(time.Millisecond * 100)

ctx, canc := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer canc()
var seenDeal storagemarket.MinerDeal
var actualStates []storagemarket.StorageDealStatus
for seenDeal.State != storagemarket.StorageDealCompleted {
select {
case seenDeal = <-dealChan:
actualStates = append(actualStates, seenDeal.State)
case <-ctx.Done():
t.Fatalf("never saw event")
}
}

expectedStates := []storagemarket.StorageDealStatus{
storagemarket.StorageDealValidating,
storagemarket.StorageDealProposalAccepted,
storagemarket.StorageDealTransferring,
storagemarket.StorageDealVerifyData,
storagemarket.StorageDealPublishing,
storagemarket.StorageDealStaged,
storagemarket.StorageDealSealing,
storagemarket.StorageDealActive,
storagemarket.StorageDealCompleted,
}
assert.Equal(t, expectedStates, actualStates)

// check a couple of things to make sure we're getting the whole deal
assert.Equal(t, td.Host1.ID(), seenDeal.Client)
assert.Empty(t, seenDeal.Message)
assert.Equal(t, proposalCid, seenDeal.ProposalCid)
assert.Equal(t, providerAddr, seenDeal.ClientDealProposal.Proposal.Provider)

cd, err := client.GetLocalDeal(ctx, proposalCid)
assert.NoError(t, err)
assert.Equal(t, cd.State, storagemarket.StorageDealActive)
assert.Equal(t, int(storagemarket.StorageDealActive), int(cd.State))

providerDeals, err := provider.ListLocalDeals()
assert.NoError(t, err)

pd := providerDeals[0]
assert.True(t, pd.ProposalCid.Equals(proposalCid))
assert.Equal(t, pd.State, storagemarket.StorageDealCompleted)
assert.Equal(t, int(storagemarket.StorageDealCompleted), int(pd.State))
}

func TestMakeDealOffline(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions storagemarket/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ type StorageDeal struct {
market.DealState
}

// Subscriber is a callback that is called when events are emitted
type ProviderSubscriber func(event ProviderEvent, deal MinerDeal)

// StorageProvider is the interface provided for storage providers
type StorageProvider interface {
Start(ctx context.Context) error
Expand All @@ -301,6 +304,8 @@ type StorageProvider interface {
GetStorageCollateral(ctx context.Context) (Balance, error)

ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader) error

SubscribeToEvents(subscriber ProviderSubscriber) shared.Unsubscribe
}

// Node dependencies for a StorageProvider
Expand Down

0 comments on commit 7515137

Please sign in to comment.