Skip to content

Commit

Permalink
feat: on disconnect miner shouldn't dial client
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Dec 15, 2020
1 parent aa8533f commit 5bf526e
Show file tree
Hide file tree
Showing 23 changed files with 335 additions and 208 deletions.
3 changes: 2 additions & 1 deletion docs/storageclient.mmd
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ stateDiagram-v2
16 --> 17 : ClientEventDataTransferInitiated
16 --> 17 : ClientEventDataTransferRestarted
28 --> 17 : ClientEventDataTransferRestarted
17 --> 11 : ClientEventDataTransferStalled
16 --> 11 : ClientEventDataTransferCancelled
17 --> 11 : ClientEventDataTransferCancelled
28 --> 11 : ClientEventDataTransferCancelled
Expand Down Expand Up @@ -78,7 +79,7 @@ stateDiagram-v2
note left of 11 : The following events only record in this state.<br><br>ClientEventFundsReleased


note left of 17 : The following events only record in this state.<br><br>ClientEventDataTransferRestarted<br>ClientEventDataTransferStalled
note left of 17 : The following events only record in this state.<br><br>ClientEventDataTransferRestarted


note left of 21 : The following events only record in this state.<br><br>ClientEventFundsReserved
Expand Down
Binary file modified docs/storageclient.mmd.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 3 additions & 3 deletions docs/storageclient.mmd.svg
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 7 additions & 3 deletions docs/storageprovider.mmd
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ stateDiagram-v2
state "StorageDealPublish" as 24
state "StorageDealPublishing" as 25
state "StorageDealError" as 26
state "StorageDealProviderTransferRestart" as 27
state "StorageDealProviderTransferAwaitRestart" as 27
state "StorageDealAwaitingPreCommit" as 29
4 : On entry runs HandoffDeal
5 : On entry runs VerifyDealActivated
Expand All @@ -33,7 +33,6 @@ stateDiagram-v2
22 : On entry runs WaitForFunding
24 : On entry runs PublishDeal
25 : On entry runs WaitForPublish
27 : On entry runs RestartDataTransfer
29 : On entry runs VerifyDealPreCommitted
[*] --> 0
note right of 0
Expand All @@ -50,14 +49,16 @@ stateDiagram-v2
14 --> 15 : ProviderEventDealDeciding
15 --> 18 : ProviderEventDataRequested
17 --> 11 : ProviderEventDataTransferFailed
27 --> 11 : ProviderEventDataTransferFailed
18 --> 17 : ProviderEventDataTransferInitiated
27 --> 11 : ProviderEventDataTransferRestartFailed
27 --> 17 : ProviderEventDataTransferInitiated
18 --> 17 : ProviderEventDataTransferRestarted
27 --> 17 : ProviderEventDataTransferRestarted
17 --> 11 : ProviderEventDataTransferCancelled
18 --> 11 : ProviderEventDataTransferCancelled
27 --> 11 : ProviderEventDataTransferCancelled
17 --> 19 : ProviderEventDataTransferCompleted
27 --> 19 : ProviderEventDataTransferCompleted
19 --> 11 : ProviderEventDataVerificationFailed
18 --> 20 : ProviderEventVerifiedData
19 --> 20 : ProviderEventVerifiedData
Expand Down Expand Up @@ -106,6 +107,9 @@ stateDiagram-v2

note left of 25 : The following events only record in this state.<br><br>ProviderEventFundsReleased


note left of 27 : The following events only record in this state.<br><br>ProviderEventDataTransferStalled

