diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 6d5acf650d7..52c3a44cc7e 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -856,7 +856,7 @@ func run(o *Options) error { if err := agentInitializer.FlowRestoreComplete(); err != nil { return err } - klog.InfoS("Flow restoration complete") + klog.InfoS("Flow restoration has completed") // ConnectUplinkToOVSBridge must be run immediately after FlowRestoreComplete if connectUplinkToBridge { // Restore network config before shutdown. ovsdbConnection must be alive when restore. diff --git a/pkg/agent/controller/noderoute/node_route_controller.go b/pkg/agent/controller/noderoute/node_route_controller.go index 041321ef212..7541e0198f2 100644 --- a/pkg/agent/controller/noderoute/node_route_controller.go +++ b/pkg/agent/controller/noderoute/node_route_controller.go @@ -373,10 +373,17 @@ func (c *Controller) Run(stopCh <-chan struct{}) { go func() { // When the initial list of Nodes has been processed, we decrement flowRestoreCompleteWait. - defer c.flowRestoreCompleteWait.Done() - wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) { + err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) { return c.hasProcessedInitialList.HasSynced(), nil }) + // An error here means the context has been cancelled, which means that the stopCh + // has been closed. While it is still possible for c.hasProcessedInitialList.HasSynced + // to become true, as workers may not have returned yet, we should not decrement + // flowRestoreCompleteWait or log the message below. + if err != nil { + return + } + c.flowRestoreCompleteWait.Done() klog.V(2).InfoS("Initial list of Nodes has been processed") }() diff --git a/pkg/agent/controller/noderoute/node_route_controller_test.go b/pkg/agent/controller/noderoute/node_route_controller_test.go index b48838f87e7..28339ae3f70 100644 --- a/pkg/agent/controller/noderoute/node_route_controller_test.go +++ b/pkg/agent/controller/noderoute/node_route_controller_test.go @@ -752,3 +752,39 @@ func TestInitialListHasSynced(t *testing.T) { assert.True(t, c.hasProcessedInitialList.HasSynced()) } + +func TestInitialListHasSyncedStopChClosedEarly(t *testing.T) { + c := newController(t, &config.NetworkConfig{}, node1) + + stopCh := make(chan struct{}) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + + c.routeClient.EXPECT().Reconcile([]string{podCIDR.String()}) + + // We close the stopCh right away, then call Run synchronously and wait for it to + // return. The workers will not even start, and the inital list of Nodes should never be + // reported as "synced". + close(stopCh) + c.Run(stopCh) + + assert.Error(t, c.flowRestoreCompleteWait.WaitWithTimeout(500*time.Millisecond)) + assert.False(t, c.hasProcessedInitialList.HasSynced()) +} + +func TestInitialListHasSyncedPolicyOnlyMode(t *testing.T) { + c := newController(t, &config.NetworkConfig{ + TrafficEncapMode: config.TrafficEncapModeNetworkPolicyOnly, + }) + defer c.queue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + go c.Run(stopCh) + + // In networkPolicyOnly mode, c.flowRestoreCompleteWait should be decremented immediately + // when calling Run, even though workers are never started and c.hasProcessedInitialList.HasSynced + // will remain false. + assert.NoError(t, c.flowRestoreCompleteWait.WaitWithTimeout(500*time.Millisecond)) + assert.False(t, c.hasProcessedInitialList.HasSynced()) +}