Skip to content

Commit

Permalink
node: Remove node deletion when stopping and wait for background workers
Browse files Browse the repository at this point in the history
NodeManager's Stop() was earlier only used by tests so it's behavior was to
clean up datapath state by calling NodeDelete for each node. This is not
the desired behavior as we want to preserve the datapath state when shutting
down in order to not disrupt connectivity.

This cleanup behavior was never used in any tests and prior this was only
triggered by control-plane tests via call to Daemon.Close(), so it's safe
to remove this.

In addition Stop() did not wait for the backgroundSync() goroutine to exit
in Stop(). Fix this by using workerpool.

Signed-off-by: Jussi Maki <jussi@isovalent.com>
  • Loading branch information
joamaki committed Jan 9, 2023
1 parent 7371fa5 commit 21cd454
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 31 deletions.
38 changes: 18 additions & 20 deletions pkg/node/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

"github.com/prometheus/client_golang/prometheus"

"github.com/cilium/workerpool"

"github.com/cilium/cilium/pkg/backoff"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/datapath"
Expand All @@ -33,8 +35,12 @@ import (
)

var (
baseBackgroundSyncInterval = time.Minute
randGen = rand.NewSafeRand(time.Now().UnixNano())
baseBackgroundSyncInterval = time.Minute
)

const (
numBackgroundWorkers = 1
)

type nodeEntry struct {
Expand Down Expand Up @@ -98,8 +104,8 @@ type manager struct {
// events.
nodeHandlers map[datapath.NodeHandler]struct{}

// closeChan is closed when the manager is closed
closeChan chan struct{}
// workerpool manages background workers
workerpool *workerpool.WorkerPool

// name is the name of the manager. It must be unique and feasibility
// to be used a prometheus metric name.
Expand Down Expand Up @@ -169,7 +175,7 @@ func New(name string, c Configuration) (*manager, error) {
conf: c,
controllerManager: controller.NewManager(),
nodeHandlers: map[datapath.NodeHandler]struct{}{},
closeChan: make(chan struct{}),
workerpool: workerpool.New(numBackgroundWorkers),
}

m.metricEventsReceived = prometheus.NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -207,30 +213,22 @@ func (m *manager) SetIPCache(ipc IPCache) {
}

func (m *manager) Start(hive.HookContext) error {
go m.backgroundSync()
return nil
return m.workerpool.Submit("backgroundSync", m.backgroundSync)
}

// Stop shuts down a node manager
func (m *manager) Stop(hive.HookContext) error {
if err := m.workerpool.Close(); err != nil {
return err
}

m.mutex.Lock()
defer m.mutex.Unlock()

close(m.closeChan)

metrics.Unregister(m.metricNumNodes)
metrics.Unregister(m.metricEventsReceived)
metrics.Unregister(m.metricDatapathValidations)

// delete all nodes to clean up the datapath for each node
for _, n := range m.nodes {
n.mutex.Lock()
m.Iter(func(nh datapath.NodeHandler) {
nh.NodeDelete(n.node)
})
n.mutex.Unlock()
}

return nil
}

Expand Down Expand Up @@ -272,7 +270,7 @@ func (m *manager) backgroundSyncInterval() time.Duration {

// backgroundSync ensures that local node has a valid datapath in-place for
// each node in the cluster. See NodeValidateImplementation().
func (m *manager) backgroundSync() {
func (m *manager) backgroundSync(ctx context.Context) error {
syncTimer, syncTimerDone := inctimer.New()
defer syncTimerDone()
for {
Expand Down Expand Up @@ -303,8 +301,8 @@ func (m *manager) backgroundSync() {
}

select {
case <-m.closeChan:
return
case <-ctx.Done():
return nil
case <-syncTimer.After(syncInterval):
}
}
Expand Down
13 changes: 2 additions & 11 deletions pkg/node/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,17 +234,8 @@ func (s *managerTestSuite) TestNodeLifecycle(c *check.C) {
_, ok = nodes[n1.Identity()]
c.Assert(ok, check.Equals, false)

mngr.Stop(context.TODO())
select {
case nodeEvent := <-dp.NodeDeleteEvent:
c.Assert(nodeEvent, checker.DeepEquals, n2)
case nodeEvent := <-dp.NodeAddEvent:
c.Errorf("Unexpected NodeAdd() event %#v", nodeEvent)
case nodeEvent := <-dp.NodeUpdateEvent:
c.Errorf("Unexpected NodeUpdate() event %#v", nodeEvent)
case <-time.After(3 * time.Second):
c.Errorf("timeout while waiting for NodeDelete() event for node2")
}
err = mngr.Stop(context.TODO())
c.Assert(err, check.IsNil)
}

func (s *managerTestSuite) TestMultipleSources(c *check.C) {
Expand Down

0 comments on commit 21cd454

Please sign in to comment.