Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 39 additions & 14 deletions channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ type monitoredChannel struct {
chid datatransfer.ChannelID
cfg *Config
unsub datatransfer.Unsubscribe
restartChannelDebounced func()
restartChannelDebounced func(error)
onShutdown func(datatransfer.ChannelID)
shutdownLk sync.Mutex

Expand All @@ -187,8 +187,33 @@ func newMonitoredChannel(
cfg: cfg,
onShutdown: onShutdown,
}

// "debounce" calls to restart channel, ie if there are multiple calls in a
// short space of time, only send a message to restart the channel once
var lk sync.Mutex
var lastErr error
debouncer := debounce.New(cfg.RestartDebounce)
mpc.restartChannelDebounced = func() { debouncer(mpc.restartChannel) }
mpc.restartChannelDebounced = func(err error) {
// Log the error at debug level
log.Debug(err.Error())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI:

log.Debugw("what does this mean", "error", err)

Otherwise:

  1. We pay for the overhead of calling Error, even if we don't have debug logging enabled.
  2. The user will see log messages like "stream reset" with no context.


// Save the last error passed to restartChannelDebounced
lk.Lock()
lastErr = err
lk.Unlock()

debouncer(func() {
// Log only the last error passed to restartChannelDebounced at warning level
lk.Lock()
log.Warnf("%s", lastErr)
lk.Unlock()

// Restart the channel
mpc.restartChannel()
})
}

// Start monitoring the channel
mpc.start()
return mpc
}
Expand Down Expand Up @@ -247,13 +272,13 @@ func (mc *monitoredChannel) start() {
case datatransfer.SendDataError:
// If the transport layer reports an error sending data over the wire,
// attempt to restart the channel
log.Warnf("%s: data transfer transport send error, restarting data transfer", mc.chid)
go mc.restartChannelDebounced()
err := xerrors.Errorf("%s: data transfer transport send error, restarting data transfer", mc.chid)
go mc.restartChannelDebounced(err)
case datatransfer.ReceiveDataError:
// If the transport layer reports an error receiving data over the wire,
// attempt to restart the channel
log.Warnf("%s: data transfer transport receive error, restarting data transfer", mc.chid)
go mc.restartChannelDebounced()
err := xerrors.Errorf("%s: data transfer transport receive error, restarting data transfer", mc.chid)
go mc.restartChannelDebounced(err)
case datatransfer.FinishTransfer:
// The channel initiator has finished sending / receiving all data.
// Watch to make sure that the responder sends a message to acknowledge
Expand Down Expand Up @@ -355,7 +380,7 @@ func (mc *monitoredChannel) restartChannel() {

// Check if channel is already being restarted
if !restartedAt.IsZero() {
log.Infof("%s: restart called but already restarting channel, "+
log.Debugf("%s: restart called but already restarting channel, "+
"waiting to restart again (since %s; restart backoff is %s)",
mc.chid, time.Since(restartedAt), mc.cfg.RestartBackoff)
return
Expand Down Expand Up @@ -397,7 +422,7 @@ func (mc *monitoredChannel) restartChannel() {
}

// There was a restart queued, restart again
log.Infof("%s: restart was queued - restarting again", mc.chid)
log.Debugf("%s: restart was queued - restarting again", mc.chid)
}
}

Expand All @@ -416,7 +441,7 @@ func (mc *monitoredChannel) doRestartChannel() error {
}

// Send the restart message
log.Infof("%s: restarting (%d consecutive restarts)", mc.chid, restartCount)
log.Debugf("%s: restarting (%d consecutive restarts)", mc.chid, restartCount)
err := mc.sendRestartMessage(restartCount)
if err != nil {
log.Warnf("%s: restart failed, trying again: %s", mc.chid, err)
Expand All @@ -433,17 +458,17 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error {
// Note that at the networking layer there is logic to retry if a network
// connection cannot be established, so this may take some time.
p := mc.chid.OtherParty(mc.mgr.PeerID())
log.Infof("%s: re-establishing connection to %s", mc.chid, p)
log.Debugf("%s: re-establishing connection to %s", mc.chid, p)
start := time.Now()
err := mc.mgr.ConnectTo(mc.ctx, p)
if err != nil {
return xerrors.Errorf("%s: failed to reconnect to peer %s after %s: %w",
mc.chid, p, time.Since(start), err)
}
log.Infof("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start))
log.Debugf("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start))

// Send a restart message for the channel
log.Infof("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount)
log.Debugf("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount)
err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid)
if err != nil {
return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err)
Expand All @@ -453,11 +478,11 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error {
// If a restart backoff is configured, backoff after a restart before
// attempting another.
if mc.cfg.RestartBackoff > 0 {
log.Infof("%s: backing off %s before allowing any other restarts",
log.Debugf("%s: backing off %s before allowing any other restarts",
mc.chid, mc.cfg.RestartBackoff)
select {
case <-time.After(mc.cfg.RestartBackoff):
log.Infof("%s: restart back-off of %s complete", mc.chid, mc.cfg.RestartBackoff)
log.Debugf("%s: restart back-off of %s complete", mc.chid, mc.cfg.RestartBackoff)
case <-mc.ctx.Done():
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,12 @@ func (m *manager) OnRequestDisconnected(chid datatransfer.ChannelID, err error)
}

func (m *manager) OnSendDataError(chid datatransfer.ChannelID, err error) error {
log.Warnf("channel %+v had transport send error: %s", chid, err)
log.Debugf("channel %+v had transport send error: %s", chid, err)
return m.channels.SendDataError(chid, err)
}

func (m *manager) OnReceiveDataError(chid datatransfer.ChannelID, err error) error {
log.Warnf("channel %+v had transport receive error: %s", chid, err)
log.Debugf("channel %+v had transport receive error: %s", chid, err)
return m.channels.ReceiveDataError(chid, err)
}

Expand Down