Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions npm/pkg/dataplane/dataplane-test-cases_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ const (
netpolCrudTag Tag = "netpol-crud"
reconcileTag Tag = "reconcile"
calicoTag Tag = "calico"
skipTestTag Tag = "skip-test"
)

const (
Expand Down Expand Up @@ -1298,7 +1297,6 @@ func updatePodTests() []*SerialTestCase {
},
TestCaseMetadata: &TestCaseMetadata{
Tags: []Tag{
skipTestTag,
podCrudTag,
netpolCrudTag,
},
Expand Down Expand Up @@ -1359,7 +1357,6 @@ func updatePodTests() []*SerialTestCase {
},
TestCaseMetadata: &TestCaseMetadata{
Tags: []Tag{
skipTestTag,
podCrudTag,
netpolCrudTag,
},
Expand Down
47 changes: 16 additions & 31 deletions npm/pkg/dataplane/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@ type Config struct {
*policies.PolicyManagerCfg
}

type updatePodCache struct {
sync.Mutex
cache map[string]*updateNPMPod
}

func newUpdatePodCache() *updatePodCache {
return &updatePodCache{cache: make(map[string]*updateNPMPod)}
}

type endpointCache struct {
sync.Mutex
cache map[string]*npmEndpoint
Expand Down Expand Up @@ -70,7 +61,7 @@ func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChann
endpointCache: newEndpointCache(),
nodeName: nodeName,
ioShim: ioShim,
updatePodCache: newUpdatePodCache(),
updatePodCache: newUpdatePodCache(1),
endpointQuery: new(endpointQuery),
stopChannel: stopChannel,
}
Expand Down Expand Up @@ -144,13 +135,7 @@ func (dp *DataPlane) AddToSets(setNames []*ipsets.IPSetMetadata, podMetadata *Po
dp.updatePodCache.Lock()
defer dp.updatePodCache.Unlock()

updatePod, ok := dp.updatePodCache.cache[podMetadata.PodKey]
if !ok {
klog.Infof("[DataPlane] {AddToSet} pod key %s not found in updatePodCache. creating a new obj", podMetadata.PodKey)
updatePod = newUpdateNPMPod(podMetadata)
dp.updatePodCache.cache[podMetadata.PodKey] = updatePod
}

updatePod := dp.updatePodCache.enqueue(podMetadata)
updatePod.updateIPSetsToAdd(setNames)
}

Expand All @@ -172,13 +157,7 @@ func (dp *DataPlane) RemoveFromSets(setNames []*ipsets.IPSetMetadata, podMetadat
dp.updatePodCache.Lock()
defer dp.updatePodCache.Unlock()

updatePod, ok := dp.updatePodCache.cache[podMetadata.PodKey]
if !ok {
klog.Infof("[DataPlane] {RemoveFromSet} pod key %s not found in updatePodCache. creating a new obj", podMetadata.PodKey)
updatePod = newUpdateNPMPod(podMetadata)
dp.updatePodCache.cache[podMetadata.PodKey] = updatePod
}

updatePod := dp.updatePodCache.enqueue(podMetadata)
updatePod.updateIPSetsToRemove(setNames)
}

Expand Down Expand Up @@ -220,7 +199,7 @@ func (dp *DataPlane) ApplyDataPlane() error {
if dp.shouldUpdatePod() {
// do not refresh endpoints if the updatePodCache is empty
dp.updatePodCache.Lock()
if len(dp.updatePodCache.cache) == 0 {
if dp.updatePodCache.isEmpty() {
dp.updatePodCache.Unlock()
return nil
}
Expand All @@ -238,15 +217,21 @@ func (dp *DataPlane) ApplyDataPlane() error {
dp.updatePodCache.Lock()
defer dp.updatePodCache.Unlock()

for podKey, pod := range dp.updatePodCache.cache {
err := dp.updatePod(pod)
if err != nil {
for !dp.updatePodCache.isEmpty() {
pod := dp.updatePodCache.dequeue()
if pod == nil {
// should never happen because of isEmpty check above and lock on updatePodCache
metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] failed to dequeue pod while applying the dataplane")
// break to avoid infinite loop (something weird happened since isEmpty returned false above)
break
}

if err := dp.updatePod(pod); err != nil {
// move on to the next and later return as success since this can be retried irrespective of other operations
metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "failed to update pod while applying the dataplane. key: [%s], err: [%s]", podKey, err.Error())
metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "failed to update pod while applying the dataplane. key: [%s], err: [%s]", pod.PodKey, err.Error())
dp.updatePodCache.requeue(pod)
continue
}

delete(dp.updatePodCache.cache, podKey)
}
}
return nil
Expand Down
142 changes: 142 additions & 0 deletions npm/pkg/dataplane/dataplane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,148 @@ func TestUpdatePolicy(t *testing.T) {
require.NoError(t, err)
}

func TestUpdatePodCache(t *testing.T) {
m1 := NewPodMetadata("x/a", "10.0.0.1", nodeName)
m2 := NewPodMetadata("x/b", "10.0.0.2", nodeName)
m3 := NewPodMetadata("x/c", "10.0.0.3", nodeName)

c := newUpdatePodCache(3)
require.True(t, c.isEmpty())

p1 := c.enqueue(m1)
require.False(t, c.isEmpty())
require.Equal(t, *newUpdateNPMPod(m1), *p1)
require.Equal(t, c.queue, []string{m1.PodKey})
require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1})

p2 := c.enqueue(m2)
require.False(t, c.isEmpty())
require.Equal(t, *newUpdateNPMPod(m2), *p2)
require.Equal(t, c.queue, []string{m1.PodKey, m2.PodKey})
require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m2.PodKey: p2})

p3 := c.enqueue(m3)
require.False(t, c.isEmpty())
require.Equal(t, *newUpdateNPMPod(m3), *p3)
require.Equal(t, c.queue, []string{m1.PodKey, m2.PodKey, m3.PodKey})
require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m2.PodKey: p2, m3.PodKey: p3})

