diff --git a/ci/jenkins/test-mc.sh b/ci/jenkins/test-mc.sh index 99d1c3ea5cc..aeb5b143692 100755 --- a/ci/jenkins/test-mc.sh +++ b/ci/jenkins/test-mc.sh @@ -190,6 +190,9 @@ function wait_for_antrea_multicluster_pods_ready { function wait_for_multicluster_controller_ready { echo "====== Deploying Antrea Multicluster Leader Cluster with ${LEADER_CLUSTER_CONFIG} ======" + leader_cluster_pod_cidr="10.244.0.0/20" + export leader_cluster_pod_cidr + perl -0777 -pi -e 's| podCIDRs\:\n - \"\"| podCIDRs\:\n - $ENV{leader_cluster_pod_cidr}|g' ./multicluster/test/yamls/leader-manifest.yml kubectl create ns antrea-multicluster "${LEADER_CLUSTER_CONFIG}" || true kubectl apply -f ./multicluster/build/yamls/antrea-multicluster-leader-global.yml "${LEADER_CLUSTER_CONFIG}" kubectl apply -f ./multicluster/test/yamls/leader-manifest.yml "${LEADER_CLUSTER_CONFIG}" @@ -206,10 +209,17 @@ function wait_for_multicluster_controller_ready { sed -i 's/antrea-multicluster/kube-system/g' ./multicluster/test/yamls/leader-access-token.yml echo "type: Opaque" >> ./multicluster/test/yamls/leader-access-token.yml - for config in "${membercluster_kubeconfigs[@]}"; + member_cluster_pod_cidrs=("10.244.16.0/20" "10.244.32.0/20") + for i in "${!membercluster_kubeconfigs[@]}"; do + pod_cidr=${member_cluster_pod_cidrs[$i]} + export pod_cidr + cp ./multicluster/test/yamls/member-manifest.yml ./multicluster/test/yamls/member-manifest-$i.yml + perl -0777 -pi -e 's| podCIDRs\:\n - \"\"| podCIDRs\:\n - $ENV{pod_cidr}|g' ./multicluster/test/yamls/member-manifest-$i.yml + + config=${membercluster_kubeconfigs[$i]} echo "====== Deploying Antrea Multicluster Member Cluster with ${config} ======" - kubectl apply -f ./multicluster/test/yamls/member-manifest.yml ${config} + kubectl apply -f ./multicluster/test/yamls/member-manifest-$i.yml ${config} kubectl rollout status deployment/antrea-mc-controller -n kube-system ${config} kubectl apply -f ./multicluster/test/yamls/leader-access-token.yml ${config} done @@ -254,6 +264,7 @@ function modify_config { multicluster: enableGateway: true enableStretchedNetworkPolicy: true + enablePodToPodConnectivity: true featureGates: { Multicluster: true } @@ -349,8 +360,10 @@ function deliver_multicluster_controller { sed -i "s||${leader_ip}|" ./multicluster/test/yamls/west-member-cluster.yml if [[ ${KIND} == "true" ]]; then docker cp ./multicluster/test/yamls/test-acnp-copy-span-ns-isolation.yml leader-control-plane:/root/test-acnp-copy-span-ns-isolation.yml + docker cp ./multicluster/test/yamls/test-acnp-cross-cluster-ns-isolation.yml leader-control-plane:/root/test-acnp-cross-cluster-ns-isolation.yml else rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" ./multicluster/test/yamls/test-acnp-copy-span-ns-isolation.yml jenkins@["${leader_ip}"]:"${WORKDIR}"/test-acnp-copy-span-ns-isolation.yml + rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" ./multicluster/test/yamls/test-acnp-cross-cluster-ns-isolation.yml jenkins@["${leader_ip}"]:"${WORKDIR}"/test-acnp-cross-cluster-ns-isolation.yml fi for kubeconfig in "${membercluster_kubeconfigs[@]}" @@ -430,6 +443,8 @@ function run_multicluster_e2e { fi set +x set -e + + tar -zcf antrea-test-logs.tar.gz antrea-multicluster-test-logs } function collect_coverage { @@ -455,8 +470,9 @@ clean_images if [[ ${KIND} == "true" ]]; then # Preparing a ClusterSet contains three Kind clusters. SERVICE_CIDRS=("10.96.10.0/24" "10.96.20.0/24" "10.96.30.0/24") + POD_CIDRS=("10.244.0.0/20" "10.244.16.0/20" "10.244.32.0/20") for i in {0..2}; do - ./ci/kind/kind-setup.sh create ${CLUSTER_NAMES[$i]} --service-cidr ${SERVICE_CIDRS[$i]} --num-workers 1 + ./ci/kind/kind-setup.sh create ${CLUSTER_NAMES[$i]} --service-cidr ${SERVICE_CIDRS[$i]} --pod-cidr ${POD_CIDRS[$i]} --num-workers 1 done for name in ${CLUSTER_NAMES[*]}; do diff --git a/multicluster/apis/multicluster/v1alpha2/zz_generated.deepcopy.go b/multicluster/apis/multicluster/v1alpha2/zz_generated.deepcopy.go index c5d780a6218..15b502d59f8 100644 --- a/multicluster/apis/multicluster/v1alpha2/zz_generated.deepcopy.go +++ b/multicluster/apis/multicluster/v1alpha2/zz_generated.deepcopy.go @@ -1,7 +1,7 @@ //go:build !ignore_autogenerated // +build !ignore_autogenerated -// Copyright 2022 Antrea Authors +// Copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/multicluster/test/e2e/antreapolicy_test.go b/multicluster/test/e2e/antreapolicy_test.go index 79e83366592..be911409807 100644 --- a/multicluster/test/e2e/antreapolicy_test.go +++ b/multicluster/test/e2e/antreapolicy_test.go @@ -24,10 +24,12 @@ import ( ) const ( - // Provide enough time for policies to be enforced & deleted by the CNI plugin. - networkPolicyDelay = 2 * time.Second - acnpIsolationResourceExport = "test-acnp-copy-span-ns-isolation.yml" - acnpName = "antrea-mc-strict-namespace-isolation" + // Provide enough time for policies to be imported and enforced by the CNI plugin. + policyRealizedTimeout = 6 * time.Second + acnpIsolationResourceExport = "test-acnp-copy-span-ns-isolation.yml" + acnpIsolationName = "antrea-mc-strict-namespace-isolation" + acnpCrossClusterIsolationResExport = "test-acnp-cross-cluster-ns-isolation.yml" + acnpCrossClusterIsolationName = "antrea-mc-strict-namespace-isolation-cross-cluster" ) var ( @@ -70,8 +72,10 @@ func initializeForPolicyTest(t *testing.T, data *MCTestData) { d := data.clusterTestDataMap[clusterName] k8sUtils, err := antreae2e.NewKubernetesUtils(&d) failOnError(err, t) - _, err = k8sUtils.Bootstrap(perClusterNamespaces, perNamespacePods, true) - failOnError(err, t) + if clusterName != leaderCluster { + _, err = k8sUtils.Bootstrap(perClusterNamespaces, perNamespacePods, true) + failOnError(err, t) + } clusterK8sUtilsMap[clusterName] = k8sUtils } } @@ -83,18 +87,12 @@ func tearDownForPolicyTest() { } } -func testMCAntreaPolicy(t *testing.T, data *MCTestData) { - data.testAntreaPolicyCopySpanNSIsolation(t) -} - // testAntreaPolicyCopySpanNSIsolation tests that after applying a ResourceExport of an ACNP // for Namespace isolation, strict Namespace isolation is enforced in each of the member clusters. -func (data *MCTestData) testAntreaPolicyCopySpanNSIsolation(t *testing.T) { +func testAntreaPolicyCopySpanNSIsolation(t *testing.T, data *MCTestData) { setup := func() { err := data.deployACNPResourceExport(t, acnpIsolationResourceExport) failOnError(err, t) - // Sleep 5s to wait resource export/import process to finish resource exchange. - time.Sleep(5 * time.Second) } teardown := func() { err := data.deleteACNPResourceExport(acnpIsolationResourceExport) @@ -114,33 +112,71 @@ func (data *MCTestData) testAntreaPolicyCopySpanNSIsolation(t *testing.T) { Steps: []*antreae2e.TestStep{testStep}, }, } - executeTestsOnAllMemberClusters(t, testCaseList, setup, teardown) + executeTestsOnAllMemberClusters(t, testCaseList, acnpIsolationName, setup, teardown, false) +} + +func testAntreaPolicyCrossClusterNSIsolation(t *testing.T, data *MCTestData) { + setup := func() { + err := data.deployACNPResourceExport(t, acnpCrossClusterIsolationResExport) + failOnError(err, t) + } + teardown := func() { + err := data.deleteACNPResourceExport(acnpCrossClusterIsolationResExport) + failOnError(err, t) + } + reachability := antreae2e.NewReachability(allPodsPerCluster, antreae2e.Dropped) + reachability.ExpectAllSelfNamespace(antreae2e.Connected) + testStep := &antreae2e.TestStep{ + Name: "Port 80", + Reachability: reachability, + Ports: []int32{80}, + Protocol: utils.ProtocolTCP, + } + testCaseList := []*antreae2e.TestCase{ + { + Name: "ACNP strict cross-cluster Namespace isolation", + Steps: []*antreae2e.TestStep{testStep}, + }, + } + executeTestsOnAllMemberClusters(t, testCaseList, acnpCrossClusterIsolationName, setup, teardown, true) } -func executeTestsOnAllMemberClusters(t *testing.T, testList []*antreae2e.TestCase, setup, teardown func()) { +func executeTestsOnAllMemberClusters(t *testing.T, testList []*antreae2e.TestCase, acnpName string, setup, teardown func(), testCrossCluster bool) { setup() - time.Sleep(networkPolicyDelay) for _, testCase := range testList { t.Logf("Running test case %s", testCase.Name) for _, step := range testCase.Steps { t.Logf("Running step %s of test case %s", step.Name, testCase.Name) reachability := step.Reachability - if reachability != nil { - for clusterName, k8sUtils := range clusterK8sUtilsMap { - if clusterName == leaderCluster { - // skip traffic test for the leader cluster - continue - } - if _, err := k8sUtils.GetACNP(acnpName); err != nil { - t.Errorf("Failed to get ACNP to be replicated in cluster %s", clusterName) - } - start := time.Now() - k8sUtils.Validate(allPodsPerCluster, reachability, step.Ports, step.Protocol) - step.Duration = time.Since(start) - _, wrong, _ := step.Reachability.Summary() - if wrong != 0 { - t.Errorf("Failure in cluster %s -- %d wrong results", clusterName, wrong) - reachability.PrintSummary(true, true, true) + for clusterName, k8sUtils := range clusterK8sUtilsMap { + if clusterName == leaderCluster { + // skip verification for the leader cluster + continue + } + if err := k8sUtils.WaitForACNPCreationAndRealization(t, acnpName, policyRealizedTimeout); err != nil { + t.Errorf("Failed to get ACNP to be replicated in cluster %s", clusterName) + failOnError(err, t) + } + start := time.Now() + k8sUtils.Validate(allPodsPerCluster, reachability, step.Ports, step.Protocol) + step.Duration = time.Since(start) + _, wrong, _ := step.Reachability.Summary() + if wrong != 0 { + t.Errorf("Failure in cluster %s -- %d wrong results", clusterName, wrong) + reachability.PrintSummary(true, true, true) + } + if testCrossCluster { + for remoteClusterName, remoteClusterK8s := range clusterK8sUtilsMap { + if remoteClusterName == leaderCluster || remoteClusterName == clusterName { + continue + } + newReachability := reachability.NewReachabilityWithSameExpectations() + k8sUtils.ValidateRemoteCluster(remoteClusterK8s, allPodsPerCluster, newReachability, step.Ports[0], step.Protocol) + _, wrong, _ = newReachability.Summary() + if wrong != 0 { + t.Errorf("Failure from cluster %s to cluster %s -- %d wrong results", clusterName, remoteClusterName, wrong) + newReachability.PrintSummary(true, true, true) + } } } } diff --git a/multicluster/test/e2e/framework.go b/multicluster/test/e2e/framework.go index 8a23289b823..95d29635862 100644 --- a/multicluster/test/e2e/framework.go +++ b/multicluster/test/e2e/framework.go @@ -89,7 +89,7 @@ func (data *MCTestData) createClients() error { } data.clusterTestDataMap = map[string]antreae2e.TestData{} for i, cluster := range data.clusters { - testData := antreae2e.TestData{} + testData := antreae2e.TestData{ClusterName: cluster} if err := testData.CreateClient(kubeConfigPaths[i]); err != nil { return fmt.Errorf("error initializing clients for cluster %s: %v", cluster, err) } diff --git a/multicluster/test/e2e/main_test.go b/multicluster/test/e2e/main_test.go index 42a50db5d5d..b4a812fc048 100644 --- a/multicluster/test/e2e/main_test.go +++ b/multicluster/test/e2e/main_test.go @@ -122,9 +122,10 @@ func TestConnectivity(t *testing.T) { t.Run("TestAntreaPolicy", func(t *testing.T) { defer tearDownForPolicyTest() initializeForPolicyTest(t, data) - testMCAntreaPolicy(t, data) + t.Run("Case=CopySpanNSIsolation", func(t *testing.T) { testAntreaPolicyCopySpanNSIsolation(t, data) }) + t.Run("Case=CrossClusterNSIsolation", func(t *testing.T) { testAntreaPolicyCrossClusterNSIsolation(t, data) }) }) // Wait 5 seconds to let both member and leader controllers clean up all resources, - // otherwise, Namespace deletion may stuck into termininating status. + // otherwise, Namespace deletion may be stuck in terminating status. time.Sleep(5 * time.Second) } diff --git a/multicluster/test/e2e/service_test.go b/multicluster/test/e2e/service_test.go index be2dafd6e73..9edd5310f99 100644 --- a/multicluster/test/e2e/service_test.go +++ b/multicluster/test/e2e/service_test.go @@ -191,6 +191,11 @@ func (data *MCTestData) testANPToServices(t *testing.T) { if _, err := data.createOrUpdateANP(eastCluster, anpBuilder1.Get()); err != nil { t.Fatalf("Error creating ANP %s: %v", anpBuilder1.Name, err) } + eastClusterData := data.clusterTestDataMap[eastCluster] + if err := eastClusterData.WaitForANPCreationAndRealization(t, anpBuilder1.Namespace, anpBuilder1.Name, policyRealizedTimeout); err != nil { + t.Errorf("Failed to wait for ANP %s/%s to be realized in cluster %s", anpBuilder1.Namespace, anpBuilder1.Name, eastCluster) + failOnError(err, t) + } connectivity := data.probeFromPodInCluster(eastCluster, multiClusterTestNamespace, eastGwClientName, "client", eastIP, mcWestClusterTestService, 80, corev1.ProtocolTCP) assert.Equal(t, antreae2e.Dropped, connectivity, "Failure -- wrong result from probing exported Service from gateway clientPod after applying toServices AntreaNetworkPolicy") @@ -213,6 +218,10 @@ func (data *MCTestData) testANPToServices(t *testing.T) { if _, err := data.createOrUpdateANP(eastCluster, anpBuilder2.Get()); err != nil { t.Fatalf("Error creating ANP %s: %v", anpBuilder2.Name, err) } + if err := eastClusterData.WaitForANPCreationAndRealization(t, anpBuilder2.Namespace, anpBuilder2.Name, policyRealizedTimeout); err != nil { + t.Errorf("Failed to wait for ANP %s/%s to be realized in cluster %s", anpBuilder2.Namespace, anpBuilder2.Name, eastCluster) + failOnError(err, t) + } defer data.deleteANP(eastCluster, multiClusterTestNamespace, anpBuilder2.Name) connectivity = data.probeFromPodInCluster(eastCluster, multiClusterTestNamespace, eastGwClientName, "client", eastIP, mcWestClusterTestService, 80, corev1.ProtocolTCP) @@ -242,6 +251,11 @@ func (data *MCTestData) testStretchedNetworkPolicy(t *testing.T) { if _, err := data.createOrUpdateACNP(westCluster, acnpBuilder1.Get()); err != nil { t.Fatalf("Error creating ACNP %s: %v", acnpBuilder1.Name, err) } + westClusterData := data.clusterTestDataMap[westCluster] + if err := westClusterData.WaitForACNPCreationAndRealization(t, acnpBuilder1.Name, policyRealizedTimeout); err != nil { + t.Errorf("Failed to wait for ACNP %s to be realized in cluster %s", acnpBuilder1.Name, westCluster) + failOnError(err, t) + } connectivity := data.probeFromPodInCluster(eastCluster, multiClusterTestNamespace, eastGwClientName, "client", westExpSvcIP, mcWestClusterTestService, 80, corev1.ProtocolTCP) assert.Equal(t, antreae2e.Dropped, connectivity, getStretchedNetworkPolicyErrorMessage(eastGwClientName)) @@ -261,6 +275,10 @@ func (data *MCTestData) testStretchedNetworkPolicy(t *testing.T) { t.Fatalf("Error creating ACNP %s: %v", acnpBuilder2.Name, err) } defer data.deleteACNP(westCluster, acnpBuilder2.Name) + if err := westClusterData.WaitForACNPCreationAndRealization(t, acnpBuilder2.Name, policyRealizedTimeout); err != nil { + t.Errorf("Failed to wait for ACNP %s to be realized in cluster %s", acnpBuilder2.Name, westCluster) + failOnError(err, t) + } connectivity = data.probeFromPodInCluster(eastCluster, multiClusterTestNamespace, eastGwClientName, "client", westExpSvcIP, mcWestClusterTestService, 80, corev1.ProtocolTCP) assert.Equal(t, antreae2e.Dropped, connectivity, getStretchedNetworkPolicyErrorMessage(eastGwClientName)) @@ -287,6 +305,11 @@ func (data *MCTestData) testStretchedNetworkPolicyReject(t *testing.T) { if _, err := data.createOrUpdateACNP(westCluster, acnpBuilder.Get()); err != nil { t.Fatalf("Error creating ACNP %s: %v", acnpBuilder.Name, err) } + westClusterData := data.clusterTestDataMap[westCluster] + if err := westClusterData.WaitForACNPCreationAndRealization(t, acnpBuilder.Name, policyRealizedTimeout); err != nil { + t.Errorf("Failed to wait for ACNP %s to be realized in cluster %s", acnpBuilder.Name, westCluster) + failOnError(err, t) + } defer data.deleteACNP(westCluster, acnpBuilder.Name) testConnectivity := func() { diff --git a/multicluster/test/yamls/test-acnp-cross-cluster-ns-isolation.yml b/multicluster/test/yamls/test-acnp-cross-cluster-ns-isolation.yml new file mode 100644 index 00000000000..b41dcbcd0b9 --- /dev/null +++ b/multicluster/test/yamls/test-acnp-cross-cluster-ns-isolation.yml @@ -0,0 +1,25 @@ +apiVersion: multicluster.crd.antrea.io/v1alpha1 +kind: ResourceExport +metadata: + name: strict-namespace-isolation-cross-cluster + namespace: antrea-multicluster +spec: + kind: AntreaClusterNetworkPolicy + name: strict-namespace-isolation-cross-cluster + clusterNetworkPolicy: + priority: 1 + tier: securityops + appliedTo: + - namespaceSelector: # Selects all non-system Namespaces in the cluster + matchExpressions: + - {key: kubernetes.io/metadata.name, operator: NotIn, values: [kube-system]} + ingress: + - action: Pass + from: + - namespaces: + match: Self + scope: ClusterSet + - action: Drop + from: + - namespaceSelector: {} + scope: ClusterSet diff --git a/pkg/controller/labelidentity/label_group_index.go b/pkg/controller/labelidentity/label_group_index.go index 425d77e5bdd..80216461225 100644 --- a/pkg/controller/labelidentity/label_group_index.go +++ b/pkg/controller/labelidentity/label_group_index.go @@ -53,10 +53,8 @@ type Interface interface { AddSelector(selector *types.GroupSelector, policyKey string) []uint32 // DeleteSelector deletes or updates a selectorItem when a selector is deleted from a policy. DeleteSelector(selectorKey string, policyKey string) - // GetLabelIdentityIDs retrieves the label identity IDs selected by the provided selectorItem keys. - GetLabelIdentityIDs(selectorKey string) []uint32 - // SetPolicySelectors registers a policy's selectors with the index. - SetPolicySelectors(selectors []*types.GroupSelector, policyKey string) []uint32 + // RemoveStalePolicySelectors cleans up any outdated selector <-> policy mapping based on the policy's latest selectors. + RemoveStalePolicySelectors(selectorKeys sets.String, policyKey string) // DeletePolicySelectors removes any selectors from referring to the policy being deleted. DeletePolicySelectors(policyKey string) // AddLabelIdentity adds LabelIdentity-ID mapping to the index. @@ -299,58 +297,42 @@ func (i *LabelIdentityIndex) deleteSelector(selectorKey string, policyKey string } } -// SetPolicySelectors registers ClusterSet-scope policy selectors with the labelIdentityIndex, -// and then retrieves all the LabelIdentity IDs that currently match these selectors. -func (i *LabelIdentityIndex) SetPolicySelectors(selectors []*types.GroupSelector, policyKey string) []uint32 { - var labelIdentityIDs []uint32 - newSelectors := map[string]*types.GroupSelector{} - for _, s := range selectors { - klog.V(4).InfoS("Getting matched LabelIdentity for policy selector", "selector", s.NormalizedName, "policy", policyKey) - newSelectors[s.NormalizedName] = s - } +// RemoveStalePolicySelectors cleans up any outdated selector <-> policy mapping based on the policy's latest selectors. +func (i *LabelIdentityIndex) RemoveStalePolicySelectors(selectorKeys sets.String, policyKey string) { originalSelectors := i.getPolicySelectors(policyKey) - for selKey, sel := range newSelectors { - if _, exists := originalSelectors[selKey]; exists { - // These clusterset-scoped selectors are already bound to the policy in labelIdentityIndex. - // We can simply read matched label identity IDs from the index. - selectedLabelIDs := i.GetLabelIdentityIDs(selKey) - labelIdentityIDs = append(labelIdentityIDs, selectedLabelIDs...) - // Remove matched clusterset-scoped selectors of the policy before and after the update. - // The selectors remaining in originalSelectors will need to be removed from the policy. - delete(originalSelectors, selKey) - } else { - selectedLabelIDs := i.AddSelector(sel, policyKey) - labelIdentityIDs = append(labelIdentityIDs, selectedLabelIDs...) + if originalSelectors == nil { + return + } + if selectorKeys.Len() > 0 { + for selKey := range selectorKeys { + if _, exists := originalSelectors[selKey]; exists { + // Remove matched ClusterSet-scoped selectors of the policy before and after the update. + // The selectors remaining in originalSelectors will need to be unbounded from the policy. + delete(originalSelectors, selKey) + } } } // The policy no longer has these selectors. for selectorKey := range originalSelectors { i.DeleteSelector(selectorKey, policyKey) } - // Dedup label identity IDs in-place. - seen := map[uint32]struct{}{} - idx := 0 - for _, id := range labelIdentityIDs { - if _, exists := seen[id]; !exists { - seen[id] = struct{}{} - labelIdentityIDs[idx] = id - idx++ - } - } - return labelIdentityIDs[:idx] } +// getPolicySelectors retrieves the selectors associated with the policy. func (i *LabelIdentityIndex) getPolicySelectors(policyKey string) map[string]*types.GroupSelector { i.lock.RLock() defer i.lock.RUnlock() - res := map[string]*types.GroupSelector{} selectors, _ := i.selectorItems.ByIndex(policyIndex, policyKey) - for _, s := range selectors { - sel := s.(*selectorItem) - res[sel.getKey()] = sel.selector + if len(selectors) > 0 { + res := map[string]*types.GroupSelector{} + for _, s := range selectors { + sel := s.(*selectorItem) + res[sel.getKey()] = sel.selector + } + return res } - return res + return nil } func (i *LabelIdentityIndex) DeletePolicySelectors(policyKey string) { @@ -364,17 +346,6 @@ func (i *LabelIdentityIndex) DeletePolicySelectors(policyKey string) { } } -func (i *LabelIdentityIndex) GetLabelIdentityIDs(selectorKey string) []uint32 { - i.lock.RLock() - defer i.lock.RUnlock() - - if s, exists, _ := i.selectorItems.GetByKey(selectorKey); exists { - sel := s.(*selectorItem) - return i.getMatchedLabelIdentityIDs(sel) - } - return []uint32{} -} - func (i *LabelIdentityIndex) getMatchedLabelIdentityIDs(sItem *selectorItem) []uint32 { var ids []uint32 for lKey := range sItem.labelIdentityKeys { diff --git a/pkg/controller/labelidentity/label_group_index_test.go b/pkg/controller/labelidentity/label_group_index_test.go index f2f4c14e788..921221be13f 100644 --- a/pkg/controller/labelidentity/label_group_index_test.go +++ b/pkg/controller/labelidentity/label_group_index_test.go @@ -331,10 +331,18 @@ func TestSetPolicySelectors(t *testing.T) { i.AddLabelIdentity(labelB, 2) i.AddLabelIdentity(labelC, 3) if tt.prevPolicyAdded != "" { - i.SetPolicySelectors(tt.prevSelAdded, tt.prevPolicyAdded) + for _, sel := range tt.prevSelAdded { + i.AddSelector(sel, tt.prevPolicyAdded) + } + } + var labelIDs []uint32 + selectorKeys := sets.NewString() + for _, sel := range tt.selectors { + labelIDs = append(labelIDs, i.AddSelector(sel, tt.policyKey)...) + selectorKeys.Insert(sel.NormalizedName) } - matchedIDs := i.SetPolicySelectors(tt.selectors, tt.policyKey) - assert.ElementsMatch(t, tt.expIDs, matchedIDs) + i.RemoveStalePolicySelectors(selectorKeys, tt.policyKey) + assert.ElementsMatch(t, tt.expIDs, dedupLabelIdentites(labelIDs)) assert.Equalf(t, len(tt.expSelectorItems), len(i.selectorItems.List()), "Unexpected number of cached selectorItems") for selKey, expSelItem := range tt.expSelectorItems { s, exists, _ := i.selectorItems.GetByKey(selKey) @@ -349,6 +357,20 @@ func TestSetPolicySelectors(t *testing.T) { } } +// Dedup LabelIdentity IDs in-place. +func dedupLabelIdentites(labelIdentityIDs []uint32) []uint32 { + seen := map[uint32]struct{}{} + idx := 0 + for _, id := range labelIdentityIDs { + if _, exists := seen[id]; !exists { + seen[id] = struct{}{} + labelIdentityIDs[idx] = id + idx++ + } + } + return labelIdentityIDs[:idx] +} + func TestAddLabelIdentity(t *testing.T) { labelIdentityAOriginalID := uint32(1) tests := []struct { @@ -412,10 +434,12 @@ func TestAddLabelIdentity(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { i := NewLabelIdentityIndex() - i.SetPolicySelectors([]*types.GroupSelector{selectorA, selectorC}, "policyA") - i.SetPolicySelectors([]*types.GroupSelector{selectorB, selectorC}, "policyB") - i.SetPolicySelectors([]*types.GroupSelector{selectorD}, "policyD") - i.SetPolicySelectors([]*types.GroupSelector{selectorE}, "policyE") + i.AddSelector(selectorA, "policyA") + i.AddSelector(selectorC, "policyA") + i.AddSelector(selectorB, "policyB") + i.AddSelector(selectorC, "policyB") + i.AddSelector(selectorD, "policyD") + i.AddSelector(selectorE, "policyE") stopCh := make(chan struct{}) defer close(stopCh) @@ -504,10 +528,12 @@ func TestDeleteLabelIdentity(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { i := NewLabelIdentityIndex() - i.SetPolicySelectors([]*types.GroupSelector{selectorA, selectorC}, "policyA") - i.SetPolicySelectors([]*types.GroupSelector{selectorB, selectorC}, "policyB") - i.SetPolicySelectors([]*types.GroupSelector{selectorD}, "policyD") - i.SetPolicySelectors([]*types.GroupSelector{selectorE}, "policyE") + i.AddSelector(selectorA, "policyA") + i.AddSelector(selectorC, "policyA") + i.AddSelector(selectorB, "policyB") + i.AddSelector(selectorC, "policyB") + i.AddSelector(selectorD, "policyD") + i.AddSelector(selectorE, "policyE") stopCh := make(chan struct{}) defer close(stopCh) diff --git a/pkg/controller/networkpolicy/antreanetworkpolicy.go b/pkg/controller/networkpolicy/antreanetworkpolicy.go index 8dfeab4a545..8a026633b5f 100644 --- a/pkg/controller/networkpolicy/antreanetworkpolicy.go +++ b/pkg/controller/networkpolicy/antreanetworkpolicy.go @@ -86,6 +86,11 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net appliedToGroups := map[string]*antreatypes.AppliedToGroup{} addressGroups := map[string]*antreatypes.AddressGroup{} rules := make([]controlplane.NetworkPolicyRule, 0, len(np.Spec.Ingress)+len(np.Spec.Egress)) + // clusterSetScopeSelectorKeys keeps track of all the ClusterSet-scoped selector keys of the policy. + // During policy peer processing, any ClusterSet-scoped selector will be registered with the + // labelIdentityInterface and added to this set. By the end of the function, this set will + // be used to remove any stale selector from the policy in the labelIdentityInterface. + var clusterSetScopeSelectorKeys sets.String // Create AppliedToGroup for each AppliedTo present in AntreaNetworkPolicy spec. atgs := n.processAppliedTo(np.Namespace, np.Spec.AppliedTo) appliedToGroups = mergeAppliedToGroups(appliedToGroups, atgs...) @@ -96,7 +101,10 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net // Create AppliedToGroup for each AppliedTo present in the ingress rule. atgs := n.processAppliedTo(np.Namespace, ingressRule.AppliedTo) appliedToGroups = mergeAppliedToGroups(appliedToGroups, atgs...) - peer, ags := n.toAntreaPeerForCRD(ingressRule.From, np, controlplane.DirectionIn, namedPortExists) + peer, ags, selKeys := n.toAntreaPeerForCRD(ingressRule.From, np, controlplane.DirectionIn, namedPortExists) + if selKeys != nil { + clusterSetScopeSelectorKeys = clusterSetScopeSelectorKeys.Union(selKeys) + } addressGroups = mergeAddressGroups(addressGroups, ags...) rules = append(rules, controlplane.NetworkPolicyRule{ Direction: controlplane.DirectionIn, @@ -122,8 +130,12 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net peer = n.svcRefToPeerForCRD(egressRule.ToServices, np.Namespace) } else { var ags []*antreatypes.AddressGroup - peer, ags = n.toAntreaPeerForCRD(egressRule.To, np, controlplane.DirectionOut, namedPortExists) + var selKeys sets.String + peer, ags, selKeys = n.toAntreaPeerForCRD(egressRule.To, np, controlplane.DirectionOut, namedPortExists) addressGroups = mergeAddressGroups(addressGroups, ags...) + if selKeys != nil { + clusterSetScopeSelectorKeys = clusterSetScopeSelectorKeys.Union(selKeys) + } } rules = append(rules, controlplane.NetworkPolicyRule{ Direction: controlplane.DirectionOut, @@ -154,6 +166,9 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net TierPriority: &tierPriority, AppliedToPerRule: appliedToPerRule, } + if n.stretchNPEnabled { + n.labelIdentityInterface.RemoveStalePolicySelectors(clusterSetScopeSelectorKeys, internalNetworkPolicyKeyFunc(np)) + } return internalNetworkPolicy, appliedToGroups, addressGroups } diff --git a/pkg/controller/networkpolicy/clusternetworkpolicy.go b/pkg/controller/networkpolicy/clusternetworkpolicy.go index 98984e91db5..e249d3feed5 100644 --- a/pkg/controller/networkpolicy/clusternetworkpolicy.go +++ b/pkg/controller/networkpolicy/clusternetworkpolicy.go @@ -339,6 +339,11 @@ func (n *NetworkPolicyController) processClusterNetworkPolicy(cnp *crdv1alpha1.C var clusterAppliedToAffectedNS []string // atgForNamespace is the appliedToGroups split by Namespaces. var atgForNamespace []*antreatypes.AppliedToGroup + // clusterSetScopeSelectorKeys keeps track of all the ClusterSet-scoped selector keys of the policy. + // During policy peer processing, any ClusterSet-scoped selector will be registered with the + // labelIdentityInterface and added to this set. By the end of the function, this set will + // be used to remove any stale selector from the policy in the labelIdentityInterface. + var clusterSetScopeSelectorKeys sets.String if hasPerNamespaceRule && len(cnp.Spec.AppliedTo) > 0 { for _, at := range cnp.Spec.AppliedTo { if at.ServiceAccount != nil { @@ -396,7 +401,10 @@ func (n *NetworkPolicyController) processClusterNetworkPolicy(cnp *crdv1alpha1.C if cnpRule.ToServices != nil { addRule(n.svcRefToPeerForCRD(cnpRule.ToServices, ""), nil, direction, ruleATGs) } else { - peer, ags := n.toAntreaPeerForCRD(clusterPeers, cnp, direction, namedPortExists) + peer, ags, selKeys := n.toAntreaPeerForCRD(clusterPeers, cnp, direction, namedPortExists) + if selKeys != nil { + clusterSetScopeSelectorKeys = clusterSetScopeSelectorKeys.Union(selKeys) + } addRule(peer, ags, direction, ruleATGs) } } @@ -405,7 +413,8 @@ func (n *NetworkPolicyController) processClusterNetworkPolicy(cnp *crdv1alpha1.C // Create a rule for each affected Namespace of appliedTo at spec level for i := range clusterAppliedToAffectedNS { klog.V(4).Infof("Adding a new per-namespace rule with appliedTo %v for rule %d of %s", clusterAppliedToAffectedNS[i], idx, cnp.Name) - peer, ags := n.toNamespacedPeerForCRD(perNSPeers, clusterAppliedToAffectedNS[i]) + peer, ags, selKeys := n.toNamespacedPeerForCRD(perNSPeers, cnp, clusterAppliedToAffectedNS[i]) + clusterSetScopeSelectorKeys = clusterSetScopeSelectorKeys.Union(selKeys) addRule(peer, ags, direction, []*antreatypes.AppliedToGroup{atgForNamespace[i]}) } } else { @@ -414,14 +423,16 @@ func (n *NetworkPolicyController) processClusterNetworkPolicy(cnp *crdv1alpha1.C if at.ServiceAccount != nil { atg := n.createAppliedToGroup(at.ServiceAccount.Namespace, serviceAccountNameToPodSelector(at.ServiceAccount.Name), nil, nil) klog.V(4).Infof("Adding a new per-namespace rule with appliedTo %v for rule %d of %s", atg, idx, cnp.Name) - peer, ags := n.toNamespacedPeerForCRD(perNSPeers, at.ServiceAccount.Namespace) + peer, ags, selKeys := n.toNamespacedPeerForCRD(perNSPeers, cnp, at.ServiceAccount.Namespace) + clusterSetScopeSelectorKeys = clusterSetScopeSelectorKeys.Union(selKeys) addRule(peer, ags, direction, []*antreatypes.AppliedToGroup{atg}) } else { affectedNS := n.getAffectedNamespacesForAppliedTo(at) for _, ns := range affectedNS { atg := n.createAppliedToGroup(ns, at.PodSelector, nil, at.ExternalEntitySelector) klog.V(4).Infof("Adding a new per-namespace rule with appliedTo %v for rule %d of %s", atg, idx, cnp.Name) - peer, ags := n.toNamespacedPeerForCRD(perNSPeers, ns) + peer, ags, selKeys := n.toNamespacedPeerForCRD(perNSPeers, cnp, ns) + clusterSetScopeSelectorKeys = clusterSetScopeSelectorKeys.Union(selKeys) addRule(peer, ags, direction, []*antreatypes.AppliedToGroup{atg}) } } @@ -454,6 +465,9 @@ func (n *NetworkPolicyController) processClusterNetworkPolicy(cnp *crdv1alpha1.C TierPriority: &tierPriority, AppliedToPerRule: appliedToPerRule, } + if n.stretchNPEnabled { + n.labelIdentityInterface.RemoveStalePolicySelectors(clusterSetScopeSelectorKeys, internalNetworkPolicyKeyFunc(cnp)) + } return internalNetworkPolicy, appliedToGroups, addressGroups } diff --git a/pkg/controller/networkpolicy/crd_utils.go b/pkg/controller/networkpolicy/crd_utils.go index 2a359e6210b..51eacd5db6f 100644 --- a/pkg/controller/networkpolicy/crd_utils.go +++ b/pkg/controller/networkpolicy/crd_utils.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" "antrea.io/antrea/multicluster/controllers/multicluster/common" @@ -136,11 +137,13 @@ func toAntreaIPBlockForCRD(ipBlock *v1alpha1.IPBlock) (*controlplane.IPBlock, er return antreaIPBlock, nil } -// toAntreaPeerForCRD creates a Antrea controlplane NetworkPolicyPeer for crdv1alpha1 NetworkPolicyPeer. +// toAntreaPeerForCRD creates an Antrea controlplane NetworkPolicyPeer for crdv1alpha1 NetworkPolicyPeer. // It is used when peer's Namespaces are not matched by NamespaceMatchTypes, for which the controlplane -// NetworkPolicyPeers will need to be created on a per Namespace basis. +// NetworkPolicyPeers will need to be created on a per-Namespace basis. +// Any ClusterSet scoped selector in this peer will also be registered with the labelIdentityInterface +// for the policy. func (n *NetworkPolicyController) toAntreaPeerForCRD(peers []v1alpha1.NetworkPolicyPeer, - np metav1.Object, dir controlplane.Direction, namedPortExists bool) (*controlplane.NetworkPolicyPeer, []*antreatypes.AddressGroup) { + np metav1.Object, dir controlplane.Direction, namedPortExists bool) (*controlplane.NetworkPolicyPeer, []*antreatypes.AddressGroup, sets.String) { var addressGroups []*antreatypes.AddressGroup // NetworkPolicyPeer is supposed to match all addresses when it is empty and no clusterGroup is present. // It's treated as an IPBlock "0.0.0.0/0". @@ -152,17 +155,19 @@ func (n *NetworkPolicyController) toAntreaPeerForCRD(peers []v1alpha1.NetworkPol // For other cases it uses the IPBlock "0.0.0.0/0" to avoid the overhead // of handling member updates of the AddressGroup. if dir == controlplane.DirectionIn || !namedPortExists { - return &matchAllPeer, nil + return &matchAllPeer, nil, nil } allPodsGroup := n.createAddressGroup("", matchAllPodsPeerCrd.PodSelector, matchAllPodsPeerCrd.NamespaceSelector, nil, nil) addressGroups = append(addressGroups, allPodsGroup) podsPeer := matchAllPeer podsPeer.AddressGroups = append(podsPeer.AddressGroups, allPodsGroup.Name) - return &podsPeer, addressGroups + return &podsPeer, addressGroups, nil } var ipBlocks []controlplane.IPBlock var fqdns []string - var clusterSetScopeSelectors []*antreatypes.GroupSelector + var labelIdentities []uint32 + uniqueLabelIDs := map[uint32]struct{}{} + clusterSetScopeSelectorKeys := sets.NewString() for _, peer := range peers { // A v1alpha1.NetworkPolicyPeer will have exactly one of the following fields set: // - podSelector and/or namespaceSelector (in-cluster scope or ClusterSet scope) @@ -194,27 +199,57 @@ func (n *NetworkPolicyController) toAntreaPeerForCRD(peers []v1alpha1.NetworkPol addressGroup := n.createAddressGroup(np.GetNamespace(), peer.PodSelector, peer.NamespaceSelector, peer.ExternalEntitySelector, nil) addressGroups = append(addressGroups, addressGroup) } - if peer.Scope == v1alpha1.ScopeClusterSet { - clusterSetScopeSelectors = append(clusterSetScopeSelectors, antreatypes.NewGroupSelector(np.GetNamespace(), peer.PodSelector, peer.NamespaceSelector, nil, nil)) + if n.stretchNPEnabled && peer.Scope == v1alpha1.ScopeClusterSet { + newClusterSetScopeSelector := antreatypes.NewGroupSelector(np.GetNamespace(), peer.PodSelector, peer.NamespaceSelector, nil, nil) + clusterSetScopeSelectorKeys.Insert(newClusterSetScopeSelector.NormalizedName) + // In addition to getting the matched Label Identity IDs, AddSelector also registers the selector + // with the labelIdentityInterface. + matchedLabelIDs := n.labelIdentityInterface.AddSelector(newClusterSetScopeSelector, internalNetworkPolicyKeyFunc(np)) + for _, id := range matchedLabelIDs { + uniqueLabelIDs[id] = struct{}{} + } } } - var labelIdentities []uint32 - if n.stretchNPEnabled { - labelIdentities = n.labelIdentityInterface.SetPolicySelectors(clusterSetScopeSelectors, internalNetworkPolicyKeyFunc(np)) + for id := range uniqueLabelIDs { + labelIdentities = append(labelIdentities, id) } - return &controlplane.NetworkPolicyPeer{AddressGroups: getAddressGroupNames(addressGroups), IPBlocks: ipBlocks, FQDNs: fqdns, LabelIdentities: labelIdentities}, addressGroups + return &controlplane.NetworkPolicyPeer{ + AddressGroups: getAddressGroupNames(addressGroups), + IPBlocks: ipBlocks, + FQDNs: fqdns, + LabelIdentities: labelIdentities, + }, addressGroups, clusterSetScopeSelectorKeys } // toNamespacedPeerForCRD creates an Antrea controlplane NetworkPolicyPeer for crdv1alpha1 NetworkPolicyPeer // for a particular Namespace. It is used when a single crdv1alpha1 NetworkPolicyPeer maps to multiple // controlplane NetworkPolicyPeers because the appliedTo workloads reside in different Namespaces. -func (n *NetworkPolicyController) toNamespacedPeerForCRD(peers []v1alpha1.NetworkPolicyPeer, namespace string) (*controlplane.NetworkPolicyPeer, []*antreatypes.AddressGroup) { +func (n *NetworkPolicyController) toNamespacedPeerForCRD(peers []v1alpha1.NetworkPolicyPeer, + np metav1.Object, namespace string) (*controlplane.NetworkPolicyPeer, []*antreatypes.AddressGroup, sets.String) { var addressGroups []*antreatypes.AddressGroup + var labelIdentities []uint32 + uniqueLabelIDs := map[uint32]struct{}{} + clusterSetScopeSelectorKeys := sets.NewString() for _, peer := range peers { addressGroup := n.createAddressGroup(namespace, peer.PodSelector, nil, peer.ExternalEntitySelector, nil) addressGroups = append(addressGroups, addressGroup) + if n.stretchNPEnabled && peer.Scope == v1alpha1.ScopeClusterSet { + newClusterSetScopeSelector := antreatypes.NewGroupSelector(namespace, peer.PodSelector, nil, peer.ExternalEntitySelector, nil) + clusterSetScopeSelectorKeys.Insert(newClusterSetScopeSelector.NormalizedName) + // In addition to getting the matched Label Identity IDs, AddSelector also registers the selector + // with the labelIdentityInterface. + matchedLabelIDs := n.labelIdentityInterface.AddSelector(newClusterSetScopeSelector, internalNetworkPolicyKeyFunc(np)) + for _, id := range matchedLabelIDs { + uniqueLabelIDs[id] = struct{}{} + } + } + } + for id := range uniqueLabelIDs { + labelIdentities = append(labelIdentities, id) } - return &controlplane.NetworkPolicyPeer{AddressGroups: getAddressGroupNames(addressGroups)}, addressGroups + return &controlplane.NetworkPolicyPeer{ + AddressGroups: getAddressGroupNames(addressGroups), LabelIdentities: labelIdentities, + }, addressGroups, clusterSetScopeSelectorKeys } // svcRefToPeerForCRD creates an Antrea controlplane NetworkPolicyPeer from ServiceReferences in ToServices diff --git a/pkg/controller/networkpolicy/crd_utils_test.go b/pkg/controller/networkpolicy/crd_utils_test.go index efd9c42616a..9391e899c69 100644 --- a/pkg/controller/networkpolicy/crd_utils_test.go +++ b/pkg/controller/networkpolicy/crd_utils_test.go @@ -475,7 +475,7 @@ func TestToAntreaPeerForCRD(t *testing.T) { npc.labelIdentityInterface.AddLabelIdentity(labelIdentityA, 1) npc.labelIdentityInterface.AddLabelIdentity(labelIdentityB, 2) } - actualPeer, _ := npc.toAntreaPeerForCRD(tt.inPeers, testCNPObj, tt.direction, tt.namedPortExists) + actualPeer, _, _ := npc.toAntreaPeerForCRD(tt.inPeers, testCNPObj, tt.direction, tt.namedPortExists) if !reflect.DeepEqual(tt.outPeer.AddressGroups, actualPeer.AddressGroups) { t.Errorf("Unexpected AddressGroups in Antrea Peer conversion. Expected %v, got %v", tt.outPeer.AddressGroups, actualPeer.AddressGroups) } diff --git a/test/e2e/antreapolicy_test.go b/test/e2e/antreapolicy_test.go index ad070237839..9df53764c78 100644 --- a/test/e2e/antreapolicy_test.go +++ b/test/e2e/antreapolicy_test.go @@ -2577,7 +2577,7 @@ func testAuditLoggingBasic(t *testing.T, data *TestData) { wg.Add(1) go func() { defer wg.Done() - k8sUtils.Probe(ns1, pod1, ns2, pod2, p80, ProtocolTCP) + k8sUtils.Probe(ns1, pod1, ns2, pod2, p80, ProtocolTCP, nil) }() } oneProbe(namespaces["x"], "a", namespaces["z"], "a") @@ -2669,7 +2669,7 @@ func testAuditLoggingEnableNP(t *testing.T, data *TestData) { wg.Add(1) go func() { defer wg.Done() - k8sUtils.Probe(ns1, pod1, ns2, pod2, p80, ProtocolTCP) + k8sUtils.Probe(ns1, pod1, ns2, pod2, p80, ProtocolTCP, nil) }() } oneProbe(namespaces["x"], "b", namespaces["x"], "a") @@ -4014,7 +4014,7 @@ func doProbe(t *testing.T, data *TestData, p *CustomProbe, protocol AntreaPolicy _, _, dstPodCleanupFunc := createAndWaitForPodWithLabels(t, data, data.createServerPodWithLabels, p.DestPod.Pod.PodName(), p.DestPod.Pod.Namespace(), p.Port, p.DestPod.Labels) defer dstPodCleanupFunc() log.Tracef("Probing: %s -> %s", p.SourcePod.Pod.PodName(), p.DestPod.Pod.PodName()) - connectivity, err := k8sUtils.Probe(p.SourcePod.Pod.Namespace(), p.SourcePod.Pod.PodName(), p.DestPod.Pod.Namespace(), p.DestPod.Pod.PodName(), p.Port, protocol) + connectivity, err := k8sUtils.Probe(p.SourcePod.Pod.Namespace(), p.SourcePod.Pod.PodName(), p.DestPod.Pod.Namespace(), p.DestPod.Pod.PodName(), p.Port, protocol, nil) if err != nil { t.Errorf("failure -- could not complete probe: %v", err) } diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 7f12fcc2d37..c73be67cdcb 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -207,6 +207,7 @@ type podInfo struct { // TestData stores the state required for each test case. type TestData struct { + ClusterName string provider providers.ProviderInterface kubeConfig *restclient.Config clientset kubernetes.Interface diff --git a/test/e2e/k8s_util.go b/test/e2e/k8s_util.go index 707f66a35d5..2a616a45761 100644 --- a/test/e2e/k8s_util.go +++ b/test/e2e/k8s_util.go @@ -19,6 +19,7 @@ import ( "fmt" "strings" "sync" + "testing" "time" "github.com/pkg/errors" @@ -28,6 +29,7 @@ import ( v1net "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" crdv1alpha2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" @@ -79,6 +81,13 @@ type CustomProbe struct { ExpectConnectivity PodConnectivityMark } +type probeResult struct { + podFrom Pod + podTo Pod + connectivity PodConnectivityMark + err error +} + // GetPodByLabel returns a Pod with the matching Namespace and "pod" label. func (k *KubernetesUtils) GetPodByLabel(ns string, name string) (*v1.Pod, error) { pods, err := k.getPodsUncached(ns, "pod", name) @@ -209,11 +218,11 @@ func (k *KubernetesUtils) pingProbe( log.Tracef("Running: kubectl exec %s -c %s -n %s -- %s", pod.Name, containerName, pod.Namespace, strings.Join(cmd, " ")) stdout, stderr, err := k.RunCommandFromPod(pod.Namespace, pod.Name, containerName, cmd) log.Tracef("%s -> %s: error when running command: err - %v /// stdout - %s /// stderr - %s", podName, dstName, err, stdout, stderr) - return DecidePingProbeResult(stdout, 3) + return decidePingProbeResult(stdout, 3) } -// DecidePingProbeResult uses the pingProbe stdout to decide the connectivity. -func DecidePingProbeResult(stdout string, probeNum int) PodConnectivityMark { +// decidePingProbeResult uses the pingProbe stdout to decide the connectivity. +func decidePingProbeResult(stdout string, probeNum int) PodConnectivityMark { // Provide stdout example for different connectivity: // ================== Connected stdout ================== // PING 10.10.1.2 (10.10.1.2) 56(84) bytes of data. @@ -265,11 +274,12 @@ func DecidePingProbeResult(stdout string, probeNum int) PodConnectivityMark { return Error } -// Probe execs into a Pod and checks its connectivity to another Pod. Of course it -// assumes that the target Pod is serving on the input port, and also that agnhost -// is installed. The connectivity from source Pod to all IPs of the target Pod +// Probe execs into a Pod and checks its connectivity to another Pod. It assumes +// that the target Pod is serving on the input port, and also that agnhost is +// installed. The connectivity from source Pod to all IPs of the target Pod // should be consistent. Otherwise, Error PodConnectivityMark will be returned. -func (k *KubernetesUtils) Probe(ns1, pod1, ns2, pod2 string, port int32, protocol utils.AntreaPolicyProtocol) (PodConnectivityMark, error) { +func (k *KubernetesUtils) Probe(ns1, pod1, ns2, pod2 string, port int32, protocol utils.AntreaPolicyProtocol, + remoteCluster *KubernetesUtils) (PodConnectivityMark, error) { fromPods, err := k.GetPodsByLabel(ns1, "pod", pod1) if err != nil { return Error, fmt.Errorf("unable to get Pods from Namespace %s: %v", ns1, err) @@ -279,15 +289,28 @@ func (k *KubernetesUtils) Probe(ns1, pod1, ns2, pod2 string, port int32, protoco } fromPod := fromPods[0] - toPods, err := k.GetPodsByLabel(ns2, "pod", pod2) + var toPods []v1.Pod + var clusterType string + if remoteCluster != nil { + toPods, err = remoteCluster.GetPodsByLabel(ns2, "pod", pod2) + clusterType = "remote" + } else { + toPods, err = k.GetPodsByLabel(ns2, "pod", pod2) + clusterType = "local" + } if err != nil { - return Error, fmt.Errorf("unable to get Pods from Namespace %s: %v", ns2, err) + return Error, fmt.Errorf("unable to get Pods from Namespace %s in %s cluster: %v", ns2, clusterType, err) } if len(toPods) == 0 { - return Error, fmt.Errorf("no Pod of label pod=%s in Namespace %s found", pod2, ns2) + return Error, fmt.Errorf("no Pod of label pod=%s in Namespace %s found in %s cluster", pod2, ns2, clusterType) } toPod := toPods[0] + fromPodName, toPodName := fmt.Sprintf("%s/%s", ns1, pod1), fmt.Sprintf("%s/%s", ns2, pod2) + return k.prodeAndDecideConnectivity(fromPod, toPod, fromPodName, toPodName, port, protocol) +} +func (k *KubernetesUtils) prodeAndDecideConnectivity(fromPod, toPod v1.Pod, + fromPodName, toPodName string, port int32, protocol utils.AntreaPolicyProtocol) (PodConnectivityMark, error) { // Both IPv4 and IPv6 address should be tested. connectivity := Unknown for _, eachIP := range toPod.Status.PodIPs { @@ -296,9 +319,9 @@ func (k *KubernetesUtils) Probe(ns1, pod1, ns2, pod2 string, port int32, protoco if strings.Contains(toIP, ":") { toIP = fmt.Sprintf("[%s]", toIP) } - // HACK: inferring container name as c80, c81, etc, for simplicity. + // HACK: inferring container name as c80, c81 etc., for simplicity. containerName := fmt.Sprintf("c%v", port) - curConnectivity := k.probe(&fromPod, fmt.Sprintf("%s/%s", ns1, pod1), containerName, toIP, fmt.Sprintf("%s/%s", ns2, pod2), port, protocol) + curConnectivity := k.probe(&fromPod, fromPodName, containerName, toIP, toPodName, port, protocol) if connectivity == Unknown { connectivity = curConnectivity } else if connectivity != curConnectivity { @@ -959,6 +982,34 @@ func (data *TestData) CleanANPs(namespaces []string) error { return nil } +func (data *TestData) WaitForANPCreationAndRealization(t *testing.T, namespace string, name string, timeout time.Duration) error { + t.Logf("Waiting for ANP '%s/%s' to be realized", namespace, name) + if err := wait.Poll(100*time.Millisecond, timeout, func() (bool, error) { + anp, err := data.crdClient.CrdV1alpha1().NetworkPolicies(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return false, nil + } + return anp.Status.ObservedGeneration == anp.Generation && anp.Status.Phase == crdv1alpha1.NetworkPolicyRealized, nil + }); err != nil { + return fmt.Errorf("error when waiting for ANP '%s/%s' to be realized: %v", namespace, name, err) + } + return nil +} + +func (data *TestData) WaitForACNPCreationAndRealization(t *testing.T, name string, timeout time.Duration) error { + t.Logf("Waiting for ACNP '%s' to be created and realized", name) + if err := wait.Poll(100*time.Millisecond, timeout, func() (bool, error) { + acnp, err := data.crdClient.CrdV1alpha1().ClusterNetworkPolicies().Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return false, nil + } + return acnp.Status.ObservedGeneration == acnp.Generation && acnp.Status.Phase == crdv1alpha1.NetworkPolicyRealized, nil + }); err != nil { + return fmt.Errorf("error when waiting for ACNP '%s' to be realized: %v", name, err) + } + return nil +} + func (k *KubernetesUtils) waitForPodInNamespace(ns string, pod string) ([]string, error) { log.Infof("Waiting for Pod '%s/%s'", ns, pod) for { @@ -1018,18 +1069,12 @@ func (k *KubernetesUtils) waitForHTTPServers(allPods []Pod) error { } func (k *KubernetesUtils) validateOnePort(allPods []Pod, reachability *Reachability, port int32, protocol utils.AntreaPolicyProtocol) { - type probeResult struct { - podFrom Pod - podTo Pod - connectivity PodConnectivityMark - err error - } numProbes := len(allPods) * len(allPods) resultsCh := make(chan *probeResult, numProbes) // TODO: find better metrics, this is only for POC. oneProbe := func(podFrom, podTo Pod, port int32) { log.Tracef("Probing: %s -> %s", podFrom, podTo) - connectivity, err := k.Probe(podFrom.Namespace(), podFrom.PodName(), podTo.Namespace(), podTo.PodName(), port, protocol) + connectivity, err := k.Probe(podFrom.Namespace(), podFrom.PodName(), podTo.Namespace(), podTo.PodName(), port, protocol, nil) resultsCh <- &probeResult{podFrom, podTo, connectivity, err} } for _, pod1 := range allPods { @@ -1079,6 +1124,34 @@ func (k *KubernetesUtils) Validate(allPods []Pod, reachability *Reachability, po } } +func (k *KubernetesUtils) ValidateRemoteCluster(remoteCluster *KubernetesUtils, allPods []Pod, reachability *Reachability, port int32, protocol utils.AntreaPolicyProtocol) { + numProbes := len(allPods) * len(allPods) + resultsCh := make(chan *probeResult, numProbes) + oneProbe := func(podFrom, podTo Pod, port int32) { + log.Tracef("Probing: %s -> %s", podFrom, podTo) + connectivity, err := k.Probe(podFrom.Namespace(), podFrom.PodName(), podTo.Namespace(), podTo.PodName(), port, protocol, remoteCluster) + resultsCh <- &probeResult{podFrom, podTo, connectivity, err} + } + for _, pod1 := range allPods { + for _, pod2 := range allPods { + go oneProbe(pod1, pod2, port) + } + } + for i := 0; i < numProbes; i++ { + r := <-resultsCh + if r.err != nil { + log.Errorf("unable to perform probe %s -> %s in %s: %v", r.podFrom, r.podTo, k.ClusterName, r.err) + } + prevConn := reachability.Observed.Get(r.podFrom.String(), r.podTo.String()) + if prevConn == Unknown { + reachability.Observe(r.podFrom, r.podTo, r.connectivity) + } + if r.connectivity != Connected && reachability.Expected.Get(r.podFrom.String(), r.podTo.String()) == Connected { + log.Warnf("FAILED CONNECTION FOR ALLOWED PODS %s -> %s:%d:%s in %s !!!! ", r.podFrom, r.podTo, port, protocol, k.ClusterName) + } + } +} + func (k *KubernetesUtils) Bootstrap(namespaces map[string]string, pods []string, createNamespaces bool) (*map[string][]string, error) { for _, ns := range namespaces { if createNamespaces { diff --git a/test/e2e/reachability.go b/test/e2e/reachability.go index 4a781307030..0bf04418db5 100644 --- a/test/e2e/reachability.go +++ b/test/e2e/reachability.go @@ -230,6 +230,19 @@ func NewReachability(pods []Pod, defaultExpectation PodConnectivityMark) *Reacha return r } +func (r *Reachability) NewReachabilityWithSameExpectations() *Reachability { + var items []string + for _, pod := range r.Pods { + items = append(items, string(pod)) + } + return &Reachability{ + Expected: r.Expected, + Observed: NewConnectivityTable(items, nil), + Pods: r.Pods, + PodsByNamespace: r.PodsByNamespace, + } +} + // ExpectConn is an experimental way to describe connectivity with named fields func (r *Reachability) ExpectConn(spec *Connectivity) { if spec.From == "" && spec.To == "" {