diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 1fa4bd36db4..4b165568130 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -3773,6 +3773,11 @@ data: # this flag will not take effect. # EndpointSlice: false + # Enables ServiceInternalTrafficPolicy in AntreaProxy. Don't enable this feature unless that feature gateway + # ServiceInternalTrafficPolicy of Kubernetes APIServer is set as enabled. If AntreaProxy is not enabled, + # this flag will not take effect. + # ServiceInternalTrafficPolicy: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -4054,7 +4059,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-dtc759g79k + name: antrea-config-4k8284md8c namespace: kube-system --- apiVersion: v1 @@ -4125,7 +4130,7 @@ spec: fieldRef: fieldPath: spec.serviceAccountName - name: ANTREA_CONFIG_MAP_NAME - value: antrea-config-dtc759g79k + value: antrea-config-4k8284md8c image: projects.registry.vmware.com/antrea/antrea-ubuntu:latest imagePullPolicy: IfNotPresent livenessProbe: @@ -4176,7 +4181,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-dtc759g79k + name: antrea-config-4k8284md8c name: antrea-config - name: antrea-controller-tls secret: @@ -4457,7 +4462,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-dtc759g79k + name: antrea-config-4k8284md8c name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index b9c08c2b8e9..5447d341d17 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -3773,6 +3773,11 @@ data: # this flag will not take effect. # EndpointSlice: false + # Enables ServiceInternalTrafficPolicy in AntreaProxy. Don't enable this feature unless that feature gateway + # ServiceInternalTrafficPolicy of Kubernetes APIServer is set as enabled. If AntreaProxy is not enabled, + # this flag will not take effect. + # ServiceInternalTrafficPolicy: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -4054,7 +4059,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-dtc759g79k + name: antrea-config-4k8284md8c namespace: kube-system --- apiVersion: v1 @@ -4125,7 +4130,7 @@ spec: fieldRef: fieldPath: spec.serviceAccountName - name: ANTREA_CONFIG_MAP_NAME - value: antrea-config-dtc759g79k + value: antrea-config-4k8284md8c image: projects.registry.vmware.com/antrea/antrea-ubuntu:latest imagePullPolicy: IfNotPresent livenessProbe: @@ -4176,7 +4181,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-dtc759g79k + name: antrea-config-4k8284md8c name: antrea-config - name: antrea-controller-tls secret: @@ -4459,7 +4464,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-dtc759g79k + name: antrea-config-4k8284md8c name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 54acac03fd4..ff8a0768f0e 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -3773,6 +3773,11 @@ data: # this flag will not take effect. # EndpointSlice: false + # Enables ServiceInternalTrafficPolicy in AntreaProxy. Don't enable this feature unless that feature gateway + # ServiceInternalTrafficPolicy of Kubernetes APIServer is set as enabled. If AntreaProxy is not enabled, + # this flag will not take effect. + # ServiceInternalTrafficPolicy: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -4054,7 +4059,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-65f7gf8456 + name: antrea-config-4g6c87t272 namespace: kube-system --- apiVersion: v1 @@ -4125,7 +4130,7 @@ spec: fieldRef: fieldPath: spec.serviceAccountName - name: ANTREA_CONFIG_MAP_NAME - value: antrea-config-65f7gf8456 + value: antrea-config-4g6c87t272 image: projects.registry.vmware.com/antrea/antrea-ubuntu:latest imagePullPolicy: IfNotPresent livenessProbe: @@ -4176,7 +4181,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-65f7gf8456 + name: antrea-config-4g6c87t272 name: antrea-config - name: antrea-controller-tls secret: @@ -4460,7 +4465,7 @@ spec: path: /home/kubernetes/bin name: host-cni-bin - configMap: - name: antrea-config-65f7gf8456 + name: antrea-config-4g6c87t272 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index ec166343f91..9c03596609f 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -3773,6 +3773,11 @@ data: # this flag will not take effect. # EndpointSlice: false + # Enables ServiceInternalTrafficPolicy in AntreaProxy. Don't enable this feature unless that feature gateway + # ServiceInternalTrafficPolicy of Kubernetes APIServer is set as enabled. If AntreaProxy is not enabled, + # this flag will not take effect. + # ServiceInternalTrafficPolicy: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -4059,7 +4064,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-fcd8c2h5b5 + name: antrea-config-t6td8mhh77 namespace: kube-system --- apiVersion: v1 @@ -4139,7 +4144,7 @@ spec: fieldRef: fieldPath: spec.serviceAccountName - name: ANTREA_CONFIG_MAP_NAME - value: antrea-config-fcd8c2h5b5 + value: antrea-config-t6td8mhh77 image: projects.registry.vmware.com/antrea/antrea-ubuntu:latest imagePullPolicy: IfNotPresent livenessProbe: @@ -4190,7 +4195,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-fcd8c2h5b5 + name: antrea-config-t6td8mhh77 name: antrea-config - name: antrea-controller-tls secret: @@ -4506,7 +4511,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-fcd8c2h5b5 + name: antrea-config-t6td8mhh77 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 070a2150d60..5b752075de0 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -3773,6 +3773,11 @@ data: # this flag will not take effect. # EndpointSlice: false + # Enables ServiceInternalTrafficPolicy in AntreaProxy. Don't enable this feature unless that feature gateway + # ServiceInternalTrafficPolicy of Kubernetes APIServer is set as enabled. If AntreaProxy is not enabled, + # this flag will not take effect. + # ServiceInternalTrafficPolicy: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -4059,7 +4064,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-dhb74b822t + name: antrea-config-2tm2m9h7ck namespace: kube-system --- apiVersion: v1 @@ -4130,7 +4135,7 @@ spec: fieldRef: fieldPath: spec.serviceAccountName - name: ANTREA_CONFIG_MAP_NAME - value: antrea-config-dhb74b822t + value: antrea-config-2tm2m9h7ck image: projects.registry.vmware.com/antrea/antrea-ubuntu:latest imagePullPolicy: IfNotPresent livenessProbe: @@ -4181,7 +4186,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-dhb74b822t + name: antrea-config-2tm2m9h7ck name: antrea-config - name: antrea-controller-tls secret: @@ -4462,7 +4467,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-dhb74b822t + name: antrea-config-2tm2m9h7ck name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/base/conf/antrea-agent.conf b/build/yamls/base/conf/antrea-agent.conf index 76d6d72b613..a8a7e6625ff 100644 --- a/build/yamls/base/conf/antrea-agent.conf +++ b/build/yamls/base/conf/antrea-agent.conf @@ -10,6 +10,11 @@ featureGates: # this flag will not take effect. # EndpointSlice: false +# Enables ServiceInternalTrafficPolicy in AntreaProxy. Don't enable this feature unless that feature gateway +# ServiceInternalTrafficPolicy of Kubernetes APIServer is set as enabled. If AntreaProxy is not enabled, +# this flag will not take effect. +# ServiceInternalTrafficPolicy: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 40ed56f3360..7697a10ebcb 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -202,14 +202,15 @@ func run(o *Options) error { v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode) v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode) proxyAll := o.config.AntreaProxy.ProxyAll + svcInternalTrafficPolicy := features.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) switch { case v4Enabled && v6Enabled: - proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient, routeClient, nodePortAddressesIPv4, nodePortAddressesIPv6, proxyAll) + proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient, routeClient, nodePortAddressesIPv4, nodePortAddressesIPv6, proxyAll, svcInternalTrafficPolicy) case v4Enabled: - proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAll) + proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAll, svcInternalTrafficPolicy) case v6Enabled: - proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAll) + proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAll, svcInternalTrafficPolicy) default: return fmt.Errorf("at least one of IPv4 or IPv6 should be enabled") } diff --git a/docs/feature-gates.md b/docs/feature-gates.md index f97c58a6256..bb9cde748ba 100644 --- a/docs/feature-gates.md +++ b/docs/feature-gates.md @@ -33,17 +33,18 @@ example, to enable `AntreaProxy` on Linux, edit the Agent configuration in the ## List of Available Features -| Feature Name | Component | Default | Stage | Alpha Release | Beta Release | GA Release | Extra Requirements | Notes | -| ----------------------- | ------------------ | ------- | ----- | ------------- | ------------ | ---------- | ------------------ | ----- | -| `AntreaProxy` | Agent | `true` | Beta | v0.8 | v0.11 | N/A | Yes | Must be enabled for Windows. | -| `EndpointSlice` | Agent | `false` | Alpha | v0.13.0 | N/A | N/A | Yes | | -| `AntreaPolicy` | Agent + Controller | `true` | Beta | v0.8 | v1.0 | N/A | No | Agent side config required from v0.9.0+. | -| `Traceflow` | Agent + Controller | `true` | Beta | v0.8 | v0.11 | N/A | Yes | | -| `FlowExporter` | Agent | `false` | Alpha | v0.9 | N/A | N/A | Yes | | -| `NetworkPolicyStats` | Agent + Controller | `true` | Beta | v0.10 | v1.2 | N/A | No | | -| `NodePortLocal` | Agent | `false` | Alpha | v0.13 | N/A | N/A | Yes | Important user-facing change in v1.2.0 | -| `Egress` | Agent + Controller | `false` | Alpha | v1.0 | N/A | N/A | Yes | | -| `NodeIPAM` | Controller | `false` | Alpha | v1.4 | N/A | N/A | Yes | | +| Feature Name | Component | Default | Stage | Alpha Release | Beta Release | GA Release | Extra Requirements | Notes | +| ------------------------------- | ------------------ | ------- | ----- | ------------- | ------------ | ---------- | ------------------ | ----- | +| `AntreaProxy` | Agent | `true` | Beta | v0.8 | v0.11 | N/A | Yes | Must be enabled for Windows. | +| `EndpointSlice` | Agent | `false` | Alpha | v0.13.0 | N/A | N/A | Yes | | +| `ServiceInternalTrafficPolicy` | Agent | `false` | Alpha | v1.4 | N/A | N/A | No | | +| `AntreaPolicy` | Agent + Controller | `true` | Beta | v0.8 | v1.0 | N/A | No | Agent side config required from v0.9.0+. | +| `Traceflow` | Agent + Controller | `true` | Beta | v0.8 | v0.11 | N/A | Yes | | +| `FlowExporter` | Agent | `false` | Alpha | v0.9 | N/A | N/A | Yes | | +| `NetworkPolicyStats` | Agent + Controller | `true` | Beta | v0.10 | v1.2 | N/A | No | | +| `NodePortLocal` | Agent | `false` | Alpha | v0.13 | N/A | N/A | Yes | Important user-facing change in v1.2.0 | +| `Egress` | Agent + Controller | `false` | Alpha | v1.0 | N/A | N/A | Yes | | +| `NodeIPAM` | Controller | `false` | Alpha | v1.4 | N/A | N/A | Yes | | ## Description and Requirements of Features @@ -82,6 +83,13 @@ and will not implement Cluster IP functionality as expected. When using the OVS built-in kernel module (which is the most common case), your kernel version must be >= 4.6 (as opposed to >= 4.4 without this feature). +### ServiceInternalTrafficPolicy + +`ServiceInternalTrafficPolicy` enables internal traffic restrictions to only route internal +traffic to Endpoints within the Node the traffic originated from. The "internal" traffic +here refers to traffic originated from Pods in the current cluster. This can help to reduce +costs and improve performance. + ### AntreaPolicy `AntreaPolicy` enables Antrea ClusterNetworkPolicy and Antrea NetworkPolicy CRDs to be diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index c49bf44ef3f..1c4dcb40a0f 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -101,15 +101,16 @@ type proxier struct { // oversizeServiceSet records the Services that have more than 800 Endpoints. oversizeServiceSet sets.String - runner *k8sproxy.BoundedFrequencyRunner - stopChan <-chan struct{} - ofClient openflow.Client - routeClient route.Interface - nodePortAddresses []net.IP - hostGateWay string - isIPv6 bool - proxyAll bool - endpointSliceEnabled bool + runner *k8sproxy.BoundedFrequencyRunner + stopChan <-chan struct{} + ofClient openflow.Client + routeClient route.Interface + nodePortAddresses []net.IP + hostGateWay string + isIPv6 bool + proxyAll bool + endpointSliceEnabled bool + svcInternalTrafficPolicyEnabled bool } func endpointKey(endpoint k8sproxy.Endpoint, protocol binding.Protocol) string { @@ -155,8 +156,9 @@ func (p *proxier) removeStaleServices() { continue } } - // Remove Service group whose Endpoints are local. - if svcInfo.NodeLocalExternal() { + // If externalTrafficPolicy of the Service is Local and internalTrafficPolicy of the Service is not Local, a Service group + // that only has local Endpoints should have been already created, and it should be removed. + if svcInfo.NodeLocalExternal() && !(p.svcInternalTrafficPolicyEnabled && svcInfo.NodeLocalInternal()) { groupIDLocal, _ := p.groupCounter.Get(svcPortName, true) if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil { klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName) @@ -164,8 +166,16 @@ func (p *proxier) removeStaleServices() { } p.groupCounter.Recycle(svcPortName, true) } - // Remove Service group which has all Endpoints. - groupID, _ := p.groupCounter.Get(svcPortName, false) + + var groupID binding.GroupIDType + // If internalTrafficPolicy of the Service is Local, a Service group that only has local Endpoints should have been + // already created. If internalTrafficPolicy of the theService is Cluster, a Service group that has all Endpoints + // should have been created. Remove the Service group. + if p.svcInternalTrafficPolicyEnabled && svcInfo.NodeLocalInternal() { + groupID, _ = p.groupCounter.Get(svcPortName, true) + } else { + groupID, _ = p.groupCounter.Get(svcPortName, false) + } if err := p.ofClient.UninstallServiceGroup(groupID); err != nil { klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName) continue @@ -245,7 +255,8 @@ func serviceIdentityChanged(svcInfo, pSvcInfo *types.ServiceInfo) bool { svcInfo.Port() != pSvcInfo.Port() || svcInfo.OFProtocol != pSvcInfo.OFProtocol || svcInfo.NodePort() != pSvcInfo.NodePort() || - svcInfo.NodeLocalExternal() != pSvcInfo.NodeLocalExternal() + svcInfo.NodeLocalExternal() != pSvcInfo.NodeLocalExternal() || + svcInfo.NodeLocalInternal() != pSvcInfo.NodeLocalInternal() } // smallSliceDifference builds a slice which includes all the strings from s1 @@ -340,7 +351,14 @@ func (p *proxier) uninstallLoadBalancerService(loadBalancerIPStrings []string, s func (p *proxier) installServices() { for svcPortName, svcPort := range p.serviceMap { svcInfo := svcPort.(*types.ServiceInfo) - groupID, _ := p.groupCounter.Get(svcPortName, false) + var groupID binding.GroupIDType + // If internalTrafficPolicy of the Service is Local, then a Service that has only local Endpoints should be created, + // otherwise a Service that has all Endpoints should be created. + if p.svcInternalTrafficPolicyEnabled && svcInfo.NodeLocalInternal() { + groupID, _ = p.groupCounter.Get(svcPortName, true) + } else { + groupID, _ = p.groupCounter.Get(svcPortName, false) + } endpointsInstalled, ok := p.endpointsInstalledMap[svcPortName] if !ok { endpointsInstalled = map[string]k8sproxy.Endpoint{} @@ -376,11 +394,18 @@ func (p *proxier) installServices() { // slice endpointList after sorting can avoid this situation in some degree. var endpointList []k8sproxy.Endpoint for _, endpoint := range endpoints { + // If internalTrafficPolicy of the Service is Local, skip the Endpoints which are not local. + if p.svcInternalTrafficPolicyEnabled && svcInfo.NodeLocalInternal() && !endpoint.GetIsLocal() { + continue + } endpointList = append(endpointList, endpoint) } sort.Sort(byEndpoint(endpointList)) - endpointList = endpointList[:maxEndpoints] - + // As non-local Endpoints are skipped, the number of Endpoints may be less than maxEndpoints. If so, it's + // unnecessary to drop any Endpoints. + if len(endpointList) > maxEndpoints { + endpointList = endpointList[:maxEndpoints] + } for _, endpoint := range endpointList { // Check if there is any installed Endpoint which is not expected anymore. if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed. needUpdateEndpoints = true @@ -392,6 +417,10 @@ func (p *proxier) installServices() { p.oversizeServiceSet.Delete(svcPortName.String()) } for _, endpoint := range endpoints { // Check if there is any installed Endpoint which is not expected anymore. + // If internalTrafficPolicy of the Service is Local, skip the Endpoints which are not local. + if p.svcInternalTrafficPolicyEnabled && svcInfo.NodeLocalInternal() && !endpoint.GetIsLocal() { + continue + } if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed. needUpdateEndpoints = true } @@ -439,8 +468,9 @@ func (p *proxier) installServices() { continue } - // Install another group when Service externalTrafficPolicy is Local. - if p.proxyAll && svcInfo.NodeLocalExternal() { + // If externalTrafficPolicy of the Service is Local and internalTrafficPolicy of the Service is not Local, another + // Service group that has only local Endpoints should be created. + if p.proxyAll && svcInfo.NodeLocalExternal() && !(p.svcInternalTrafficPolicyEnabled && svcInfo.NodeLocalInternal()) { groupIDLocal, _ := p.groupCounter.Get(svcPortName, true) var localEndpointList []k8sproxy.Endpoint for _, ed := range endpointUpdateList { @@ -491,10 +521,14 @@ func (p *proxier) installServices() { continue } - // If externalTrafficPolicy of the Service is Local, Service NodePort or LoadBalancer should use the Service - // group whose Endpoints are local. + // When creating a Service of NodePort/LoadBalancer, a ClusterIP will be also created. If externalTrafficPolicy + // of the Service is Local and internalTrafficPolicy of the Service is Cluster, Service NodePort/LoadBalancer + // should use the Service group that which has only local Endpoints, rather than the Service group that has all + // Endpoints, which is for the ClusterIP. However, if internalTrafficPolicy of the Service is Local, the Service + // group that has only local Endpoints should have been created for the ClusterIP, Service NodePort/LoadBalancer + // should also use the same Service group. nGroupID := groupID - if svcInfo.NodeLocalExternal() { + if svcInfo.NodeLocalExternal() && !(p.svcInternalTrafficPolicyEnabled && svcInfo.NodeLocalInternal()) { nGroupID, _ = p.groupCounter.Get(svcPortName, true) } @@ -768,7 +802,8 @@ func NewProxier( isIPv6 bool, routeClient route.Interface, nodePortAddresses []net.IP, - proxyAllEnabled bool) *proxier { + proxyAllEnabled bool, + svcInternalTrafficPolicyEnabled bool) *proxier { recorder := record.NewBroadcaster().NewRecorder( runtime.NewScheme(), corev1.EventSource{Component: componentName, Host: hostname}, @@ -783,24 +818,25 @@ func NewProxier( } p := &proxier{ - endpointsConfig: config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod), - serviceConfig: config.NewServiceConfig(informerFactory.Core().V1().Services(), resyncPeriod), - endpointsChanges: newEndpointsChangesTracker(hostname, endpointSliceEnabled, isIPv6), - serviceChanges: newServiceChangesTracker(recorder, ipFamily), - serviceMap: k8sproxy.ServiceMap{}, - serviceInstalledMap: k8sproxy.ServiceMap{}, - endpointsInstalledMap: types.EndpointsMap{}, - endpointsMap: types.EndpointsMap{}, - endpointReferenceCounter: map[string]int{}, - serviceStringMap: map[string]k8sproxy.ServicePortName{}, - oversizeServiceSet: sets.NewString(), - groupCounter: types.NewGroupCounter(isIPv6), - ofClient: ofClient, - routeClient: routeClient, - nodePortAddresses: nodePortAddresses, - isIPv6: isIPv6, - proxyAll: proxyAllEnabled, - endpointSliceEnabled: endpointSliceEnabled, + endpointsConfig: config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod), + serviceConfig: config.NewServiceConfig(informerFactory.Core().V1().Services(), resyncPeriod), + endpointsChanges: newEndpointsChangesTracker(hostname, endpointSliceEnabled, isIPv6), + serviceChanges: newServiceChangesTracker(recorder, ipFamily), + serviceMap: k8sproxy.ServiceMap{}, + serviceInstalledMap: k8sproxy.ServiceMap{}, + endpointsInstalledMap: types.EndpointsMap{}, + endpointsMap: types.EndpointsMap{}, + endpointReferenceCounter: map[string]int{}, + serviceStringMap: map[string]k8sproxy.ServicePortName{}, + oversizeServiceSet: sets.NewString(), + groupCounter: types.NewGroupCounter(isIPv6), + ofClient: ofClient, + routeClient: routeClient, + nodePortAddresses: nodePortAddresses, + isIPv6: isIPv6, + proxyAll: proxyAllEnabled, + endpointSliceEnabled: endpointSliceEnabled, + svcInternalTrafficPolicyEnabled: svcInternalTrafficPolicyEnabled, } p.serviceConfig.RegisterEventHandler(p) @@ -852,13 +888,14 @@ func NewDualStackProxier( routeClient route.Interface, nodePortAddressesIPv4 []net.IP, nodePortAddressesIPv6 []net.IP, - proxyAllEnabled bool) *metaProxierWrapper { + proxyAllEnabled bool, + serviceInternalTrafficPolicyEnabled bool) *metaProxierWrapper { // Create an ipv4 instance of the single-stack proxier. - ipv4Proxier := NewProxier(hostname, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAllEnabled) + ipv4Proxier := NewProxier(hostname, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAllEnabled, serviceInternalTrafficPolicyEnabled) // Create an ipv6 instance of the single-stack proxier. - ipv6Proxier := NewProxier(hostname, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAllEnabled) + ipv6Proxier := NewProxier(hostname, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAllEnabled, serviceInternalTrafficPolicyEnabled) // Create a meta-proxier that dispatch calls between the two // single-stack proxier instances. diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 577f13ca867..d1f1422f7b2 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -51,6 +51,7 @@ var ( loadBalancerIPv6 = net.ParseIP("fec0::169:254:169:1") svcNodePortIPv4 = net.ParseIP("192.168.77.100") svcNodePortIPv6 = net.ParseIP("2001::192:168:77:100") + hostname = "localhost" nodePortAddressesIPv4 = []net.IP{svcNodePortIPv4} nodePortAddressesIPv6 = []net.IP{svcNodePortIPv6} @@ -99,8 +100,7 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*corev1.Endpoints)) return ept } -func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodePortAddresses []net.IP, isIPv6, proxyAllEnabled bool) *proxier { - hostname := "localhost" +func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodePortAddresses []net.IP, isIPv6, proxyAllEnabled, serviceInternalTrafficPolicyEnabled bool) *proxier { eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder( runtime.NewScheme(), @@ -113,31 +113,33 @@ func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodeP } p := &proxier{ - endpointsChanges: newEndpointsChangesTracker(hostname, false, isIPv6), - serviceChanges: newServiceChangesTracker(recorder, ipFamily), - serviceMap: k8sproxy.ServiceMap{}, - serviceInstalledMap: k8sproxy.ServiceMap{}, - endpointsInstalledMap: types.EndpointsMap{}, - endpointReferenceCounter: map[string]int{}, - endpointsMap: types.EndpointsMap{}, - groupCounter: types.NewGroupCounter(isIPv6), - ofClient: ofClient, - routeClient: routeClient, - serviceStringMap: map[string]k8sproxy.ServicePortName{}, - isIPv6: isIPv6, - nodePortAddresses: nodePortAddresses, - proxyAll: proxyAllEnabled, + endpointsChanges: newEndpointsChangesTracker(hostname, false, isIPv6), + serviceChanges: newServiceChangesTracker(recorder, ipFamily), + serviceMap: k8sproxy.ServiceMap{}, + serviceInstalledMap: k8sproxy.ServiceMap{}, + endpointsInstalledMap: types.EndpointsMap{}, + endpointReferenceCounter: map[string]int{}, + endpointsMap: types.EndpointsMap{}, + groupCounter: types.NewGroupCounter(isIPv6), + ofClient: ofClient, + routeClient: routeClient, + serviceStringMap: map[string]k8sproxy.ServicePortName{}, + isIPv6: isIPv6, + nodePortAddresses: nodePortAddresses, + proxyAll: proxyAllEnabled, + svcInternalTrafficPolicyEnabled: serviceInternalTrafficPolicyEnabled, } p.runner = k8sproxy.NewBoundedFrequencyRunner(componentName, p.syncProxyRules, time.Second, 30*time.Second, 2) return p } -func testClusterIP(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { +func testClusterIP(t *testing.T, svcIP net.IP, ep1IP, ep2IP net.IP, isIPv6 bool, nodeLocalInternal bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) mockRouteClient := routemock.NewMockInterface(ctrl) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, isIPv6, true) + + fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, isIPv6, true, nodeLocalInternal) svcPort := 80 svcPortName := k8sproxy.ServicePortName{ @@ -145,6 +147,11 @@ func testClusterIP(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { Port: "80", Protocol: corev1.ProtocolTCP, } + internalTrafficPolicy := corev1.ServiceInternalTrafficPolicyCluster + if nodeLocalInternal { + internalTrafficPolicy = corev1.ServiceInternalTrafficPolicyLocal + } + makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { svc.Spec.ClusterIP = svcIP.String() @@ -153,25 +160,48 @@ func testClusterIP(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { Port: int32(svcPort), Protocol: corev1.ProtocolTCP, }} + svc.Spec.InternalTrafficPolicy = &internalTrafficPolicy }), ) - makeEndpointsMap(fp, - makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *corev1.Endpoints) { - ept.Subsets = []corev1.EndpointSubset{{ - Addresses: []corev1.EndpointAddress{{ - IP: epIP.String(), - }}, - Ports: []corev1.EndpointPort{{ - Name: svcPortName.Port, - Port: int32(svcPort), - Protocol: corev1.ProtocolTCP, - }}, - }} - }), - ) + var eps []*corev1.Endpoints + epFunc := func(ept *corev1.Endpoints) { + ept.Subsets = []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{ + IP: ep1IP.String(), + NodeName: &hostname, + }}, + Ports: []corev1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }}, + }} - groupID, _ := fp.groupCounter.Get(svcPortName, false) + if nodeLocalInternal { + ept.Subsets = append(ept.Subsets, + corev1.EndpointSubset{ + Addresses: []corev1.EndpointAddress{{ + IP: ep2IP.String(), + }}, + Ports: []corev1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }}, + }, + ) + } + } + eps = append(eps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)) + makeEndpointsMap(fp, eps...) + + var groupID binding.GroupIDType + if nodeLocalInternal { + groupID, _ = fp.groupCounter.Get(svcPortName, true) + } else { + groupID, _ = fp.groupCounter.Get(svcPortName, false) + } mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) bindingProtocol := binding.ProtocolTCP if isIPv6 { @@ -189,7 +219,7 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) mockRouteClient := routemock.NewMockInterface(ctrl) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, isIPv6, true) + fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, isIPv6, true, false) svcPort := 80 svcNodePort := 30008 @@ -225,7 +255,7 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep ept.Subsets = []corev1.EndpointSubset{{ Addresses: []corev1.EndpointAddress{{ IP: ep1IP.String(), - Hostname: "localhost", + NodeName: &hostname, }}, Ports: []corev1.EndpointPort{{ Name: svcPortName.Port, @@ -233,24 +263,21 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep Protocol: corev1.ProtocolTCP, }}, }} - } - eps = append(eps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)) - if nodeLocalExternal { - epFunc = func(ept *corev1.Endpoints) { - ept.Subsets = []corev1.EndpointSubset{{ - Addresses: []corev1.EndpointAddress{{ - IP: ep2IP.String(), - Hostname: "remote", - }}, - Ports: []corev1.EndpointPort{{ - Name: svcPortName.Port, - Port: int32(svcPort), - Protocol: corev1.ProtocolTCP, - }}, - }} + if nodeLocalExternal { + ept.Subsets = append(ept.Subsets, + corev1.EndpointSubset{ + Addresses: []corev1.EndpointAddress{{ + IP: ep2IP.String(), + }}, + Ports: []corev1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }}, + }) } - eps = append(eps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)) } + eps = append(eps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)) makeEndpointsMap(fp, eps...) groupID, _ := fp.groupCounter.Get(svcPortName, false) @@ -280,7 +307,7 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) mockRouteClient := routemock.NewMockInterface(ctrl) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, isIPv6, true) + fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, isIPv6, true, false) svcPort := 80 svcNodePort := 31000 @@ -313,7 +340,7 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP ept.Subsets = []corev1.EndpointSubset{{ Addresses: []corev1.EndpointAddress{{ IP: ep1IP.String(), - Hostname: "localhost", + NodeName: &hostname, }}, Ports: []corev1.EndpointPort{{ Name: svcPortName.Port, @@ -321,24 +348,21 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP Protocol: corev1.ProtocolTCP, }}, }} - } - eps = append(eps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)) - if nodeLocalExternal { - epFunc = func(ept *corev1.Endpoints) { - ept.Subsets = []corev1.EndpointSubset{{ - Addresses: []corev1.EndpointAddress{{ - IP: ep2IP.String(), - Hostname: "remote", - }}, - Ports: []corev1.EndpointPort{{ - Name: svcPortName.Port, - Port: int32(svcPort), - Protocol: corev1.ProtocolTCP, - }}, - }} + if nodeLocalExternal { + ept.Subsets = append(ept.Subsets, + corev1.EndpointSubset{ + Addresses: []corev1.EndpointAddress{{ + IP: ep2IP.String(), + }}, + Ports: []corev1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }}, + }) } - eps = append(eps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)) } + eps = append(eps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)) makeEndpointsMap(fp, eps...) groupID, _ := fp.groupCounter.Get(svcPortName, false) @@ -394,11 +418,19 @@ func TestNodePortIPv6ExternalLocal(t *testing.T) { } func TestClusterIPv4(t *testing.T) { - testClusterIP(t, svcIPv4, ep1IPv4, false) + testClusterIP(t, svcIPv4, ep1IPv4, nil, false, false) } func TestClusterIPv6(t *testing.T) { - testClusterIP(t, svcIPv6, ep1IPv6, true) + testClusterIP(t, svcIPv6, ep1IPv6, nil, true, false) +} + +func TestClusterIPv4InternalLocal(t *testing.T) { + testClusterIP(t, svcIPv4, ep1IPv4, ep2IPv4, false, true) +} + +func TestClusterIPv6InternalLocal(t *testing.T) { + testClusterIP(t, svcIPv6, ep1IPv6, ep2IPv6, true, true) } func TestDualStackService(t *testing.T) { @@ -406,8 +438,8 @@ func TestDualStackService(t *testing.T) { defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) mockRouteClient := routemock.NewMockInterface(ctrl) - fpv4 := NewFakeProxier(mockRouteClient, mockOFClient, nil, false, false) - fpv6 := NewFakeProxier(mockRouteClient, mockOFClient, nil, true, false) + fpv4 := NewFakeProxier(mockRouteClient, mockOFClient, nil, false, false, false) + fpv6 := NewFakeProxier(mockRouteClient, mockOFClient, nil, true, false, false) metaProxier := k8sproxy.NewMetaProxier(fpv4, fpv6) svcPort := 80 @@ -482,7 +514,7 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) mockRouteClient := routemock.NewMockInterface(ctrl) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, isIPv6, true) + fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, isIPv6, true, false) svcPort := 80 svcPortName := k8sproxy.ServicePortName{ @@ -548,7 +580,7 @@ func testClusterIPNoEndpoint(t *testing.T, svcIP net.IP, isIPv6 bool) { defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) mockRouteClient := routemock.NewMockInterface(ctrl) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, isIPv6, false) + fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, isIPv6, false, false) svcPort := 80 svcNodePort := 3001 @@ -586,7 +618,7 @@ func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) mockRouteClient := routemock.NewMockInterface(ctrl) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, isIPv6, false) + fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, isIPv6, false, false) svcPort := 80 svcPortName := k8sproxy.ServicePortName{ @@ -678,7 +710,7 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) mockRouteClient := routemock.NewMockInterface(ctrl) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, isIPv6, false) + fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, isIPv6, false, false) svcPort := 80 svcPortName := k8sproxy.ServicePortName{ @@ -738,7 +770,7 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) mockRouteClient := routemock.NewMockInterface(ctrl) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, isIPv6, false) + fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, isIPv6, false, false) svcPort := 80 svcNodePort := 3001 @@ -806,7 +838,7 @@ func testSessionAffinity(t *testing.T, svcExternalIPs net.IP, svcIP net.IP, isIP ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(nil, mockOFClient, nil, isIPv6, false) + fp := NewFakeProxier(nil, mockOFClient, nil, isIPv6, false, false) svcPort := 80 svcNodePort := 3001 @@ -854,7 +886,7 @@ func testPortChange(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) mockRouteClient := routemock.NewMockInterface(ctrl) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, isIPv6, false) + fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, isIPv6, false, false) svcPort1 := 80 svcPort2 := 8080 @@ -928,7 +960,7 @@ func TestServicesWithSameEndpoints(t *testing.T) { defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) mockRouteClient := routemock.NewMockInterface(ctrl) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, false, false) + fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, false, false, false) epIP := net.ParseIP("10.50.60.71") svcIP1 := net.ParseIP("10.180.30.41") svcIP2 := net.ParseIP("10.180.30.42") @@ -1027,7 +1059,7 @@ func TestMetrics(t *testing.T) { endpointsInstallMetric = metrics.EndpointsInstalledTotalV6.GaugeMetric servicesInstallMetric = metrics.ServicesInstalledTotalV6.GaugeMetric } - testClusterIP(t, net.ParseIP(tc.svcIP), net.ParseIP(tc.epIP), tc.isIPv6) + testClusterIP(t, net.ParseIP(tc.svcIP), net.ParseIP(tc.epIP), nil, tc.isIPv6, false) v, err := testutil.GetCounterMetricValue(endpointsUpdateTotalMetric) assert.NoError(t, err) assert.Equal(t, 0, int(v)) diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index 4ed927db192..b8d5fa13dc3 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -39,6 +39,11 @@ const ( // flag will not take effect. EndpointSlice featuregate.Feature = "EndpointSlice" + // alpha: v1.4 + // Enable ServiceInternalTrafficPolicy support in AntreaProxy. If AntreaProxy is not + // enabled, this flag will not take effect. + ServiceInternalTrafficPolicy featuregate.Feature = "ServiceInternalTrafficPolicy" + // alpha: v0.8 // beta: v0.11 // Enable antrea proxy which provides ServiceLB for in-cluster services in antrea agent. @@ -85,15 +90,16 @@ var ( // To add a new feature, define a key for it above and add it here. The features will be // available throughout Antrea binaries. DefaultAntreaFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ - AntreaPolicy: {Default: true, PreRelease: featuregate.Beta}, - AntreaProxy: {Default: true, PreRelease: featuregate.Beta}, - Egress: {Default: false, PreRelease: featuregate.Alpha}, - EndpointSlice: {Default: false, PreRelease: featuregate.Alpha}, - Traceflow: {Default: true, PreRelease: featuregate.Beta}, - FlowExporter: {Default: false, PreRelease: featuregate.Alpha}, - NetworkPolicyStats: {Default: true, PreRelease: featuregate.Beta}, - NodePortLocal: {Default: false, PreRelease: featuregate.Alpha}, - NodeIPAM: {Default: false, PreRelease: featuregate.Alpha}, + AntreaPolicy: {Default: true, PreRelease: featuregate.Beta}, + AntreaProxy: {Default: true, PreRelease: featuregate.Beta}, + ServiceInternalTrafficPolicy: {Default: false, PreRelease: featuregate.Alpha}, + Egress: {Default: false, PreRelease: featuregate.Alpha}, + EndpointSlice: {Default: false, PreRelease: featuregate.Alpha}, + Traceflow: {Default: true, PreRelease: featuregate.Beta}, + FlowExporter: {Default: false, PreRelease: featuregate.Alpha}, + NetworkPolicyStats: {Default: true, PreRelease: featuregate.Beta}, + NodePortLocal: {Default: false, PreRelease: featuregate.Alpha}, + NodeIPAM: {Default: false, PreRelease: featuregate.Alpha}, } // UnsupportedFeaturesOnWindows records the features not supported on