From 29bfef7e037e96cba9a2962c5e8cdd49dc80caee Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Wed, 28 Apr 2021 14:35:43 +0200 Subject: [PATCH] fix: on restart dont unpause unstarted transfer --- channelmonitor/channelmonitor.go | 5 +++++ impl/integration_test.go | 1 + transport/graphsync/graphsync.go | 36 ++++++++++++++++++++++++++++---- 3 files changed, 38 insertions(+), 4 deletions(-) diff --git a/channelmonitor/channelmonitor.go b/channelmonitor/channelmonitor.go index 3a2aa9c1..5c53624b 100644 --- a/channelmonitor/channelmonitor.go +++ b/channelmonitor/channelmonitor.go @@ -52,6 +52,8 @@ type Config struct { // Max time to wait for the responder to send a Complete message once all // data has been sent CompleteTimeout time.Duration + // Called when a restart completes successfully + OnRestartComplete func(id datatransfer.ChannelID) } func NewMonitor(mgr monitorAPI, cfg *Config) *Monitor { @@ -382,6 +384,9 @@ func (mc *monitoredChannel) restartChannel() { if !restartAgain { // No restart queued, we're done + if mc.cfg.OnRestartComplete != nil { + mc.cfg.OnRestartComplete(mc.chid) + } return } diff --git a/impl/integration_test.go b/impl/integration_test.go index 339ffb9e..fbb15cfb 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -725,6 +725,7 @@ func TestAutoRestart(t *testing.T) { // Set up restartConf := ChannelRestartConfig(channelmonitor.Config{ AcceptTimeout: 100 * time.Millisecond, + RestartDebounce: 500 * time.Millisecond, RestartBackoff: 500 * time.Millisecond, MaxConsecutiveRestarts: 5, RestartAckTimeout: 100 * time.Millisecond, diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index 24a214f4..a5bbb6ab 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -78,6 +78,7 @@ type Transport struct { contextCancelMap map[datatransfer.ChannelID]cancelRequest pending map[datatransfer.ChannelID]chan struct{} requestorCancelledMap map[datatransfer.ChannelID]struct{} + channelXferStarted map[datatransfer.ChannelID]bool pendingExtensions map[datatransfer.ChannelID][]graphsync.ExtensionData stores map[datatransfer.ChannelID]struct{} supportedExtensions []graphsync.ExtensionName @@ -98,6 +99,7 @@ func NewTransport(peerID peer.ID, gs graphsync.GraphExchange, options ...Option) pendingExtensions: make(map[datatransfer.ChannelID][]graphsync.ExtensionData), channelIDMap: make(map[datatransfer.ChannelID]graphsyncKey), pending: make(map[datatransfer.ChannelID]chan struct{}), + channelXferStarted: make(map[datatransfer.ChannelID]bool), stores: make(map[datatransfer.ChannelID]struct{}), supportedExtensions: defaultSupportedExtensions, } @@ -149,15 +151,22 @@ func (t *Transport) OpenChannel(ctx context.Context, // Relock now that request has been cancelled t.dataLock.Lock() } - // Set up the request listeners + + // Keep track of "pending" channels. + // The channel is in the "pending" state when we've made a call to + // Graphsync to open a request, but Graphsync hasn't yet called the + // outgoing request hook. t.pending[channelID] = make(chan struct{}) + // Create a cancellable context for the channel so that the graphsync + // request can be cancelled internalCtx, internalCancel := context.WithCancel(ctx) cancelRQ := cancelRequest{ cancel: internalCancel, completed: make(chan struct{}), } t.contextCancelMap[channelID] = cancelRQ + t.dataLock.Unlock() // If this is a restart request, the client can send a list of CIDs of @@ -348,10 +357,10 @@ func (t *Transport) ResumeChannel(ctx context.Context, defer t.dataLock.Unlock() if _, ok := t.requestorCancelledMap[chid]; ok { - t.pendingExtensions[chid] = append(t.pendingExtensions[chid], extensions...) return nil } + t.channelXferStarted[chid] = true return t.gs.UnpauseResponse(gsKey.p, gsKey.requestID, extensions...) } @@ -375,10 +384,11 @@ func (t *Transport) CloseChannel(ctx context.Context, chid datatransfer.ChannelI return nil } t.dataLock.Lock() - if _, ok := t.requestorCancelledMap[chid]; ok { + _, ok := t.requestorCancelledMap[chid] + t.dataLock.Unlock() + if ok { return nil } - t.dataLock.Unlock() return t.gs.CancelResponse(gsKey.p, gsKey.requestID) } @@ -606,11 +616,26 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook return } + // Check if the callback indicated that the channel should be paused + // immediately + paused := false if err == datatransfer.ErrPause { + paused = true hookActions.PauseResponse() } t.dataLock.Lock() + + // If this is a restart request, and the data transfer still hasn't got + // out of the paused state (eg because we're still unsealing), start this + // graphsync response in the paused state. + hasXferStarted, isRestart := t.channelXferStarted[chid] + if isRestart && !hasXferStarted && !paused { + paused = true + hookActions.PauseResponse() + } + t.channelXferStarted[chid] = !paused + gsKey := graphsyncKey{request.ID(), p} if _, ok := t.requestorCancelledMap[chid]; ok { delete(t.requestorCancelledMap, chid) @@ -626,7 +651,9 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook if ok { hookActions.UsePersistenceOption("data-transfer-" + chid.String()) } + t.dataLock.Unlock() + hookActions.ValidateRequest() } @@ -695,6 +722,7 @@ func (t *Transport) cleanupChannel(chid datatransfer.ChannelID, gsKey graphsyncK delete(t.graphsyncRequestMap, gsKey) delete(t.pendingExtensions, chid) delete(t.requestorCancelledMap, chid) + delete(t.channelXferStarted, chid) _, ok := t.stores[chid] if ok { opt := "data-transfer-" + chid.String()