diff --git a/npm/pkg/dataplane/dataplane-test-cases_windows_test.go b/npm/pkg/dataplane/dataplane-test-cases_windows_test.go index 58cff377f8..6aae19e33f 100644 --- a/npm/pkg/dataplane/dataplane-test-cases_windows_test.go +++ b/npm/pkg/dataplane/dataplane-test-cases_windows_test.go @@ -264,7 +264,7 @@ func getAllSerialTests() []*SerialTestCase { }, }, { - Description: "pod created off node (no local endpoint), then relevant policy created", + Description: "pod created off node (no endpoint), then relevant policy created", Actions: []*Action{ CreatePod("x", "a", ip1, otherNode, map[string]string{"k1": "v1"}), // will apply dirty ipsets from CreatePod @@ -287,6 +287,33 @@ func getAllSerialTests() []*SerialTestCase { ExpectedEnpdointACLs: nil, }, }, + { + Description: "pod created off node (remote endpoint), then relevant policy created", + Actions: []*Action{ + CreateRemoteEndpoint(endpoint1, ip1), + CreatePod("x", "a", ip1, otherNode, map[string]string{"k1": "v1"}), + // will apply dirty ipsets from CreatePod + UpdatePolicy(policyXBaseOnK1V1()), + }, + TestCaseMetadata: &TestCaseMetadata{ + Tags: []Tag{ + podCrudTag, + netpolCrudTag, + }, + DpCfg: defaultWindowsDPCfg, + InitialEndpoints: nil, + ExpectedSetPolicies: []*hcn.SetPolicySetting{ + dptestutils.SetPolicy(emptySet), + dptestutils.SetPolicy(allNamespaces, emptySet.GetHashedName(), nsXSet.GetHashedName()), + dptestutils.SetPolicy(nsXSet, ip1), + dptestutils.SetPolicy(podK1Set, ip1), + dptestutils.SetPolicy(podK1V1Set, ip1), + }, + ExpectedEnpdointACLs: map[string][]*hnswrapper.FakeEndpointPolicy{ + endpoint1: {}, + }, + }, + }, { Description: "policy created, then pod created which satisfies policy", Actions: []*Action{ @@ -337,6 +364,57 @@ func getAllSerialTests() []*SerialTestCase { }, }, }, + { + Description: "policy created, then pod created off node (no endpoint) which satisfies policy", + Actions: []*Action{ + UpdatePolicy(policyXBaseOnK1V1()), + CreatePod("x", "a", ip1, otherNode, map[string]string{"k1": "v1"}), + ApplyDP(), + }, + TestCaseMetadata: &TestCaseMetadata{ + Tags: []Tag{ + podCrudTag, + netpolCrudTag, + }, + DpCfg: defaultWindowsDPCfg, + InitialEndpoints: nil, + ExpectedSetPolicies: []*hcn.SetPolicySetting{ + dptestutils.SetPolicy(emptySet), + dptestutils.SetPolicy(allNamespaces, emptySet.GetHashedName(), nsXSet.GetHashedName()), + dptestutils.SetPolicy(nsXSet, ip1), + dptestutils.SetPolicy(podK1Set, ip1), + dptestutils.SetPolicy(podK1V1Set, ip1), + }, + ExpectedEnpdointACLs: nil, + }, + }, + { + Description: "policy created, then pod created off node (remote endpoint) which satisfies policy", + Actions: []*Action{ + UpdatePolicy(policyXBaseOnK1V1()), + CreateRemoteEndpoint(endpoint1, ip1), + CreatePod("x", "a", ip1, otherNode, map[string]string{"k1": "v1"}), + ApplyDP(), + }, + TestCaseMetadata: &TestCaseMetadata{ + Tags: []Tag{ + podCrudTag, + netpolCrudTag, + }, + DpCfg: defaultWindowsDPCfg, + InitialEndpoints: nil, + ExpectedSetPolicies: []*hcn.SetPolicySetting{ + dptestutils.SetPolicy(emptySet), + dptestutils.SetPolicy(allNamespaces, emptySet.GetHashedName(), nsXSet.GetHashedName()), + dptestutils.SetPolicy(nsXSet, ip1), + dptestutils.SetPolicy(podK1Set, ip1), + dptestutils.SetPolicy(podK1V1Set, ip1), + }, + ExpectedEnpdointACLs: map[string][]*hnswrapper.FakeEndpointPolicy{ + endpoint1: {}, + }, + }, + }, { Description: "policy created, then pod created which satisfies policy, then pod relabeled and no longer satisfies policy", Actions: []*Action{ @@ -434,8 +512,9 @@ func getAllMultiJobTests() []*MultiJobTestCase { }, DpCfg: defaultWindowsDPCfg, InitialEndpoints: []*hcn.HostComputeEndpoint{ + // ends up being 2 identical endpoints (test2)?? dptestutils.Endpoint(endpoint1, ip1), - dptestutils.Endpoint(endpoint2, ip2), + dptestutils.RemoteEndpoint(endpoint2, ip2), }, ExpectedSetPolicies: []*hcn.SetPolicySetting{ dptestutils.SetPolicy(emptySet), diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index b967adeef5..8227da2abf 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -14,7 +14,7 @@ import ( "k8s.io/klog" ) -const reconcileTimeInMinutes = 5 +const reconcileTimeInMinutes int = 5 type PolicyMode string @@ -214,7 +214,7 @@ func (dp *DataPlane) ApplyDataPlane() error { } if dp.shouldUpdatePod() { - err := dp.refreshAllPodEndpoints() + err := dp.refreshPodEndpoints() if err != nil { metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] failed to refresh endpoints while updating pods. err: [%s]", err.Error()) return fmt.Errorf("[DataPlane] failed to refresh endpoints while updating pods. err: [%w]", err) diff --git a/npm/pkg/dataplane/dataplane_linux.go b/npm/pkg/dataplane/dataplane_linux.go index 1ffd0e6ce9..41fa1664fa 100644 --- a/npm/pkg/dataplane/dataplane_linux.go +++ b/npm/pkg/dataplane/dataplane_linux.go @@ -30,7 +30,7 @@ func (dp *DataPlane) bootupDataPlane() error { return nil } -func (dp *DataPlane) refreshAllPodEndpoints() error { +func (dp *DataPlane) refreshPodEndpoints() error { // NOOP in Linux return nil } diff --git a/npm/pkg/dataplane/dataplane_windows.go b/npm/pkg/dataplane/dataplane_windows.go index cc9600320d..856f96065e 100644 --- a/npm/pkg/dataplane/dataplane_windows.go +++ b/npm/pkg/dataplane/dataplane_windows.go @@ -16,6 +16,9 @@ import ( const ( maxNoNetRetryCount int = 240 // max wait time 240*5 == 20 mins maxNoNetSleepTime int = 5 // in seconds + + refreshAllEndpoints bool = true + refreshLocalEndpoints bool = false ) var ( @@ -42,10 +45,6 @@ func (dp *DataPlane) initializeDataPlane() error { // reset endpoint cache so that netpol references are removed for all endpoints while refreshing pod endpoints // no need to lock endpointCache at boot up dp.endpointCache.cache = make(map[string]*npmEndpoint) - err = dp.refreshAllPodEndpoints() - if err != nil { - return err - } return nil } @@ -79,10 +78,23 @@ func (dp *DataPlane) getNetworkInfo() error { func (dp *DataPlane) bootupDataPlane() error { // initialize the DP so the podendpoints will get updated. if err := dp.initializeDataPlane(); err != nil { - return err + return npmerrors.SimpleErrorWrapper("failed to initialize dataplane", err) } - epIDs := dp.getAllEndpointIDs() + // for backwards compatibility, get remote allEndpoints to delete as well + allEndpoints, err := dp.getPodEndpoints(refreshAllEndpoints) + if err != nil { + return npmerrors.SimpleErrorWrapper("failed to get all pod endpoints", err) + } + + // TODO once we make endpoint refreshing smarter, it would be most efficient to use allEndpoints to refreshPodEndpoints here. + // But currently, we call refreshPodEndpoints for every Pod event, so this optimization wouldn't do anything for now. + // There's also no need to refreshPodEndpoints at bootup since we don't know of any Pods at this point, and the endpoint cache is only needed for known Pods. + + epIDs := make([]string, len(allEndpoints)) + for k, e := range allEndpoints { + epIDs[k] = e.Id + } // It is important to keep order to clean-up ACLs before ipsets. Otherwise we won't be able to delete ipsets referenced by ACLs if err := dp.policyMgr.Bootup(epIDs); err != nil { @@ -284,16 +296,28 @@ func (dp *DataPlane) getEndpointsToApplyPolicy(policy *policies.NPMNetworkPolicy return endpointList, nil } -func (dp *DataPlane) getAllPodEndpoints() ([]hcn.HostComputeEndpoint, error) { +func (dp *DataPlane) getPodEndpoints(includeRemoteEndpoints bool) ([]*hcn.HostComputeEndpoint, error) { klog.Infof("Getting all endpoints for Network ID %s", dp.networkID) endpoints, err := dp.ioShim.Hns.ListEndpointsOfNetwork(dp.networkID) if err != nil { return nil, err } - return endpoints, nil + + localEndpoints := make([]*hcn.HostComputeEndpoint, 0) + for k := range endpoints { + e := &endpoints[k] + if includeRemoteEndpoints || e.Flags == hcn.EndpointFlagsNone { + // having EndpointFlagsNone means it is a local endpoint + localEndpoints = append(localEndpoints, e) + } else { + // TODO remove for GA + klog.Infof("ignoring remote endpoint. ID: %s, IP configs: %+v", e.Id, e.IpConfigurations) + } + } + return localEndpoints, nil } -// refreshAllPodEndpoints will refresh all the pod endpoints and create empty netpol references for new endpoints +// refreshPodEndpoints will refresh all the pod endpoints and create empty netpol references for new endpoints /* Key Assumption: a new pod event (w/ IP) cannot come before HNS knows (and can tell us) about the endpoint. From NPM logs, it seems that endpoints are updated far earlier (several seconds) before the pod event comes in. @@ -310,8 +334,8 @@ Why can we refresh only once before updating all pods in the updatePodCache (see - Again, it's ok if we try to apply on a non-existent endpoint. - We won't miss the endpoint (see the assumption). At the time the pod event came in (when AddToSets/RemoveFromSets were called), HNS already knew about the endpoint. */ -func (dp *DataPlane) refreshAllPodEndpoints() error { - endpoints, err := dp.getAllPodEndpoints() +func (dp *DataPlane) refreshPodEndpoints() error { + endpoints, err := dp.getPodEndpoints(refreshLocalEndpoints) if err != nil { return err } @@ -338,7 +362,7 @@ func (dp *DataPlane) refreshAllPodEndpoints() error { oldNPMEP, ok := dp.endpointCache.cache[ip] if !ok { // add the endpoint to the cache if it's not already there - npmEP := newNPMEndpoint(&endpoint) + npmEP := newNPMEndpoint(endpoint) dp.endpointCache.cache[ip] = npmEP // NOTE: TSGs rely on this log line klog.Infof("updating endpoint cache to include %s: %+v", npmEP.ip, npmEP) @@ -346,7 +370,7 @@ func (dp *DataPlane) refreshAllPodEndpoints() error { // multiple endpoints can have the same IP address, but there should be one endpoint ID per pod // throw away old endpoints that have the same IP as a current endpoint (the old endpoint is getting deleted) // we don't have to worry about cleaning up network policies on endpoints that are getting deleted - npmEP := newNPMEndpoint(&endpoint) + npmEP := newNPMEndpoint(endpoint) if oldNPMEP.podKey == unspecifiedPodKey { klog.Infof("updating endpoint cache since endpoint changed for IP which never had a pod key. new endpoint: %s, old endpoint: %s, ip: %s", npmEP.id, oldNPMEP.id, npmEP.ip) dp.endpointCache.cache[ip] = npmEP @@ -398,15 +422,6 @@ func (dp *DataPlane) setNetworkIDByName(networkName string) error { return nil } -func (dp *DataPlane) getAllEndpointIDs() []string { - // no need to lock endpointCache at boot up - endpointIDs := make([]string, 0, len(dp.endpointCache.cache)) - for _, endpoint := range dp.endpointCache.cache { - endpointIDs = append(endpointIDs, endpoint.id) - } - return endpointIDs -} - func isNetworkNotFoundErr(err error) bool { return strings.Contains(err.Error(), fmt.Sprintf("Network name \"%s\" not found", util.AzureNetworkName)) } diff --git a/npm/pkg/dataplane/dataplane_windows_test.go b/npm/pkg/dataplane/dataplane_windows_test.go index 1eec813e44..5fd4a18b6f 100644 --- a/npm/pkg/dataplane/dataplane_windows_test.go +++ b/npm/pkg/dataplane/dataplane_windows_test.go @@ -15,7 +15,7 @@ import ( const ( defaultHNSLatency = time.Duration(0) - threadedHNSLatency = time.Duration(1 * time.Second) + threadedHNSLatency = time.Duration(50 * time.Millisecond) ) func TestAllSerialCases(t *testing.T) { diff --git a/npm/pkg/dataplane/testutils/utils_windows.go b/npm/pkg/dataplane/testutils/utils_windows.go index 815cfd4625..1da0179673 100644 --- a/npm/pkg/dataplane/testutils/utils_windows.go +++ b/npm/pkg/dataplane/testutils/utils_windows.go @@ -37,6 +37,12 @@ func Endpoint(epID, ip string) *hcn.HostComputeEndpoint { } } +func RemoteEndpoint(epID, ip string) *hcn.HostComputeEndpoint { + e := Endpoint(epID, ip) + e.Flags = hcn.EndpointFlagsRemoteEndpoint + return e +} + func SetPolicy(setMetadata *ipsets.IPSetMetadata, members ...string) *hcn.SetPolicySetting { pType := hcn.SetPolicyType("") switch setMetadata.GetSetKind() { diff --git a/npm/pkg/dataplane/types_windows_test.go b/npm/pkg/dataplane/types_windows_test.go index 7902341186..996a18319f 100644 --- a/npm/pkg/dataplane/types_windows_test.go +++ b/npm/pkg/dataplane/types_windows_test.go @@ -47,22 +47,40 @@ type HNSAction interface { } type EndpointCreateAction struct { - ID string - IP string + ID string + IP string + IsRemote bool } func CreateEndpoint(id, ip string) *Action { return &Action{ HNSAction: &EndpointCreateAction{ - ID: id, - IP: ip, + ID: id, + IP: ip, + IsRemote: false, + }, + } +} + +func CreateRemoteEndpoint(id, ip string) *Action { + return &Action{ + HNSAction: &EndpointCreateAction{ + ID: id, + IP: ip, + IsRemote: true, }, } } // Do models endpoint creation in HNS func (e *EndpointCreateAction) Do(hns *hnswrapper.Hnsv2wrapperFake) error { - ep := dptestutils.Endpoint(e.ID, e.IP) + var ep *hcn.HostComputeEndpoint + if e.IsRemote { + ep = dptestutils.RemoteEndpoint(e.ID, e.IP) + } else { + ep = dptestutils.Endpoint(e.ID, e.IP) + } + _, err := hns.CreateEndpoint(ep) if err != nil { return errors.Wrapf(err, "[EndpointCreateAction] failed to create endpoint. ep: [%+v]", ep)