diff --git a/channelmonitor/channelmonitor.go b/channelmonitor/channelmonitor.go index e14ee683..f79fb2b4 100644 --- a/channelmonitor/channelmonitor.go +++ b/channelmonitor/channelmonitor.go @@ -96,7 +96,7 @@ func checkConfig(cfg *Config) { // This interface just makes it easier to abstract some methods between the // push and pull monitor implementations type monitoredChan interface { - Shutdown() + Shutdown() bool checkDataRate() } @@ -125,11 +125,23 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) monitored m.lk.Lock() defer m.lk.Unlock() + // Check if there is already a monitor for this channel + if _, ok := m.channels[chid]; ok { + tp := "push" + if !isPush { + tp = "pull" + } + log.Warnf("ignoring add %s channel %s: %s channel with that id already exists", + tp, chid, tp) + return nil + } + + // Create the channel monitor var mpc monitoredChan if isPush { - mpc = newMonitoredPushChannel(m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown) + mpc = newMonitoredPushChannel(m.ctx, m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown) } else { - mpc = newMonitoredPullChannel(m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown) + mpc = newMonitoredPullChannel(m.ctx, m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown) } m.channels[chid] = mpc return mpc @@ -207,6 +219,9 @@ func (m *Monitor) checkDataRate() { // monitoredChannel keeps track of the data-rate for a channel, and // restarts the channel if the rate falls below the minimum allowed type monitoredChannel struct { + // The parentCtx is used when sending a close message for a channel, so + // that operation can continue even after the monitoredChannel is shutdown + parentCtx context.Context ctx context.Context cancel context.CancelFunc mgr monitorAPI @@ -223,6 +238,7 @@ type monitoredChannel struct { } func newMonitoredChannel( + parentCtx context.Context, mgr monitorAPI, chid datatransfer.ChannelID, cfg *Config, @@ -231,6 +247,7 @@ func newMonitoredChannel( ) *monitoredChannel { ctx, cancel := context.WithCancel(context.Background()) mpc := &monitoredChannel{ + parentCtx: parentCtx, ctx: ctx, cancel: cancel, mgr: mgr, @@ -247,14 +264,15 @@ func newMonitoredChannel( func (mc *monitoredChannel) checkDataRate() { } -// Cancel the context and unsubscribe from events -func (mc *monitoredChannel) Shutdown() { +// Cancel the context and unsubscribe from events. +// Returns true if channel has not already been shutdown. +func (mc *monitoredChannel) Shutdown() bool { mc.shutdownLk.Lock() defer mc.shutdownLk.Unlock() // Check if the channel was already shut down if mc.cancel == nil { - return + return false } mc.cancel() // cancel context so all go-routines exit mc.cancel = nil @@ -264,6 +282,8 @@ func (mc *monitoredChannel) Shutdown() { // Inform the Manager that this channel has shut down go mc.onShutdown(mc.chid) + + return true } func (mc *monitoredChannel) start() { @@ -301,9 +321,9 @@ func (mc *monitoredChannel) start() { log.Warnf("%s: data transfer transport send error, restarting data transfer", mc.chid) go mc.restartChannel() case datatransfer.FinishTransfer: - // The client has finished sending all data. Watch to make sure - // that the responder sends a message to acknowledge that the - // transfer is complete + // The channel initiator has finished sending / receiving all data. + // Watch to make sure that the responder sends a message to acknowledge + // that the transfer is complete go mc.watchForResponderComplete() default: // Delegate to the push channel monitor or pull channel monitor to @@ -425,14 +445,21 @@ func (mc *monitoredChannel) restartChannel() { mc.restartLk.Unlock() } +// Shut down the monitor and close the data transfer channel func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) { + // Shutdown the monitor + firstShutdown := mc.Shutdown() + if !firstShutdown { + // Channel was already shutdown, ignore this second attempt to shutdown + return + } + + // Close the data transfer channel and fire an error log.Errorf("closing data-transfer channel: %s", cherr) - err := mc.mgr.CloseDataTransferChannelWithError(mc.ctx, mc.chid, cherr) + err := mc.mgr.CloseDataTransferChannelWithError(mc.parentCtx, mc.chid, cherr) if err != nil { log.Errorf("error closing data-transfer channel %s: %s", mc.chid, err) } - - mc.Shutdown() } // Snapshot of the pending and sent data at a particular point in time. @@ -454,6 +481,7 @@ type monitoredPushChannel struct { } func newMonitoredPushChannel( + parentCtx context.Context, mgr monitorAPI, chid datatransfer.ChannelID, cfg *Config, @@ -462,7 +490,7 @@ func newMonitoredPushChannel( mpc := &monitoredPushChannel{ dataRatePoints: make(chan *dataRatePoint, cfg.ChecksPerInterval), } - mpc.monitoredChannel = newMonitoredChannel(mgr, chid, cfg, onShutdown, mpc.onDTEvent) + mpc.monitoredChannel = newMonitoredChannel(parentCtx, mgr, chid, cfg, onShutdown, mpc.onDTEvent) return mpc } @@ -538,6 +566,7 @@ type monitoredPullChannel struct { } func newMonitoredPullChannel( + parentCtx context.Context, mgr monitorAPI, chid datatransfer.ChannelID, cfg *Config, @@ -546,7 +575,7 @@ func newMonitoredPullChannel( mpc := &monitoredPullChannel{ dataRatePoints: make(chan uint64, cfg.ChecksPerInterval), } - mpc.monitoredChannel = newMonitoredChannel(mgr, chid, cfg, onShutdown, mpc.onDTEvent) + mpc.monitoredChannel = newMonitoredChannel(parentCtx, mgr, chid, cfg, onShutdown, mpc.onDTEvent) return mpc } @@ -578,8 +607,8 @@ func (mc *monitoredPullChannel) checkDataRate() { log.Debugf("%s: since last check: received: %d - %d = %d, required %d", mc.chid, mc.received, atIntervalStart, rcvdInInterval, mc.cfg.MinBytesTransferred) if rcvdInInterval < mc.cfg.MinBytesTransferred { - log.Warnf("%s: data-rate too low, restarting channel: since last check %s ago: received %d, required %d", - mc.chid, mc.cfg.Interval, rcvdInInterval, mc.cfg.MinBytesTransferred) + log.Warnf("%s: data-rate too low, restarting channel: since last check received %d but required %d", + mc.chid, rcvdInInterval, mc.cfg.MinBytesTransferred) go mc.restartChannel() } } diff --git a/impl/integration_test.go b/impl/integration_test.go index c9b00d88..77bd3ade 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -614,13 +614,17 @@ func TestAutoRestart(t *testing.T) { // right before the responder sends the complete message (ie responder sent // all blocks, but the responder doesn't get a chance to tell the initiator // before the disconnect) - name: "push: before requester sends complete message", - isPush: true, + name: "pull: before responder sends complete message", + isPush: false, expectInitiatorDTFail: true, disconnectOnRequestComplete: true, }} for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { + expectFailMsg := "" + if tc.expectInitiatorDTFail { + expectFailMsg = " (expect failure)" + } + t.Run(tc.name+expectFailMsg, func(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -659,8 +663,8 @@ func TestAutoRestart(t *testing.T) { // Set up restartConf := ChannelRestartConfig(channelmonitor.Config{ - MonitorPushChannels: true, - MonitorPullChannels: true, + MonitorPushChannels: tc.isPush, + MonitorPullChannels: !tc.isPush, AcceptTimeout: 100 * time.Millisecond, Interval: 100 * time.Millisecond, MinBytesTransferred: 1,