Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 164 additions & 25 deletions channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Config struct {
MonitorPullChannels bool
// Max time to wait for other side to accept open channel request before attempting restart
AcceptTimeout time.Duration
// Indicates whether to monitor the data-rate on the channel, and restart
// if it falls too low
EnableDataRateMonitoring bool
// Interval between checks of transfer rate
Interval time.Duration
// Min bytes that must be sent / received in interval
Expand All @@ -53,6 +56,8 @@ type Config struct {
// Max time to wait for the responder to send a Complete message once all
// data has been sent
CompleteTimeout time.Duration
// Max time to wait for a resume after the responder paused the channel - for pull channels
PullPauseTimeout time.Duration
}

func NewMonitor(mgr monitorAPI, cfg *Config) *Monitor {
Expand Down Expand Up @@ -91,12 +96,16 @@ func checkConfig(cfg *Config) {
if cfg.CompleteTimeout <= 0 {
panic(fmt.Sprintf(prefix+"CompleteTimeout is %s but must be > 0", cfg.CompleteTimeout))
}
if cfg.MonitorPullChannels && cfg.PullPauseTimeout <= 0 {
panic(fmt.Sprintf(prefix+"PullPauseTimeout is %s but must be > 0", cfg.PullPauseTimeout))
}
}

// This interface just makes it easier to abstract some methods between the
// push and pull monitor implementations
type monitoredChan interface {
Shutdown()
Shutdown() bool
enableDataRateMonitoring()
checkDataRate()
}

Expand Down Expand Up @@ -125,16 +134,56 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) monitored
m.lk.Lock()
defer m.lk.Unlock()

// Check if there is already a monitor for this channel
if _, ok := m.channels[chid]; ok {
tp := "push"
if !isPush {
tp = "pull"
}
log.Warnf("ignoring add %s channel %s: %s channel with that id already exists",
tp, chid, tp)
return nil
}

// Create the channel monitor
var mpc monitoredChan
if isPush {
mpc = newMonitoredPushChannel(m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown)
mpc = newMonitoredPushChannel(m.ctx, m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown)
} else {
mpc = newMonitoredPullChannel(m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown)
mpc = newMonitoredPullChannel(m.ctx, m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown)
}
m.channels[chid] = mpc
return mpc
}

// EnableDataRateMonitoring enables monitoring for a channel.
// It is used in the situation where the client wants to create the channel
// (and start monitoring for the Accept response from the responder), but
// doesn't want to start monitoring the data rate until a later time.
// For example when creating a retrieval deal, it may take some time to
// set up a payment channel. The client opens a data-transfer channel,
// then opens a payment channel, then starts monitoring the data rate.
// Note: The monitor must already be enabled in order to enable monitoring for
// a particular channel.
// This method is idempotent.
func (m *Monitor) EnableDataRateMonitoring(chid datatransfer.ChannelID) error {
if !m.enabled() {
return xerrors.Errorf("could not enable data rate monitoring for channel %s: channel monitoring is disabled", chid)
}

m.lk.Lock()
defer m.lk.Unlock()

ch, ok := m.channels[chid]
if !ok {
return xerrors.Errorf("could not enable data rate monitoring for unknown channel %s", chid)
}

ch.enableDataRateMonitoring()

return nil
}

