Skip to content

Commit

Permalink
Fix Antrea-native Policy with multiple AppliedTo (antrea-io#2084)
Browse files Browse the repository at this point in the history
A Policy rule may have multiple AppliedToGroups, not all of which select
some workloads on the Nodes that the Policy applies to. It's by design
that an AppliedToGroup won't be sent to a Node if it doesn't select any
workload on it, so agents shouldn't require all AppliedToGroups to be
received before it can realize a rule. What's more, it may happen that
none of its AppliedToGroups is sent to a Node when the rule itself is
being evaluated on the Node if it's sent to the Node because other rules
of its parent Policy apply to it.

This patch fixes the logic by making the controller install a rule when
any of its AppliedToGroups can be populated and all of its AddressGroups
can be populated, and uninstall it when none of its AppliedToGroups can
be populated.
  • Loading branch information
tnqn authored and antoninbas committed Apr 30, 2021
1 parent 0e9dff6 commit 0bcb59b
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 68 deletions.
75 changes: 49 additions & 26 deletions pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,22 +220,32 @@ func (c *ruleCache) getAppliedNetworkPolicies(pod, namespace string, npFilter *q
return policies
}

func (c *ruleCache) getRule(ruleID string) (*rule, bool) {
obj, exists, _ := c.rules.GetByKey(ruleID)
if !exists {
return nil, false
}
return obj.(*rule), true
}

func (c *ruleCache) getRulesByNetworkPolicy(uid string) []*rule {
func (c *ruleCache) getEffectiveRulesByNetworkPolicy(uid string) []*rule {
objs, _ := c.rules.ByIndex(policyIndex, uid)
if len(objs) == 0 {
return nil
}
rules := make([]*rule, len(objs))
for i, obj := range objs {
rules[i] = obj.(*rule)
rules := make([]*rule, 0, len(objs))

// A rule is considered effective when any of its AppliedToGroups can be populated.
isEffective := func(r *rule) bool {
for _, g := range r.AppliedToGroups {
_, exists := c.memberSetByGroup[g]
if exists {
return true
}
}
return false
}

c.podSetLock.RLock()
defer c.podSetLock.RUnlock()

for _, obj := range objs {
rule := obj.(*rule)
if isEffective(rule) {
rules = append(rules, rule)
}
}
return rules
}
Expand Down Expand Up @@ -518,13 +528,13 @@ func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) error
}

// DeleteAppliedToGroup deletes a cached *v1beta.AppliedToGroup.
// It should only happen when a group is no longer referenced by any rule, so
// no need to mark dirty rules.
// It may be called when a rule becomes ineffective, so it needs to mark dirty rules.
func (c *ruleCache) DeleteAppliedToGroup(group *v1beta.AppliedToGroup) error {
c.podSetLock.Lock()
defer c.podSetLock.Unlock()

delete(c.memberSetByGroup, group.Name)
c.onAppliedToGroupUpdate(group.Name)
return nil
}

Expand Down Expand Up @@ -693,16 +703,33 @@ func (c *ruleCache) deleteNetworkPolicyLocked(uid string) error {
}

// GetCompletedRule constructs a *CompletedRule for the provided ruleID.
// If the rule is not found or not completed due to missing group data,
// the return value will indicate it.
func (c *ruleCache) GetCompletedRule(ruleID string) (completedRule *CompletedRule, exists bool, completed bool) {
// If the rule is not effective or not realizable due to missing group data, the return value will indicate it.
// A rule is considered effective when any of its AppliedToGroups can be populated.
// A rule is considered realizable when it's effective and all of its AddressGroups can be populated.
// When a rule is not effective, it should be removed from the datapath.
// When a rule is effective but not realizable, the caller should wait for it being realizable before doing anything.
// When a rule is effective and realizable, the caller should realize it.
// This is because some AppliedToGroups in a rule might never be sent to this Node if one of the following is true:
// - The original policy has multiple AppliedToGroups and some AppliedToGroups' span does not include this Node.
// - The original policy is appliedTo-per-rule, and some of the rule's AppliedToGroups do not include this Node.
// - The original policy is appliedTo-per-rule, none of the rule's AppliedToGroups includes this Node, but some other rules' (in the same policy) AppliedToGroups include this Node.
// In these cases, it is not guaranteed that all AppliedToGroups in the rule will eventually be present in the cache.
// Only the AppliedToGroups whose span includes this Node will eventually be received.
func (c *ruleCache) GetCompletedRule(ruleID string) (completedRule *CompletedRule, effective bool, realizable bool) {
obj, exists, _ := c.rules.GetByKey(ruleID)
if !exists {
return nil, false, false
}

r := obj.(*rule)

groupMembers, anyExists := c.unionAppliedToGroups(r.AppliedToGroups)
if !anyExists {
return nil, false, false
}

var fromAddresses, toAddresses v1beta.GroupMemberSet
var completed bool
if r.Direction == v1beta.DirectionIn {
fromAddresses, completed = c.unionAddressGroups(r.From.AddressGroups)
} else {
Expand All @@ -712,11 +739,6 @@ func (c *ruleCache) GetCompletedRule(ruleID string) (completedRule *CompletedRul
return nil, true, false
}

groupMembers, completed := c.unionAppliedToGroups(r.AppliedToGroups)
if !completed {
return nil, true, false
}

completedRule = &CompletedRule{
rule: r,
FromAddresses: fromAddresses,
Expand Down Expand Up @@ -764,20 +786,21 @@ func (c *ruleCache) unionAddressGroups(groupNames []string) (v1beta.GroupMemberS
}

// unionAppliedToGroups gets the union of pods of the provided appliedTo groups.
// If any group is not found, nil and false will be returned to indicate the
// set is not complete yet.
// If any group is found, the union and true will be returned. Otherwise an empty set and false will be returned.
func (c *ruleCache) unionAppliedToGroups(groupNames []string) (v1beta.GroupMemberSet, bool) {
c.podSetLock.RLock()
defer c.podSetLock.RUnlock()

anyExists := false
set := v1beta.NewGroupMemberSet()
for _, groupName := range groupNames {
curSet, exists := c.memberSetByGroup[groupName]
if !exists {
klog.V(2).Infof("AppliedToGroup %v was not found", groupName)
return nil, false
continue
}
anyExists = true
set = set.Union(curSet)
}
return set, true
return set, anyExists
}
51 changes: 42 additions & 9 deletions pkg/agent/controller/networkpolicy/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,12 +770,24 @@ func TestRuleCacheGetCompletedRule(t *testing.T) {
From: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup1", "addressGroup2", "addressGroup3"}},
AppliedToGroups: []string{"appliedToGroup1", "appliedToGroup2"},
}
rule4 := &rule{
ID: "rule4",
Direction: v1beta2.DirectionIn,
From: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup1", "addressGroup2"}},
AppliedToGroups: []string{"appliedToGroup1", "appliedToGroup2", "appliedToGroup3"},
}
rule5 := &rule{
ID: "rule5",
Direction: v1beta2.DirectionIn,
From: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup1", "addressGroup2"}},
AppliedToGroups: []string{"appliedToGroup3", "appliedToGroup4"},
}
tests := []struct {
name string
args string
wantCompletedRule *CompletedRule
wantExists bool
wantCompleted bool
wantEffective bool
wantRealizable bool
}{
{
"one-group-rule",
Expand All @@ -802,15 +814,34 @@ func TestRuleCacheGetCompletedRule(t *testing.T) {
true,
},
{
"incompleted-rule",
"missing-one-addressgroup-rule",
rule3.ID,
nil,
true,
false,
},
{
"missing-one-appliedtogroup-rule",
rule4.ID,
&CompletedRule{
rule: rule4,
FromAddresses: addressGroup1.Union(addressGroup2),
ToAddresses: nil,
TargetMembers: appliedToGroup1.Union(appliedToGroup2),
},
true,
true,
},
{
"missing-all-appliedtogroups-rule",
rule5.ID,
nil,
false,
false,
},
{
"non-existing-rule",
"rule4",
"rule6",
nil,
false,
false,
Expand All @@ -826,16 +857,18 @@ func TestRuleCacheGetCompletedRule(t *testing.T) {
c.rules.Add(rule1)
c.rules.Add(rule2)
c.rules.Add(rule3)
c.rules.Add(rule4)
c.rules.Add(rule5)

gotCompletedRule, gotExists, gotCompleted := c.GetCompletedRule(tt.args)
gotCompletedRule, gotEffective, gotRealizable := c.GetCompletedRule(tt.args)
if !reflect.DeepEqual(gotCompletedRule, tt.wantCompletedRule) {
t.Errorf("GetCompletedRule() gotCompletedRule = %v, want %v", gotCompletedRule, tt.wantCompletedRule)
}
if gotExists != tt.wantExists {
t.Errorf("GetCompletedRule() gotExists = %v, want %v", gotExists, tt.wantExists)
if gotEffective != tt.wantEffective {
t.Errorf("GetCompletedRule() gotEffective = %v, want %v", gotEffective, tt.wantEffective)
}
if gotCompleted != tt.wantCompleted {
t.Errorf("GetCompletedRule() gotCompleted = %v, want %v", gotCompleted, tt.wantCompleted)
if gotRealizable != tt.wantRealizable {
t.Errorf("GetCompletedRule() gotRealizable = %v, want %v", gotRealizable, tt.wantRealizable)
}
})
}
Expand Down
22 changes: 13 additions & 9 deletions pkg/agent/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,9 @@ func (c *Controller) syncRule(key string) error {
klog.V(4).Infof("Finished syncing rule %q. (%v)", key, time.Since(startTime))
}()

rule, exists, completed := c.ruleCache.GetCompletedRule(key)
if !exists {
klog.V(2).Infof("Rule %v had been deleted, removing its flows", key)
rule, effective, realizable := c.ruleCache.GetCompletedRule(key)
if !effective {
klog.V(2).Infof("Rule %v was not effective, removing its flows", key)
if err := c.reconciler.Forget(key); err != nil {
return err
}
Expand All @@ -467,10 +467,10 @@ func (c *Controller) syncRule(key string) error {
}
return nil
}
// If the rule is not complete, we can simply skip it as it will be marked as dirty
// If the rule is not realizable, we can simply skip it as it will be marked as dirty
// and queued again when we receive the missing group it missed.
if !completed {
klog.V(2).Infof("Rule %v was not complete, skipping", key)
if !realizable {
klog.V(2).Infof("Rule %v was not realizable, skipping", key)
return nil
}
if err := c.reconciler.Reconcile(rule); err != nil {
Expand All @@ -493,9 +493,13 @@ func (c *Controller) syncRules(keys []string) error {

var allRules []*CompletedRule
for _, key := range keys {
rule, exists, completed := c.ruleCache.GetCompletedRule(key)
if !exists || !completed {
klog.Errorf("Rule %s is not complete or does not exist in cache", key)
rule, effective, realizable := c.ruleCache.GetCompletedRule(key)
// It's normal that a rule is not effective on this Node but abnormal that it is not realizable after watchers
// complete full sync.
if !effective {
klog.Infof("Rule %s is not effective on this Node", key)
} else if !realizable {
klog.Errorf("Rule %s is effective but not realizable", key)
} else {
allRules = append(allRules, rule)
}
Expand Down
34 changes: 15 additions & 19 deletions pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func TestAddMultipleGroupsRule(t *testing.T) {
desiredRule := &CompletedRule{
rule: &rule{Direction: v1beta2.DirectionIn, Services: services},
FromAddresses: v1beta2.NewGroupMemberSet(newAddressGroupMember("1.1.1.1"), newAddressGroupMember("2.2.2.2"), newAddressGroupMember("3.3.3.3")),
ToAddresses: v1beta2.NewGroupMemberSet(),
ToAddresses: nil,
TargetMembers: v1beta2.NewGroupMemberSet(newAppliedToGroupMember("pod1", "ns1"), newAppliedToGroupMember("pod2", "ns2")),
}
stopCh := make(chan struct{})
Expand Down Expand Up @@ -306,37 +306,33 @@ func TestAddMultipleGroupsRule(t *testing.T) {
assert.Equal(t, 1, controller.GetAddressGroupNum())
assert.Equal(t, 1, controller.GetAppliedToGroupNum())

// addressGroup2 comes, no rule will be synced due to missing appliedToGroup2 data.
// addressGroup2 comes, policy1 will be synced with the TargetMembers populated from appliedToGroup1.
addressGroupWatcher.Add(newAddressGroup("addressGroup2", []v1beta2.GroupMember{*newAddressGroupMember("1.1.1.1"), *newAddressGroupMember("3.3.3.3")}))
select {
case ruleID := <-reconciler.updated:
t.Fatalf("Expected no update, got %v", ruleID)
actualRule, _ := reconciler.getLastRealized(ruleID)
assert.Equal(t, actualRule.Direction, desiredRule.Direction)
assert.ElementsMatch(t, actualRule.Services, desiredRule.Services)
assert.Equal(t, actualRule.FromAddresses, desiredRule.FromAddresses)
assert.Equal(t, actualRule.ToAddresses, desiredRule.ToAddresses)
assert.Equal(t, actualRule.TargetMembers, v1beta2.NewGroupMemberSet(newAppliedToGroupMember("pod1", "ns1")))
case <-time.After(time.Millisecond * 100):
t.Fatal("Expected one update, got none")
}
assert.Equal(t, 1, controller.GetNetworkPolicyNum())
assert.Equal(t, 2, controller.GetAddressGroupNum())
assert.Equal(t, 1, controller.GetAppliedToGroupNum())

// appliedToGroup2 comes, policy1 will be synced.
// appliedToGroup2 comes, policy1 will be synced with the TargetMembers populated from appliedToGroup1 and appliedToGroup2.
appliedToGroupWatcher.Add(newAppliedToGroup("appliedToGroup2", []v1beta2.GroupMember{*newAppliedToGroupMember("pod2", "ns2")}))
select {
case ruleID := <-reconciler.updated:
actualRule, _ := reconciler.getLastRealized(ruleID)
if actualRule.Direction != desiredRule.Direction {
t.Errorf("Expected Direction %v, got %v", actualRule.Direction, desiredRule.Direction)
}
if !assert.ElementsMatch(t, actualRule.Services, desiredRule.Services) {
t.Errorf("Expected Services %v, got %v", actualRule.Services, desiredRule.Services)
}
if !actualRule.FromAddresses.Equal(desiredRule.FromAddresses) {
t.Errorf("Expected FromAddresses %v, got %v", actualRule.FromAddresses, desiredRule.FromAddresses)
}
if !actualRule.ToAddresses.Equal(desiredRule.ToAddresses) {
t.Errorf("Expected ToAddresses %v, got %v", actualRule.ToAddresses, desiredRule.ToAddresses)
}
if !actualRule.TargetMembers.Equal(desiredRule.TargetMembers) {
t.Errorf("Expected Pods %v, got %v", actualRule.TargetMembers, desiredRule.TargetMembers)
}
assert.Equal(t, actualRule.Direction, desiredRule.Direction)
assert.ElementsMatch(t, actualRule.Services, desiredRule.Services)
assert.Equal(t, actualRule.FromAddresses, desiredRule.FromAddresses)
assert.Equal(t, actualRule.ToAddresses, desiredRule.ToAddresses)
assert.Equal(t, actualRule.TargetMembers, desiredRule.TargetMembers)
case <-time.After(time.Millisecond * 100):
t.Fatal("Expected one update, got none")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/networkpolicy/status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (c *StatusController) syncHandler(uid types.UID) error {
if policy == nil {
return nil
}
desiredRules := c.ruleCache.getRulesByNetworkPolicy(string(uid))
desiredRules := c.ruleCache.getEffectiveRulesByNetworkPolicy(string(uid))
// The policy must have been deleted, no further processing.
if len(desiredRules) == 0 {
return nil
Expand Down
10 changes: 6 additions & 4 deletions pkg/agent/controller/networkpolicy/status_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ func TestSyncStatusForNewPolicy(t *testing.T) {
go statusController.Run(stopCh)

ruleCache.AddNetworkPolicy(tt.policy)
rules := ruleCache.getRulesByNetworkPolicy(string(tt.policy.UID))
ruleCache.AddAppliedToGroup(newAppliedToGroup("appliedToGroup1", []v1beta2.GroupMember{*newAppliedToGroupMember("pod1", "ns1")}))
rules := ruleCache.getEffectiveRulesByNetworkPolicy(string(tt.policy.UID))
for i, rule := range rules {
// Only make specified number of rules realized.
if i >= tt.realizedRules {
Expand All @@ -126,10 +127,11 @@ func TestSyncStatusUpForUpdatedPolicy(t *testing.T) {
defer close(stopCh)
go statusController.Run(stopCh)

ruleCache.AddAppliedToGroup(newAppliedToGroup("appliedToGroup1", []v1beta2.GroupMember{*newAppliedToGroupMember("pod1", "ns1")}))
policy := newNetworkPolicy("policy1", "uid1", []string{"addressGroup1"}, []string{}, []string{"appliedToGroup1"}, nil)
policy.Generation = 1
ruleCache.AddNetworkPolicy(policy)
rule1 := ruleCache.getRulesByNetworkPolicy(string(policy.UID))[0]
rule1 := ruleCache.getEffectiveRulesByNetworkPolicy(string(policy.UID))[0]
statusController.SetRuleRealization(rule1.ID, policy.UID)

matchGeneration := func(generation int64) error {
Expand All @@ -149,7 +151,7 @@ func TestSyncStatusUpForUpdatedPolicy(t *testing.T) {
ruleCache.UpdateNetworkPolicy(policy)
assert.Error(t, matchGeneration(policy.Generation), "The generation should not be updated to %v but was updated", policy.Generation)

rules := ruleCache.getRulesByNetworkPolicy(string(policy.UID))
rules := ruleCache.getEffectiveRulesByNetworkPolicy(string(policy.UID))
for _, rule := range rules {
// Only call SetRuleRealization for new rule.
if rule.ID != rule1.ID {
Expand Down Expand Up @@ -184,7 +186,7 @@ func BenchmarkSyncHandler(b *testing.B) {
policy.Rules = append(policy.Rules, newPolicyRule(v1beta2.DirectionOut, nil, []string{fmt.Sprintf("addressGroup%d", i)}, nil))
}
ruleCache.AddNetworkPolicy(policy)
rules := ruleCache.getRulesByNetworkPolicy(string(policy.UID))
rules := ruleCache.getEffectiveRulesByNetworkPolicy(string(policy.UID))
for _, rule := range rules {
statusController.SetRuleRealization(rule.ID, policy.UID)
}
Expand Down
Loading

0 comments on commit 0bcb59b

Please sign in to comment.