From f3fbac0e6f806d05040f08aa6d8ec84b5ce120d6 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Fri, 17 Mar 2023 10:56:10 -0700 Subject: [PATCH 01/12] process updatePods in fifo order --- npm/pkg/dataplane/dataplane.go | 43 +++++++++++++++++++++++++++-- npm/pkg/dataplane/dataplane_test.go | 30 ++++++++++++++++++++ 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index 55a60e340a..ffe0a34c09 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -27,10 +27,35 @@ type Config struct { type updatePodCache struct { sync.Mutex cache map[string]*updateNPMPod + // order maintains the FIFO order of the pods in the cache. + // This makes sure we handle sequence 2 of issue 1729 with the same order as the control plane. + // It also lets us update Pod ACLs in the same order as the control plane so that + // e.g. the first Pod created is the first Pod to have proper connectivity. + order []string } func newUpdatePodCache() *updatePodCache { - return &updatePodCache{cache: make(map[string]*updateNPMPod)} + return &updatePodCache{ + cache: make(map[string]*updateNPMPod), + order: make([]string, 0), + } +} + +// cleanupOrder removes all elements in the order slice which aren't in the cache. +// cleanupOrder should be called while holding the updatePodCache lock. +func (u *updatePodCache) cleanupOrder() { + newOrder := make([]string, 0) + if len(u.cache) == 0 { + u.order = newOrder + return + } + + for _, podKey := range u.order { + if _, ok := u.cache[podKey]; ok { + newOrder = append(newOrder, podKey) + } + } + u.order = newOrder } type endpointCache struct { @@ -147,6 +172,7 @@ func (dp *DataPlane) AddToSets(setNames []*ipsets.IPSetMetadata, podMetadata *Po klog.Infof("[DataPlane] {AddToSet} pod key %s not found in updatePodCache. creating a new obj", podMetadata.PodKey) updatePod = newUpdateNPMPod(podMetadata) dp.updatePodCache.cache[podMetadata.PodKey] = updatePod + dp.updatePodCache.order = append(dp.updatePodCache.order, podMetadata.PodKey) } updatePod.updateIPSetsToAdd(setNames) @@ -175,6 +201,7 @@ func (dp *DataPlane) RemoveFromSets(setNames []*ipsets.IPSetMetadata, podMetadat klog.Infof("[DataPlane] {RemoveFromSet} pod key %s not found in updatePodCache. creating a new obj", podMetadata.PodKey) updatePod = newUpdateNPMPod(podMetadata) dp.updatePodCache.cache[podMetadata.PodKey] = updatePod + dp.updatePodCache.order = append(dp.updatePodCache.order, podMetadata.PodKey) } updatePod.updateIPSetsToRemove(setNames) @@ -219,6 +246,7 @@ func (dp *DataPlane) ApplyDataPlane() error { // do not refresh endpoints if the updatePodCache is empty dp.updatePodCache.Lock() if len(dp.updatePodCache.cache) == 0 { + dp.updatePodCache.cleanupOrder() dp.updatePodCache.Unlock() return nil } @@ -234,9 +262,18 @@ func (dp *DataPlane) ApplyDataPlane() error { // lock updatePodCache while driving goal state to kernel // prevents another ApplyDataplane call from updating the same pods dp.updatePodCache.Lock() - defer dp.updatePodCache.Unlock() + defer func() { + dp.updatePodCache.cleanupOrder() + dp.updatePodCache.Unlock() + }() + + for _, podKey := range dp.updatePodCache.order { + pod, ok := dp.updatePodCache.cache[podKey] + if !ok { + metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] skipping update since pod not found in updatePodCache. podKey: %s", podKey) + continue + } - for podKey, pod := range dp.updatePodCache.cache { err := dp.updatePod(pod) if err != nil { // move on to the next and later return as success since this can be retried irrespective of other operations diff --git a/npm/pkg/dataplane/dataplane_test.go b/npm/pkg/dataplane/dataplane_test.go index bc2b5ca1ee..c21ad46661 100644 --- a/npm/pkg/dataplane/dataplane_test.go +++ b/npm/pkg/dataplane/dataplane_test.go @@ -249,6 +249,36 @@ func TestUpdatePolicy(t *testing.T) { require.NoError(t, err) } +func TestUpdatePodCacheCleanupOrder(t *testing.T) { + upc := newUpdatePodCache() + + pod1 := newUpdateNPMPod(NewPodMetadata("x/a", "10.0.0.1", nodeName)) + pod2 := newUpdateNPMPod(NewPodMetadata("x/b", "10.0.0.2", nodeName)) + pod3 := newUpdateNPMPod(NewPodMetadata("x/c", "10.0.0.3", nodeName)) + + upc.cache[pod1.PodKey] = pod1 + upc.cache[pod2.PodKey] = pod2 + upc.cache[pod3.PodKey] = pod3 + + upc.order = append(upc.order, pod1.PodKey) + upc.order = append(upc.order, pod2.PodKey) + upc.order = append(upc.order, pod3.PodKey) + + require.Equal(t, 3, len(upc.order)) + + upc.cleanupOrder() + require.Equal(t, 3, len(upc.order)) + + delete(upc.cache, pod2.PodKey) + upc.cleanupOrder() + require.Equal(t, 2, len(upc.order)) + + delete(upc.cache, pod1.PodKey) + delete(upc.cache, pod3.PodKey) + upc.cleanupOrder() + require.Equal(t, 0, len(upc.order)) +} + func getBootupTestCalls() []testutils.TestCmd { return append(policies.GetBootupTestCalls(), ipsets.GetResetTestCalls()...) } From 56a406f2e888f3895d5f8eaea72d1a44f570e663 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Fri, 17 Mar 2023 11:14:29 -0700 Subject: [PATCH 02/12] fix lint --- npm/pkg/dataplane/dataplane_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/npm/pkg/dataplane/dataplane_test.go b/npm/pkg/dataplane/dataplane_test.go index c21ad46661..bfcf98f84d 100644 --- a/npm/pkg/dataplane/dataplane_test.go +++ b/npm/pkg/dataplane/dataplane_test.go @@ -260,9 +260,7 @@ func TestUpdatePodCacheCleanupOrder(t *testing.T) { upc.cache[pod2.PodKey] = pod2 upc.cache[pod3.PodKey] = pod3 - upc.order = append(upc.order, pod1.PodKey) - upc.order = append(upc.order, pod2.PodKey) - upc.order = append(upc.order, pod3.PodKey) + upc.order = append(upc.order, pod1.PodKey, pod2.PodKey, pod3.PodKey) require.Equal(t, 3, len(upc.order)) From 45226a400c2e254d131de6228834955501a1c1e9 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Mon, 10 Apr 2023 00:13:31 -0700 Subject: [PATCH 03/12] better UT --- npm/pkg/dataplane/dataplane_test.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/npm/pkg/dataplane/dataplane_test.go b/npm/pkg/dataplane/dataplane_test.go index bfcf98f84d..77418005cc 100644 --- a/npm/pkg/dataplane/dataplane_test.go +++ b/npm/pkg/dataplane/dataplane_test.go @@ -262,19 +262,26 @@ func TestUpdatePodCacheCleanupOrder(t *testing.T) { upc.order = append(upc.order, pod1.PodKey, pod2.PodKey, pod3.PodKey) - require.Equal(t, 3, len(upc.order)) + upc.cleanupOrder() + require.Equal(t, upc.order, []string{pod1.PodKey, pod2.PodKey, pod3.PodKey}) + delete(upc.cache, pod1.PodKey) upc.cleanupOrder() - require.Equal(t, 3, len(upc.order)) + require.Equal(t, upc.order, []string{pod2.PodKey, pod3.PodKey}) + + upc.order = append(upc.order, pod1.PodKey) + upc.cache[pod1.PodKey] = pod1 + upc.cleanupOrder() + require.Equal(t, upc.order, []string{pod2.PodKey, pod3.PodKey, pod1.PodKey}) delete(upc.cache, pod2.PodKey) + delete(upc.cache, pod3.PodKey) upc.cleanupOrder() - require.Equal(t, 2, len(upc.order)) + require.Equal(t, upc.order, []string{pod1.PodKey}) delete(upc.cache, pod1.PodKey) - delete(upc.cache, pod3.PodKey) upc.cleanupOrder() - require.Equal(t, 0, len(upc.order)) + require.Equal(t, upc.order, []string{}) } func getBootupTestCalls() []testutils.TestCmd { From 8e46e389be6cb42f8c38bd06a40614abe111277e Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Mon, 10 Apr 2023 00:17:46 -0700 Subject: [PATCH 04/12] comments and better naming --- npm/pkg/dataplane/dataplane.go | 40 ++++++++++++++++------------- npm/pkg/dataplane/dataplane_test.go | 24 ++++++++--------- 2 files changed, 34 insertions(+), 30 deletions(-) diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index 60ed3cb476..0a5e143686 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -27,35 +27,35 @@ type Config struct { type updatePodCache struct { sync.Mutex cache map[string]*updateNPMPod - // order maintains the FIFO order of the pods in the cache. - // This makes sure we handle sequence 2 of issue 1729 with the same order as the control plane. - // It also lets us update Pod ACLs in the same order as the control plane so that + // queue maintains the FIFO queue of the pods in the cache. + // This makes sure we handle sequence 2 of issue 1729 with the same queue as the control plane. + // It also lets us update Pod ACLs in the same queue as the control plane so that // e.g. the first Pod created is the first Pod to have proper connectivity. - order []string + queue []string } func newUpdatePodCache() *updatePodCache { return &updatePodCache{ cache: make(map[string]*updateNPMPod), - order: make([]string, 0), + queue: make([]string, 0), } } -// cleanupOrder removes all elements in the order slice which aren't in the cache. -// cleanupOrder should be called while holding the updatePodCache lock. -func (u *updatePodCache) cleanupOrder() { - newOrder := make([]string, 0) +// removeDeletedItemsFromQueue removes all elements in the queue slice which aren't in the cache. +// removeDeletedItemsFromQueue should be called while holding the updatePodCache lock. +func (u *updatePodCache) removeDeletedItemsFromQueue() { if len(u.cache) == 0 { - u.order = newOrder + u.queue = make([]string, 0) return } - for _, podKey := range u.order { + newQueue := make([]string, 0) + for _, podKey := range u.queue { if _, ok := u.cache[podKey]; ok { - newOrder = append(newOrder, podKey) + newQueue = append(newQueue, podKey) } } - u.order = newOrder + u.queue = newQueue } type endpointCache struct { @@ -174,7 +174,9 @@ func (dp *DataPlane) AddToSets(setNames []*ipsets.IPSetMetadata, podMetadata *Po klog.Infof("[DataPlane] {AddToSet} pod key %s not found in updatePodCache. creating a new obj", podMetadata.PodKey) updatePod = newUpdateNPMPod(podMetadata) dp.updatePodCache.cache[podMetadata.PodKey] = updatePod - dp.updatePodCache.order = append(dp.updatePodCache.order, podMetadata.PodKey) + + // add to queue only if not in the cache/queue already + dp.updatePodCache.queue = append(dp.updatePodCache.queue, podMetadata.PodKey) } updatePod.updateIPSetsToAdd(setNames) @@ -203,7 +205,9 @@ func (dp *DataPlane) RemoveFromSets(setNames []*ipsets.IPSetMetadata, podMetadat klog.Infof("[DataPlane] {RemoveFromSet} pod key %s not found in updatePodCache. creating a new obj", podMetadata.PodKey) updatePod = newUpdateNPMPod(podMetadata) dp.updatePodCache.cache[podMetadata.PodKey] = updatePod - dp.updatePodCache.order = append(dp.updatePodCache.order, podMetadata.PodKey) + + // add to queue only if not in the cache/queue already + dp.updatePodCache.queue = append(dp.updatePodCache.queue, podMetadata.PodKey) } updatePod.updateIPSetsToRemove(setNames) @@ -248,7 +252,7 @@ func (dp *DataPlane) ApplyDataPlane() error { // do not refresh endpoints if the updatePodCache is empty dp.updatePodCache.Lock() if len(dp.updatePodCache.cache) == 0 { - dp.updatePodCache.cleanupOrder() + dp.updatePodCache.removeDeletedItemsFromQueue() dp.updatePodCache.Unlock() return nil } @@ -265,11 +269,11 @@ func (dp *DataPlane) ApplyDataPlane() error { // prevents another ApplyDataplane call from updating the same pods dp.updatePodCache.Lock() defer func() { - dp.updatePodCache.cleanupOrder() + dp.updatePodCache.removeDeletedItemsFromQueue() dp.updatePodCache.Unlock() }() - for _, podKey := range dp.updatePodCache.order { + for _, podKey := range dp.updatePodCache.queue { pod, ok := dp.updatePodCache.cache[podKey] if !ok { metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] skipping update since pod not found in updatePodCache. podKey: %s", podKey) diff --git a/npm/pkg/dataplane/dataplane_test.go b/npm/pkg/dataplane/dataplane_test.go index 77418005cc..d1a946a4c0 100644 --- a/npm/pkg/dataplane/dataplane_test.go +++ b/npm/pkg/dataplane/dataplane_test.go @@ -260,28 +260,28 @@ func TestUpdatePodCacheCleanupOrder(t *testing.T) { upc.cache[pod2.PodKey] = pod2 upc.cache[pod3.PodKey] = pod3 - upc.order = append(upc.order, pod1.PodKey, pod2.PodKey, pod3.PodKey) + upc.queue = append(upc.queue, pod1.PodKey, pod2.PodKey, pod3.PodKey) - upc.cleanupOrder() - require.Equal(t, upc.order, []string{pod1.PodKey, pod2.PodKey, pod3.PodKey}) + upc.removeDeletedItemsFromQueue() + require.Equal(t, upc.queue, []string{pod1.PodKey, pod2.PodKey, pod3.PodKey}) delete(upc.cache, pod1.PodKey) - upc.cleanupOrder() - require.Equal(t, upc.order, []string{pod2.PodKey, pod3.PodKey}) + upc.removeDeletedItemsFromQueue() + require.Equal(t, upc.queue, []string{pod2.PodKey, pod3.PodKey}) - upc.order = append(upc.order, pod1.PodKey) + upc.queue = append(upc.queue, pod1.PodKey) upc.cache[pod1.PodKey] = pod1 - upc.cleanupOrder() - require.Equal(t, upc.order, []string{pod2.PodKey, pod3.PodKey, pod1.PodKey}) + upc.removeDeletedItemsFromQueue() + require.Equal(t, upc.queue, []string{pod2.PodKey, pod3.PodKey, pod1.PodKey}) delete(upc.cache, pod2.PodKey) delete(upc.cache, pod3.PodKey) - upc.cleanupOrder() - require.Equal(t, upc.order, []string{pod1.PodKey}) + upc.removeDeletedItemsFromQueue() + require.Equal(t, upc.queue, []string{pod1.PodKey}) delete(upc.cache, pod1.PodKey) - upc.cleanupOrder() - require.Equal(t, upc.order, []string{}) + upc.removeDeletedItemsFromQueue() + require.Equal(t, upc.queue, []string{}) } func getBootupTestCalls() []testutils.TestCmd { From 7adc273a95746deb55b7bc76f4509a701ae80104 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Mon, 10 Apr 2023 00:26:53 -0700 Subject: [PATCH 05/12] stop skipping UTs --- .../dataplane/dataplane-test-cases_windows_test.go | 5 ++--- npm/pkg/dataplane/dataplane_windows_test.go | 12 ------------ 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/npm/pkg/dataplane/dataplane-test-cases_windows_test.go b/npm/pkg/dataplane/dataplane-test-cases_windows_test.go index f467b4c1d0..9c8349beb6 100644 --- a/npm/pkg/dataplane/dataplane-test-cases_windows_test.go +++ b/npm/pkg/dataplane/dataplane-test-cases_windows_test.go @@ -17,7 +17,6 @@ const ( netpolCrudTag Tag = "netpol-crud" reconcileTag Tag = "reconcile" calicoTag Tag = "calico" - skipTestTag Tag = "skip-test" ) const ( @@ -1298,7 +1297,7 @@ func updatePodTests() []*SerialTestCase { }, TestCaseMetadata: &TestCaseMetadata{ Tags: []Tag{ - skipTestTag, + podCrudTag, netpolCrudTag, }, @@ -1359,7 +1358,7 @@ func updatePodTests() []*SerialTestCase { }, TestCaseMetadata: &TestCaseMetadata{ Tags: []Tag{ - skipTestTag, + podCrudTag, netpolCrudTag, }, diff --git a/npm/pkg/dataplane/dataplane_windows_test.go b/npm/pkg/dataplane/dataplane_windows_test.go index 302c6279f9..17008442ca 100644 --- a/npm/pkg/dataplane/dataplane_windows_test.go +++ b/npm/pkg/dataplane/dataplane_windows_test.go @@ -43,12 +43,6 @@ func testSerialCases(t *testing.T, tests []*SerialTestCase) { i := i tt := tt - for _, tag := range tt.Tags { - if tag == skipTestTag { - continue - } - } - t.Run(tt.Description, func(t *testing.T) { t.Logf("beginning test #%d. Description: [%s]. Tags: %+v", i, tt.Description, tt.Tags) @@ -85,12 +79,6 @@ func testMultiJobCases(t *testing.T, tests []*MultiJobTestCase) { i := i tt := tt - for _, tag := range tt.Tags { - if tag == skipTestTag { - continue - } - } - t.Run(tt.Description, func(t *testing.T) { t.Logf("beginning test #%d. Description: [%s]. Tags: %+v", i, tt.Description, tt.Tags) From 948169d0878b7698f84d4e67705581a7fb05f840 Mon Sep 17 00:00:00 2001 From: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> Date: Mon, 10 Apr 2023 09:46:26 -0700 Subject: [PATCH 06/12] fix lint --- npm/pkg/dataplane/dataplane-test-cases_windows_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/npm/pkg/dataplane/dataplane-test-cases_windows_test.go b/npm/pkg/dataplane/dataplane-test-cases_windows_test.go index 9c8349beb6..1abe4246e7 100644 --- a/npm/pkg/dataplane/dataplane-test-cases_windows_test.go +++ b/npm/pkg/dataplane/dataplane-test-cases_windows_test.go @@ -1297,7 +1297,6 @@ func updatePodTests() []*SerialTestCase { }, TestCaseMetadata: &TestCaseMetadata{ Tags: []Tag{ - podCrudTag, netpolCrudTag, }, @@ -1358,7 +1357,6 @@ func updatePodTests() []*SerialTestCase { }, TestCaseMetadata: &TestCaseMetadata{ Tags: []Tag{ - podCrudTag, netpolCrudTag, }, From 500a25e903e94b53f0f27f83d1871123796250b2 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Mon, 10 Apr 2023 18:50:57 -0700 Subject: [PATCH 07/12] redesign --- npm/pkg/dataplane/dataplane.go | 83 ++--------------- npm/pkg/dataplane/dataplane_test.go | 136 +++++++++++++++++++++------- npm/pkg/dataplane/types.go | 69 ++++++++++++++ 3 files changed, 183 insertions(+), 105 deletions(-) diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index 0a5e143686..a15ddb1894 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -24,40 +24,6 @@ type Config struct { *policies.PolicyManagerCfg } -type updatePodCache struct { - sync.Mutex - cache map[string]*updateNPMPod - // queue maintains the FIFO queue of the pods in the cache. - // This makes sure we handle sequence 2 of issue 1729 with the same queue as the control plane. - // It also lets us update Pod ACLs in the same queue as the control plane so that - // e.g. the first Pod created is the first Pod to have proper connectivity. - queue []string -} - -func newUpdatePodCache() *updatePodCache { - return &updatePodCache{ - cache: make(map[string]*updateNPMPod), - queue: make([]string, 0), - } -} - -// removeDeletedItemsFromQueue removes all elements in the queue slice which aren't in the cache. -// removeDeletedItemsFromQueue should be called while holding the updatePodCache lock. -func (u *updatePodCache) removeDeletedItemsFromQueue() { - if len(u.cache) == 0 { - u.queue = make([]string, 0) - return - } - - newQueue := make([]string, 0) - for _, podKey := range u.queue { - if _, ok := u.cache[podKey]; ok { - newQueue = append(newQueue, podKey) - } - } - u.queue = newQueue -} - type endpointCache struct { sync.Mutex cache map[string]*npmEndpoint @@ -95,7 +61,7 @@ func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChann endpointCache: newEndpointCache(), nodeName: nodeName, ioShim: ioShim, - updatePodCache: newUpdatePodCache(), + updatePodCache: newUpdatePodCache(1), endpointQuery: new(endpointQuery), stopChannel: stopChannel, } @@ -169,16 +135,7 @@ func (dp *DataPlane) AddToSets(setNames []*ipsets.IPSetMetadata, podMetadata *Po dp.updatePodCache.Lock() defer dp.updatePodCache.Unlock() - updatePod, ok := dp.updatePodCache.cache[podMetadata.PodKey] - if !ok { - klog.Infof("[DataPlane] {AddToSet} pod key %s not found in updatePodCache. creating a new obj", podMetadata.PodKey) - updatePod = newUpdateNPMPod(podMetadata) - dp.updatePodCache.cache[podMetadata.PodKey] = updatePod - - // add to queue only if not in the cache/queue already - dp.updatePodCache.queue = append(dp.updatePodCache.queue, podMetadata.PodKey) - } - + updatePod := dp.updatePodCache.enqueue(podMetadata) updatePod.updateIPSetsToAdd(setNames) } @@ -200,16 +157,7 @@ func (dp *DataPlane) RemoveFromSets(setNames []*ipsets.IPSetMetadata, podMetadat dp.updatePodCache.Lock() defer dp.updatePodCache.Unlock() - updatePod, ok := dp.updatePodCache.cache[podMetadata.PodKey] - if !ok { - klog.Infof("[DataPlane] {RemoveFromSet} pod key %s not found in updatePodCache. creating a new obj", podMetadata.PodKey) - updatePod = newUpdateNPMPod(podMetadata) - dp.updatePodCache.cache[podMetadata.PodKey] = updatePod - - // add to queue only if not in the cache/queue already - dp.updatePodCache.queue = append(dp.updatePodCache.queue, podMetadata.PodKey) - } - + updatePod := dp.updatePodCache.enqueue(podMetadata) updatePod.updateIPSetsToRemove(setNames) } @@ -251,8 +199,7 @@ func (dp *DataPlane) ApplyDataPlane() error { if dp.shouldUpdatePod() { // do not refresh endpoints if the updatePodCache is empty dp.updatePodCache.Lock() - if len(dp.updatePodCache.cache) == 0 { - dp.updatePodCache.removeDeletedItemsFromQueue() + if dp.updatePodCache.isEmpty() { dp.updatePodCache.Unlock() return nil } @@ -268,26 +215,16 @@ func (dp *DataPlane) ApplyDataPlane() error { // lock updatePodCache while driving goal state to kernel // prevents another ApplyDataplane call from updating the same pods dp.updatePodCache.Lock() - defer func() { - dp.updatePodCache.removeDeletedItemsFromQueue() - dp.updatePodCache.Unlock() - }() - - for _, podKey := range dp.updatePodCache.queue { - pod, ok := dp.updatePodCache.cache[podKey] - if !ok { - metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] skipping update since pod not found in updatePodCache. podKey: %s", podKey) - continue - } + defer dp.updatePodCache.Unlock() - err := dp.updatePod(pod) - if err != nil { + for !dp.updatePodCache.isEmpty() { + pod := dp.updatePodCache.dequeue() + if err := dp.updatePod(pod); err != nil { // move on to the next and later return as success since this can be retried irrespective of other operations - metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "failed to update pod while applying the dataplane. key: [%s], err: [%s]", podKey, err.Error()) + metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "failed to update pod while applying the dataplane. key: [%s], err: [%s]", pod.PodKey, err.Error()) + dp.updatePodCache.requeue(pod) continue } - - delete(dp.updatePodCache.cache, podKey) } } return nil diff --git a/npm/pkg/dataplane/dataplane_test.go b/npm/pkg/dataplane/dataplane_test.go index d1a946a4c0..67e620b0f5 100644 --- a/npm/pkg/dataplane/dataplane_test.go +++ b/npm/pkg/dataplane/dataplane_test.go @@ -249,39 +249,111 @@ func TestUpdatePolicy(t *testing.T) { require.NoError(t, err) } -func TestUpdatePodCacheCleanupOrder(t *testing.T) { - upc := newUpdatePodCache() - - pod1 := newUpdateNPMPod(NewPodMetadata("x/a", "10.0.0.1", nodeName)) - pod2 := newUpdateNPMPod(NewPodMetadata("x/b", "10.0.0.2", nodeName)) - pod3 := newUpdateNPMPod(NewPodMetadata("x/c", "10.0.0.3", nodeName)) - - upc.cache[pod1.PodKey] = pod1 - upc.cache[pod2.PodKey] = pod2 - upc.cache[pod3.PodKey] = pod3 - - upc.queue = append(upc.queue, pod1.PodKey, pod2.PodKey, pod3.PodKey) - - upc.removeDeletedItemsFromQueue() - require.Equal(t, upc.queue, []string{pod1.PodKey, pod2.PodKey, pod3.PodKey}) - - delete(upc.cache, pod1.PodKey) - upc.removeDeletedItemsFromQueue() - require.Equal(t, upc.queue, []string{pod2.PodKey, pod3.PodKey}) - - upc.queue = append(upc.queue, pod1.PodKey) - upc.cache[pod1.PodKey] = pod1 - upc.removeDeletedItemsFromQueue() - require.Equal(t, upc.queue, []string{pod2.PodKey, pod3.PodKey, pod1.PodKey}) - - delete(upc.cache, pod2.PodKey) - delete(upc.cache, pod3.PodKey) - upc.removeDeletedItemsFromQueue() - require.Equal(t, upc.queue, []string{pod1.PodKey}) +func TestUpdatePodCache(t *testing.T) { + m1 := NewPodMetadata("x/a", "10.0.0.1", nodeName) + m2 := NewPodMetadata("x/b", "10.0.0.2", nodeName) + m3 := NewPodMetadata("x/c", "10.0.0.3", nodeName) + + c := newUpdatePodCache(3) + require.True(t, c.isEmpty()) + + p1 := c.enqueue(m1) + require.False(t, c.isEmpty()) + require.Equal(t, *newUpdateNPMPod(m1), *p1) + require.Equal(t, c.queue, []string{m1.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1}) + + p2 := c.enqueue(m2) + require.False(t, c.isEmpty()) + require.Equal(t, *newUpdateNPMPod(m2), *p2) + require.Equal(t, c.queue, []string{m1.PodKey, m2.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m2.PodKey: p2}) + + p3 := c.enqueue(m3) + require.False(t, c.isEmpty()) + require.Equal(t, *newUpdateNPMPod(m3), *p3) + require.Equal(t, c.queue, []string{m1.PodKey, m2.PodKey, m3.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m2.PodKey: p2, m3.PodKey: p3}) + + // Test that enqueueing an existing pod does not change the queue or cache. + pairs := []struct { + m *PodMetadata + p *updateNPMPod + }{ + {m1, p1}, + {m2, p2}, + {m3, p3}, + } + for _, pair := range pairs { + p := c.enqueue(pair.m) + require.False(t, c.isEmpty()) + require.Equal(t, pair.p, p) + require.Equal(t, c.queue, []string{m1.PodKey, m2.PodKey, m3.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m2.PodKey: p2, m3.PodKey: p3}) + } - delete(upc.cache, pod1.PodKey) - upc.removeDeletedItemsFromQueue() - require.Equal(t, upc.queue, []string{}) + // test dequeue + p := c.dequeue() + require.False(t, c.isEmpty()) + require.Equal(t, p1, p) + require.Equal(t, c.queue, []string{m2.PodKey, m3.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m2.PodKey: p2, m3.PodKey: p3}) + + p = c.dequeue() + require.False(t, c.isEmpty()) + require.Equal(t, p2, p) + require.Equal(t, c.queue, []string{m3.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m3.PodKey: p3}) + + // test requeuing + c.requeue(p) + require.False(t, c.isEmpty()) + require.Equal(t, c.queue, []string{m3.PodKey, m2.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m3.PodKey: p3, m2.PodKey: p2}) + + p = c.dequeue() + require.False(t, c.isEmpty()) + require.Equal(t, p3, p) + require.Equal(t, c.queue, []string{m2.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m2.PodKey: p2}) + + // test enqueuing again + p = c.enqueue(m1) + require.Equal(t, *p1, *p) + require.False(t, c.isEmpty()) + require.Equal(t, c.queue, []string{m2.PodKey, m1.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m2.PodKey: p2, m1.PodKey: p1}) + + p = c.dequeue() + require.False(t, c.isEmpty()) + require.Equal(t, p2, p) + require.Equal(t, c.queue, []string{m1.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1}) + + p = c.dequeue() + require.True(t, c.isEmpty()) + require.Equal(t, p1, p) + require.Equal(t, c.queue, []string{}) + require.Equal(t, c.cache, map[string]*updateNPMPod{}) + + // test requeue on empty queue + c.requeue(p) + require.False(t, c.isEmpty()) + require.Equal(t, c.queue, []string{m1.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1}) + + p = c.dequeue() + require.True(t, c.isEmpty()) + require.Equal(t, p1, p) + require.Equal(t, c.queue, []string{}) + require.Equal(t, c.cache, map[string]*updateNPMPod{}) + + // test enqueue on empty queue + p = c.enqueue(m3) + require.False(t, c.isEmpty()) + require.Equal(t, *p3, *p) + require.Equal(t, c.queue, []string{m3.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m3.PodKey: p}) } func getBootupTestCalls() []testutils.TestCmd { diff --git a/npm/pkg/dataplane/types.go b/npm/pkg/dataplane/types.go index d8fd26afc3..93a12fcd68 100644 --- a/npm/pkg/dataplane/types.go +++ b/npm/pkg/dataplane/types.go @@ -2,10 +2,12 @@ package dataplane import ( "strings" + "sync" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/policies" "github.com/Azure/azure-container-networking/npm/util" + "k8s.io/klog" ) type GenericDataplane interface { @@ -76,3 +78,70 @@ func (npmPod *updateNPMPod) updateIPSetsToRemove(setNames []*ipsets.IPSetMetadat npmPod.IPSetsToRemove = append(npmPod.IPSetsToRemove, set.GetPrefixName()) } } + +type updatePodCache struct { + sync.Mutex + cache map[string]*updateNPMPod + // queue maintains the FIFO queue of the pods in the cache. + // This makes sure we handle issue 1729 with the same queue as the control plane. + // It also lets us update Pod ACLs in the same queue as the control plane so that + // e.g. the first Pod created is the first Pod to have proper connectivity. + queue []string + initialCapacity int +} + +// newUpdatePodCache creates a new updatePodCache with the given initial capacity for the queue +func newUpdatePodCache(initialCapacity int) *updatePodCache { + return &updatePodCache{ + cache: make(map[string]*updateNPMPod), + queue: make([]string, 0, initialCapacity), + initialCapacity: initialCapacity, + } +} + +// enqueue adds a pod to the queue if necessary and returns the pod object used +func (c *updatePodCache) enqueue(m *PodMetadata) *updateNPMPod { + pod, ok := c.cache[m.PodKey] + if !ok { + klog.Infof("[DataPlane] pod key %s not found in updatePodCache. creating a new obj", m.PodKey) + + pod = newUpdateNPMPod(m) + c.cache[m.PodKey] = pod + c.queue = append(c.queue, m.PodKey) + } + + return pod +} + +// dequeue returns the first pod in the queue and removes it from the queue. +// Caller should ensure the queue is not empty. +// Otherwise, the following will occur: "panic: runtime error: index out of range [0] with length 0" +func (c *updatePodCache) dequeue() *updateNPMPod { + pod := c.cache[c.queue[0]] + c.queue = c.queue[1:] + delete(c.cache, pod.PodKey) + + if c.isEmpty() { + // reset the slice to make sure the underlying array is garbage collected (not sure if this is necessary) + c.queue = make([]string, 0, c.initialCapacity) + } + + return pod +} + +// requeue adds the pod to the end of the queue +func (c *updatePodCache) requeue(pod *updateNPMPod) { + if _, ok := c.cache[pod.PodKey]; ok { + // should not happen + klog.Infof("[DataPlane] pod key %s already exists in updatePodCache. skipping requeue", pod.PodKey) + return + } + + c.cache[pod.PodKey] = pod + c.queue = append(c.queue, pod.PodKey) +} + +// isEmpty returns true if the queue is empty +func (c *updatePodCache) isEmpty() bool { + return len(c.queue) == 0 +} From 3e8d1872a81a0119d062ca39435e7d5cdcbd4596 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 11 Apr 2023 11:03:40 -0700 Subject: [PATCH 08/12] dequeue returns nil when cache is empty --- npm/pkg/dataplane/dataplane.go | 7 +++++++ npm/pkg/dataplane/types.go | 7 +++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index a15ddb1894..69feb7881f 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -219,6 +219,13 @@ func (dp *DataPlane) ApplyDataPlane() error { for !dp.updatePodCache.isEmpty() { pod := dp.updatePodCache.dequeue() + if pod == nil { + // should never happen because of isEmpty check above and lock on updatePodCache + metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] failed to dequeue pod while applying the dataplane") + // break to avoid infinite loop (something weird happened since isEmpty returned false above) + break + } + if err := dp.updatePod(pod); err != nil { // move on to the next and later return as success since this can be retried irrespective of other operations metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "failed to update pod while applying the dataplane. key: [%s], err: [%s]", pod.PodKey, err.Error()) diff --git a/npm/pkg/dataplane/types.go b/npm/pkg/dataplane/types.go index 93a12fcd68..b15798b78b 100644 --- a/npm/pkg/dataplane/types.go +++ b/npm/pkg/dataplane/types.go @@ -114,9 +114,12 @@ func (c *updatePodCache) enqueue(m *PodMetadata) *updateNPMPod { } // dequeue returns the first pod in the queue and removes it from the queue. -// Caller should ensure the queue is not empty. -// Otherwise, the following will occur: "panic: runtime error: index out of range [0] with length 0" func (c *updatePodCache) dequeue() *updateNPMPod { + if c.isEmpty() { + klog.Infof("[DataPlane] updatePodCache is empty. returning nil for dequeue()") + return nil + } + pod := c.cache[c.queue[0]] c.queue = c.queue[1:] delete(c.cache, pod.PodKey) From 3f5f99da1f2bb2412d4a341319f30ddf71eb85ea Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 11 Apr 2023 12:10:24 -0700 Subject: [PATCH 09/12] Revert "dequeue returns nil when cache is empty" This reverts commit 3e8d1872a81a0119d062ca39435e7d5cdcbd4596. --- npm/pkg/dataplane/dataplane.go | 7 ------- npm/pkg/dataplane/types.go | 7 ++----- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index 69feb7881f..a15ddb1894 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -219,13 +219,6 @@ func (dp *DataPlane) ApplyDataPlane() error { for !dp.updatePodCache.isEmpty() { pod := dp.updatePodCache.dequeue() - if pod == nil { - // should never happen because of isEmpty check above and lock on updatePodCache - metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] failed to dequeue pod while applying the dataplane") - // break to avoid infinite loop (something weird happened since isEmpty returned false above) - break - } - if err := dp.updatePod(pod); err != nil { // move on to the next and later return as success since this can be retried irrespective of other operations metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "failed to update pod while applying the dataplane. key: [%s], err: [%s]", pod.PodKey, err.Error()) diff --git a/npm/pkg/dataplane/types.go b/npm/pkg/dataplane/types.go index b15798b78b..93a12fcd68 100644 --- a/npm/pkg/dataplane/types.go +++ b/npm/pkg/dataplane/types.go @@ -114,12 +114,9 @@ func (c *updatePodCache) enqueue(m *PodMetadata) *updateNPMPod { } // dequeue returns the first pod in the queue and removes it from the queue. +// Caller should ensure the queue is not empty. +// Otherwise, the following will occur: "panic: runtime error: index out of range [0] with length 0" func (c *updatePodCache) dequeue() *updateNPMPod { - if c.isEmpty() { - klog.Infof("[DataPlane] updatePodCache is empty. returning nil for dequeue()") - return nil - } - pod := c.cache[c.queue[0]] c.queue = c.queue[1:] delete(c.cache, pod.PodKey) From 942bb1f840b755decaf97e85166bf98926a4233a Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 11 Apr 2023 12:31:46 -0700 Subject: [PATCH 10/12] requeue if node name has changed --- npm/pkg/dataplane/dataplane_test.go | 30 +++++++++++++++++++++++++++++ npm/pkg/dataplane/types.go | 24 +++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/npm/pkg/dataplane/dataplane_test.go b/npm/pkg/dataplane/dataplane_test.go index 67e620b0f5..09c4beb343 100644 --- a/npm/pkg/dataplane/dataplane_test.go +++ b/npm/pkg/dataplane/dataplane_test.go @@ -354,6 +354,36 @@ func TestUpdatePodCache(t *testing.T) { require.Equal(t, *p3, *p) require.Equal(t, c.queue, []string{m3.PodKey}) require.Equal(t, c.cache, map[string]*updateNPMPod{m3.PodKey: p}) + + // test enqueue with different node on only item in queue + m3Node2 := *m3 + m3Node2.NodeName = "node2" + p3Node2 := *newUpdateNPMPod(&m3Node2) + p = c.enqueue(&m3Node2) + require.False(t, c.isEmpty()) + require.Equal(t, p3Node2, *p) + require.Equal(t, c.queue, []string{m3Node2.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m3Node2.PodKey: p}) + + // test enqueue with different node on first item in queue + p = c.enqueue(m1) + require.False(t, c.isEmpty()) + require.Equal(t, *p1, *p) + require.Equal(t, c.queue, []string{m3Node2.PodKey, m1.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m3Node2.PodKey: &p3Node2, m1.PodKey: p1}) + + p = c.enqueue(m3) + require.False(t, c.isEmpty()) + require.Equal(t, *p3, *p) + require.Equal(t, c.queue, []string{m1.PodKey, m3.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m3.PodKey: p}) + + // test enqueue with different node on last item in queue + p = c.enqueue(&m3Node2) + require.False(t, c.isEmpty()) + require.Equal(t, p3Node2, *p) + require.Equal(t, c.queue, []string{m1.PodKey, m3Node2.PodKey}) + require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m3Node2.PodKey: p}) } func getBootupTestCalls() []testutils.TestCmd { diff --git a/npm/pkg/dataplane/types.go b/npm/pkg/dataplane/types.go index 93a12fcd68..b0ad6013c2 100644 --- a/npm/pkg/dataplane/types.go +++ b/npm/pkg/dataplane/types.go @@ -102,6 +102,30 @@ func newUpdatePodCache(initialCapacity int) *updatePodCache { // enqueue adds a pod to the queue if necessary and returns the pod object used func (c *updatePodCache) enqueue(m *PodMetadata) *updateNPMPod { pod, ok := c.cache[m.PodKey] + + if ok && pod.NodeName != m.NodeName { + // Currently, don't expect this path to be taken because dataplane makes sure to only enqueue on-node Pods. + // If the pod is already in the cache but the node name has changed, we need to requeue it. + // Can discard the old Pod info since the Pod must have been deleted and brought back up on a different node. + klog.Infof("[DataPlane] pod already in cache but node name has changed. deleting the old pod object from the queue. podKey: %s", m.PodKey) + + // remove the old pod from the cache and queue + delete(c.cache, m.PodKey) + i := 0 + for i = 0; i < len(c.queue); i++ { + if c.queue[i] == m.PodKey { + break + } + } + + if i < len(c.queue) { + // this should always be true since we should always find the item in the queue + c.queue = append(c.queue[:i], c.queue[i+1:]...) + } + + ok = false + } + if !ok { klog.Infof("[DataPlane] pod key %s not found in updatePodCache. creating a new obj", m.PodKey) From 8a2f0005ddb682599704b85a5e21bb857af84a6f Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 11 Apr 2023 14:30:15 -0700 Subject: [PATCH 11/12] Revert "Revert "dequeue returns nil when cache is empty"" This reverts commit 3f5f99da1f2bb2412d4a341319f30ddf71eb85ea. --- npm/pkg/dataplane/dataplane.go | 7 +++++++ npm/pkg/dataplane/types.go | 7 +++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index a15ddb1894..69feb7881f 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -219,6 +219,13 @@ func (dp *DataPlane) ApplyDataPlane() error { for !dp.updatePodCache.isEmpty() { pod := dp.updatePodCache.dequeue() + if pod == nil { + // should never happen because of isEmpty check above and lock on updatePodCache + metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] failed to dequeue pod while applying the dataplane") + // break to avoid infinite loop (something weird happened since isEmpty returned false above) + break + } + if err := dp.updatePod(pod); err != nil { // move on to the next and later return as success since this can be retried irrespective of other operations metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "failed to update pod while applying the dataplane. key: [%s], err: [%s]", pod.PodKey, err.Error()) diff --git a/npm/pkg/dataplane/types.go b/npm/pkg/dataplane/types.go index b0ad6013c2..dfe0193c85 100644 --- a/npm/pkg/dataplane/types.go +++ b/npm/pkg/dataplane/types.go @@ -138,9 +138,12 @@ func (c *updatePodCache) enqueue(m *PodMetadata) *updateNPMPod { } // dequeue returns the first pod in the queue and removes it from the queue. -// Caller should ensure the queue is not empty. -// Otherwise, the following will occur: "panic: runtime error: index out of range [0] with length 0" func (c *updatePodCache) dequeue() *updateNPMPod { + if c.isEmpty() { + klog.Infof("[DataPlane] updatePodCache is empty. returning nil for dequeue()") + return nil + } + pod := c.cache[c.queue[0]] c.queue = c.queue[1:] delete(c.cache, pod.PodKey) From 35c820e29cb81d5a1d67df8bb2a29e4a61a0f156 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 11 Apr 2023 14:31:44 -0700 Subject: [PATCH 12/12] UT for nil result from dequeue --- npm/pkg/dataplane/dataplane_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/npm/pkg/dataplane/dataplane_test.go b/npm/pkg/dataplane/dataplane_test.go index 09c4beb343..58268d2d9f 100644 --- a/npm/pkg/dataplane/dataplane_test.go +++ b/npm/pkg/dataplane/dataplane_test.go @@ -348,6 +348,11 @@ func TestUpdatePodCache(t *testing.T) { require.Equal(t, c.queue, []string{}) require.Equal(t, c.cache, map[string]*updateNPMPod{}) + // test nil result on empty queue + p = c.dequeue() + require.True(t, c.isEmpty()) + require.Nil(t, p) + // test enqueue on empty queue p = c.enqueue(m3) require.False(t, c.isEmpty())