26 --> [*]
9 --> [*]
8 --> [*]
Binary file modified docs/storageprovider.mmd.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 3 additions & 3 deletions docs/storageprovider.mmd.svg
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/filecoin-project/go-address v0.0.3
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434
github.com/filecoin-project/go-data-transfer v1.2.3
github.com/filecoin-project/go-data-transfer v1.1.1-0.20201215145236-6e17350133e7
github.com/filecoin-project/go-ds-versioning v0.1.0
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
Expand All @@ -22,7 +22,7 @@ require (
github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-graphsync v0.5.0
github.com/ipfs/go-graphsync v0.5.2
github.com/ipfs/go-ipfs-blockstore v1.0.3
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMX
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer v1.0.1 h1:5sYKDbstyDsdJpVP4UGUW6+BgCNfgnH8hQgf0E3ZAno=
github.com/filecoin-project/go-data-transfer v1.0.1/go.mod h1:UxvfUAY9v3ub0a21BSK9u3pB2aq30Y0KMsG+w9/ysyo=
github.com/filecoin-project/go-data-transfer v1.2.3 h1:rM/HgGOOMsKvmeQjY7CVR3v7Orxf04LJSSczSpGlhg4=
github.com/filecoin-project/go-data-transfer v1.2.3/go.mod h1:ZAH51JZFR8NZC4FPiDPG+swjgui0q6zTMJbztc6pHhY=
github.com/filecoin-project/go-data-transfer v1.1.1-0.20201215145236-6e17350133e7 h1:23clpKmKnQFgQOF4gImTxZa62XYrQvjEr5OUrmK+VA0=
github.com/filecoin-project/go-data-transfer v1.1.1-0.20201215145236-6e17350133e7/go.mod h1:mvjZ+C3NkBX10JP4JMu27DCjUouHFjHwUGh+Xc4yvDA=
github.com/filecoin-project/go-ds-versioning v0.1.0 h1:y/X6UksYTsK8TLCI7rttCKEvl8btmWxyFMEeeWGUxIQ=
github.com/filecoin-project/go-ds-versioning v0.1.0/go.mod h1:mp16rb4i2QPmxBnmanUx8i/XANp+PFCCJWiAb+VW4/s=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1:GxJzR3oRIMTPtpZ0b7QF8FKPK6/iPAc7trhlL5k/g+s=
Expand Down Expand Up @@ -304,8 +304,8 @@ github.com/ipfs/go-graphsync v0.4.2 h1:Y/jt5r619yj0LI7OLtGKh4jYm8goYUcuJ09y7TZ3z
github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0=
github.com/ipfs/go-graphsync v0.4.3 h1:2t+oCpufufs1oqChoWiIK7V5uC1XCtf06PK9nqMV6pM=
github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY=
github.com/ipfs/go-graphsync v0.5.0 h1:iaByvxq88Ys1KcaQzTS1wmRhNsNEo3SaUiSGqTSbGmM=
github.com/ipfs/go-graphsync v0.5.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-graphsync v0.5.2 h1:USD+daaSC+7pLHCxROThSaF6SF7WYXF03sjrta0rCfA=
github.com/ipfs/go-graphsync v0.5.2/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk=
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
Expand Down
80 changes: 80 additions & 0 deletions shared/ready.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package shared

import (
"context"
"errors"
"sync"

"github.com/hannahhoward/go-pubsub"
)
Expand All @@ -22,3 +24,81 @@ func ReadyDispatcher(evt pubsub.Event, fn pubsub.SubscriberFn) error {
cb(migrateErr)
return nil
}

// ReadyManager managers listeners for a ready event
type ReadyManager struct {
ctx context.Context
Stop context.CancelFunc

lk sync.RWMutex
isReady bool
initErr error
pubsub *pubsub.PubSub
}

func NewReadyManager() *ReadyManager {
ctx, stop := context.WithCancel(context.Background())
return &ReadyManager{
ctx: ctx,
Stop: stop,
pubsub: pubsub.New(ReadyDispatcher),
}
}

// FireReady is called when the ready event occurs
func (m *ReadyManager) FireReady(err error) error {
m.lk.Lock()
defer m.lk.Unlock()

if m.isReady {
return nil
}

m.isReady = true
m.initErr = err
return m.pubsub.Publish(err)
}

// OnReady registers a listener for the ready event.
// If the event has already been fired, the callback is immediately called back
// (in a go-routine).
func (m *ReadyManager) OnReady(ready ReadyFunc) {
m.lk.Lock()
defer m.lk.Unlock()

if m.isReady {
initErr := m.initErr
go ready(initErr)
return
}

m.pubsub.Subscribe(ready)
}

// AwaitReady blocks until the ready event fires.
// Returns immediately if the event already fired.
func (m *ReadyManager) AwaitReady() error {
m.lk.RLock()
isReady := m.isReady
m.lk.RUnlock()

if isReady {
return m.initErr
}

errch := make(chan error)
m.OnReady(func(err error) {
select {
case <-m.ctx.Done():
errch <- m.ctx.Err()
case errch <- err:
}
})

select {
case <-m.ctx.Done():
return m.ctx.Err()
case err := <-errch:
return err
}
}
67 changes: 34 additions & 33 deletions storagemarket/dealstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ const (
// StorageDealError means the deal has failed due to an error, and no further updates will occur
StorageDealError

// StorageDealProviderTransferRestart means a storage deal data transfer from client to provider will be restarted
// by the provider
StorageDealProviderTransferRestart
// StorageDealProviderTransferAwaitRestart means the provider has restarted while data
// was being transferred from client to provider, and will wait for the client to
// resume the transfer
StorageDealProviderTransferAwaitRestart

// StorageDealClientTransferRestart means a storage deal data transfer from client to provider will be restarted
// by the client
Expand All @@ -109,34 +110,34 @@ const (

// DealStates maps StorageDealStatus codes to string names
var DealStates = map[StorageDealStatus]string{
StorageDealUnknown: "StorageDealUnknown",
StorageDealProposalNotFound: "StorageDealProposalNotFound",
StorageDealProposalRejected: "StorageDealProposalRejected",
StorageDealProposalAccepted: "StorageDealProposalAccepted",
StorageDealAcceptWait: "StorageDealAcceptWait",
StorageDealStartDataTransfer: "StorageDealStartDataTransfer",
StorageDealStaged: "StorageDealStaged",
StorageDealAwaitingPreCommit: "StorageDealAwaitingPreCommit",
StorageDealSealing: "StorageDealSealing",
StorageDealActive: "StorageDealActive",
StorageDealExpired: "StorageDealExpired",
StorageDealSlashed: "StorageDealSlashed",
StorageDealRejecting: "StorageDealRejecting",
StorageDealFailing: "StorageDealFailing",
StorageDealFundsReserved: "StorageDealFundsReserved",
StorageDealCheckForAcceptance: "StorageDealCheckForAcceptance",
StorageDealValidating: "StorageDealValidating",
StorageDealTransferring: "StorageDealTransferring",
StorageDealWaitingForData: "StorageDealWaitingForData",
StorageDealVerifyData: "StorageDealVerifyData",
StorageDealReserveProviderFunds: "StorageDealReserveProviderFunds",
StorageDealReserveClientFunds: "StorageDealReserveClientFunds",
StorageDealProviderFunding: "StorageDealProviderFunding",
StorageDealClientFunding: "StorageDealClientFunding",
StorageDealPublish: "StorageDealPublish",
StorageDealPublishing: "StorageDealPublishing",
StorageDealError: "StorageDealError",
StorageDealFinalizing: "StorageDealFinalizing",
StorageDealClientTransferRestart: "StorageDealClientTransferRestart",
StorageDealProviderTransferRestart: "StorageDealProviderTransferRestart",
StorageDealUnknown: "StorageDealUnknown",
StorageDealProposalNotFound: "StorageDealProposalNotFound",
StorageDealProposalRejected: "StorageDealProposalRejected",
StorageDealProposalAccepted: "StorageDealProposalAccepted",
StorageDealAcceptWait: "StorageDealAcceptWait",
StorageDealStartDataTransfer: "StorageDealStartDataTransfer",
StorageDealStaged: "StorageDealStaged",
StorageDealAwaitingPreCommit: "StorageDealAwaitingPreCommit",
StorageDealSealing: "StorageDealSealing",
StorageDealActive: "StorageDealActive",
StorageDealExpired: "StorageDealExpired",
StorageDealSlashed: "StorageDealSlashed",
StorageDealRejecting: "StorageDealRejecting",
StorageDealFailing: "StorageDealFailing",
StorageDealFundsReserved: "StorageDealFundsReserved",
StorageDealCheckForAcceptance: "StorageDealCheckForAcceptance",
StorageDealValidating: "StorageDealValidating",
StorageDealTransferring: "StorageDealTransferring",
StorageDealWaitingForData: "StorageDealWaitingForData",
StorageDealVerifyData: "StorageDealVerifyData",
StorageDealReserveProviderFunds: "StorageDealReserveProviderFunds",
StorageDealReserveClientFunds: "StorageDealReserveClientFunds",
StorageDealProviderFunding: "StorageDealProviderFunding",
StorageDealClientFunding: "StorageDealClientFunding",
StorageDealPublish: "StorageDealPublish",
StorageDealPublishing: "StorageDealPublishing",
StorageDealError: "StorageDealError",
StorageDealFinalizing: "StorageDealFinalizing",
StorageDealClientTransferRestart: "StorageDealClientTransferRestart",
StorageDealProviderTransferAwaitRestart: "StorageDealProviderTransferAwaitRestart",
}
1 change: 1 addition & 0 deletions storagemarket/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ const (
ProviderEventRestart

// ProviderEventDataTransferRestartFailed means a data transfer that was restarted by the provider failed
// Deprecated: this event is no longer used
ProviderEventDataTransferRestartFailed

// ProviderEventDataTransferStalled happens when the providers data transfer experiences a disconnect
Expand Down
10 changes: 6 additions & 4 deletions storagemarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ var ClientEvents = fsm.Events{
}),

fsm.Event(storagemarket.ClientEventDataTransferStalled).
From(storagemarket.StorageDealTransferring).ToJustRecord().Action(func(deal *storagemarket.ClientDeal) error {
deal.Message = "data transfer appears to be stalled. attempt restart"
return nil
}),
From(storagemarket.StorageDealTransferring).
To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("could not complete data transfer, could not connect to provider %s", deal.Miner).Error()
return nil
}),

fsm.Event(storagemarket.ClientEventDataTransferCancelled).
FromMany(
Expand Down
2 changes: 1 addition & 1 deletion storagemarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func ProposeDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storag

// RestartDataTransfer restarts a data transfer to the provider that was initiated earlier
func RestartDataTransfer(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {
log.Infof("restarting data transfer for deal deal %s", deal.ProposalCid)
log.Infof("restarting data transfer for deal %s", deal.ProposalCid)

if deal.TransferChannelID == nil {
return ctx.Trigger(storagemarket.ClientEventDataTransferRestartFailed, xerrors.New("channelId on client deal is nil"))
Expand Down
13 changes: 9 additions & 4 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type Provider struct {
universalRetrievalEnabled bool
customDealDeciderFunc DealDeciderFunc
pubSub *pubsub.PubSub
readySub *pubsub.PubSub
readyMgr *shared.ReadyManager

deals fsm.Group
migrateDeals func(context.Context) error
Expand Down Expand Up @@ -125,7 +125,7 @@ func NewProvider(net network.StorageMarketNetwork,
actor: minerAddress,
dataTransfer: dataTransfer,
pubSub: pubsub.New(providerDispatcher),
readySub: pubsub.New(shared.ReadyDispatcher),
readyMgr: shared.NewReadyManager(),
}
storageMigrations, err := migrations.ProviderMigrations.Build()
if err != nil {
Expand Down Expand Up @@ -178,7 +178,11 @@ func (p *Provider) Start(ctx context.Context) error {

// OnReady registers a listener for when the provider has finished starting up
func (p *Provider) OnReady(ready shared.ReadyFunc) {
p.readySub.Subscribe(ready)
p.readyMgr.OnReady(ready)
}

func (p *Provider) AwaitReady() error {
return p.readyMgr.AwaitReady()
}

/*
Expand Down Expand Up @@ -268,6 +272,7 @@ func (p *Provider) receiveDeal(s network.StorageDealStream) error {

// Stop terminates processing of deals on a StorageProvider
func (p *Provider) Stop() error {
p.readyMgr.Stop()
p.unsubDataTransfer()
err := p.deals.Stop(context.TODO())
if err != nil {
Expand Down Expand Up @@ -555,7 +560,7 @@ func (p *Provider) dispatch(eventName fsm.EventName, deal fsm.StateType) {

func (p *Provider) start(ctx context.Context) error {
err := p.migrateDeals(ctx)
publishErr := p.readySub.Publish(err)
publishErr := p.readyMgr.FireReady(err)
if publishErr != nil {
log.Warnf("Publish storage provider ready event: %s", err.Error())
}
Expand Down

0 comments on commit 5bf526e

Please sign in to comment.