Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 50 additions & 24 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,29 +93,9 @@ func ChannelRemoveTimeout(timeout time.Duration) DataTransferOption {

// PushChannelRestartConfig sets the configuration options for automatically
// restarting push channels
// - interval is the time over which minBytesSent must have been sent
// - checksPerInterval is the number of times to check per interval
// - minBytesSent is the minimum amount of data that must have been sent over
// the interval
// - restartBackoff is the time to wait before checking again for restarts
// - maxConsecutiveRestarts is the maximum number of restarts in a row to
// attempt where no data is transferred. When the limit is reached the
// channel is closed.
func PushChannelRestartConfig(
interval time.Duration,
checksPerInterval uint32,
minBytesSent uint64,
restartBackoff time.Duration,
maxConsecutiveRestarts uint32,
) DataTransferOption {
func PushChannelRestartConfig(cfg pushchannelmonitor.Config) DataTransferOption {
return func(m *manager) {
m.pushChannelMonitorCfg = &pushchannelmonitor.Config{
Interval: interval,
ChecksPerInterval: checksPerInterval,
MinBytesSent: minBytesSent,
RestartBackoff: restartBackoff,
MaxConsecutiveRestarts: maxConsecutiveRestarts,
}
m.pushChannelMonitorCfg = &cfg
}
}

Expand Down Expand Up @@ -318,30 +298,76 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe
if err != nil {
return err
}

// Close the channel on the local transport
err = m.transport.CloseChannel(ctx, chid)
if err != nil {
log.Warnf("unable to close channel %s: %s", chid, err)
}

log.Infof("%s: sending close channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
// Send a cancel message to the remote peer
log.Infof("%s: sending cancel channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
err = m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid))
if err != nil {
err = fmt.Errorf("Unable to send cancel message: %w", err)
err = fmt.Errorf("unable to send cancel message for channel %s to peer %s: %w",
chid, m.peerID, err)
_ = m.OnRequestDisconnected(ctx, chid)
log.Warn(err)
}

// Fire a cancel event
fsmerr := m.channels.Cancel(chid)
// If it wasn't possible to send a cancel message to the peer, return
// that error
if err != nil {
return err
}
// If it wasn't possible to fire a cancel event, return that error
if fsmerr != nil {
return xerrors.Errorf("unable to send cancel to channel FSM: %w", fsmerr)
}

return nil
}

// close an open channel and fire an error event
func (m *manager) CloseDataTransferChannelWithError(ctx context.Context, chid datatransfer.ChannelID, cherr error) error {
log.Infof("close channel %s with error %s", chid, cherr)

chst, err := m.channels.GetByID(ctx, chid)
if err != nil {
return err
}

// Cancel the channel on the local transport
err = m.transport.CloseChannel(ctx, chid)
if err != nil {
log.Warnf("unable to close channel %s: %s", chid, err)
}

// Try to send a cancel message to the remote peer. It's quite likely
// we aren't able to send the message to the peer because the channel
// is already in an error state, which is probably because of connection
// issues, so if we cant send the message just log a warning.
log.Infof("%s: sending cancel channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
err = m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid))
if err != nil {
// Just log a warning here because it's important that we fire the
// error event with the original error so that it doesn't get masked
// by subsequent errors.
log.Warnf("unable to send cancel message for channel %s to peer %s: %w",
chid, m.peerID, err)
}

// Fire an error event
err = m.channels.Error(chid, cherr)
if err != nil {
return xerrors.Errorf("unable to send error %s to channel FSM: %w", cherr, err)
}

return nil
}

// pause a running data transfer channel
func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error {
log.Infof("pause channel %s", chid)
Expand Down
Loading