func (m *Monitor) Shutdown() {
// Causes the run loop to exit
m.stop()
Expand Down Expand Up @@ -207,6 +256,9 @@ func (m *Monitor) checkDataRate() {
// monitoredChannel keeps track of the data-rate for a channel, and
// restarts the channel if the rate falls below the minimum allowed
type monitoredChannel struct {
// The parentCtx is used when sending a close message for a channel, so
// that operation can continue even after the monitoredChannel is shutdown
parentCtx context.Context
ctx context.Context
cancel context.CancelFunc
mgr monitorAPI
Expand All @@ -223,6 +275,7 @@ type monitoredChannel struct {
}

func newMonitoredChannel(
parentCtx context.Context,
mgr monitorAPI,
chid datatransfer.ChannelID,
cfg *Config,
Expand All @@ -231,6 +284,7 @@ func newMonitoredChannel(
) *monitoredChannel {
ctx, cancel := context.WithCancel(context.Background())
mpc := &monitoredChannel{
parentCtx: parentCtx,
ctx: ctx,
cancel: cancel,
mgr: mgr,
Expand All @@ -247,14 +301,15 @@ func newMonitoredChannel(
func (mc *monitoredChannel) checkDataRate() {
}

// Cancel the context and unsubscribe from events
func (mc *monitoredChannel) Shutdown() {
// Cancel the context and unsubscribe from events.
// Returns true if channel has not already been shutdown.
func (mc *monitoredChannel) Shutdown() bool {
mc.shutdownLk.Lock()
defer mc.shutdownLk.Unlock()

// Check if the channel was already shut down
if mc.cancel == nil {
return
return false
}
mc.cancel() // cancel context so all go-routines exit
mc.cancel = nil
Expand All @@ -264,6 +319,8 @@ func (mc *monitoredChannel) Shutdown() {

// Inform the Manager that this channel has shut down
go mc.onShutdown(mc.chid)

return true
}

func (mc *monitoredChannel) start() {
Expand Down Expand Up @@ -301,9 +358,9 @@ func (mc *monitoredChannel) start() {
log.Warnf("%s: data transfer transport send error, restarting data transfer", mc.chid)
go mc.restartChannel()
case datatransfer.FinishTransfer:
// The client has finished sending all data. Watch to make sure
// that the responder sends a message to acknowledge that the
// transfer is complete
// The channel initiator has finished sending / receiving all data.
// Watch to make sure that the responder sends a message to acknowledge
// that the transfer is complete
go mc.watchForResponderComplete()
default:
// Delegate to the push channel monitor or pull channel monitor to
Expand Down Expand Up @@ -425,14 +482,21 @@ func (mc *monitoredChannel) restartChannel() {
mc.restartLk.Unlock()
}

// Shut down the monitor and close the data transfer channel
func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) {
// Shutdown the monitor
firstShutdown := mc.Shutdown()
if !firstShutdown {
// Channel was already shutdown, ignore this second attempt to shutdown
return
}

// Close the data transfer channel and fire an error
log.Errorf("closing data-transfer channel: %s", cherr)
err := mc.mgr.CloseDataTransferChannelWithError(mc.ctx, mc.chid, cherr)
err := mc.mgr.CloseDataTransferChannelWithError(mc.parentCtx, mc.chid, cherr)
if err != nil {
log.Errorf("error closing data-transfer channel %s: %s", mc.chid, err)
}

mc.Shutdown()
}

// Snapshot of the pending and sent data at a particular point in time.
Expand All @@ -447,31 +511,50 @@ type dataRatePoint struct {
type monitoredPushChannel struct {
*monitoredChannel

statsLk sync.RWMutex
queued uint64
sent uint64
dataRatePoints chan *dataRatePoint
statsLk sync.RWMutex
rateMonitoringEnabled bool
queued uint64
sent uint64
dataRatePoints chan *dataRatePoint
}

func newMonitoredPushChannel(
parentCtx context.Context,
mgr monitorAPI,
chid datatransfer.ChannelID,
cfg *Config,
onShutdown func(datatransfer.ChannelID),
) *monitoredPushChannel {
mpc := &monitoredPushChannel{
dataRatePoints: make(chan *dataRatePoint, cfg.ChecksPerInterval),
rateMonitoringEnabled: cfg.EnableDataRateMonitoring,
dataRatePoints: make(chan *dataRatePoint, cfg.ChecksPerInterval),
}
mpc.monitoredChannel = newMonitoredChannel(mgr, chid, cfg, onShutdown, mpc.onDTEvent)
mpc.monitoredChannel = newMonitoredChannel(parentCtx, mgr, chid, cfg, onShutdown, mpc.onDTEvent)
return mpc
}

func (mc *monitoredPushChannel) enableDataRateMonitoring() {
mc.statsLk.Lock()
defer mc.statsLk.Unlock()

if mc.rateMonitoringEnabled {
return
}

log.Info("%s: enabling push channel data rate monitoring", mc.chid)
mc.rateMonitoringEnabled = true
}

// check if the amount of data sent in the interval was too low, and if so
// restart the channel
func (mc *monitoredPushChannel) checkDataRate() {
mc.statsLk.Lock()
defer mc.statsLk.Unlock()

if !mc.rateMonitoringEnabled {
return
}

// Before returning, add the current data rate stats to the queue
defer func() {
var pending uint64
Expand Down Expand Up @@ -532,30 +615,74 @@ func (mc *monitoredPushChannel) onDTEvent(event datatransfer.Event, channelState
type monitoredPullChannel struct {
*monitoredChannel

statsLk sync.RWMutex
received uint64
dataRatePoints chan uint64
statsLk sync.RWMutex
rateMonitoringEnabled bool
received uint64
dataRatePoints chan uint64
pausedAt time.Time
}

func newMonitoredPullChannel(
parentCtx context.Context,
mgr monitorAPI,
chid datatransfer.ChannelID,
cfg *Config,
onShutdown func(datatransfer.ChannelID),
) *monitoredPullChannel {
mpc := &monitoredPullChannel{
dataRatePoints: make(chan uint64, cfg.ChecksPerInterval),
rateMonitoringEnabled: cfg.EnableDataRateMonitoring,
dataRatePoints: make(chan uint64, cfg.ChecksPerInterval),
}
mpc.monitoredChannel = newMonitoredChannel(mgr, chid, cfg, onShutdown, mpc.onDTEvent)
mpc.monitoredChannel = newMonitoredChannel(parentCtx, mgr, chid, cfg, onShutdown, mpc.onDTEvent)
return mpc
}

func (mc *monitoredPullChannel) enableDataRateMonitoring() {
mc.statsLk.Lock()
defer mc.statsLk.Unlock()

if mc.rateMonitoringEnabled {
return
}

log.Info("%s: enabling pull channel data rate monitoring", mc.chid)
mc.rateMonitoringEnabled = true
}

// check if the amount of data received in the interval was too low, and if so
// restart the channel
func (mc *monitoredPullChannel) checkDataRate() {
mc.statsLk.Lock()
defer mc.statsLk.Unlock()

if !mc.rateMonitoringEnabled {
return
}

// If the channel is currently paused
if !mc.pausedAt.IsZero() {
// Check if the channel has been paused for too long
pausedFor := time.Since(mc.pausedAt)
if pausedFor > mc.cfg.PullPauseTimeout {
log.Warn(fmt.Sprintf("%s: paused for too long, restarting channel: ", mc.chid) +
fmt.Sprintf("paused since %s: %s is longer than pause timeout %s",
mc.pausedAt, pausedFor, mc.cfg.PullPauseTimeout))

// Reset pause so we can continue checking the data rate
mc.pausedAt = time.Time{}

// Restart the channel
go mc.restartChannel()

return
}

// Channel is paused, so wait for a resume before checking the data rate
log.Debugf("%s: is paused since %s; waiting for resume to continue checking data rate",
mc.chid, mc.pausedAt)
return
}

// Before returning, add the current data rate stats to the queue
defer func() {
mc.dataRatePoints <- mc.received
Expand All @@ -578,8 +705,8 @@ func (mc *monitoredPullChannel) checkDataRate() {
log.Debugf("%s: since last check: received: %d - %d = %d, required %d",
mc.chid, mc.received, atIntervalStart, rcvdInInterval, mc.cfg.MinBytesTransferred)
if rcvdInInterval < mc.cfg.MinBytesTransferred {
log.Warnf("%s: data-rate too low, restarting channel: since last check %s ago: received %d, required %d",
mc.chid, mc.cfg.Interval, rcvdInInterval, mc.cfg.MinBytesTransferred)
log.Warnf("%s: data-rate too low, restarting channel: since last check received %d but required %d",
mc.chid, rcvdInInterval, mc.cfg.MinBytesTransferred)
go mc.restartChannel()
}
}
Expand All @@ -595,5 +722,17 @@ func (mc *monitoredPullChannel) onDTEvent(event datatransfer.Event, channelState

// Some data was received so reset the consecutive restart counter
mc.resetConsecutiveRestarts()

case datatransfer.PauseResponder:
// The sender has paused sending data
mc.statsLk.Lock()
mc.pausedAt = time.Now()
mc.statsLk.Unlock()

case datatransfer.ResumeResponder:
// The sender has resumed sending data
mc.statsLk.Lock()
mc.pausedAt = time.Time{}
mc.statsLk.Unlock()
}
}
Loading