Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix restarts during data transfer for a retrieval deal #540

Merged
merged 5 commits into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/filecoin-project/go-address v0.0.3
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434
github.com/filecoin-project/go-data-transfer v1.1.1-0.20210420125303-a175afc88c43
github.com/filecoin-project/go-data-transfer v1.1.1-0.20210428151930-29bfef7e037e
github.com/filecoin-project/go-ds-versioning v0.1.0
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434/go
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer v1.0.1/go.mod h1:UxvfUAY9v3ub0a21BSK9u3pB2aq30Y0KMsG+w9/ysyo=
github.com/filecoin-project/go-data-transfer v1.1.1-0.20210420125303-a175afc88c43 h1:2uUG5i+5kRduLZFLjkYf2LqpvefKO4sbQPnFA3weGJU=
github.com/filecoin-project/go-data-transfer v1.1.1-0.20210420125303-a175afc88c43/go.mod h1:E3WW4mCEYwU2y65swPEajSZoFWFmfXt7uwGduoACZQc=
github.com/filecoin-project/go-data-transfer v1.1.1-0.20210428151930-29bfef7e037e h1:8sGyac9gEAPRUifBYQfEdNqPUS6S0p4Eh+D1ATDgmIw=
github.com/filecoin-project/go-data-transfer v1.1.1-0.20210428151930-29bfef7e037e/go.mod h1:E3WW4mCEYwU2y65swPEajSZoFWFmfXt7uwGduoACZQc=
github.com/filecoin-project/go-ds-versioning v0.1.0 h1:y/X6UksYTsK8TLCI7rttCKEvl8btmWxyFMEeeWGUxIQ=
github.com/filecoin-project/go-ds-versioning v0.1.0/go.mod h1:mp16rb4i2QPmxBnmanUx8i/XANp+PFCCJWiAb+VW4/s=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
Expand Down
3 changes: 2 additions & 1 deletion retrievalmarket/impl/provider_environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ type providerStoreGetter struct {

func (psg *providerStoreGetter) Get(otherPeer peer.ID, dealID retrievalmarket.DealID) (*multistore.Store, error) {
var deal retrievalmarket.ProviderDealState
err := psg.p.stateMachines.GetSync(context.TODO(), retrievalmarket.ProviderDealIdentifier{Receiver: otherPeer, DealID: dealID}, &deal)
provDealID := retrievalmarket.ProviderDealIdentifier{Receiver: otherPeer, DealID: dealID}
err := psg.p.stateMachines.Get(provDealID).Get(&deal)
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
Expand Down
14 changes: 12 additions & 2 deletions retrievalmarket/impl/requestvalidation/requestvalidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func (rv *ProviderRequestValidator) ValidatePull(isRestart bool, receiver peer.I
return response, err
}

// validatePull is called by the data provider when a new graphsync pull
// request is created. This can be the initial pull request or a new request
// created when the data transfer is restarted (eg after a connection failure).
// By default the graphsync request starts immediately sending data, unless
// validatePull returns ErrPause or the data-transfer has not yet started
// (because the provider is still unsealing the data).
func (rv *ProviderRequestValidator) validatePull(isRestart bool, receiver peer.ID, proposal *retrievalmarket.DealProposal, legacyProtocol bool, baseCid cid.Cid, selector ipld.Node) (*retrievalmarket.DealResponse, error) {
// Check the proposal CID matches
if proposal.PayloadCID != baseCid {
Expand All @@ -106,19 +112,21 @@ func (rv *ProviderRequestValidator) validatePull(isRestart bool, receiver peer.I
return nil, errors.New("incorrect selector for this proposal")
}

// If the validation is for a restart request, the state is already being
// tracked so no further action is required
// If the validation is for a restart request, return nil, which means
// the data-transfer should not be explicitly paused or resumed
if isRestart {
return nil, nil
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
}

// This is a new graphsync request (not a restart)
pds := retrievalmarket.ProviderDealState{
DealProposal: *proposal,
Receiver: receiver,
LegacyProtocol: legacyProtocol,
CurrentInterval: proposal.PaymentInterval,
}

// Decide whether to accept the deal
status, err := rv.acceptDeal(&pds)

response := retrievalmarket.DealResponse{
Expand All @@ -140,6 +148,8 @@ func (rv *ProviderRequestValidator) validatePull(isRestart bool, receiver peer.I
return nil, err
}

// Pause the data transfer while unsealing the data.
// The state machine will unpause the transfer when unsealing completes.
return &response, datatransfer.ErrPause
}

Expand Down
2 changes: 0 additions & 2 deletions retrievalmarket/impl/requestvalidation/revalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ func NewProviderRevalidator(env RevalidatorEnvironment) *ProviderRevalidator {
// a given channel ID with a retrieval deal, so that checks run for data sent
// on the channel
func (pr *ProviderRevalidator) TrackChannel(deal rm.ProviderDealState) {
// Sanity check
if deal.ChannelID == nil {
log.Errorf("cannot track deal %s: channel ID is nil", deal.ID)
return
}

Expand Down
40 changes: 35 additions & 5 deletions retrievalmarket/retrieval_restart_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ var noOpDelay = testnodes.DelayFakeCommonNode{}
// complete successfully.
func TestBounceConnectionDealTransferOngoing(t *testing.T) {
bgCtx := context.Background()
//logger.SetLogLevel("restart_test", "debug")
logger.SetLogLevel("restart_test", "debug")
//logger.SetLogLevel("dt-impl", "debug")
logger.SetLogLevel("dt-chanmon", "debug")
//logger.SetLogLevel("dt-chanmon", "debug")
//logger.SetLogLevel("dt_graphsync", "debug")
//logger.SetLogLevel("markets-rtvl", "debug")
//logger.SetLogLevel("markets-rtvl-reval", "debug")

Expand Down Expand Up @@ -177,7 +178,14 @@ func TestBounceConnectionDealTransferOngoing(t *testing.T) {
// and the deal will complete successfully.
func TestBounceConnectionDealTransferUnsealing(t *testing.T) {
bgCtx := context.Background()
logger.SetLogLevel("dt-chanmon", "debug")
//logger.SetLogLevel("dt-chanmon", "debug")
//logger.SetLogLevel("retrieval", "debug")
//logger.SetLogLevel("retrievalmarket_impl", "debug")
logger.SetLogLevel("restart_test", "debug")
//logger.SetLogLevel("markets-rtvl-reval", "debug")
//logger.SetLogLevel("graphsync", "debug")
//logger.SetLogLevel("gs-traversal", "debug")
//logger.SetLogLevel("gs-executor", "debug")

beforeRestoringConnection := true
afterRestoringConnection := !beforeRestoringConnection
Expand All @@ -195,6 +203,11 @@ func TestBounceConnectionDealTransferUnsealing(t *testing.T) {
for _, tc := range tcs {
tc := tc
t.Run(tc.name, func(t *testing.T) {
restartComplete := make(chan struct{})
onRestartComplete := func(_ datatransfer.ChannelID) {
close(restartComplete)
}

dtClientNetRetry := dtnet.RetryParameters(time.Second, time.Second, 5, 1)
restartConf := dtimpl.ChannelRestartConfig(channelmonitor.Config{
AcceptTimeout: 100 * time.Millisecond,
Expand All @@ -203,6 +216,7 @@ func TestBounceConnectionDealTransferUnsealing(t *testing.T) {
RestartDebounce: 100 * time.Millisecond,
MaxConsecutiveRestarts: 5,
CompleteTimeout: 100 * time.Millisecond,
OnRestartComplete: onRestartComplete,
})
td := shared_testutil.NewLibp2pTestData(bgCtx, t)
td.DTNet1 = dtnet.NewFromLibp2pHost(td.Host1, dtClientNetRetry)
Expand Down Expand Up @@ -249,18 +263,34 @@ func TestBounceConnectionDealTransferUnsealing(t *testing.T) {
rh.TestDataNet.MockNet.UnlinkPeers(clientHost, providerHost)

go func() {
// Simulate unsealing delay
time.Sleep(50 * time.Millisecond)

// If unsealing should finish before restoring the connection
if tc.finishUnseal == beforeRestoringConnection {
log.Debugf("resume unseal")
// Finish unsealing
log.Debugf("finish unseal")
rh.ProviderNode.FinishUnseal()
time.Sleep(20 * time.Millisecond)
}

// Restore the connection
log.Debugf("restoring connection")
rh.TestDataNet.MockNet.LinkPeers(clientHost, providerHost)

// If unsealing should finish after restoring the connection
if tc.finishUnseal == afterRestoringConnection {
log.Debugf("resume unseal")
// Wait for the Restart message to be sent and
// acknowledged
select {
case <-ctxTimeout.Done():
return
case <-restartComplete:
}

// Finish unsealing
time.Sleep(50 * time.Millisecond)
log.Debugf("finish unseal")
rh.ProviderNode.FinishUnseal()
}
}()
Expand Down