Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Config struct {
// Max time to wait for the responder to send a Complete message once all
// data has been sent
CompleteTimeout time.Duration
// Max time to wait for a resume after the responder paused the channel - for pull channels
PullPauseTimeout time.Duration
}

func NewMonitor(mgr monitorAPI, cfg *Config) *Monitor {
Expand Down Expand Up @@ -91,6 +93,9 @@ func checkConfig(cfg *Config) {
if cfg.CompleteTimeout <= 0 {
panic(fmt.Sprintf(prefix+"CompleteTimeout is %s but must be > 0", cfg.CompleteTimeout))
}
if cfg.MonitorPullChannels && cfg.PullPauseTimeout <= 0 {
panic(fmt.Sprintf(prefix+"PullPauseTimeout is %s but must be > 0", cfg.PullPauseTimeout))
}
}

// This interface just makes it easier to abstract some methods between the
Expand Down Expand Up @@ -563,6 +568,7 @@ type monitoredPullChannel struct {
statsLk sync.RWMutex
received uint64
dataRatePoints chan uint64
pausedAt time.Time
}

func newMonitoredPullChannel(
Expand All @@ -585,6 +591,30 @@ func (mc *monitoredPullChannel) checkDataRate() {
mc.statsLk.Lock()
defer mc.statsLk.Unlock()

// If the channel is currently paused
if !mc.pausedAt.IsZero() {
// Check if the channel has been paused for too long
pausedFor := time.Since(mc.pausedAt)
if pausedFor > mc.cfg.PullPauseTimeout {
log.Warn(fmt.Sprintf("%s: paused for too long, restarting channel: ", mc.chid) +
fmt.Sprintf("paused since %s: %s is longer than pause timeout %s",
mc.pausedAt, pausedFor, mc.cfg.PullPauseTimeout))

// Reset pause so we can continue checking the data rate
mc.pausedAt = time.Time{}

// Restart the channel
go mc.restartChannel()

return
}

// Channel is paused, so wait for a resume before checking the data rate
log.Debugf("%s: is paused since %s; waiting for resume to continue checking data rate",
mc.chid, mc.pausedAt)
return
}

// Before returning, add the current data rate stats to the queue
defer func() {
mc.dataRatePoints <- mc.received
Expand Down Expand Up @@ -624,5 +654,17 @@ func (mc *monitoredPullChannel) onDTEvent(event datatransfer.Event, channelState

// Some data was received so reset the consecutive restart counter
mc.resetConsecutiveRestarts()

case datatransfer.PauseResponder:
// The sender has paused sending data
mc.statsLk.Lock()
mc.pausedAt = time.Now()
mc.statsLk.Unlock()

case datatransfer.ResumeResponder:
// The sender has resumed sending data
mc.statsLk.Lock()
mc.pausedAt = time.Time{}
mc.statsLk.Unlock()
}
}
147 changes: 147 additions & 0 deletions channelmonitor/channelmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func TestPushChannelMonitorAutoRestart(t *testing.T) {
MinBytesTransferred: 1,
MaxConsecutiveRestarts: 3,
CompleteTimeout: time.Hour,
PullPauseTimeout: time.Hour,
})
m.Start()
mch := m.AddPushChannel(ch1).(*monitoredPushChannel)
Expand Down Expand Up @@ -152,6 +153,7 @@ func TestPullChannelMonitorAutoRestart(t *testing.T) {
MinBytesTransferred: 1,
MaxConsecutiveRestarts: 3,
CompleteTimeout: time.Hour,
PullPauseTimeout: time.Hour,
})
m.Start()
mch := m.AddPullChannel(ch1).(*monitoredPullChannel)
Expand Down Expand Up @@ -315,6 +317,7 @@ func TestPushChannelMonitorDataRate(t *testing.T) {
MinBytesTransferred: tc.minBytesSent,
MaxConsecutiveRestarts: 3,
CompleteTimeout: time.Hour,
PullPauseTimeout: time.Hour,
})

// Note: Don't start monitor, we'll call checkDataRate() manually
Expand Down Expand Up @@ -384,6 +387,7 @@ func TestPullChannelMonitorDataRate(t *testing.T) {
MinBytesTransferred: tc.minBytesTransferred,
MaxConsecutiveRestarts: 3,
CompleteTimeout: time.Hour,
PullPauseTimeout: time.Hour,
})

// Note: Don't start monitor, we'll call checkDataRate() manually
Expand Down Expand Up @@ -414,6 +418,139 @@ func TestPullChannelMonitorDataRate(t *testing.T) {
}
}

