diff --git a/npm/pkg/dataplane/dataplane-test-cases_windows_test.go b/npm/pkg/dataplane/dataplane-test-cases_windows_test.go index f467b4c1d0..1abe4246e7 100644 --- a/npm/pkg/dataplane/dataplane-test-cases_windows_test.go +++ b/npm/pkg/dataplane/dataplane-test-cases_windows_test.go @@ -17,7 +17,6 @@ const ( netpolCrudTag Tag = "netpol-crud" reconcileTag Tag = "reconcile" calicoTag Tag = "calico" - skipTestTag Tag = "skip-test" ) const ( @@ -1298,7 +1297,6 @@ func updatePodTests() []*SerialTestCase { }, TestCaseMetadata: &TestCaseMetadata{ Tags: []Tag{ - skipTestTag, podCrudTag, netpolCrudTag, }, @@ -1359,7 +1357,6 @@ func updatePodTests() []*SerialTestCase { }, TestCaseMetadata: &TestCaseMetadata{ Tags: []Tag{ - skipTestTag, podCrudTag, netpolCrudTag, }, diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index 4520df7391..69feb7881f 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -24,15 +24,6 @@ type Config struct { *policies.PolicyManagerCfg } -type updatePodCache struct { - sync.Mutex - cache map[string]*updateNPMPod -} - -func newUpdatePodCache() *updatePodCache { - return &updatePodCache{cache: make(map[string]*updateNPMPod)} -} - type endpointCache struct { sync.Mutex cache map[string]*npmEndpoint @@ -70,7 +61,7 @@ func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChann endpointCache: newEndpointCache(), nodeName: nodeName, ioShim: ioShim, - updatePodCache: newUpdatePodCache(), + updatePodCache: newUpdatePodCache(1), endpointQuery: new(endpointQuery), stopChannel: stopChannel, } @@ -144,13 +135,7 @@ func (dp *DataPlane) AddToSets(setNames []*ipsets.IPSetMetadata, podMetadata *Po dp.updatePodCache.Lock() defer dp.updatePodCache.Unlock() - updatePod, ok := dp.updatePodCache.cache[podMetadata.PodKey] - if !ok { - klog.Infof("[DataPlane] {AddToSet} pod key %s not found in updatePodCache. creating a new obj", podMetadata.PodKey) - updatePod = newUpdateNPMPod(podMetadata) - dp.updatePodCache.cache[podMetadata.PodKey] = updatePod - } - + updatePod := dp.updatePodCache.enqueue(podMetadata) updatePod.updateIPSetsToAdd(setNames) } @@ -172,13 +157,7 @@ func (dp *DataPlane) RemoveFromSets(setNames []*ipsets.IPSetMetadata, podMetadat dp.updatePodCache.Lock() defer dp.updatePodCache.Unlock() - updatePod, ok := dp.updatePodCache.cache[podMetadata.PodKey] - if !ok { - klog.Infof("[DataPlane] {RemoveFromSet} pod key %s not found in updatePodCache. creating a new obj", podMetadata.PodKey) - updatePod = newUpdateNPMPod(podMetadata) - dp.updatePodCache.cache[podMetadata.PodKey] = updatePod - } - + updatePod := dp.updatePodCache.enqueue(podMetadata) updatePod.updateIPSetsToRemove(setNames) } @@ -220,7 +199,7 @@ func (dp *DataPlane) ApplyDataPlane() error { if dp.shouldUpdatePod() { // do not refresh endpoints if the updatePodCache is empty dp.updatePodCache.Lock() - if len(dp.updatePodCache.cache) == 0 { + if dp.updatePodCache.isEmpty() { dp.updatePodCache.Unlock() return nil } @@ -238,15 +217,21 @@ func (dp *DataPlane) ApplyDataPlane() error { dp.updatePodCache.Lock() defer dp.updatePodCache.Unlock() - for podKey, pod := range dp.updatePodCache.cache { - err := dp.updatePod(pod) - if err != nil { + for !dp.updatePodCache.isEmpty() { + pod := dp.updatePodCache.dequeue() + if pod == nil { + // should never happen because of isEmpty check above and lock on updatePodCache + metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] failed to dequeue pod while applying the dataplane") + // break to avoid infinite loop (something weird happened since isEmpty returned false above) + break + } + + if err := dp.updatePod(pod); err != nil { // move on to the next and later return as success since this can be retried irrespective of other operations - metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "failed to update pod while applying the dataplane. key: [%s], err: [%s]", podKey, err.Error()) + metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "failed to update pod while applying the dataplane. key: [%s], err: [%s]", pod.PodKey, err.Error()) + dp.updatePodCache.requeue(pod) continue } - - delete(dp.updatePodCache.cache, podKey) } } return nil diff --git a/npm/pkg/dataplane/dataplane_test.go b/npm/pkg/dataplane/dataplane_test.go index bc2b5ca1ee..58268d2d9f 100644 --- a/npm/pkg/dataplane/dataplane_test.go +++ b/npm/pkg/dataplane/dataplane_test.go @@ -249,6 +249,148 @@ func TestUpdatePolicy(t *testing.T) { require.NoError(t, err) } +func TestUpdatePodCache(t *testing.T) { + m1 := NewPodMetadata("x/a", "10.0.0.1", nodeName) + m2 := NewPodMetadata("x/b", "10.0.0.2", nodeName) + m3 := NewPodMetadata("x/c", "10.0.0.3", nodeName) + + c := newUpdatePodCache(3) + require.True(t, c.isEmpty()) + + p1 := c.enqueue(m1) + require.False(t, c.isEmpty()) + require.Equal(t, *newUpdateNPMPod(m1), *p1) + require.Equal(t, c.queue, []string{m1.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1}) + + p2 := c.enqueue(m2) + require.False(t, c.isEmpty()) + require.Equal(t, *newUpdateNPMPod(m2), *p2) + require.Equal(t, c.queue, []string{m1.PodKey, m2.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m2.PodKey: p2}) + + p3 := c.enqueue(m3) + require.False(t, c.isEmpty()) + require.Equal(t, *newUpdateNPMPod(m3), *p3) + require.Equal(t, c.queue, []string{m1.PodKey, m2.PodKey, m3.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m2.PodKey: p2, m3.PodKey: p3}) + + // Test that enqueueing an existing pod does not change the queue or cache. + pairs := []struct { + m *PodMetadata + p *updateNPMPod + }{ + {m1, p1}, + {m2, p2}, + {m3, p3}, + } + for _, pair := range pairs { + p := c.enqueue(pair.m) + require.False(t, c.isEmpty()) + require.Equal(t, pair.p, p) + require.Equal(t, c.queue, []string{m1.PodKey, m2.PodKey, m3.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m2.PodKey: p2, m3.PodKey: p3}) + } + + // test dequeue + p := c.dequeue() + require.False(t, c.isEmpty()) + require.Equal(t, p1, p) + require.Equal(t, c.queue, []string{m2.PodKey, m3.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m2.PodKey: p2, m3.PodKey: p3}) + + p = c.dequeue() + require.False(t, c.isEmpty()) + require.Equal(t, p2, p) + require.Equal(t, c.queue, []string{m3.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m3.PodKey: p3}) + + // test requeuing + c.requeue(p) + require.False(t, c.isEmpty()) + require.Equal(t, c.queue, []string{m3.PodKey, m2.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m3.PodKey: p3, m2.PodKey: p2}) + + p = c.dequeue() + require.False(t, c.isEmpty()) + require.Equal(t, p3, p) + require.Equal(t, c.queue, []string{m2.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m2.PodKey: p2}) + + // test enqueuing again + p = c.enqueue(m1) + require.Equal(t, *p1, *p) + require.False(t, c.isEmpty()) + require.Equal(t, c.queue, []string{m2.PodKey, m1.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m2.PodKey: p2, m1.PodKey: p1}) + + p = c.dequeue() + require.False(t, c.isEmpty()) + require.Equal(t, p2, p) + require.Equal(t, c.queue, []string{m1.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1}) + + p = c.dequeue() + require.True(t, c.isEmpty()) + require.Equal(t, p1, p) + require.Equal(t, c.queue, []string{}) + require.Equal(t, c.cache, map[string]*updateNPMPod{}) + + // test requeue on empty queue + c.requeue(p) + require.False(t, c.isEmpty()) + require.Equal(t, c.queue, []string{m1.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1}) + + p = c.dequeue() + require.True(t, c.isEmpty()) + require.Equal(t, p1, p) + require.Equal(t, c.queue, []string{}) + require.Equal(t, c.cache, map[string]*updateNPMPod{}) + + // test nil result on empty queue + p = c.dequeue() + require.True(t, c.isEmpty()) + require.Nil(t, p) + + // test enqueue on empty queue + p = c.enqueue(m3) + require.False(t, c.isEmpty()) + require.Equal(t, *p3, *p) + require.Equal(t, c.queue, []string{m3.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m3.PodKey: p}) + + // test enqueue with different node on only item in queue + m3Node2 := *m3 + m3Node2.NodeName = "node2" + p3Node2 := *newUpdateNPMPod(&m3Node2) + p = c.enqueue(&m3Node2) + require.False(t, c.isEmpty()) + require.Equal(t, p3Node2, *p) + require.Equal(t, c.queue, []string{m3Node2.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m3Node2.PodKey: p}) + + // test enqueue with different node on first item in queue + p = c.enqueue(m1) + require.False(t, c.isEmpty()) + require.Equal(t, *p1, *p) + require.Equal(t, c.queue, []string{m3Node2.PodKey, m1.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m3Node2.PodKey: &p3Node2, m1.PodKey: p1}) + + p = c.enqueue(m3) + require.False(t, c.isEmpty()) + require.Equal(t, *p3, *p) + require.Equal(t, c.queue, []string{m1.PodKey, m3.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m3.PodKey: p}) + + // test enqueue with different node on last item in queue + p = c.enqueue(&m3Node2) + require.False(t, c.isEmpty()) + require.Equal(t, p3Node2, *p) + require.Equal(t, c.queue, []string{m1.PodKey, m3Node2.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m3Node2.PodKey: p}) +} + func getBootupTestCalls() []testutils.TestCmd { return append(policies.GetBootupTestCalls(), ipsets.GetResetTestCalls()...) } diff --git a/npm/pkg/dataplane/dataplane_windows_test.go b/npm/pkg/dataplane/dataplane_windows_test.go index 302c6279f9..17008442ca 100644 --- a/npm/pkg/dataplane/dataplane_windows_test.go +++ b/npm/pkg/dataplane/dataplane_windows_test.go @@ -43,12 +43,6 @@ func testSerialCases(t *testing.T, tests []*SerialTestCase) { i := i tt := tt - for _, tag := range tt.Tags { - if tag == skipTestTag { - continue - } - } - t.Run(tt.Description, func(t *testing.T) { t.Logf("beginning test #%d. Description: [%s]. Tags: %+v", i, tt.Description, tt.Tags) @@ -85,12 +79,6 @@ func testMultiJobCases(t *testing.T, tests []*MultiJobTestCase) { i := i tt := tt - for _, tag := range tt.Tags { - if tag == skipTestTag { - continue - } - } - t.Run(tt.Description, func(t *testing.T) { t.Logf("beginning test #%d. Description: [%s]. Tags: %+v", i, tt.Description, tt.Tags) diff --git a/npm/pkg/dataplane/types.go b/npm/pkg/dataplane/types.go index d8fd26afc3..dfe0193c85 100644 --- a/npm/pkg/dataplane/types.go +++ b/npm/pkg/dataplane/types.go @@ -2,10 +2,12 @@ package dataplane import ( "strings" + "sync" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/policies" "github.com/Azure/azure-container-networking/npm/util" + "k8s.io/klog" ) type GenericDataplane interface { @@ -76,3 +78,97 @@ func (npmPod *updateNPMPod) updateIPSetsToRemove(setNames []*ipsets.IPSetMetadat npmPod.IPSetsToRemove = append(npmPod.IPSetsToRemove, set.GetPrefixName()) } } + +type updatePodCache struct { + sync.Mutex + cache map[string]*updateNPMPod + // queue maintains the FIFO queue of the pods in the cache. + // This makes sure we handle issue 1729 with the same queue as the control plane. + // It also lets us update Pod ACLs in the same queue as the control plane so that + // e.g. the first Pod created is the first Pod to have proper connectivity. + queue []string + initialCapacity int +} + +// newUpdatePodCache creates a new updatePodCache with the given initial capacity for the queue +func newUpdatePodCache(initialCapacity int) *updatePodCache { + return &updatePodCache{ + cache: make(map[string]*updateNPMPod), + queue: make([]string, 0, initialCapacity), + initialCapacity: initialCapacity, + } +} + +// enqueue adds a pod to the queue if necessary and returns the pod object used +func (c *updatePodCache) enqueue(m *PodMetadata) *updateNPMPod { + pod, ok := c.cache[m.PodKey] + + if ok && pod.NodeName != m.NodeName { + // Currently, don't expect this path to be taken because dataplane makes sure to only enqueue on-node Pods. + // If the pod is already in the cache but the node name has changed, we need to requeue it. + // Can discard the old Pod info since the Pod must have been deleted and brought back up on a different node. + klog.Infof("[DataPlane] pod already in cache but node name has changed. deleting the old pod object from the queue. podKey: %s", m.PodKey) + + // remove the old pod from the cache and queue + delete(c.cache, m.PodKey) + i := 0 + for i = 0; i < len(c.queue); i++ { + if c.queue[i] == m.PodKey { + break + } + } + + if i < len(c.queue) { + // this should always be true since we should always find the item in the queue + c.queue = append(c.queue[:i], c.queue[i+1:]...) + } + + ok = false + } + + if !ok { + klog.Infof("[DataPlane] pod key %s not found in updatePodCache. creating a new obj", m.PodKey) + + pod = newUpdateNPMPod(m) + c.cache[m.PodKey] = pod + c.queue = append(c.queue, m.PodKey) + } + + return pod +} + +// dequeue returns the first pod in the queue and removes it from the queue. +func (c *updatePodCache) dequeue() *updateNPMPod { + if c.isEmpty() { + klog.Infof("[DataPlane] updatePodCache is empty. returning nil for dequeue()") + return nil + } + + pod := c.cache[c.queue[0]] + c.queue = c.queue[1:] + delete(c.cache, pod.PodKey) + + if c.isEmpty() { + // reset the slice to make sure the underlying array is garbage collected (not sure if this is necessary) + c.queue = make([]string, 0, c.initialCapacity) + } + + return pod +} + +// requeue adds the pod to the end of the queue +func (c *updatePodCache) requeue(pod *updateNPMPod) { + if _, ok := c.cache[pod.PodKey]; ok { + // should not happen + klog.Infof("[DataPlane] pod key %s already exists in updatePodCache. skipping requeue", pod.PodKey) + return + } + + c.cache[pod.PodKey] = pod + c.queue = append(c.queue, pod.PodKey) +} + +// isEmpty returns true if the queue is empty +func (c *updatePodCache) isEmpty() bool { + return len(c.queue) == 0 +}