From 5ced0666dcc5d3aa098ab88fdc13bedf8672aba6 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 26 Mar 2021 16:19:58 +0100 Subject: [PATCH 1/4] fix: fail a pull channel when there is a timeout receiving the Complete message --- channelmonitor/channelmonitor.go | 61 +++++++++++++++++++++++--------- impl/integration_test.go | 14 +++++--- 2 files changed, 54 insertions(+), 21 deletions(-) diff --git a/channelmonitor/channelmonitor.go b/channelmonitor/channelmonitor.go index e14ee683..f79fb2b4 100644 --- a/channelmonitor/channelmonitor.go +++ b/channelmonitor/channelmonitor.go @@ -96,7 +96,7 @@ func checkConfig(cfg *Config) { // This interface just makes it easier to abstract some methods between the // push and pull monitor implementations type monitoredChan interface { - Shutdown() + Shutdown() bool checkDataRate() } @@ -125,11 +125,23 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) monitored m.lk.Lock() defer m.lk.Unlock() + // Check if there is already a monitor for this channel + if _, ok := m.channels[chid]; ok { + tp := "push" + if !isPush { + tp = "pull" + } + log.Warnf("ignoring add %s channel %s: %s channel with that id already exists", + tp, chid, tp) + return nil + } + + // Create the channel monitor var mpc monitoredChan if isPush { - mpc = newMonitoredPushChannel(m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown) + mpc = newMonitoredPushChannel(m.ctx, m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown) } else { - mpc = newMonitoredPullChannel(m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown) + mpc = newMonitoredPullChannel(m.ctx, m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown) } m.channels[chid] = mpc return mpc @@ -207,6 +219,9 @@ func (m *Monitor) checkDataRate() { // monitoredChannel keeps track of the data-rate for a channel, and // restarts the channel if the rate falls below the minimum allowed type monitoredChannel struct { + // The parentCtx is used when sending a close message for a channel, so + // that operation can continue even after the monitoredChannel is shutdown + parentCtx context.Context ctx context.Context cancel context.CancelFunc mgr monitorAPI @@ -223,6 +238,7 @@ type monitoredChannel struct { } func newMonitoredChannel( + parentCtx context.Context, mgr monitorAPI, chid datatransfer.ChannelID, cfg *Config, @@ -231,6 +247,7 @@ func newMonitoredChannel( ) *monitoredChannel { ctx, cancel := context.WithCancel(context.Background()) mpc := &monitoredChannel{ + parentCtx: parentCtx, ctx: ctx, cancel: cancel, mgr: mgr, @@ -247,14 +264,15 @@ func newMonitoredChannel( func (mc *monitoredChannel) checkDataRate() { } -// Cancel the context and unsubscribe from events -func (mc *monitoredChannel) Shutdown() { +// Cancel the context and unsubscribe from events. +// Returns true if channel has not already been shutdown. +func (mc *monitoredChannel) Shutdown() bool { mc.shutdownLk.Lock() defer mc.shutdownLk.Unlock() // Check if the channel was already shut down if mc.cancel == nil { - return + return false } mc.cancel() // cancel context so all go-routines exit mc.cancel = nil @@ -264,6 +282,8 @@ func (mc *monitoredChannel) Shutdown() { // Inform the Manager that this channel has shut down go mc.onShutdown(mc.chid) + + return true } func (mc *monitoredChannel) start() { @@ -301,9 +321,9 @@ func (mc *monitoredChannel) start() { log.Warnf("%s: data transfer transport send error, restarting data transfer", mc.chid) go mc.restartChannel() case datatransfer.FinishTransfer: - // The client has finished sending all data. Watch to make sure - // that the responder sends a message to acknowledge that the - // transfer is complete + // The channel initiator has finished sending / receiving all data. + // Watch to make sure that the responder sends a message to acknowledge + // that the transfer is complete go mc.watchForResponderComplete() default: // Delegate to the push channel monitor or pull channel monitor to @@ -425,14 +445,21 @@ func (mc *monitoredChannel) restartChannel() { mc.restartLk.Unlock() } +// Shut down the monitor and close the data transfer channel func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) { + // Shutdown the monitor + firstShutdown := mc.Shutdown() + if !firstShutdown { + // Channel was already shutdown, ignore this second attempt to shutdown + return + } + + // Close the data transfer channel and fire an error log.Errorf("closing data-transfer channel: %s", cherr) - err := mc.mgr.CloseDataTransferChannelWithError(mc.ctx, mc.chid, cherr) + err := mc.mgr.CloseDataTransferChannelWithError(mc.parentCtx, mc.chid, cherr) if err != nil { log.Errorf("error closing data-transfer channel %s: %s", mc.chid, err) } - - mc.Shutdown() } // Snapshot of the pending and sent data at a particular point in time. @@ -454,6 +481,7 @@ type monitoredPushChannel struct { } func newMonitoredPushChannel( + parentCtx context.Context, mgr monitorAPI, chid datatransfer.ChannelID, cfg *Config, @@ -462,7 +490,7 @@ func newMonitoredPushChannel( mpc := &monitoredPushChannel{ dataRatePoints: make(chan *dataRatePoint, cfg.ChecksPerInterval), } - mpc.monitoredChannel = newMonitoredChannel(mgr, chid, cfg, onShutdown, mpc.onDTEvent) + mpc.monitoredChannel = newMonitoredChannel(parentCtx, mgr, chid, cfg, onShutdown, mpc.onDTEvent) return mpc } @@ -538,6 +566,7 @@ type monitoredPullChannel struct { } func newMonitoredPullChannel( + parentCtx context.Context, mgr monitorAPI, chid datatransfer.ChannelID, cfg *Config, @@ -546,7 +575,7 @@ func newMonitoredPullChannel( mpc := &monitoredPullChannel{ dataRatePoints: make(chan uint64, cfg.ChecksPerInterval), } - mpc.monitoredChannel = newMonitoredChannel(mgr, chid, cfg, onShutdown, mpc.onDTEvent) + mpc.monitoredChannel = newMonitoredChannel(parentCtx, mgr, chid, cfg, onShutdown, mpc.onDTEvent) return mpc } @@ -578,8 +607,8 @@ func (mc *monitoredPullChannel) checkDataRate() { log.Debugf("%s: since last check: received: %d - %d = %d, required %d", mc.chid, mc.received, atIntervalStart, rcvdInInterval, mc.cfg.MinBytesTransferred) if rcvdInInterval < mc.cfg.MinBytesTransferred { - log.Warnf("%s: data-rate too low, restarting channel: since last check %s ago: received %d, required %d", - mc.chid, mc.cfg.Interval, rcvdInInterval, mc.cfg.MinBytesTransferred) + log.Warnf("%s: data-rate too low, restarting channel: since last check received %d but required %d", + mc.chid, rcvdInInterval, mc.cfg.MinBytesTransferred) go mc.restartChannel() } } diff --git a/impl/integration_test.go b/impl/integration_test.go index c9b00d88..77bd3ade 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -614,13 +614,17 @@ func TestAutoRestart(t *testing.T) { // right before the responder sends the complete message (ie responder sent // all blocks, but the responder doesn't get a chance to tell the initiator // before the disconnect) - name: "push: before requester sends complete message", - isPush: true, + name: "pull: before responder sends complete message", + isPush: false, expectInitiatorDTFail: true, disconnectOnRequestComplete: true, }} for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { + expectFailMsg := "" + if tc.expectInitiatorDTFail { + expectFailMsg = " (expect failure)" + } + t.Run(tc.name+expectFailMsg, func(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -659,8 +663,8 @@ func TestAutoRestart(t *testing.T) { // Set up restartConf := ChannelRestartConfig(channelmonitor.Config{ - MonitorPushChannels: true, - MonitorPullChannels: true, + MonitorPushChannels: tc.isPush, + MonitorPullChannels: !tc.isPush, AcceptTimeout: 100 * time.Millisecond, Interval: 100 * time.Millisecond, MinBytesTransferred: 1, From 6eb5bfcdeb2cb71f00a20b8a19c43fb9b09dcd8b Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 29 Mar 2021 12:08:56 +0200 Subject: [PATCH 2/4] feat: respect pausing in pull channel monitor --- channelmonitor/channelmonitor.go | 42 ++++++++ channelmonitor/channelmonitor_test.go | 147 ++++++++++++++++++++++++++ impl/integration_test.go | 136 ++++++++++++++++++------ 3 files changed, 290 insertions(+), 35 deletions(-) diff --git a/channelmonitor/channelmonitor.go b/channelmonitor/channelmonitor.go index f79fb2b4..2c7037b8 100644 --- a/channelmonitor/channelmonitor.go +++ b/channelmonitor/channelmonitor.go @@ -53,6 +53,8 @@ type Config struct { // Max time to wait for the responder to send a Complete message once all // data has been sent CompleteTimeout time.Duration + // Max time to wait for a resume after the responder paused the channel - for pull channels + PullPauseTimeout time.Duration } func NewMonitor(mgr monitorAPI, cfg *Config) *Monitor { @@ -91,6 +93,9 @@ func checkConfig(cfg *Config) { if cfg.CompleteTimeout <= 0 { panic(fmt.Sprintf(prefix+"CompleteTimeout is %s but must be > 0", cfg.CompleteTimeout)) } + if cfg.MonitorPullChannels && cfg.PullPauseTimeout <= 0 { + panic(fmt.Sprintf(prefix+"PullPauseTimeout is %s but must be > 0", cfg.PullPauseTimeout)) + } } // This interface just makes it easier to abstract some methods between the @@ -563,6 +568,7 @@ type monitoredPullChannel struct { statsLk sync.RWMutex received uint64 dataRatePoints chan uint64 + pausedAt time.Time } func newMonitoredPullChannel( @@ -585,6 +591,30 @@ func (mc *monitoredPullChannel) checkDataRate() { mc.statsLk.Lock() defer mc.statsLk.Unlock() + // If the channel is currently paused + if !mc.pausedAt.IsZero() { + // Check if the channel has been paused for too long + pausedFor := time.Since(mc.pausedAt) + if pausedFor > mc.cfg.PullPauseTimeout { + log.Warn(fmt.Sprintf("%s: paused for too long, restarting channel: ", mc.chid) + + fmt.Sprintf("paused since %s: %s is longer than pause timeout %s", + mc.pausedAt, pausedFor, mc.cfg.PullPauseTimeout)) + + // Reset pause so we can continue checking the data rate + mc.pausedAt = time.Time{} + + // Restart the channel + go mc.restartChannel() + + return + } + + // Channel is paused, so wait for a resume before checking the data rate + log.Debugf("%s: is paused since %s; waiting for resume to continue checking data rate", + mc.chid, mc.pausedAt) + return + } + // Before returning, add the current data rate stats to the queue defer func() { mc.dataRatePoints <- mc.received @@ -624,5 +654,17 @@ func (mc *monitoredPullChannel) onDTEvent(event datatransfer.Event, channelState // Some data was received so reset the consecutive restart counter mc.resetConsecutiveRestarts() + + case datatransfer.PauseResponder: + // The sender has paused sending data + mc.statsLk.Lock() + mc.pausedAt = time.Now() + mc.statsLk.Unlock() + + case datatransfer.ResumeResponder: + // The sender has resumed sending data + mc.statsLk.Lock() + mc.pausedAt = time.Time{} + mc.statsLk.Unlock() } } diff --git a/channelmonitor/channelmonitor_test.go b/channelmonitor/channelmonitor_test.go index 410bcf0a..d91dd491 100644 --- a/channelmonitor/channelmonitor_test.go +++ b/channelmonitor/channelmonitor_test.go @@ -66,6 +66,7 @@ func TestPushChannelMonitorAutoRestart(t *testing.T) { MinBytesTransferred: 1, MaxConsecutiveRestarts: 3, CompleteTimeout: time.Hour, + PullPauseTimeout: time.Hour, }) m.Start() mch := m.AddPushChannel(ch1).(*monitoredPushChannel) @@ -152,6 +153,7 @@ func TestPullChannelMonitorAutoRestart(t *testing.T) { MinBytesTransferred: 1, MaxConsecutiveRestarts: 3, CompleteTimeout: time.Hour, + PullPauseTimeout: time.Hour, }) m.Start() mch := m.AddPullChannel(ch1).(*monitoredPullChannel) @@ -315,6 +317,7 @@ func TestPushChannelMonitorDataRate(t *testing.T) { MinBytesTransferred: tc.minBytesSent, MaxConsecutiveRestarts: 3, CompleteTimeout: time.Hour, + PullPauseTimeout: time.Hour, }) // Note: Don't start monitor, we'll call checkDataRate() manually @@ -384,6 +387,7 @@ func TestPullChannelMonitorDataRate(t *testing.T) { MinBytesTransferred: tc.minBytesTransferred, MaxConsecutiveRestarts: 3, CompleteTimeout: time.Hour, + PullPauseTimeout: time.Hour, }) // Note: Don't start monitor, we'll call checkDataRate() manually @@ -414,6 +418,139 @@ func TestPullChannelMonitorDataRate(t *testing.T) { } } +func TestPullChannelMonitorPausing(t *testing.T) { + ch := &mockChannelState{chid: ch1} + mockAPI := newMockMonitorAPI(ch, false) + + checkIfRestarted := func(expectRestart bool) { + select { + case <-time.After(5 * time.Millisecond): + if expectRestart { + require.Fail(t, "failed to restart channel") + } + case <-mockAPI.restarts: + if !expectRestart { + require.Fail(t, "expected no channel restart") + } + } + } + + minBytesTransferred := uint64(10) + m := NewMonitor(mockAPI, &Config{ + MonitorPullChannels: true, + AcceptTimeout: time.Hour, + Interval: time.Hour, + ChecksPerInterval: 1, + MinBytesTransferred: minBytesTransferred, + MaxConsecutiveRestarts: 3, + CompleteTimeout: time.Hour, + PullPauseTimeout: time.Hour, + }) + + // Note: Don't start monitor, we'll call checkDataRate() manually + + m.AddPullChannel(ch1) + + lastRcvd := uint64(5) + mockAPI.dataReceived(lastRcvd) + m.checkDataRate() + + // Some data received, but less than required amount + mockAPI.dataReceived(lastRcvd + minBytesTransferred/2) + + // If responder is paused, the monitor should ignore data + // rate checking until responder resumes + mockAPI.pauseResponder() + m.checkDataRate() // Should be ignored because responder is paused + m.checkDataRate() + m.checkDataRate() + + // Should not restart + checkIfRestarted(false) + + // Resume the responder + mockAPI.resumeResponder() + + // Receive some data + lastRcvd = 100 + mockAPI.dataReceived(lastRcvd) + + // Should not restart because received data exceeds minimum required + m.checkDataRate() + checkIfRestarted(false) + + // Pause responder again + mockAPI.pauseResponder() + m.checkDataRate() // Should be ignored because responder is paised + m.checkDataRate() + m.checkDataRate() + + // Resume responder + mockAPI.resumeResponder() + + // Not enough data received, should restart + mockAPI.dataReceived(lastRcvd + minBytesTransferred/2) + m.checkDataRate() + checkIfRestarted(true) +} + +func TestPullChannelMonitorPauseTimeout(t *testing.T) { + ch := &mockChannelState{chid: ch1} + mockAPI := newMockMonitorAPI(ch, false) + + checkIfRestarted := func(expectRestart bool) { + select { + case <-time.After(5 * time.Millisecond): + if expectRestart { + require.Fail(t, "failed to restart channel") + } + case <-mockAPI.restarts: + if !expectRestart { + require.Fail(t, "expected no channel restart") + } + } + } + + minBytesTransferred := uint64(10) + pullPauseTimeout := 50 * time.Millisecond + m := NewMonitor(mockAPI, &Config{ + MonitorPullChannels: true, + AcceptTimeout: time.Hour, + Interval: time.Hour, + ChecksPerInterval: 1, + MinBytesTransferred: minBytesTransferred, + MaxConsecutiveRestarts: 3, + CompleteTimeout: time.Hour, + PullPauseTimeout: pullPauseTimeout, + }) + + // Note: Don't start monitor, we'll call checkDataRate() manually + + m.AddPullChannel(ch1) + + lastRcvd := uint64(5) + mockAPI.dataReceived(lastRcvd) + m.checkDataRate() + + // Some data received, but less than required amount + mockAPI.dataReceived(lastRcvd + minBytesTransferred/2) + + // If responder is paused, the monitor should ignore data + // rate checking until responder resumes + mockAPI.pauseResponder() + m.checkDataRate() // Should be ignored because responder is paused + + // Should not restart + checkIfRestarted(false) + + // Pause timeout elapses + time.Sleep(pullPauseTimeout * 2) + + // Should detect timeout has elapsed and restart + m.checkDataRate() + checkIfRestarted(true) +} + func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) { runTest := func(name string, isPush bool) { t.Run(name, func(t *testing.T) { @@ -430,6 +567,7 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) { MinBytesTransferred: 2, MaxConsecutiveRestarts: uint32(maxConsecutiveRestarts), CompleteTimeout: time.Hour, + PullPauseTimeout: time.Hour, }) // Note: Don't start monitor, we'll call checkDataRate() manually @@ -550,6 +688,7 @@ func TestChannelMonitorTimeouts(t *testing.T) { MinBytesTransferred: 1, MaxConsecutiveRestarts: 1, CompleteTimeout: completeTimeout, + PullPauseTimeout: time.Hour, }) m.Start() @@ -715,6 +854,14 @@ func (m *mockMonitorAPI) sendDataErrorEvent() { m.callSubscriber(datatransfer.Event{Code: datatransfer.SendDataError}, m.ch) } +func (m *mockMonitorAPI) pauseResponder() { + m.callSubscriber(datatransfer.Event{Code: datatransfer.PauseResponder}, m.ch) +} + +func (m *mockMonitorAPI) resumeResponder() { + m.callSubscriber(datatransfer.Event{Code: datatransfer.ResumeResponder}, m.ch) +} + type mockChannelState struct { chid datatransfer.ChannelID queued uint64 diff --git a/impl/integration_test.go b/impl/integration_test.go index 77bd3ade..6b0a07f0 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -5,6 +5,7 @@ import ( "context" "math/rand" "os" + "sync/atomic" "testing" "time" @@ -672,6 +673,7 @@ func TestAutoRestart(t *testing.T) { RestartBackoff: 500 * time.Millisecond, MaxConsecutiveRestarts: 5, CompleteTimeout: 100 * time.Millisecond, + PullPauseTimeout: 100 * time.Millisecond, }) initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, restartConf) require.NoError(t, err) @@ -935,20 +937,30 @@ func (r *retrievalRevalidator) OnComplete(chid datatransfer.ChannelID) (bool, da func TestSimulatedRetrievalFlow(t *testing.T) { ctx := context.Background() testCases := map[string]struct { - unpauseRequestorDelay time.Duration - unpauseResponderDelay time.Duration - pausePoints []uint64 + unpauseResponderDelay time.Duration + channelMonitorPauseTimeout time.Duration + restartExpected bool }{ - "fast unseal, payment channel ready": { - pausePoints: []uint64{1000, 3000, 6000, 10000, 15000}, + // Simulate a retrieval where the provider pauses the data transfer + // while the file is being unsealed, for just a moment + "fast unseal": { + unpauseResponderDelay: 0, + channelMonitorPauseTimeout: 500 * time.Millisecond, + restartExpected: false, }, - "fast unseal, payment channel not ready": { - unpauseRequestorDelay: 100 * time.Millisecond, - pausePoints: []uint64{1000, 3000, 6000, 10000, 15000}, + // Simulate a retrieval where the provider pauses the data transfer + // while the file is being unsealed, for a relatively long time + "slow unseal": { + unpauseResponderDelay: 200 * time.Millisecond, + channelMonitorPauseTimeout: 500 * time.Millisecond, + restartExpected: false, }, - "slow unseal, payment channel ready": { - unpauseResponderDelay: 200 * time.Millisecond, - pausePoints: []uint64{1000, 3000, 6000, 10000, 15000}, + // Simulate a retrieval where the provider pauses the data transfer + // while the file is being unsealed, for longer than the unpause timeout + "unseal slower than unpause timeout": { + unpauseResponderDelay: 200 * time.Millisecond, + channelMonitorPauseTimeout: 100 * time.Millisecond, + restartExpected: true, }, } for testCase, config := range testCases { @@ -956,61 +968,102 @@ func TestSimulatedRetrievalFlow(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 4*time.Second) defer cancel() + // The provider will pause sending data and wait for a payment + // voucher at each of these points in the transfer + pausePoints := []uint64{1000, 3000, 6000, 10000, 15000} + gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) - host1 := gsData.Host1 // initiator, data sender + provHost := gsData.Host1 + // Load a file into the provider blockstore root := gsData.LoadUnixFSFile(t, false) rootCid := root.(cidlink.Link).Cid - tp1 := gsData.SetupGSTransportHost1() - tp2 := gsData.SetupGSTransportHost2() - dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1) + tpProv := gsData.SetupGSTransportHost1() + tpClient := gsData.SetupGSTransportHost2() + + // Set up restart config for client + clientRestartConf := ChannelRestartConfig(channelmonitor.Config{ + MonitorPullChannels: true, + AcceptTimeout: 100 * time.Millisecond, + Interval: 100 * time.Millisecond, + MinBytesTransferred: 1, + ChecksPerInterval: 2, + RestartBackoff: 500 * time.Millisecond, + MaxConsecutiveRestarts: 5, + CompleteTimeout: 100 * time.Millisecond, + PullPauseTimeout: config.channelMonitorPauseTimeout, + }) + + // Setup data transfer for client and provider + dtClient, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tpClient, clientRestartConf) require.NoError(t, err) - testutil.StartAndWaitForReady(ctx, t, dt1) - dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2) + testutil.StartAndWaitForReady(ctx, t, dtClient) + + dtProv, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tpProv) require.NoError(t, err) - testutil.StartAndWaitForReady(ctx, t, dt2) + testutil.StartAndWaitForReady(ctx, t, dtProv) + + // Setup requester event listener var chid datatransfer.ChannelID errChan := make(chan struct{}, 2) + restarted := int32(0) clientPausePoint := 0 clientFinished := make(chan struct{}, 1) finalVoucherResult := testutil.NewFakeDTType() encodedFVR, err := encoding.Encode(finalVoucherResult) require.NoError(t, err) var clientSubscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) { - if event.Code == datatransfer.Error { - errChan <- struct{}{} - } + // Watch for the final payment request from the provider, and + // respond with a payment voucher if event.Code == datatransfer.NewVoucherResult { lastVoucherResult := channelState.LastVoucherResult() encodedLVR, err := encoding.Encode(lastVoucherResult) require.NoError(t, err) if bytes.Equal(encodedLVR, encodedFVR) { - _ = dt2.SendVoucher(ctx, chid, testutil.NewFakeDTType()) + _ = dtClient.SendVoucher(ctx, chid, testutil.NewFakeDTType()) } } + // When data received exceeds a pause point, the provider will + // have paused the transfer while waiting for a payment voucher. + // So send a payment voucher from the client. if event.Code == datatransfer.DataReceived && - clientPausePoint < len(config.pausePoints) && - channelState.Received() > config.pausePoints[clientPausePoint] { - _ = dt2.SendVoucher(ctx, chid, testutil.NewFakeDTType()) + clientPausePoint < len(pausePoints) && + channelState.Received() > pausePoints[clientPausePoint] { + _ = dtClient.SendVoucher(ctx, chid, testutil.NewFakeDTType()) clientPausePoint++ } + + if event.Code == datatransfer.Restart { + atomic.AddInt32(&restarted, 1) + } + if event.Code == datatransfer.Error { + errChan <- struct{}{} + } if channelState.Status() == datatransfer.Completed { clientFinished <- struct{}{} } } - dt2.SubscribeToEvents(clientSubscriber) + dtClient.SubscribeToEvents(clientSubscriber) + + // Setup responder event listener providerFinished := make(chan struct{}, 1) providerAccepted := false var providerSubscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) { + // The provider should immediately pause the channel when it + // receives an open channel request. if event.Code == datatransfer.PauseResponder { if !providerAccepted { providerAccepted = true + + // Simulate pausing while data is unsealed timer := time.NewTimer(config.unpauseResponderDelay) go func() { <-timer.C - _ = dt1.ResumeDataTransferChannel(ctx, chid) + + // Resume after unseal completes + _ = dtProv.ResumeDataTransferChannel(ctx, chid) }() } } @@ -1021,20 +1074,24 @@ func TestSimulatedRetrievalFlow(t *testing.T) { providerFinished <- struct{}{} } } - dt1.SubscribeToEvents(providerSubscriber) + dtProv.SubscribeToEvents(providerSubscriber) voucher := testutil.FakeDTType{Data: "applesauce"} sv := testutil.NewStubbedValidator() sv.ExpectPausePull() - require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv)) + require.NoError(t, dtProv.RegisterVoucherType(&testutil.FakeDTType{}, sv)) + // Set up a revalidator on the provider that will pause at + // configurable pause points srv := &retrievalRevalidator{ - testutil.NewStubbedRevalidator(), 0, 0, config.pausePoints, finalVoucherResult, + testutil.NewStubbedRevalidator(), 0, 0, pausePoints, finalVoucherResult, } srv.ExpectSuccessRevalidation() - require.NoError(t, dt1.RegisterRevalidator(testutil.NewFakeDTType(), srv)) + require.NoError(t, dtProv.RegisterRevalidator(testutil.NewFakeDTType(), srv)) + + require.NoError(t, dtClient.RegisterVoucherType(&testutil.FakeDTType{}, sv)) + require.NoError(t, dtClient.RegisterVoucherResultType(testutil.NewFakeDTType())) - require.NoError(t, dt2.RegisterVoucherResultType(testutil.NewFakeDTType())) - chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), &voucher, rootCid, gsData.AllSelector) + chid, err = dtClient.OpenPullDataChannel(ctx, provHost.ID(), &voucher, rootCid, gsData.AllSelector) require.NoError(t, err) for providerFinished != nil || clientFinished != nil { @@ -1049,11 +1106,20 @@ func TestSimulatedRetrievalFlow(t *testing.T) { t.Fatal("received unexpected error") } } + + // Check if there was a restart, and whether it was expected + if restarted == 0 && config.restartExpected { + require.Fail(t, "expected restart but did not restart") + } + if restarted > 0 && !config.restartExpected { + require.Fail(t, "expected no restart but there was a restart") + } + sv.VerifyExpectations(t) srv.VerifyExpectations(t) gsData.VerifyFileTransferred(t, root, true) - require.Equal(t, srv.providerPausePoint, len(config.pausePoints)) - require.Equal(t, clientPausePoint, len(config.pausePoints)) + require.Equal(t, srv.providerPausePoint, len(pausePoints)) + require.Equal(t, clientPausePoint, len(pausePoints)) }) } } From 11406550d6c3728a1a8db115c0260b9ecbf12434 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 29 Mar 2021 17:23:37 +0200 Subject: [PATCH 3/4] feat: mechanism to enable data-rate monitoring after opening channel --- channelmonitor/channelmonitor.go | 87 ++++++++- channelmonitor/channelmonitor_test.go | 253 ++++++++++++++++---------- impl/impl.go | 4 + manager.go | 9 + 4 files changed, 242 insertions(+), 111 deletions(-) diff --git a/channelmonitor/channelmonitor.go b/channelmonitor/channelmonitor.go index 2c7037b8..8c210bc8 100644 --- a/channelmonitor/channelmonitor.go +++ b/channelmonitor/channelmonitor.go @@ -40,6 +40,9 @@ type Config struct { MonitorPullChannels bool // Max time to wait for other side to accept open channel request before attempting restart AcceptTimeout time.Duration + // Indicates whether to monitor the data-rate on the channel, and restart + // if it falls too low + EnableDataRateMonitoring bool // Interval between checks of transfer rate Interval time.Duration // Min bytes that must be sent / received in interval @@ -102,6 +105,7 @@ func checkConfig(cfg *Config) { // push and pull monitor implementations type monitoredChan interface { Shutdown() bool + enableDataRateMonitoring() checkDataRate() } @@ -152,6 +156,33 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) monitored return mpc } +// EnableDataRateMonitoring enables monitoring for a channel. +// It is used in the situation where the client wants to create the channel +// (and start monitoring for the Accept response from the responder), but +// doesn't want to start monitoring the data rate until a later time. +// For example when creating a retrieval deal, it may take some time to +// set up a payment channel. +// Note: The monitor must already be enabled in order to enable monitoring for +// a particular channel. +// This method is idempotent. +func (m *Monitor) EnableDataRateMonitoring(chid datatransfer.ChannelID) error { + if !m.enabled() { + return xerrors.Errorf("could not enable data rate monitoring for channel %s: channel monitoring is disabled", chid) + } + + m.lk.Lock() + defer m.lk.Unlock() + + ch, ok := m.channels[chid] + if !ok { + return xerrors.Errorf("could not enable data rate monitoring for unknown channel %s", chid) + } + + ch.enableDataRateMonitoring() + + return nil +} + func (m *Monitor) Shutdown() { // Causes the run loop to exit m.stop() @@ -479,10 +510,11 @@ type dataRatePoint struct { type monitoredPushChannel struct { *monitoredChannel - statsLk sync.RWMutex - queued uint64 - sent uint64 - dataRatePoints chan *dataRatePoint + statsLk sync.RWMutex + rateMonitoringEnabled bool + queued uint64 + sent uint64 + dataRatePoints chan *dataRatePoint } func newMonitoredPushChannel( @@ -493,18 +525,35 @@ func newMonitoredPushChannel( onShutdown func(datatransfer.ChannelID), ) *monitoredPushChannel { mpc := &monitoredPushChannel{ - dataRatePoints: make(chan *dataRatePoint, cfg.ChecksPerInterval), + rateMonitoringEnabled: cfg.EnableDataRateMonitoring, + dataRatePoints: make(chan *dataRatePoint, cfg.ChecksPerInterval), } mpc.monitoredChannel = newMonitoredChannel(parentCtx, mgr, chid, cfg, onShutdown, mpc.onDTEvent) return mpc } +func (mc *monitoredPushChannel) enableDataRateMonitoring() { + mc.statsLk.Lock() + defer mc.statsLk.Unlock() + + if mc.rateMonitoringEnabled { + return + } + + log.Info("%s: enabling push channel data rate monitoring", mc.chid) + mc.rateMonitoringEnabled = true +} + // check if the amount of data sent in the interval was too low, and if so // restart the channel func (mc *monitoredPushChannel) checkDataRate() { mc.statsLk.Lock() defer mc.statsLk.Unlock() + if !mc.rateMonitoringEnabled { + return + } + // Before returning, add the current data rate stats to the queue defer func() { var pending uint64 @@ -565,10 +614,11 @@ func (mc *monitoredPushChannel) onDTEvent(event datatransfer.Event, channelState type monitoredPullChannel struct { *monitoredChannel - statsLk sync.RWMutex - received uint64 - dataRatePoints chan uint64 - pausedAt time.Time + statsLk sync.RWMutex + rateMonitoringEnabled bool + received uint64 + dataRatePoints chan uint64 + pausedAt time.Time } func newMonitoredPullChannel( @@ -579,18 +629,35 @@ func newMonitoredPullChannel( onShutdown func(datatransfer.ChannelID), ) *monitoredPullChannel { mpc := &monitoredPullChannel{ - dataRatePoints: make(chan uint64, cfg.ChecksPerInterval), + rateMonitoringEnabled: cfg.EnableDataRateMonitoring, + dataRatePoints: make(chan uint64, cfg.ChecksPerInterval), } mpc.monitoredChannel = newMonitoredChannel(parentCtx, mgr, chid, cfg, onShutdown, mpc.onDTEvent) return mpc } +func (mc *monitoredPullChannel) enableDataRateMonitoring() { + mc.statsLk.Lock() + defer mc.statsLk.Unlock() + + if mc.rateMonitoringEnabled { + return + } + + log.Info("%s: enabling pull channel data rate monitoring", mc.chid) + mc.rateMonitoringEnabled = true +} + // check if the amount of data received in the interval was too low, and if so // restart the channel func (mc *monitoredPullChannel) checkDataRate() { mc.statsLk.Lock() defer mc.statsLk.Unlock() + if !mc.rateMonitoringEnabled { + return + } + // If the channel is currently paused if !mc.pausedAt.IsZero() { // Check if the channel has been paused for too long diff --git a/channelmonitor/channelmonitor_test.go b/channelmonitor/channelmonitor_test.go index d91dd491..fad4188f 100644 --- a/channelmonitor/channelmonitor_test.go +++ b/channelmonitor/channelmonitor_test.go @@ -59,14 +59,15 @@ func TestPushChannelMonitorAutoRestart(t *testing.T) { mockAPI := newMockMonitorAPI(ch, tc.errOnRestart) m := NewMonitor(mockAPI, &Config{ - MonitorPushChannels: true, - AcceptTimeout: time.Hour, - Interval: 10 * time.Millisecond, - ChecksPerInterval: 10, - MinBytesTransferred: 1, - MaxConsecutiveRestarts: 3, - CompleteTimeout: time.Hour, - PullPauseTimeout: time.Hour, + MonitorPushChannels: true, + AcceptTimeout: time.Hour, + EnableDataRateMonitoring: true, + Interval: 10 * time.Millisecond, + ChecksPerInterval: 10, + MinBytesTransferred: 1, + MaxConsecutiveRestarts: 3, + CompleteTimeout: time.Hour, + PullPauseTimeout: time.Hour, }) m.Start() mch := m.AddPushChannel(ch1).(*monitoredPushChannel) @@ -146,14 +147,15 @@ func TestPullChannelMonitorAutoRestart(t *testing.T) { mockAPI := newMockMonitorAPI(ch, tc.errOnRestart) m := NewMonitor(mockAPI, &Config{ - MonitorPullChannels: true, - AcceptTimeout: time.Hour, - Interval: 10 * time.Millisecond, - ChecksPerInterval: 10, - MinBytesTransferred: 1, - MaxConsecutiveRestarts: 3, - CompleteTimeout: time.Hour, - PullPauseTimeout: time.Hour, + MonitorPullChannels: true, + AcceptTimeout: time.Hour, + EnableDataRateMonitoring: true, + Interval: 10 * time.Millisecond, + ChecksPerInterval: 10, + MinBytesTransferred: 1, + MaxConsecutiveRestarts: 3, + CompleteTimeout: time.Hour, + PullPauseTimeout: time.Hour, }) m.Start() mch := m.AddPullChannel(ch1).(*monitoredPullChannel) @@ -310,14 +312,15 @@ func TestPushChannelMonitorDataRate(t *testing.T) { checksPerInterval := uint32(1) m := NewMonitor(mockAPI, &Config{ - MonitorPushChannels: true, - AcceptTimeout: time.Hour, - Interval: time.Hour, - ChecksPerInterval: checksPerInterval, - MinBytesTransferred: tc.minBytesSent, - MaxConsecutiveRestarts: 3, - CompleteTimeout: time.Hour, - PullPauseTimeout: time.Hour, + MonitorPushChannels: true, + AcceptTimeout: time.Hour, + EnableDataRateMonitoring: true, + Interval: time.Hour, + ChecksPerInterval: checksPerInterval, + MinBytesTransferred: tc.minBytesSent, + MaxConsecutiveRestarts: 3, + CompleteTimeout: time.Hour, + PullPauseTimeout: time.Hour, }) // Note: Don't start monitor, we'll call checkDataRate() manually @@ -380,14 +383,15 @@ func TestPullChannelMonitorDataRate(t *testing.T) { checksPerInterval := uint32(1) m := NewMonitor(mockAPI, &Config{ - MonitorPullChannels: true, - AcceptTimeout: time.Hour, - Interval: time.Hour, - ChecksPerInterval: checksPerInterval, - MinBytesTransferred: tc.minBytesTransferred, - MaxConsecutiveRestarts: 3, - CompleteTimeout: time.Hour, - PullPauseTimeout: time.Hour, + MonitorPullChannels: true, + AcceptTimeout: time.Hour, + EnableDataRateMonitoring: true, + Interval: time.Hour, + ChecksPerInterval: checksPerInterval, + MinBytesTransferred: tc.minBytesTransferred, + MaxConsecutiveRestarts: 3, + CompleteTimeout: time.Hour, + PullPauseTimeout: time.Hour, }) // Note: Don't start monitor, we'll call checkDataRate() manually @@ -418,33 +422,79 @@ func TestPullChannelMonitorDataRate(t *testing.T) { } } -func TestPullChannelMonitorPausing(t *testing.T) { - ch := &mockChannelState{chid: ch1} - mockAPI := newMockMonitorAPI(ch, false) +func TestChannelMonitorDataRateEnabled(t *testing.T) { + runTest := func(name string, isPush bool) { + t.Run(name, func(t *testing.T) { + ch := &mockChannelState{chid: ch1} + mockAPI := newMockMonitorAPI(ch, false) - checkIfRestarted := func(expectRestart bool) { - select { - case <-time.After(5 * time.Millisecond): - if expectRestart { - require.Fail(t, "failed to restart channel") - } - case <-mockAPI.restarts: - if !expectRestart { - require.Fail(t, "expected no channel restart") + m := NewMonitor(mockAPI, &Config{ + MonitorPushChannels: isPush, + MonitorPullChannels: !isPush, + AcceptTimeout: time.Hour, + EnableDataRateMonitoring: false, + Interval: time.Hour, + ChecksPerInterval: 1, + MinBytesTransferred: 1, + MaxConsecutiveRestarts: 3, + CompleteTimeout: time.Hour, + PullPauseTimeout: time.Hour, + }) + + // Note: Don't start monitor, we'll call checkDataRate() manually + + if isPush { + m.AddPushChannel(ch1) + + // Queue up some unsent data: queued (10) - sent (5) = 5 + mockAPI.dataQueued(10) + mockAPI.dataSent(5) + } else { + m.AddPullChannel(ch1) + + // Receive some data + mockAPI.dataReceived(5) } - } + + // Data rate checks should be ignored because data rate monitoring is + // disabled + m.checkDataRate() + m.checkDataRate() + + mockAPI.expectNotToRestart(t) + + // Enable data rate monitoring + m.EnableDataRateMonitoring(ch1) + + // Check the data rate once to create an initial data point + m.checkDataRate() + // Check a second time to compare against the initial data point + m.checkDataRate() + + // Expect that the channel will be restarted by the monitor + mockAPI.expectRestart(t) + }) } + runTest("push", true) + runTest("pull", false) +} + +func TestPullChannelMonitorPausing(t *testing.T) { + ch := &mockChannelState{chid: ch1} + mockAPI := newMockMonitorAPI(ch, false) + minBytesTransferred := uint64(10) m := NewMonitor(mockAPI, &Config{ - MonitorPullChannels: true, - AcceptTimeout: time.Hour, - Interval: time.Hour, - ChecksPerInterval: 1, - MinBytesTransferred: minBytesTransferred, - MaxConsecutiveRestarts: 3, - CompleteTimeout: time.Hour, - PullPauseTimeout: time.Hour, + MonitorPullChannels: true, + AcceptTimeout: time.Hour, + EnableDataRateMonitoring: true, + Interval: time.Hour, + ChecksPerInterval: 1, + MinBytesTransferred: minBytesTransferred, + MaxConsecutiveRestarts: 3, + CompleteTimeout: time.Hour, + PullPauseTimeout: time.Hour, }) // Note: Don't start monitor, we'll call checkDataRate() manually @@ -466,7 +516,7 @@ func TestPullChannelMonitorPausing(t *testing.T) { m.checkDataRate() // Should not restart - checkIfRestarted(false) + mockAPI.expectNotToRestart(t) // Resume the responder mockAPI.resumeResponder() @@ -477,7 +527,7 @@ func TestPullChannelMonitorPausing(t *testing.T) { // Should not restart because received data exceeds minimum required m.checkDataRate() - checkIfRestarted(false) + mockAPI.expectNotToRestart(t) // Pause responder again mockAPI.pauseResponder() @@ -491,37 +541,25 @@ func TestPullChannelMonitorPausing(t *testing.T) { // Not enough data received, should restart mockAPI.dataReceived(lastRcvd + minBytesTransferred/2) m.checkDataRate() - checkIfRestarted(true) + mockAPI.expectRestart(t) } func TestPullChannelMonitorPauseTimeout(t *testing.T) { ch := &mockChannelState{chid: ch1} mockAPI := newMockMonitorAPI(ch, false) - checkIfRestarted := func(expectRestart bool) { - select { - case <-time.After(5 * time.Millisecond): - if expectRestart { - require.Fail(t, "failed to restart channel") - } - case <-mockAPI.restarts: - if !expectRestart { - require.Fail(t, "expected no channel restart") - } - } - } - minBytesTransferred := uint64(10) pullPauseTimeout := 50 * time.Millisecond m := NewMonitor(mockAPI, &Config{ - MonitorPullChannels: true, - AcceptTimeout: time.Hour, - Interval: time.Hour, - ChecksPerInterval: 1, - MinBytesTransferred: minBytesTransferred, - MaxConsecutiveRestarts: 3, - CompleteTimeout: time.Hour, - PullPauseTimeout: pullPauseTimeout, + MonitorPullChannels: true, + AcceptTimeout: time.Hour, + EnableDataRateMonitoring: true, + Interval: time.Hour, + ChecksPerInterval: 1, + MinBytesTransferred: minBytesTransferred, + MaxConsecutiveRestarts: 3, + CompleteTimeout: time.Hour, + PullPauseTimeout: pullPauseTimeout, }) // Note: Don't start monitor, we'll call checkDataRate() manually @@ -541,14 +579,14 @@ func TestPullChannelMonitorPauseTimeout(t *testing.T) { m.checkDataRate() // Should be ignored because responder is paused // Should not restart - checkIfRestarted(false) + mockAPI.expectNotToRestart(t) // Pause timeout elapses time.Sleep(pullPauseTimeout * 2) // Should detect timeout has elapsed and restart m.checkDataRate() - checkIfRestarted(true) + mockAPI.expectRestart(t) } func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) { @@ -559,15 +597,16 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) { maxConsecutiveRestarts := 3 m := NewMonitor(mockAPI, &Config{ - MonitorPushChannels: isPush, - MonitorPullChannels: !isPush, - AcceptTimeout: time.Hour, - Interval: time.Hour, - ChecksPerInterval: 1, - MinBytesTransferred: 2, - MaxConsecutiveRestarts: uint32(maxConsecutiveRestarts), - CompleteTimeout: time.Hour, - PullPauseTimeout: time.Hour, + MonitorPushChannels: isPush, + MonitorPullChannels: !isPush, + AcceptTimeout: time.Hour, + EnableDataRateMonitoring: true, + Interval: time.Hour, + ChecksPerInterval: 1, + MinBytesTransferred: 2, + MaxConsecutiveRestarts: uint32(maxConsecutiveRestarts), + CompleteTimeout: time.Hour, + PullPauseTimeout: time.Hour, }) // Note: Don't start monitor, we'll call checkDataRate() manually @@ -595,8 +634,7 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) { for i := 0; i < maxConsecutiveRestarts; i++ { m.checkDataRate() - err := mockAPI.awaitRestart() - require.NoError(t, err) + mockAPI.expectRestart(t) } } triggerMaxRestarts() @@ -615,8 +653,7 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) { // attempt. // Instead the channel should be closed and the monitor shut down. m.checkDataRate() - err := mockAPI.awaitRestart() - require.Error(t, err) // require error because expecting no restart + mockAPI.expectNotToRestart(t) verifyChannelShutdown(t, chanCtx) }) } @@ -680,15 +717,16 @@ func TestChannelMonitorTimeouts(t *testing.T) { acceptTimeout := 10 * time.Millisecond completeTimeout := 10 * time.Millisecond m := NewMonitor(mockAPI, &Config{ - MonitorPushChannels: isPush, - MonitorPullChannels: !isPush, - AcceptTimeout: acceptTimeout, - Interval: time.Hour, - ChecksPerInterval: 1, - MinBytesTransferred: 1, - MaxConsecutiveRestarts: 1, - CompleteTimeout: completeTimeout, - PullPauseTimeout: time.Hour, + MonitorPushChannels: isPush, + MonitorPullChannels: !isPush, + AcceptTimeout: acceptTimeout, + EnableDataRateMonitoring: true, + Interval: time.Hour, + ChecksPerInterval: 1, + MinBytesTransferred: 1, + MaxConsecutiveRestarts: 1, + CompleteTimeout: completeTimeout, + PullPauseTimeout: time.Hour, }) m.Start() @@ -817,6 +855,19 @@ func (m *mockMonitorAPI) awaitRestart() error { } } +func (m *mockMonitorAPI) expectRestart(t *testing.T) { + err := m.awaitRestart() + require.NoError(t, err) +} + +func (m *mockMonitorAPI) expectNotToRestart(t *testing.T) { + select { + case <-time.After(10 * time.Millisecond): + case <-m.restarts: + require.Fail(t, "expected no channel restart") + } +} + func (m *mockMonitorAPI) CloseDataTransferChannelWithError(ctx context.Context, chid datatransfer.ChannelID, cherr error) error { close(m.closed) return nil diff --git a/impl/impl.go b/impl/impl.go index 8490f11e..91b10852 100644 --- a/impl/impl.go +++ b/impl/impl.go @@ -259,6 +259,10 @@ func (m *manager) OpenPullDataChannel(ctx context.Context, requestTo peer.ID, vo return chid, nil } +func (m *manager) EnableDataRateMonitoring(chid datatransfer.ChannelID) error { + return m.channelMonitor.EnableDataRateMonitoring(chid) +} + // SendVoucher sends an intermediate voucher as needed when the receiver sends a request for revalidation func (m *manager) SendVoucher(ctx context.Context, channelID datatransfer.ChannelID, voucher datatransfer.Voucher) error { chst, err := m.channels.GetByID(ctx, channelID) diff --git a/manager.go b/manager.go index 7b9c3402..393410bd 100644 --- a/manager.go +++ b/manager.go @@ -104,6 +104,15 @@ type Manager interface { // transfer parts of the piece that match the selector OpenPullDataChannel(ctx context.Context, to peer.ID, voucher Voucher, baseCid cid.Cid, selector ipld.Node) (ChannelID, error) + // EnableDataRateMonitoring enables data-rate monitoring for a channel. + // It is used in the situation where the client wants to create the channel + // (and start monitoring for the Accept response from the responder), but + // doesn't want to start monitoring the data rate until a later time. + // For example when creating a retrieval deal, it may take some time to + // set up a payment channel. So the client creates a data-transfer channel, + // then creates a payment channel, then enables data-rate monitoring. + EnableDataRateMonitoring(chid ChannelID) error + // send an intermediate voucher as needed when the receiver sends a request for revalidation SendVoucher(ctx context.Context, chid ChannelID, voucher Voucher) error From 74b8bb2389705013573cb28203712f4e254b9693 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 30 Mar 2021 11:42:19 +0200 Subject: [PATCH 4/4] feat: integration test for enable data rate monitoring --- channelmonitor/channelmonitor.go | 3 +- impl/integration_test.go | 117 +++++++++++++++++++++++-------- 2 files changed, 88 insertions(+), 32 deletions(-) diff --git a/channelmonitor/channelmonitor.go b/channelmonitor/channelmonitor.go index 8c210bc8..69169a9b 100644 --- a/channelmonitor/channelmonitor.go +++ b/channelmonitor/channelmonitor.go @@ -161,7 +161,8 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) monitored // (and start monitoring for the Accept response from the responder), but // doesn't want to start monitoring the data rate until a later time. // For example when creating a retrieval deal, it may take some time to -// set up a payment channel. +// set up a payment channel. The client opens a data-transfer channel, +// then opens a payment channel, then starts monitoring the data rate. // Note: The monitor must already be enabled in order to enable monitoring for // a particular channel. // This method is idempotent. diff --git a/impl/integration_test.go b/impl/integration_test.go index 6b0a07f0..abcae508 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -664,16 +664,17 @@ func TestAutoRestart(t *testing.T) { // Set up restartConf := ChannelRestartConfig(channelmonitor.Config{ - MonitorPushChannels: tc.isPush, - MonitorPullChannels: !tc.isPush, - AcceptTimeout: 100 * time.Millisecond, - Interval: 100 * time.Millisecond, - MinBytesTransferred: 1, - ChecksPerInterval: 10, - RestartBackoff: 500 * time.Millisecond, - MaxConsecutiveRestarts: 5, - CompleteTimeout: 100 * time.Millisecond, - PullPauseTimeout: 100 * time.Millisecond, + MonitorPushChannels: tc.isPush, + MonitorPullChannels: !tc.isPush, + AcceptTimeout: 100 * time.Millisecond, + EnableDataRateMonitoring: true, + Interval: 100 * time.Millisecond, + MinBytesTransferred: 1, + ChecksPerInterval: 10, + RestartBackoff: 500 * time.Millisecond, + MaxConsecutiveRestarts: 5, + CompleteTimeout: 100 * time.Millisecond, + PullPauseTimeout: 100 * time.Millisecond, }) initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, restartConf) require.NoError(t, err) @@ -937,31 +938,68 @@ func (r *retrievalRevalidator) OnComplete(chid datatransfer.ChannelID) (bool, da func TestSimulatedRetrievalFlow(t *testing.T) { ctx := context.Background() testCases := map[string]struct { - unpauseResponderDelay time.Duration + provUnsealDelay time.Duration + clientPaychCreateDelay time.Duration channelMonitorPauseTimeout time.Duration + monitorDataRateFromStart bool restartExpected bool }{ - // Simulate a retrieval where the provider pauses the data transfer - // while the file is being unsealed, for just a moment + // Simulate a retrieval where + // - the provider unseal delay is minimal + // - the client payment channel already has sufficient funds "fast unseal": { - unpauseResponderDelay: 0, + provUnsealDelay: 0, + clientPaychCreateDelay: 0, channelMonitorPauseTimeout: 500 * time.Millisecond, + monitorDataRateFromStart: true, restartExpected: false, }, - // Simulate a retrieval where the provider pauses the data transfer - // while the file is being unsealed, for a relatively long time + // Simulate a retrieval where + // - the provider unseal delay is relatively long but does not exceed + // the pause timeout + // - the client payment channel already has sufficient funds "slow unseal": { - unpauseResponderDelay: 200 * time.Millisecond, + provUnsealDelay: 200 * time.Millisecond, + clientPaychCreateDelay: 0, channelMonitorPauseTimeout: 500 * time.Millisecond, + monitorDataRateFromStart: true, restartExpected: false, }, - // Simulate a retrieval where the provider pauses the data transfer - // while the file is being unsealed, for longer than the unpause timeout + // Simulate a retrieval where + // - the provider unseal delay is longer than the unpause timeout + // - the client payment channel already has sufficient funds + // There should be a restart because of the excessive unseal delay. "unseal slower than unpause timeout": { - unpauseResponderDelay: 200 * time.Millisecond, + provUnsealDelay: 200 * time.Millisecond, + clientPaychCreateDelay: 0, channelMonitorPauseTimeout: 100 * time.Millisecond, + monitorDataRateFromStart: true, restartExpected: true, }, + // Simulate a retrieval where + // - the provider unseal delay is long but does not exceed the pause timeout + // - the client payment channel creation delay is longer than the pause timeout + // There should not be a restart because the client doesn't start monitoring + // the data rate until after the payment channel creation has completed. + "slow unseal, slow payment channel create": { + provUnsealDelay: 50 * time.Millisecond, + clientPaychCreateDelay: 150 * time.Millisecond, + channelMonitorPauseTimeout: 100 * time.Millisecond, + monitorDataRateFromStart: false, + restartExpected: false, + }, + // Simulate a retrieval where + // - the provider unseal delay is longer than the pause timeout + // - the client payment channel creation delay is longer than the pause timeout + // There should not be a restart because the client doesn't start monitoring + // the data rate until after the payment channel creation has completed. + "unseal slower than unpause timeout, slow payment channel create": { + provUnsealDelay: 200 * time.Millisecond, + clientPaychCreateDelay: 150 * time.Millisecond, + channelMonitorPauseTimeout: 100 * time.Millisecond, + monitorDataRateFromStart: false, + restartExpected: false, + }, } for testCase, config := range testCases { t.Run(testCase, func(t *testing.T) { @@ -984,15 +1022,16 @@ func TestSimulatedRetrievalFlow(t *testing.T) { // Set up restart config for client clientRestartConf := ChannelRestartConfig(channelmonitor.Config{ - MonitorPullChannels: true, - AcceptTimeout: 100 * time.Millisecond, - Interval: 100 * time.Millisecond, - MinBytesTransferred: 1, - ChecksPerInterval: 2, - RestartBackoff: 500 * time.Millisecond, - MaxConsecutiveRestarts: 5, - CompleteTimeout: 100 * time.Millisecond, - PullPauseTimeout: config.channelMonitorPauseTimeout, + MonitorPullChannels: true, + AcceptTimeout: 100 * time.Millisecond, + EnableDataRateMonitoring: config.monitorDataRateFromStart, + Interval: 100 * time.Millisecond, + MinBytesTransferred: 1, + ChecksPerInterval: 2, + RestartBackoff: 500 * time.Millisecond, + MaxConsecutiveRestarts: 5, + CompleteTimeout: 100 * time.Millisecond, + PullPauseTimeout: config.channelMonitorPauseTimeout, }) // Setup data transfer for client and provider @@ -1031,7 +1070,23 @@ func TestSimulatedRetrievalFlow(t *testing.T) { if event.Code == datatransfer.DataReceived && clientPausePoint < len(pausePoints) && channelState.Received() > pausePoints[clientPausePoint] { - _ = dtClient.SendVoucher(ctx, chid, testutil.NewFakeDTType()) + + // If this is the first pause point + if clientPausePoint == 0 { + go func() { + // Simulate waiting for a payment channel to be created + time.Sleep(config.clientPaychCreateDelay) + // Enable data rate monitoring now that the payment + // channel has been created + err := dtClient.EnableDataRateMonitoring(chid) + require.NoError(t, err) + // Send the voucher + _ = dtClient.SendVoucher(ctx, chid, testutil.NewFakeDTType()) + }() + } else { + // Otherwise just send the voucher immediately + _ = dtClient.SendVoucher(ctx, chid, testutil.NewFakeDTType()) + } clientPausePoint++ } @@ -1058,7 +1113,7 @@ func TestSimulatedRetrievalFlow(t *testing.T) { providerAccepted = true // Simulate pausing while data is unsealed - timer := time.NewTimer(config.unpauseResponderDelay) + timer := time.NewTimer(config.provUnsealDelay) go func() { <-timer.C