Skip to content

Commit

Permalink
fix: handle restart during unsealing
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Apr 28, 2021
1 parent fb588a0 commit f044f4a
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 10 deletions.
3 changes: 2 additions & 1 deletion retrievalmarket/impl/provider_environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ type providerStoreGetter struct {

func (psg *providerStoreGetter) Get(otherPeer peer.ID, dealID retrievalmarket.DealID) (*multistore.Store, error) {
var deal retrievalmarket.ProviderDealState
err := psg.p.stateMachines.GetSync(context.TODO(), retrievalmarket.ProviderDealIdentifier{Receiver: otherPeer, DealID: dealID}, &deal)
provDealID := retrievalmarket.ProviderDealIdentifier{Receiver: otherPeer, DealID: dealID}
err := psg.p.stateMachines.Get(provDealID).Get(&deal)
if err != nil {
return nil, err
}
Expand Down
14 changes: 12 additions & 2 deletions retrievalmarket/impl/requestvalidation/requestvalidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func (rv *ProviderRequestValidator) ValidatePull(isRestart bool, receiver peer.I
return response, err
}

// validatePull is called by the data provider when a new graphsync pull
// request is created. This can be the initial pull request or a new request
// created when the data transfer is restarted (eg after a connection failure).
// By default the graphsync request starts immediately sending data, unless
// validatePull returns ErrPause or the data-transfer has not yet started
// (because the provider is still unsealing the data).
func (rv *ProviderRequestValidator) validatePull(isRestart bool, receiver peer.ID, proposal *retrievalmarket.DealProposal, legacyProtocol bool, baseCid cid.Cid, selector ipld.Node) (*retrievalmarket.DealResponse, error) {
// Check the proposal CID matches
if proposal.PayloadCID != baseCid {
Expand All @@ -106,19 +112,21 @@ func (rv *ProviderRequestValidator) validatePull(isRestart bool, receiver peer.I
return nil, errors.New("incorrect selector for this proposal")
}

// If the validation is for a restart request, the state is already being
// tracked so no further action is required
// If the validation is for a restart request, return nil, which means
// the data-transfer should not be explicitly paused or resumed
if isRestart {
return nil, nil
}

// This is a new graphsync request (not a restart)
pds := retrievalmarket.ProviderDealState{
DealProposal: *proposal,
Receiver: receiver,
LegacyProtocol: legacyProtocol,
CurrentInterval: proposal.PaymentInterval,
}

// Decide whether to accept the deal
status, err := rv.acceptDeal(&pds)

response := retrievalmarket.DealResponse{
Expand All @@ -140,6 +148,8 @@ func (rv *ProviderRequestValidator) validatePull(isRestart bool, receiver peer.I
return nil, err
}

// Pause the data transfer while unsealing the data.
// The state machine will unpause the transfer when unsealing completes.
return &response, datatransfer.ErrPause
}

Expand Down
2 changes: 0 additions & 2 deletions retrievalmarket/impl/requestvalidation/revalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ func NewProviderRevalidator(env RevalidatorEnvironment) *ProviderRevalidator {
// a given channel ID with a retrieval deal, so that checks run for data sent
// on the channel
func (pr *ProviderRevalidator) TrackChannel(deal rm.ProviderDealState) {
// Sanity check
if deal.ChannelID == nil {
log.Errorf("cannot track deal %s: channel ID is nil", deal.ID)
return
}

Expand Down
40 changes: 35 additions & 5 deletions retrievalmarket/retrieval_restart_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ var noOpDelay = testnodes.DelayFakeCommonNode{}
// complete successfully.
func TestBounceConnectionDealTransferOngoing(t *testing.T) {
bgCtx := context.Background()
//logger.SetLogLevel("restart_test", "debug")
logger.SetLogLevel("restart_test", "debug")
//logger.SetLogLevel("dt-impl", "debug")
logger.SetLogLevel("dt-chanmon", "debug")
//logger.SetLogLevel("dt-chanmon", "debug")
//logger.SetLogLevel("dt_graphsync", "debug")
//logger.SetLogLevel("markets-rtvl", "debug")
//logger.SetLogLevel("markets-rtvl-reval", "debug")

Expand Down Expand Up @@ -177,7 +178,14 @@ func TestBounceConnectionDealTransferOngoing(t *testing.T) {
// and the deal will complete successfully.
func TestBounceConnectionDealTransferUnsealing(t *testing.T) {
bgCtx := context.Background()
logger.SetLogLevel("dt-chanmon", "debug")
//logger.SetLogLevel("dt-chanmon", "debug")
//logger.SetLogLevel("retrieval", "debug")
//logger.SetLogLevel("retrievalmarket_impl", "debug")
logger.SetLogLevel("restart_test", "debug")
//logger.SetLogLevel("markets-rtvl-reval", "debug")
//logger.SetLogLevel("graphsync", "debug")
//logger.SetLogLevel("gs-traversal", "debug")
//logger.SetLogLevel("gs-executor", "debug")

beforeRestoringConnection := true
afterRestoringConnection := !beforeRestoringConnection
Expand All @@ -195,6 +203,11 @@ func TestBounceConnectionDealTransferUnsealing(t *testing.T) {
for _, tc := range tcs {
tc := tc
t.Run(tc.name, func(t *testing.T) {
restartComplete := make(chan struct{})
onRestartComplete := func(_ datatransfer.ChannelID) {
close(restartComplete)
}

dtClientNetRetry := dtnet.RetryParameters(time.Second, time.Second, 5, 1)
restartConf := dtimpl.ChannelRestartConfig(channelmonitor.Config{
AcceptTimeout: 100 * time.Millisecond,
Expand All @@ -203,6 +216,7 @@ func TestBounceConnectionDealTransferUnsealing(t *testing.T) {
RestartDebounce: 100 * time.Millisecond,
MaxConsecutiveRestarts: 5,
CompleteTimeout: 100 * time.Millisecond,
OnRestartComplete: onRestartComplete,
})
td := shared_testutil.NewLibp2pTestData(bgCtx, t)
td.DTNet1 = dtnet.NewFromLibp2pHost(td.Host1, dtClientNetRetry)
Expand Down Expand Up @@ -249,18 +263,34 @@ func TestBounceConnectionDealTransferUnsealing(t *testing.T) {
rh.TestDataNet.MockNet.UnlinkPeers(clientHost, providerHost)

go func() {
// Simulate unsealing delay
time.Sleep(50 * time.Millisecond)

// If unsealing should finish before restoring the connection
if tc.finishUnseal == beforeRestoringConnection {
log.Debugf("resume unseal")
// Finish unsealing
log.Debugf("finish unseal")
rh.ProviderNode.FinishUnseal()
time.Sleep(20 * time.Millisecond)
}

// Restore the connection
log.Debugf("restoring connection")
rh.TestDataNet.MockNet.LinkPeers(clientHost, providerHost)

// If unsealing should finish after restoring the connection
if tc.finishUnseal == afterRestoringConnection {
log.Debugf("resume unseal")
// Wait for the Restart message to be sent and
// acknowledged
select {
case <-ctxTimeout.Done():
return
case <-restartComplete:
}

// Finish unsealing
time.Sleep(50 * time.Millisecond)
log.Debugf("finish unseal")
rh.ProviderNode.FinishUnseal()
}
}()
Expand Down

0 comments on commit f044f4a

Please sign in to comment.