Skip to content

Commit

Permalink
Address more comments
Browse files Browse the repository at this point in the history
Signed-off-by: Dyanngg <dingyang@vmware.com>
  • Loading branch information
Dyanngg committed Mar 9, 2023
1 parent a8e6a7f commit c1a102a
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 38 deletions.
2 changes: 1 addition & 1 deletion multicluster/test/e2e/antreapolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func executeTestsOnAllMemberClusters(t *testing.T, testList []*antreae2e.TestCas
// skip verification for the leader cluster
continue
}
if err := k8sUtils.WaitForACNPCreatiionAndRealization(t, acnpName, policyRealizedTimeout); err != nil {
if err := k8sUtils.WaitForACNPCreationAndRealization(t, acnpName, policyRealizedTimeout); err != nil {
t.Errorf("Failed to get ACNP to be replicated in cluster %s", clusterName)
failOnError(err, t)
}
Expand Down
12 changes: 4 additions & 8 deletions pkg/controller/labelidentity/label_group_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Interface interface {
// DeleteSelector deletes or updates a selectorItem when a selector is deleted from a policy.
DeleteSelector(selectorKey string, policyKey string)
// RemoveStalePolicySelectors cleans up any outdated selector <-> policy mapping based on the policy's latest selectors.
RemoveStalePolicySelectors(selectors []*types.GroupSelector, policyKey string)
RemoveStalePolicySelectors(selectorKeys sets.String, policyKey string)
// DeletePolicySelectors removes any selectors from referring to the policy being deleted.
DeletePolicySelectors(policyKey string)
// AddLabelIdentity adds LabelIdentity-ID mapping to the index.
Expand Down Expand Up @@ -298,17 +298,13 @@ func (i *LabelIdentityIndex) deleteSelector(selectorKey string, policyKey string
}

// RemoveStalePolicySelectors cleans up any outdated selector <-> policy mapping based on the policy's latest selectors.
func (i *LabelIdentityIndex) RemoveStalePolicySelectors(selectors []*types.GroupSelector, policyKey string) {
func (i *LabelIdentityIndex) RemoveStalePolicySelectors(selectorKeys sets.String, policyKey string) {
originalSelectors := i.getPolicySelectors(policyKey)
if originalSelectors == nil {
return
}
if len(selectors) > 0 {
newSelectors := map[string]*types.GroupSelector{}
for _, s := range selectors {
newSelectors[s.NormalizedName] = s
}
for selKey := range newSelectors {
if selectorKeys.Len() > 0 {
for _, selKey := range selectorKeys.UnsortedList() {
if _, exists := originalSelectors[selKey]; exists {
// Remove matched ClusterSet-scoped selectors of the policy before and after the update.
// The selectors remaining in originalSelectors will need to be unbounded from the policy.
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/labelidentity/label_group_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,12 @@ func TestSetPolicySelectors(t *testing.T) {
}
}
var labelIDs []uint32
selectorKeys := sets.NewString()
for _, sel := range tt.selectors {
labelIDs = append(labelIDs, i.AddSelector(sel, tt.policyKey)...)
selectorKeys.Insert(sel.NormalizedName)
}
i.RemoveStalePolicySelectors(tt.selectors, tt.policyKey)
i.RemoveStalePolicySelectors(selectorKeys, tt.policyKey)
assert.ElementsMatch(t, tt.expIDs, dedupLabelIdentites(labelIDs))
assert.Equalf(t, len(tt.expSelectorItems), len(i.selectorItems.List()), "Unexpected number of cached selectorItems")
for selKey, expSelItem := range tt.expSelectorItems {
Expand Down
15 changes: 9 additions & 6 deletions pkg/controller/networkpolicy/antreanetworkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net
appliedToGroups := map[string]*antreatypes.AppliedToGroup{}
addressGroups := map[string]*antreatypes.AddressGroup{}
rules := make([]controlplane.NetworkPolicyRule, 0, len(np.Spec.Ingress)+len(np.Spec.Egress))
// clusterSetScopeSelectors keeps track of all the ClusterSet-scoped selectors of the policy.
// clusterSetScopeSelectorKeys keeps track of all the ClusterSet-scoped selector keys of the policy.
// During policy peer processing, any ClusterSet-scoped selector will be registered with the
// labelIdentityInterface and added to this list. By the end of the function, this list will
// labelIdentityInterface and added to this set. By the end of the function, this set will
// be used to remove any stale selector from the policy in the labelIdentityInterface.
var clusterSetScopeSelectors []*antreatypes.GroupSelector
var clusterSetScopeSelectorKeys sets.String
// Create AppliedToGroup for each AppliedTo present in AntreaNetworkPolicy spec.
atgs := n.processAppliedTo(np.Namespace, np.Spec.AppliedTo)
appliedToGroups = mergeAppliedToGroups(appliedToGroups, atgs...)
Expand All @@ -101,7 +101,8 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net
// Create AppliedToGroup for each AppliedTo present in the ingress rule.
atgs := n.processAppliedTo(np.Namespace, ingressRule.AppliedTo)
appliedToGroups = mergeAppliedToGroups(appliedToGroups, atgs...)
peer, ags := n.toAntreaPeerForCRD(ingressRule.From, np, controlplane.DirectionIn, namedPortExists, &clusterSetScopeSelectors)
peer, ags, selKeys := n.toAntreaPeerForCRD(ingressRule.From, np, controlplane.DirectionIn, namedPortExists)
clusterSetScopeSelectorKeys = clusterSetScopeSelectorKeys.Union(selKeys)
addressGroups = mergeAddressGroups(addressGroups, ags...)
rules = append(rules, controlplane.NetworkPolicyRule{
Direction: controlplane.DirectionIn,
Expand All @@ -127,8 +128,10 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net
peer = n.svcRefToPeerForCRD(egressRule.ToServices, np.Namespace)
} else {
var ags []*antreatypes.AddressGroup
peer, ags = n.toAntreaPeerForCRD(egressRule.To, np, controlplane.DirectionOut, namedPortExists, &clusterSetScopeSelectors)
var selKeys sets.String
peer, ags, selKeys = n.toAntreaPeerForCRD(egressRule.To, np, controlplane.DirectionOut, namedPortExists)
addressGroups = mergeAddressGroups(addressGroups, ags...)
clusterSetScopeSelectorKeys = clusterSetScopeSelectorKeys.Union(selKeys)
}
rules = append(rules, controlplane.NetworkPolicyRule{
Direction: controlplane.DirectionOut,
Expand Down Expand Up @@ -160,7 +163,7 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net
AppliedToPerRule: appliedToPerRule,
}
if n.stretchNPEnabled {
n.labelIdentityInterface.RemoveStalePolicySelectors(clusterSetScopeSelectors, internalNetworkPolicyKeyFunc(np))
n.labelIdentityInterface.RemoveStalePolicySelectors(clusterSetScopeSelectorKeys, internalNetworkPolicyKeyFunc(np))
}
return internalNetworkPolicy, appliedToGroups, addressGroups
}
Expand Down
24 changes: 14 additions & 10 deletions pkg/controller/networkpolicy/clusternetworkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,11 @@ func (n *NetworkPolicyController) processClusterNetworkPolicy(cnp *crdv1alpha1.C
var clusterAppliedToAffectedNS []string
// atgForNamespace is the appliedToGroups split by Namespaces.
var atgForNamespace []*antreatypes.AppliedToGroup
// clusterSetScopeSelectors keeps track of all the ClusterSet-scoped selectors (including per-Namespace
// selectors) of the policy. During policy peer processing, any ClusterSet-scoped selector will be
// registered with the labelIdentityInterface and added to this list. By the end of the function, this
// list will be used to remove any stale selector from the policy in the labelIdentityInterface.
var clusterSetScopeSelectors []*antreatypes.GroupSelector
// clusterSetScopeSelectorKeys keeps track of all the ClusterSet-scoped selector keys of the policy.
// During policy peer processing, any ClusterSet-scoped selector will be registered with the
// labelIdentityInterface and added to this set. By the end of the function, this set will
// be used to remove any stale selector from the policy in the labelIdentityInterface.
var clusterSetScopeSelectorKeys sets.String
if hasPerNamespaceRule && len(cnp.Spec.AppliedTo) > 0 {
for _, at := range cnp.Spec.AppliedTo {
if at.ServiceAccount != nil {
Expand Down Expand Up @@ -401,7 +401,8 @@ func (n *NetworkPolicyController) processClusterNetworkPolicy(cnp *crdv1alpha1.C
if cnpRule.ToServices != nil {
addRule(n.svcRefToPeerForCRD(cnpRule.ToServices, ""), nil, direction, ruleATGs)
} else {
peer, ags := n.toAntreaPeerForCRD(clusterPeers, cnp, direction, namedPortExists, &clusterSetScopeSelectors)
peer, ags, selKeys := n.toAntreaPeerForCRD(clusterPeers, cnp, direction, namedPortExists)
clusterSetScopeSelectorKeys = clusterSetScopeSelectorKeys.Union(selKeys)
addRule(peer, ags, direction, ruleATGs)
}
}
Expand All @@ -410,7 +411,8 @@ func (n *NetworkPolicyController) processClusterNetworkPolicy(cnp *crdv1alpha1.C
// Create a rule for each affected Namespace of appliedTo at spec level
for i := range clusterAppliedToAffectedNS {
klog.V(4).Infof("Adding a new per-namespace rule with appliedTo %v for rule %d of %s", clusterAppliedToAffectedNS[i], idx, cnp.Name)
peer, ags := n.toNamespacedPeerForCRD(perNSPeers, cnp, clusterAppliedToAffectedNS[i], &clusterSetScopeSelectors)
peer, ags, selKeys := n.toNamespacedPeerForCRD(perNSPeers, cnp, clusterAppliedToAffectedNS[i])
clusterSetScopeSelectorKeys = clusterSetScopeSelectorKeys.Union(selKeys)
addRule(peer, ags, direction, []*antreatypes.AppliedToGroup{atgForNamespace[i]})
}
} else {
Expand All @@ -419,14 +421,16 @@ func (n *NetworkPolicyController) processClusterNetworkPolicy(cnp *crdv1alpha1.C
if at.ServiceAccount != nil {
atg := n.createAppliedToGroup(at.ServiceAccount.Namespace, serviceAccountNameToPodSelector(at.ServiceAccount.Name), nil, nil)
klog.V(4).Infof("Adding a new per-namespace rule with appliedTo %v for rule %d of %s", atg, idx, cnp.Name)
peer, ags := n.toNamespacedPeerForCRD(perNSPeers, cnp, at.ServiceAccount.Namespace, &clusterSetScopeSelectors)
peer, ags, selKeys := n.toNamespacedPeerForCRD(perNSPeers, cnp, at.ServiceAccount.Namespace)
clusterSetScopeSelectorKeys = clusterSetScopeSelectorKeys.Union(selKeys)
addRule(peer, ags, direction, []*antreatypes.AppliedToGroup{atg})
} else {
affectedNS := n.getAffectedNamespacesForAppliedTo(at)
for _, ns := range affectedNS {
atg := n.createAppliedToGroup(ns, at.PodSelector, nil, at.ExternalEntitySelector)
klog.V(4).Infof("Adding a new per-namespace rule with appliedTo %v for rule %d of %s", atg, idx, cnp.Name)
peer, ags := n.toNamespacedPeerForCRD(perNSPeers, cnp, ns, &clusterSetScopeSelectors)
peer, ags, selKeys := n.toNamespacedPeerForCRD(perNSPeers, cnp, ns)
clusterSetScopeSelectorKeys = clusterSetScopeSelectorKeys.Union(selKeys)
addRule(peer, ags, direction, []*antreatypes.AppliedToGroup{atg})
}
}
Expand Down Expand Up @@ -460,7 +464,7 @@ func (n *NetworkPolicyController) processClusterNetworkPolicy(cnp *crdv1alpha1.C
AppliedToPerRule: appliedToPerRule,
}
if n.stretchNPEnabled {
n.labelIdentityInterface.RemoveStalePolicySelectors(clusterSetScopeSelectors, internalNetworkPolicyKeyFunc(cnp))
n.labelIdentityInterface.RemoveStalePolicySelectors(clusterSetScopeSelectorKeys, internalNetworkPolicyKeyFunc(cnp))
}
return internalNetworkPolicy, appliedToGroups, addressGroups
}
Expand Down
20 changes: 11 additions & 9 deletions pkg/controller/networkpolicy/crd_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package networkpolicy

import (
"k8s.io/apimachinery/pkg/util/sets"
"strings"
"time"

Expand Down Expand Up @@ -142,8 +143,7 @@ func toAntreaIPBlockForCRD(ipBlock *v1alpha1.IPBlock) (*controlplane.IPBlock, er
// Any ClusterSet scoped selector in this peer will also be registered with the labelIdentityInterface
// for the policy.
func (n *NetworkPolicyController) toAntreaPeerForCRD(peers []v1alpha1.NetworkPolicyPeer,
np metav1.Object, dir controlplane.Direction, namedPortExists bool,
clusterSetScopeSelectors *[]*antreatypes.GroupSelector) (*controlplane.NetworkPolicyPeer, []*antreatypes.AddressGroup) {
np metav1.Object, dir controlplane.Direction, namedPortExists bool) (*controlplane.NetworkPolicyPeer, []*antreatypes.AddressGroup, sets.String) {
var addressGroups []*antreatypes.AddressGroup
// NetworkPolicyPeer is supposed to match all addresses when it is empty and no clusterGroup is present.
// It's treated as an IPBlock "0.0.0.0/0".
Expand All @@ -155,18 +155,19 @@ func (n *NetworkPolicyController) toAntreaPeerForCRD(peers []v1alpha1.NetworkPol
// For other cases it uses the IPBlock "0.0.0.0/0" to avoid the overhead
// of handling member updates of the AddressGroup.
if dir == controlplane.DirectionIn || !namedPortExists {
return &matchAllPeer, nil
return &matchAllPeer, nil, sets.NewString()
}
allPodsGroup := n.createAddressGroup("", matchAllPodsPeerCrd.PodSelector, matchAllPodsPeerCrd.NamespaceSelector, nil, nil)
addressGroups = append(addressGroups, allPodsGroup)
podsPeer := matchAllPeer
podsPeer.AddressGroups = append(podsPeer.AddressGroups, allPodsGroup.Name)
return &podsPeer, addressGroups
return &podsPeer, addressGroups, sets.NewString()
}
var ipBlocks []controlplane.IPBlock
var fqdns []string
var labelIdentities []uint32
uniqueLabelIDs := map[uint32]struct{}{}
clusterSetScopeSelectorKeys := sets.NewString()
for _, peer := range peers {
// A v1alpha1.NetworkPolicyPeer will have exactly one of the following fields set:
// - podSelector and/or namespaceSelector (in-cluster scope or ClusterSet scope)
Expand Down Expand Up @@ -200,7 +201,7 @@ func (n *NetworkPolicyController) toAntreaPeerForCRD(peers []v1alpha1.NetworkPol
}
if n.stretchNPEnabled && peer.Scope == v1alpha1.ScopeClusterSet {
newClusterSetScopeSelector := antreatypes.NewGroupSelector(np.GetNamespace(), peer.PodSelector, peer.NamespaceSelector, nil, nil)
*clusterSetScopeSelectors = append(*clusterSetScopeSelectors, newClusterSetScopeSelector)
clusterSetScopeSelectorKeys.Insert(newClusterSetScopeSelector.NormalizedName)
// In addition to getting the matched Label Identity IDs, AddSelector also registers the selector
// with the labelIdentityInterface.
matchedLabelIDs := n.labelIdentityInterface.AddSelector(newClusterSetScopeSelector, internalNetworkPolicyKeyFunc(np))
Expand All @@ -217,23 +218,24 @@ func (n *NetworkPolicyController) toAntreaPeerForCRD(peers []v1alpha1.NetworkPol
IPBlocks: ipBlocks,
FQDNs: fqdns,
LabelIdentities: labelIdentities,
}, addressGroups
}, addressGroups, clusterSetScopeSelectorKeys
}

// toNamespacedPeerForCRD creates an Antrea controlplane NetworkPolicyPeer for crdv1alpha1 NetworkPolicyPeer
// for a particular Namespace. It is used when a single crdv1alpha1 NetworkPolicyPeer maps to multiple
// controlplane NetworkPolicyPeers because the appliedTo workloads reside in different Namespaces.
func (n *NetworkPolicyController) toNamespacedPeerForCRD(peers []v1alpha1.NetworkPolicyPeer,
np metav1.Object, namespace string, clusterSetScopeSelectors *[]*antreatypes.GroupSelector) (*controlplane.NetworkPolicyPeer, []*antreatypes.AddressGroup) {
np metav1.Object, namespace string) (*controlplane.NetworkPolicyPeer, []*antreatypes.AddressGroup, sets.String) {
var addressGroups []*antreatypes.AddressGroup
var labelIdentities []uint32
uniqueLabelIDs := map[uint32]struct{}{}
clusterSetScopeSelectorKeys := sets.NewString()
for _, peer := range peers {
addressGroup := n.createAddressGroup(namespace, peer.PodSelector, nil, peer.ExternalEntitySelector, nil)
addressGroups = append(addressGroups, addressGroup)
if n.stretchNPEnabled && peer.Scope == v1alpha1.ScopeClusterSet {
newClusterSetScopeSelector := antreatypes.NewGroupSelector(namespace, peer.PodSelector, nil, peer.ExternalEntitySelector, nil)
*clusterSetScopeSelectors = append(*clusterSetScopeSelectors, newClusterSetScopeSelector)
clusterSetScopeSelectorKeys.Insert(newClusterSetScopeSelector.NormalizedName)
// In addition to getting the matched Label Identity IDs, AddSelector also registers the selector
// with the labelIdentityInterface.
matchedLabelIDs := n.labelIdentityInterface.AddSelector(newClusterSetScopeSelector, internalNetworkPolicyKeyFunc(np))
Expand All @@ -247,7 +249,7 @@ func (n *NetworkPolicyController) toNamespacedPeerForCRD(peers []v1alpha1.Networ
}
return &controlplane.NetworkPolicyPeer{
AddressGroups: getAddressGroupNames(addressGroups), LabelIdentities: labelIdentities,
}, addressGroups
}, addressGroups, clusterSetScopeSelectorKeys
}

// svcRefToPeerForCRD creates an Antrea controlplane NetworkPolicyPeer from ServiceReferences in ToServices
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/networkpolicy/crd_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,7 @@ func TestToAntreaPeerForCRD(t *testing.T) {
npc.labelIdentityInterface.AddLabelIdentity(labelIdentityA, 1)
npc.labelIdentityInterface.AddLabelIdentity(labelIdentityB, 2)
}
var clusterSetScopeSelectors []*antreatypes.GroupSelector
actualPeer, _ := npc.toAntreaPeerForCRD(tt.inPeers, testCNPObj, tt.direction, tt.namedPortExists, &clusterSetScopeSelectors)
actualPeer, _, _ := npc.toAntreaPeerForCRD(tt.inPeers, testCNPObj, tt.direction, tt.namedPortExists)
if !reflect.DeepEqual(tt.outPeer.AddressGroups, actualPeer.AddressGroups) {
t.Errorf("Unexpected AddressGroups in Antrea Peer conversion. Expected %v, got %v", tt.outPeer.AddressGroups, actualPeer.AddressGroups)
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/k8s_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ func (data *TestData) CleanANPs(namespaces []string) error {
return nil
}

func (data *TestData) WaitForACNPCreatiionAndRealization(t *testing.T, name string, timeout time.Duration) error {
func (data *TestData) WaitForACNPCreationAndRealization(t *testing.T, name string, timeout time.Duration) error {
t.Logf("Waiting for ACNP '%s' to be created and realized", name)
if err := wait.Poll(100*time.Millisecond, timeout, func() (bool, error) {
acnp, err := data.crdClient.CrdV1alpha1().ClusterNetworkPolicies().Get(context.TODO(), name, metav1.GetOptions{})
Expand Down

0 comments on commit c1a102a

Please sign in to comment.