func TestPullChannelMonitorPausing(t *testing.T) {
ch := &mockChannelState{chid: ch1}
mockAPI := newMockMonitorAPI(ch, false)

checkIfRestarted := func(expectRestart bool) {
select {
case <-time.After(5 * time.Millisecond):
if expectRestart {
require.Fail(t, "failed to restart channel")
}
case <-mockAPI.restarts:
if !expectRestart {
require.Fail(t, "expected no channel restart")
}
}
}

minBytesTransferred := uint64(10)
m := NewMonitor(mockAPI, &Config{
MonitorPullChannels: true,
AcceptTimeout: time.Hour,
Interval: time.Hour,
ChecksPerInterval: 1,
MinBytesTransferred: minBytesTransferred,
MaxConsecutiveRestarts: 3,
CompleteTimeout: time.Hour,
PullPauseTimeout: time.Hour,
})

// Note: Don't start monitor, we'll call checkDataRate() manually

m.AddPullChannel(ch1)

lastRcvd := uint64(5)
mockAPI.dataReceived(lastRcvd)
m.checkDataRate()

// Some data received, but less than required amount
mockAPI.dataReceived(lastRcvd + minBytesTransferred/2)

// If responder is paused, the monitor should ignore data
// rate checking until responder resumes
mockAPI.pauseResponder()
m.checkDataRate() // Should be ignored because responder is paused
m.checkDataRate()
m.checkDataRate()

// Should not restart
checkIfRestarted(false)

// Resume the responder
mockAPI.resumeResponder()

// Receive some data
lastRcvd = 100
mockAPI.dataReceived(lastRcvd)

// Should not restart because received data exceeds minimum required
m.checkDataRate()
checkIfRestarted(false)

// Pause responder again
mockAPI.pauseResponder()
m.checkDataRate() // Should be ignored because responder is paised
m.checkDataRate()
m.checkDataRate()

// Resume responder
mockAPI.resumeResponder()

// Not enough data received, should restart
mockAPI.dataReceived(lastRcvd + minBytesTransferred/2)
m.checkDataRate()
checkIfRestarted(true)
}

func TestPullChannelMonitorPauseTimeout(t *testing.T) {
ch := &mockChannelState{chid: ch1}
mockAPI := newMockMonitorAPI(ch, false)

checkIfRestarted := func(expectRestart bool) {
select {
case <-time.After(5 * time.Millisecond):
if expectRestart {
require.Fail(t, "failed to restart channel")
}
case <-mockAPI.restarts:
if !expectRestart {
require.Fail(t, "expected no channel restart")
}
}
}

minBytesTransferred := uint64(10)
pullPauseTimeout := 50 * time.Millisecond
m := NewMonitor(mockAPI, &Config{
MonitorPullChannels: true,
AcceptTimeout: time.Hour,
Interval: time.Hour,
ChecksPerInterval: 1,
MinBytesTransferred: minBytesTransferred,
MaxConsecutiveRestarts: 3,
CompleteTimeout: time.Hour,
PullPauseTimeout: pullPauseTimeout,
})

// Note: Don't start monitor, we'll call checkDataRate() manually

m.AddPullChannel(ch1)

lastRcvd := uint64(5)
mockAPI.dataReceived(lastRcvd)
m.checkDataRate()

// Some data received, but less than required amount
mockAPI.dataReceived(lastRcvd + minBytesTransferred/2)

// If responder is paused, the monitor should ignore data
// rate checking until responder resumes
mockAPI.pauseResponder()
m.checkDataRate() // Should be ignored because responder is paused

// Should not restart
checkIfRestarted(false)

// Pause timeout elapses
time.Sleep(pullPauseTimeout * 2)

// Should detect timeout has elapsed and restart
m.checkDataRate()
checkIfRestarted(true)
}

func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) {
runTest := func(name string, isPush bool) {
t.Run(name, func(t *testing.T) {
Expand All @@ -430,6 +567,7 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) {
MinBytesTransferred: 2,
MaxConsecutiveRestarts: uint32(maxConsecutiveRestarts),
CompleteTimeout: time.Hour,
PullPauseTimeout: time.Hour,
})

// Note: Don't start monitor, we'll call checkDataRate() manually
Expand Down Expand Up @@ -550,6 +688,7 @@ func TestChannelMonitorTimeouts(t *testing.T) {
MinBytesTransferred: 1,
MaxConsecutiveRestarts: 1,
CompleteTimeout: completeTimeout,
PullPauseTimeout: time.Hour,
})
m.Start()

Expand Down Expand Up @@ -715,6 +854,14 @@ func (m *mockMonitorAPI) sendDataErrorEvent() {
m.callSubscriber(datatransfer.Event{Code: datatransfer.SendDataError}, m.ch)
}

func (m *mockMonitorAPI) pauseResponder() {
m.callSubscriber(datatransfer.Event{Code: datatransfer.PauseResponder}, m.ch)
}

func (m *mockMonitorAPI) resumeResponder() {
m.callSubscriber(datatransfer.Event{Code: datatransfer.ResumeResponder}, m.ch)
}

type mockChannelState struct {
chid datatransfer.ChannelID
queued uint64
Expand Down
Loading