Skip to content
83 changes: 81 additions & 2 deletions npm/pkg/dataplane/dataplane-test-cases_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func getAllSerialTests() []*SerialTestCase {
},
},
{
Description: "pod created off node (no local endpoint), then relevant policy created",
Description: "pod created off node (no endpoint), then relevant policy created",
Actions: []*Action{
CreatePod("x", "a", ip1, otherNode, map[string]string{"k1": "v1"}),
// will apply dirty ipsets from CreatePod
Expand All @@ -287,6 +287,33 @@ func getAllSerialTests() []*SerialTestCase {
ExpectedEnpdointACLs: nil,
},
},
{
Description: "pod created off node (remote endpoint), then relevant policy created",
Actions: []*Action{
CreateRemoteEndpoint(endpoint1, ip1),
CreatePod("x", "a", ip1, otherNode, map[string]string{"k1": "v1"}),
// will apply dirty ipsets from CreatePod
UpdatePolicy(policyXBaseOnK1V1()),
},
TestCaseMetadata: &TestCaseMetadata{
Tags: []Tag{
podCrudTag,
netpolCrudTag,
},
DpCfg: defaultWindowsDPCfg,
InitialEndpoints: nil,
ExpectedSetPolicies: []*hcn.SetPolicySetting{
dptestutils.SetPolicy(emptySet),
dptestutils.SetPolicy(allNamespaces, emptySet.GetHashedName(), nsXSet.GetHashedName()),
dptestutils.SetPolicy(nsXSet, ip1),
dptestutils.SetPolicy(podK1Set, ip1),
dptestutils.SetPolicy(podK1V1Set, ip1),
},
ExpectedEnpdointACLs: map[string][]*hnswrapper.FakeEndpointPolicy{
endpoint1: {},
},
},
},
{
Description: "policy created, then pod created which satisfies policy",
Actions: []*Action{
Expand Down Expand Up @@ -337,6 +364,57 @@ func getAllSerialTests() []*SerialTestCase {
},
},
},
{
Description: "policy created, then pod created off node (no endpoint) which satisfies policy",
Actions: []*Action{
UpdatePolicy(policyXBaseOnK1V1()),
CreatePod("x", "a", ip1, otherNode, map[string]string{"k1": "v1"}),
ApplyDP(),
},
TestCaseMetadata: &TestCaseMetadata{
Tags: []Tag{
podCrudTag,
netpolCrudTag,
},
DpCfg: defaultWindowsDPCfg,
InitialEndpoints: nil,
ExpectedSetPolicies: []*hcn.SetPolicySetting{
dptestutils.SetPolicy(emptySet),
dptestutils.SetPolicy(allNamespaces, emptySet.GetHashedName(), nsXSet.GetHashedName()),
dptestutils.SetPolicy(nsXSet, ip1),
dptestutils.SetPolicy(podK1Set, ip1),
dptestutils.SetPolicy(podK1V1Set, ip1),
},
ExpectedEnpdointACLs: nil,
},
},
{
Description: "policy created, then pod created off node (remote endpoint) which satisfies policy",
Actions: []*Action{
UpdatePolicy(policyXBaseOnK1V1()),
CreateRemoteEndpoint(endpoint1, ip1),
CreatePod("x", "a", ip1, otherNode, map[string]string{"k1": "v1"}),
ApplyDP(),
},
TestCaseMetadata: &TestCaseMetadata{
Tags: []Tag{
podCrudTag,
netpolCrudTag,
},
DpCfg: defaultWindowsDPCfg,
InitialEndpoints: nil,
ExpectedSetPolicies: []*hcn.SetPolicySetting{
dptestutils.SetPolicy(emptySet),
dptestutils.SetPolicy(allNamespaces, emptySet.GetHashedName(), nsXSet.GetHashedName()),
dptestutils.SetPolicy(nsXSet, ip1),
dptestutils.SetPolicy(podK1Set, ip1),
dptestutils.SetPolicy(podK1V1Set, ip1),
},
ExpectedEnpdointACLs: map[string][]*hnswrapper.FakeEndpointPolicy{
endpoint1: {},
},
},
},
{
Description: "policy created, then pod created which satisfies policy, then pod relabeled and no longer satisfies policy",
Actions: []*Action{
Expand Down Expand Up @@ -434,8 +512,9 @@ func getAllMultiJobTests() []*MultiJobTestCase {
},
DpCfg: defaultWindowsDPCfg,
InitialEndpoints: []*hcn.HostComputeEndpoint{
// ends up being 2 identical endpoints (test2)??
dptestutils.Endpoint(endpoint1, ip1),
dptestutils.Endpoint(endpoint2, ip2),
dptestutils.RemoteEndpoint(endpoint2, ip2),
},
ExpectedSetPolicies: []*hcn.SetPolicySetting{
dptestutils.SetPolicy(emptySet),
Expand Down
4 changes: 2 additions & 2 deletions npm/pkg/dataplane/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"k8s.io/klog"
)

const reconcileTimeInMinutes = 5
const reconcileTimeInMinutes int = 5

type PolicyMode string

Expand Down Expand Up @@ -214,7 +214,7 @@ func (dp *DataPlane) ApplyDataPlane() error {
}

if dp.shouldUpdatePod() {
err := dp.refreshAllPodEndpoints()
err := dp.refreshPodEndpoints()
if err != nil {
metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] failed to refresh endpoints while updating pods. err: [%s]", err.Error())
return fmt.Errorf("[DataPlane] failed to refresh endpoints while updating pods. err: [%w]", err)
Expand Down
2 changes: 1 addition & 1 deletion npm/pkg/dataplane/dataplane_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (dp *DataPlane) bootupDataPlane() error {
return nil
}

func (dp *DataPlane) refreshAllPodEndpoints() error {
func (dp *DataPlane) refreshPodEndpoints() error {
// NOOP in Linux
return nil
}
59 changes: 37 additions & 22 deletions npm/pkg/dataplane/dataplane_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
const (
maxNoNetRetryCount int = 240 // max wait time 240*5 == 20 mins
maxNoNetSleepTime int = 5 // in seconds

refreshAllEndpoints bool = true
refreshLocalEndpoints bool = false
)

var (
Expand All @@ -42,10 +45,6 @@ func (dp *DataPlane) initializeDataPlane() error {
// reset endpoint cache so that netpol references are removed for all endpoints while refreshing pod endpoints
// no need to lock endpointCache at boot up
dp.endpointCache.cache = make(map[string]*npmEndpoint)
err = dp.refreshAllPodEndpoints()
if err != nil {
return err
}

return nil
}
Expand Down Expand Up @@ -79,10 +78,23 @@ func (dp *DataPlane) getNetworkInfo() error {
func (dp *DataPlane) bootupDataPlane() error {
// initialize the DP so the podendpoints will get updated.
if err := dp.initializeDataPlane(); err != nil {
return err
return npmerrors.SimpleErrorWrapper("failed to initialize dataplane", err)
}

epIDs := dp.getAllEndpointIDs()
// for backwards compatibility, get remote allEndpoints to delete as well
allEndpoints, err := dp.getPodEndpoints(refreshAllEndpoints)
if err != nil {
return npmerrors.SimpleErrorWrapper("failed to get all pod endpoints", err)
}

// TODO once we make endpoint refreshing smarter, it would be most efficient to use allEndpoints to refreshPodEndpoints here.
// But currently, we call refreshPodEndpoints for every Pod event, so this optimization wouldn't do anything for now.
// There's also no need to refreshPodEndpoints at bootup since we don't know of any Pods at this point, and the endpoint cache is only needed for known Pods.

epIDs := make([]string, len(allEndpoints))
for k, e := range allEndpoints {
epIDs[k] = e.Id
}

// It is important to keep order to clean-up ACLs before ipsets. Otherwise we won't be able to delete ipsets referenced by ACLs
if err := dp.policyMgr.Bootup(epIDs); err != nil {
Expand Down Expand Up @@ -284,16 +296,28 @@ func (dp *DataPlane) getEndpointsToApplyPolicy(policy *policies.NPMNetworkPolicy
return endpointList, nil
}

func (dp *DataPlane) getAllPodEndpoints() ([]hcn.HostComputeEndpoint, error) {
func (dp *DataPlane) getPodEndpoints(includeRemoteEndpoints bool) ([]*hcn.HostComputeEndpoint, error) {
klog.Infof("Getting all endpoints for Network ID %s", dp.networkID)
endpoints, err := dp.ioShim.Hns.ListEndpointsOfNetwork(dp.networkID)
if err != nil {
return nil, err
}
return endpoints, nil

localEndpoints := make([]*hcn.HostComputeEndpoint, 0)
for k := range endpoints {
e := &endpoints[k]
if includeRemoteEndpoints || e.Flags == hcn.EndpointFlagsNone {
// having EndpointFlagsNone means it is a local endpoint
localEndpoints = append(localEndpoints, e)
} else {
// TODO remove for GA
klog.Infof("ignoring remote endpoint. ID: %s, IP configs: %+v", e.Id, e.IpConfigurations)
}
}
return localEndpoints, nil
}

// refreshAllPodEndpoints will refresh all the pod endpoints and create empty netpol references for new endpoints
// refreshPodEndpoints will refresh all the pod endpoints and create empty netpol references for new endpoints
/*
Key Assumption: a new pod event (w/ IP) cannot come before HNS knows (and can tell us) about the endpoint.
From NPM logs, it seems that endpoints are updated far earlier (several seconds) before the pod event comes in.
Expand All @@ -310,8 +334,8 @@ Why can we refresh only once before updating all pods in the updatePodCache (see
- Again, it's ok if we try to apply on a non-existent endpoint.
- We won't miss the endpoint (see the assumption). At the time the pod event came in (when AddToSets/RemoveFromSets were called), HNS already knew about the endpoint.
*/
func (dp *DataPlane) refreshAllPodEndpoints() error {
endpoints, err := dp.getAllPodEndpoints()
func (dp *DataPlane) refreshPodEndpoints() error {
endpoints, err := dp.getPodEndpoints(refreshLocalEndpoints)
if err != nil {
return err
}
Expand All @@ -338,15 +362,15 @@ func (dp *DataPlane) refreshAllPodEndpoints() error {
oldNPMEP, ok := dp.endpointCache.cache[ip]
if !ok {
// add the endpoint to the cache if it's not already there
npmEP := newNPMEndpoint(&endpoint)
npmEP := newNPMEndpoint(endpoint)
dp.endpointCache.cache[ip] = npmEP
// NOTE: TSGs rely on this log line
klog.Infof("updating endpoint cache to include %s: %+v", npmEP.ip, npmEP)
} else if oldNPMEP.id != endpoint.Id {
// multiple endpoints can have the same IP address, but there should be one endpoint ID per pod
// throw away old endpoints that have the same IP as a current endpoint (the old endpoint is getting deleted)
// we don't have to worry about cleaning up network policies on endpoints that are getting deleted
npmEP := newNPMEndpoint(&endpoint)
npmEP := newNPMEndpoint(endpoint)
if oldNPMEP.podKey == unspecifiedPodKey {
klog.Infof("updating endpoint cache since endpoint changed for IP which never had a pod key. new endpoint: %s, old endpoint: %s, ip: %s", npmEP.id, oldNPMEP.id, npmEP.ip)
dp.endpointCache.cache[ip] = npmEP
Expand Down Expand Up @@ -398,15 +422,6 @@ func (dp *DataPlane) setNetworkIDByName(networkName string) error {
return nil
}

func (dp *DataPlane) getAllEndpointIDs() []string {
// no need to lock endpointCache at boot up
endpointIDs := make([]string, 0, len(dp.endpointCache.cache))
for _, endpoint := range dp.endpointCache.cache {
endpointIDs = append(endpointIDs, endpoint.id)
}
return endpointIDs
}

func isNetworkNotFoundErr(err error) bool {
return strings.Contains(err.Error(), fmt.Sprintf("Network name \"%s\" not found", util.AzureNetworkName))
}
2 changes: 1 addition & 1 deletion npm/pkg/dataplane/dataplane_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

const (
defaultHNSLatency = time.Duration(0)
threadedHNSLatency = time.Duration(1 * time.Second)
threadedHNSLatency = time.Duration(50 * time.Millisecond)
)

func TestAllSerialCases(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions npm/pkg/dataplane/testutils/utils_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ func Endpoint(epID, ip string) *hcn.HostComputeEndpoint {
}
}

func RemoteEndpoint(epID, ip string) *hcn.HostComputeEndpoint {
e := Endpoint(epID, ip)
e.Flags = hcn.EndpointFlagsRemoteEndpoint
return e
}

func SetPolicy(setMetadata *ipsets.IPSetMetadata, members ...string) *hcn.SetPolicySetting {
pType := hcn.SetPolicyType("")
switch setMetadata.GetSetKind() {
Expand Down
28 changes: 23 additions & 5 deletions npm/pkg/dataplane/types_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,40 @@ type HNSAction interface {
}

type EndpointCreateAction struct {
ID string
IP string
ID string
IP string
IsRemote bool
}

func CreateEndpoint(id, ip string) *Action {
return &Action{
HNSAction: &EndpointCreateAction{
ID: id,
IP: ip,
ID: id,
IP: ip,
IsRemote: false,
},
}
}

func CreateRemoteEndpoint(id, ip string) *Action {
return &Action{
HNSAction: &EndpointCreateAction{
ID: id,
IP: ip,
IsRemote: true,
},
}
}

// Do models endpoint creation in HNS
func (e *EndpointCreateAction) Do(hns *hnswrapper.Hnsv2wrapperFake) error {
ep := dptestutils.Endpoint(e.ID, e.IP)
var ep *hcn.HostComputeEndpoint
if e.IsRemote {
ep = dptestutils.RemoteEndpoint(e.ID, e.IP)
} else {
ep = dptestutils.Endpoint(e.ID, e.IP)
}

_, err := hns.CreateEndpoint(ep)
if err != nil {
return errors.Wrapf(err, "[EndpointCreateAction] failed to create endpoint. ep: [%+v]", ep)
Expand Down