Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 45 additions & 16 deletions channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func checkConfig(cfg *Config) {
// This interface just makes it easier to abstract some methods between the
// push and pull monitor implementations
type monitoredChan interface {
Shutdown()
Shutdown() bool
checkDataRate()
}

Expand Down Expand Up @@ -125,11 +125,23 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) monitored
m.lk.Lock()
defer m.lk.Unlock()

// Check if there is already a monitor for this channel
if _, ok := m.channels[chid]; ok {
tp := "push"
if !isPush {
tp = "pull"
}
log.Warnf("ignoring add %s channel %s: %s channel with that id already exists",
tp, chid, tp)
return nil
}

// Create the channel monitor
var mpc monitoredChan
if isPush {
mpc = newMonitoredPushChannel(m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown)
mpc = newMonitoredPushChannel(m.ctx, m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown)
} else {
mpc = newMonitoredPullChannel(m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown)
mpc = newMonitoredPullChannel(m.ctx, m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown)
}
m.channels[chid] = mpc
return mpc
Expand Down Expand Up @@ -207,6 +219,9 @@ func (m *Monitor) checkDataRate() {
// monitoredChannel keeps track of the data-rate for a channel, and
// restarts the channel if the rate falls below the minimum allowed
type monitoredChannel struct {
// The parentCtx is used when sending a close message for a channel, so
// that operation can continue even after the monitoredChannel is shutdown
parentCtx context.Context
ctx context.Context
cancel context.CancelFunc
mgr monitorAPI
Expand All @@ -223,6 +238,7 @@ type monitoredChannel struct {
}

func newMonitoredChannel(
parentCtx context.Context,
mgr monitorAPI,
chid datatransfer.ChannelID,
cfg *Config,
Expand All @@ -231,6 +247,7 @@ func newMonitoredChannel(
) *monitoredChannel {
ctx, cancel := context.WithCancel(context.Background())
mpc := &monitoredChannel{
parentCtx: parentCtx,
ctx: ctx,
cancel: cancel,
mgr: mgr,
Expand All @@ -247,14 +264,15 @@ func newMonitoredChannel(
func (mc *monitoredChannel) checkDataRate() {
}

// Cancel the context and unsubscribe from events
func (mc *monitoredChannel) Shutdown() {
// Cancel the context and unsubscribe from events.
// Returns true if channel has not already been shutdown.
func (mc *monitoredChannel) Shutdown() bool {
mc.shutdownLk.Lock()
defer mc.shutdownLk.Unlock()

// Check if the channel was already shut down
if mc.cancel == nil {
return
return false
}
mc.cancel() // cancel context so all go-routines exit
mc.cancel = nil
Expand All @@ -264,6 +282,8 @@ func (mc *monitoredChannel) Shutdown() {

// Inform the Manager that this channel has shut down
go mc.onShutdown(mc.chid)

return true
}

func (mc *monitoredChannel) start() {
Expand Down Expand Up @@ -301,9 +321,9 @@ func (mc *monitoredChannel) start() {
log.Warnf("%s: data transfer transport send error, restarting data transfer", mc.chid)
go mc.restartChannel()
case datatransfer.FinishTransfer:
// The client has finished sending all data. Watch to make sure
// that the responder sends a message to acknowledge that the
// transfer is complete
// The channel initiator has finished sending / receiving all data.
// Watch to make sure that the responder sends a message to acknowledge
// that the transfer is complete
go mc.watchForResponderComplete()
default:
// Delegate to the push channel monitor or pull channel monitor to
Expand Down Expand Up @@ -425,14 +445,21 @@ func (mc *monitoredChannel) restartChannel() {
mc.restartLk.Unlock()
}

// Shut down the monitor and close the data transfer channel
func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) {
// Shutdown the monitor
firstShutdown := mc.Shutdown()
if !firstShutdown {
// Channel was already shutdown, ignore this second attempt to shutdown
return
}

// Close the data transfer channel and fire an error
log.Errorf("closing data-transfer channel: %s", cherr)
err := mc.mgr.CloseDataTransferChannelWithError(mc.ctx, mc.chid, cherr)
err := mc.mgr.CloseDataTransferChannelWithError(mc.parentCtx, mc.chid, cherr)
if err != nil {
log.Errorf("error closing data-transfer channel %s: %s", mc.chid, err)
}

mc.Shutdown()
}

// Snapshot of the pending and sent data at a particular point in time.
Expand All @@ -454,6 +481,7 @@ type monitoredPushChannel struct {
}

func newMonitoredPushChannel(
parentCtx context.Context,
mgr monitorAPI,
chid datatransfer.ChannelID,
cfg *Config,
Expand All @@ -462,7 +490,7 @@ func newMonitoredPushChannel(
mpc := &monitoredPushChannel{
dataRatePoints: make(chan *dataRatePoint, cfg.ChecksPerInterval),
}
mpc.monitoredChannel = newMonitoredChannel(mgr, chid, cfg, onShutdown, mpc.onDTEvent)
mpc.monitoredChannel = newMonitoredChannel(parentCtx, mgr, chid, cfg, onShutdown, mpc.onDTEvent)
return mpc
}

Expand Down Expand Up @@ -538,6 +566,7 @@ type monitoredPullChannel struct {
}

func newMonitoredPullChannel(
parentCtx context.Context,
mgr monitorAPI,
chid datatransfer.ChannelID,
cfg *Config,
Expand All @@ -546,7 +575,7 @@ func newMonitoredPullChannel(
mpc := &monitoredPullChannel{
dataRatePoints: make(chan uint64, cfg.ChecksPerInterval),
}
mpc.monitoredChannel = newMonitoredChannel(mgr, chid, cfg, onShutdown, mpc.onDTEvent)
mpc.monitoredChannel = newMonitoredChannel(parentCtx, mgr, chid, cfg, onShutdown, mpc.onDTEvent)
return mpc
}

Expand Down Expand Up @@ -578,8 +607,8 @@ func (mc *monitoredPullChannel) checkDataRate() {
log.Debugf("%s: since last check: received: %d - %d = %d, required %d",
mc.chid, mc.received, atIntervalStart, rcvdInInterval, mc.cfg.MinBytesTransferred)
if rcvdInInterval < mc.cfg.MinBytesTransferred {
log.Warnf("%s: data-rate too low, restarting channel: since last check %s ago: received %d, required %d",
mc.chid, mc.cfg.Interval, rcvdInInterval, mc.cfg.MinBytesTransferred)
log.Warnf("%s: data-rate too low, restarting channel: since last check received %d but required %d",
mc.chid, rcvdInInterval, mc.cfg.MinBytesTransferred)
go mc.restartChannel()
}
}
Expand Down
14 changes: 9 additions & 5 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,13 +614,17 @@ func TestAutoRestart(t *testing.T) {
// right before the responder sends the complete message (ie responder sent
// all blocks, but the responder doesn't get a chance to tell the initiator
// before the disconnect)
name: "push: before requester sends complete message",
isPush: true,
name: "pull: before responder sends complete message",
isPush: false,
expectInitiatorDTFail: true,
disconnectOnRequestComplete: true,
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
expectFailMsg := ""
if tc.expectInitiatorDTFail {
expectFailMsg = " (expect failure)"
}
t.Run(tc.name+expectFailMsg, func(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -659,8 +663,8 @@ func TestAutoRestart(t *testing.T) {

// Set up
restartConf := ChannelRestartConfig(channelmonitor.Config{
MonitorPushChannels: true,
MonitorPullChannels: true,
MonitorPushChannels: tc.isPush,
MonitorPullChannels: !tc.isPush,
AcceptTimeout: 100 * time.Millisecond,
Interval: 100 * time.Millisecond,
MinBytesTransferred: 1,
Expand Down