Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Antonin Bas <antonin.bas@broadcom.com>
  • Loading branch information
antoninbas committed May 31, 2024
1 parent 01b3092 commit 313eb8f
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 3 deletions.
2 changes: 1 addition & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ func run(o *Options) error {
if err := agentInitializer.FlowRestoreComplete(); err != nil {
return err
}
klog.InfoS("Flow restoration complete")
klog.InfoS("Flow restoration has completed")
// ConnectUplinkToOVSBridge must be run immediately after FlowRestoreComplete
if connectUplinkToBridge {
// Restore network config before shutdown. ovsdbConnection must be alive when restore.
Expand Down
11 changes: 9 additions & 2 deletions pkg/agent/controller/noderoute/node_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,17 @@ func (c *Controller) Run(stopCh <-chan struct{}) {

go func() {
// When the initial list of Nodes has been processed, we decrement flowRestoreCompleteWait.
defer c.flowRestoreCompleteWait.Done()
wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) {
err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) {
return c.hasProcessedInitialList.HasSynced(), nil
})
// An error here means the context has been cancelled, which means that the stopCh
// has been closed. While it is still possible for c.hasProcessedInitialList.HasSynced
// to become true, as workers may not have returned yet, we should not decrement
// flowRestoreCompleteWait or log the message below.
if err != nil {
return
}
c.flowRestoreCompleteWait.Done()
klog.V(2).InfoS("Initial list of Nodes has been processed")
}()

Expand Down
36 changes: 36 additions & 0 deletions pkg/agent/controller/noderoute/node_route_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,3 +752,39 @@ func TestInitialListHasSynced(t *testing.T) {

assert.True(t, c.hasProcessedInitialList.HasSynced())
}

func TestInitialListHasSyncedStopChClosedEarly(t *testing.T) {
c := newController(t, &config.NetworkConfig{}, node1)

stopCh := make(chan struct{})
c.informerFactory.Start(stopCh)
c.informerFactory.WaitForCacheSync(stopCh)

c.routeClient.EXPECT().Reconcile([]string{podCIDR.String()})

// We close the stopCh right away, then call Run synchronously and wait for it to
// return. The workers will not even start, and the initial list of Nodes should never be
// reported as "synced".
close(stopCh)
c.Run(stopCh)

assert.Error(t, c.flowRestoreCompleteWait.WaitWithTimeout(500*time.Millisecond))
assert.False(t, c.hasProcessedInitialList.HasSynced())
}

func TestInitialListHasSyncedPolicyOnlyMode(t *testing.T) {
c := newController(t, &config.NetworkConfig{
TrafficEncapMode: config.TrafficEncapModeNetworkPolicyOnly,
})
defer c.queue.ShutDown()

stopCh := make(chan struct{})
defer close(stopCh)
go c.Run(stopCh)

// In networkPolicyOnly mode, c.flowRestoreCompleteWait should be decremented immediately
// when calling Run, even though workers are never started and c.hasProcessedInitialList.HasSynced
// will remain false.
assert.NoError(t, c.flowRestoreCompleteWait.WaitWithTimeout(500*time.Millisecond))
assert.False(t, c.hasProcessedInitialList.HasSynced())
}

0 comments on commit 313eb8f

Please sign in to comment.