From a175afc88c43158de8b26982e48d9f50b24474d9 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 20 Apr 2021 14:35:40 +0200 Subject: [PATCH 1/2] feat: add isRestart param to validators --- impl/events.go | 15 +++++++++------ impl/restart.go | 2 +- manager.go | 2 ++ testutil/stubbedvalidator.go | 2 ++ 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/impl/events.go b/impl/events.go index ffcd61d2..abcadf3f 100644 --- a/impl/events.go +++ b/impl/events.go @@ -322,7 +322,7 @@ func (m *manager) restartRequest(chid datatransfer.ChannelID, return nil, err } - voucher, result, err := m.validateVoucher(initiator, incoming, incoming.IsPull(), incoming.BaseCid(), stor) + voucher, result, err := m.validateVoucher(true, initiator, incoming, incoming.IsPull(), incoming.BaseCid(), stor) if err != nil && err != datatransfer.ErrPause { return result, xerrors.Errorf("failed to validate voucher: %w", err) } @@ -361,7 +361,7 @@ func (m *manager) acceptRequest( return nil, err } - voucher, result, err := m.validateVoucher(initiator, incoming, incoming.IsPull(), incoming.BaseCid(), stor) + voucher, result, err := m.validateVoucher(false, initiator, incoming, incoming.IsPull(), incoming.BaseCid(), stor) if err != nil && err != datatransfer.ErrPause { return result, err } @@ -410,16 +410,19 @@ func (m *manager) acceptRequest( // * reading voucher fails // * deserialization of selector fails // * validation fails -func (m *manager) validateVoucher(sender peer.ID, +func (m *manager) validateVoucher( + isRestart bool, + sender peer.ID, incoming datatransfer.Request, isPull bool, baseCid cid.Cid, - stor ipld.Node) (datatransfer.Voucher, datatransfer.VoucherResult, error) { + stor ipld.Node, +) (datatransfer.Voucher, datatransfer.VoucherResult, error) { vouch, err := m.decodeVoucher(incoming, m.validatedTypes) if err != nil { return nil, nil, err } - var validatorFunc func(peer.ID, datatransfer.Voucher, cid.Cid, ipld.Node) (datatransfer.VoucherResult, error) + var validatorFunc func(bool, peer.ID, datatransfer.Voucher, cid.Cid, ipld.Node) (datatransfer.VoucherResult, error) processor, _ := m.validatedTypes.Processor(vouch.Type()) validator := processor.(datatransfer.RequestValidator) if isPull { @@ -428,7 +431,7 @@ func (m *manager) validateVoucher(sender peer.ID, validatorFunc = validator.ValidatePush } - result, err := validatorFunc(sender, vouch, baseCid, stor) + result, err := validatorFunc(isRestart, sender, vouch, baseCid, stor) return vouch, result, err } diff --git a/impl/restart.go b/impl/restart.go index ac0bd12c..9674e699 100644 --- a/impl/restart.go +++ b/impl/restart.go @@ -73,7 +73,7 @@ func (m *manager) validateRestartVoucher(channel datatransfer.ChannelState, isPu } // revalidate the voucher by reconstructing the request that would have led to the creation of this channel - if _, _, err := m.validateVoucher(channel.OtherPeer(), req, isPull, channel.BaseCID(), channel.Selector()); err != nil { + if _, _, err := m.validateVoucher(true, channel.OtherPeer(), req, isPull, channel.BaseCID(), channel.Selector()); err != nil { return err } diff --git a/manager.go b/manager.go index 7b9c3402..8c51ca8a 100644 --- a/manager.go +++ b/manager.go @@ -13,12 +13,14 @@ import ( type RequestValidator interface { // ValidatePush validates a push request received from the peer that will send data ValidatePush( + isRestart bool, sender peer.ID, voucher Voucher, baseCid cid.Cid, selector ipld.Node) (VoucherResult, error) // ValidatePull validates a pull request received from the peer that will receive data ValidatePull( + isRestart bool, receiver peer.ID, voucher Voucher, baseCid cid.Cid, diff --git a/testutil/stubbedvalidator.go b/testutil/stubbedvalidator.go index 283743d3..e2e5c6e0 100644 --- a/testutil/stubbedvalidator.go +++ b/testutil/stubbedvalidator.go @@ -19,6 +19,7 @@ func NewStubbedValidator() *StubbedValidator { // ValidatePush returns a stubbed result for a push validation func (sv *StubbedValidator) ValidatePush( + isRestart bool, sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, @@ -30,6 +31,7 @@ func (sv *StubbedValidator) ValidatePush( // ValidatePull returns a stubbed result for a pull validation func (sv *StubbedValidator) ValidatePull( + isRestart bool, receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, From 488161166b6f7b06896a1ae0eb5b698562bd55ad Mon Sep 17 00:00:00 2001 From: dirkmc Date: Thu, 6 May 2021 09:08:27 +0200 Subject: [PATCH 2/2] fix: on restart dont unpause unstarted transfer (#199) --- channelmonitor/channelmonitor.go | 5 +++++ impl/integration_test.go | 1 + transport/graphsync/graphsync.go | 36 ++++++++++++++++++++++++++++---- 3 files changed, 38 insertions(+), 4 deletions(-) diff --git a/channelmonitor/channelmonitor.go b/channelmonitor/channelmonitor.go index 3a2aa9c1..5c53624b 100644 --- a/channelmonitor/channelmonitor.go +++ b/channelmonitor/channelmonitor.go @@ -52,6 +52,8 @@ type Config struct { // Max time to wait for the responder to send a Complete message once all // data has been sent CompleteTimeout time.Duration + // Called when a restart completes successfully + OnRestartComplete func(id datatransfer.ChannelID) } func NewMonitor(mgr monitorAPI, cfg *Config) *Monitor { @@ -382,6 +384,9 @@ func (mc *monitoredChannel) restartChannel() { if !restartAgain { // No restart queued, we're done + if mc.cfg.OnRestartComplete != nil { + mc.cfg.OnRestartComplete(mc.chid) + } return } diff --git a/impl/integration_test.go b/impl/integration_test.go index 339ffb9e..fbb15cfb 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -725,6 +725,7 @@ func TestAutoRestart(t *testing.T) { // Set up restartConf := ChannelRestartConfig(channelmonitor.Config{ AcceptTimeout: 100 * time.Millisecond, + RestartDebounce: 500 * time.Millisecond, RestartBackoff: 500 * time.Millisecond, MaxConsecutiveRestarts: 5, RestartAckTimeout: 100 * time.Millisecond, diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index 24a214f4..a5bbb6ab 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -78,6 +78,7 @@ type Transport struct { contextCancelMap map[datatransfer.ChannelID]cancelRequest pending map[datatransfer.ChannelID]chan struct{} requestorCancelledMap map[datatransfer.ChannelID]struct{} + channelXferStarted map[datatransfer.ChannelID]bool pendingExtensions map[datatransfer.ChannelID][]graphsync.ExtensionData stores map[datatransfer.ChannelID]struct{} supportedExtensions []graphsync.ExtensionName @@ -98,6 +99,7 @@ func NewTransport(peerID peer.ID, gs graphsync.GraphExchange, options ...Option) pendingExtensions: make(map[datatransfer.ChannelID][]graphsync.ExtensionData), channelIDMap: make(map[datatransfer.ChannelID]graphsyncKey), pending: make(map[datatransfer.ChannelID]chan struct{}), + channelXferStarted: make(map[datatransfer.ChannelID]bool), stores: make(map[datatransfer.ChannelID]struct{}), supportedExtensions: defaultSupportedExtensions, } @@ -149,15 +151,22 @@ func (t *Transport) OpenChannel(ctx context.Context, // Relock now that request has been cancelled t.dataLock.Lock() } - // Set up the request listeners + + // Keep track of "pending" channels. + // The channel is in the "pending" state when we've made a call to + // Graphsync to open a request, but Graphsync hasn't yet called the + // outgoing request hook. t.pending[channelID] = make(chan struct{}) + // Create a cancellable context for the channel so that the graphsync + // request can be cancelled internalCtx, internalCancel := context.WithCancel(ctx) cancelRQ := cancelRequest{ cancel: internalCancel, completed: make(chan struct{}), } t.contextCancelMap[channelID] = cancelRQ + t.dataLock.Unlock() // If this is a restart request, the client can send a list of CIDs of @@ -348,10 +357,10 @@ func (t *Transport) ResumeChannel(ctx context.Context, defer t.dataLock.Unlock() if _, ok := t.requestorCancelledMap[chid]; ok { - t.pendingExtensions[chid] = append(t.pendingExtensions[chid], extensions...) return nil } + t.channelXferStarted[chid] = true return t.gs.UnpauseResponse(gsKey.p, gsKey.requestID, extensions...) } @@ -375,10 +384,11 @@ func (t *Transport) CloseChannel(ctx context.Context, chid datatransfer.ChannelI return nil } t.dataLock.Lock() - if _, ok := t.requestorCancelledMap[chid]; ok { + _, ok := t.requestorCancelledMap[chid] + t.dataLock.Unlock() + if ok { return nil } - t.dataLock.Unlock() return t.gs.CancelResponse(gsKey.p, gsKey.requestID) } @@ -606,11 +616,26 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook return } + // Check if the callback indicated that the channel should be paused + // immediately + paused := false if err == datatransfer.ErrPause { + paused = true hookActions.PauseResponse() } t.dataLock.Lock() + + // If this is a restart request, and the data transfer still hasn't got + // out of the paused state (eg because we're still unsealing), start this + // graphsync response in the paused state. + hasXferStarted, isRestart := t.channelXferStarted[chid] + if isRestart && !hasXferStarted && !paused { + paused = true + hookActions.PauseResponse() + } + t.channelXferStarted[chid] = !paused + gsKey := graphsyncKey{request.ID(), p} if _, ok := t.requestorCancelledMap[chid]; ok { delete(t.requestorCancelledMap, chid) @@ -626,7 +651,9 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook if ok { hookActions.UsePersistenceOption("data-transfer-" + chid.String()) } + t.dataLock.Unlock() + hookActions.ValidateRequest() } @@ -695,6 +722,7 @@ func (t *Transport) cleanupChannel(chid datatransfer.ChannelID, gsKey graphsyncK delete(t.graphsyncRequestMap, gsKey) delete(t.pendingExtensions, chid) delete(t.requestorCancelledMap, chid) + delete(t.channelXferStarted, chid) _, ok := t.stores[chid] if ok { opt := "data-transfer-" + chid.String()