// Test that enqueueing an existing pod does not change the queue or cache.
pairs := []struct {
m *PodMetadata
p *updateNPMPod
}{
{m1, p1},
{m2, p2},
{m3, p3},
}
for _, pair := range pairs {
p := c.enqueue(pair.m)
require.False(t, c.isEmpty())
require.Equal(t, pair.p, p)
require.Equal(t, c.queue, []string{m1.PodKey, m2.PodKey, m3.PodKey})
require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m2.PodKey: p2, m3.PodKey: p3})
}

// test dequeue
p := c.dequeue()
require.False(t, c.isEmpty())
require.Equal(t, p1, p)
require.Equal(t, c.queue, []string{m2.PodKey, m3.PodKey})
require.Equal(t, c.cache, map[string]*updateNPMPod{m2.PodKey: p2, m3.PodKey: p3})

p = c.dequeue()
require.False(t, c.isEmpty())
require.Equal(t, p2, p)
require.Equal(t, c.queue, []string{m3.PodKey})
require.Equal(t, c.cache, map[string]*updateNPMPod{m3.PodKey: p3})

// test requeuing
c.requeue(p)
require.False(t, c.isEmpty())
require.Equal(t, c.queue, []string{m3.PodKey, m2.PodKey})
require.Equal(t, c.cache, map[string]*updateNPMPod{m3.PodKey: p3, m2.PodKey: p2})

p = c.dequeue()
require.False(t, c.isEmpty())
require.Equal(t, p3, p)
require.Equal(t, c.queue, []string{m2.PodKey})
require.Equal(t, c.cache, map[string]*updateNPMPod{m2.PodKey: p2})

// test enqueuing again
p = c.enqueue(m1)
require.Equal(t, *p1, *p)
require.False(t, c.isEmpty())
require.Equal(t, c.queue, []string{m2.PodKey, m1.PodKey})
require.Equal(t, c.cache, map[string]*updateNPMPod{m2.PodKey: p2, m1.PodKey: p1})

p = c.dequeue()
require.False(t, c.isEmpty())
require.Equal(t, p2, p)
require.Equal(t, c.queue, []string{m1.PodKey})
require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1})

p = c.dequeue()
require.True(t, c.isEmpty())
require.Equal(t, p1, p)
require.Equal(t, c.queue, []string{})
require.Equal(t, c.cache, map[string]*updateNPMPod{})

// test requeue on empty queue
c.requeue(p)
require.False(t, c.isEmpty())
require.Equal(t, c.queue, []string{m1.PodKey})
require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1})

p = c.dequeue()
require.True(t, c.isEmpty())
require.Equal(t, p1, p)
require.Equal(t, c.queue, []string{})
require.Equal(t, c.cache, map[string]*updateNPMPod{})

// test nil result on empty queue
p = c.dequeue()
require.True(t, c.isEmpty())
require.Nil(t, p)

// test enqueue on empty queue
p = c.enqueue(m3)
require.False(t, c.isEmpty())
require.Equal(t, *p3, *p)
require.Equal(t, c.queue, []string{m3.PodKey})
require.Equal(t, c.cache, map[string]*updateNPMPod{m3.PodKey: p})

// test enqueue with different node on only item in queue
m3Node2 := *m3
m3Node2.NodeName = "node2"
p3Node2 := *newUpdateNPMPod(&m3Node2)
p = c.enqueue(&m3Node2)
require.False(t, c.isEmpty())
require.Equal(t, p3Node2, *p)
require.Equal(t, c.queue, []string{m3Node2.PodKey})
require.Equal(t, c.cache, map[string]*updateNPMPod{m3Node2.PodKey: p})

// test enqueue with different node on first item in queue
p = c.enqueue(m1)
require.False(t, c.isEmpty())
require.Equal(t, *p1, *p)
require.Equal(t, c.queue, []string{m3Node2.PodKey, m1.PodKey})
require.Equal(t, c.cache, map[string]*updateNPMPod{m3Node2.PodKey: &p3Node2, m1.PodKey: p1})

p = c.enqueue(m3)
require.False(t, c.isEmpty())
require.Equal(t, *p3, *p)
require.Equal(t, c.queue, []string{m1.PodKey, m3.PodKey})
require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m3.PodKey: p})

// test enqueue with different node on last item in queue
p = c.enqueue(&m3Node2)
require.False(t, c.isEmpty())
require.Equal(t, p3Node2, *p)
require.Equal(t, c.queue, []string{m1.PodKey, m3Node2.PodKey})
require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m3Node2.PodKey: p})
}

func getBootupTestCalls() []testutils.TestCmd {
return append(policies.GetBootupTestCalls(), ipsets.GetResetTestCalls()...)
}
Expand Down
12 changes: 0 additions & 12 deletions npm/pkg/dataplane/dataplane_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ func testSerialCases(t *testing.T, tests []*SerialTestCase) {
i := i
tt := tt

for _, tag := range tt.Tags {
if tag == skipTestTag {
continue
}
}

t.Run(tt.Description, func(t *testing.T) {
t.Logf("beginning test #%d. Description: [%s]. Tags: %+v", i, tt.Description, tt.Tags)

Expand Down Expand Up @@ -85,12 +79,6 @@ func testMultiJobCases(t *testing.T, tests []*MultiJobTestCase) {
i := i
tt := tt

for _, tag := range tt.Tags {
if tag == skipTestTag {
continue
}
}

t.Run(tt.Description, func(t *testing.T) {
t.Logf("beginning test #%d. Description: [%s]. Tags: %+v", i, tt.Description, tt.Tags)

Expand Down
Loading