Skip to content

Commit

Permalink
feat: fully support service traffic policies
Browse files Browse the repository at this point in the history
Adds support for spec.internalTrafficPolicy and fixes support for
spec.externalTrafficPolicy so that it only effects external traffic.

Keeps existing support for kube-router.io/service-local annotation which
overrides both to local when set to true. Any other value in this
annotation is ignored.
  • Loading branch information
aauren committed Jan 24, 2024
1 parent 8404260 commit fcd21b4
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 89 deletions.
40 changes: 26 additions & 14 deletions pkg/controllers/proxy/network_services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ type serviceInfo struct {
skipLbIps bool
externalIPs []string
loadBalancerIPs []string
local bool
intTrafficPolicy *v1.ServiceInternalTrafficPolicy
extTrafficPolicy *v1.ServiceExternalTrafficPolicy
flags schedFlags
}

Expand Down Expand Up @@ -914,18 +915,21 @@ func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap {
continue
}

intClusterPolicyDefault := v1.ServiceInternalTrafficPolicyCluster
extClusterPolicyDefault := v1.ServiceExternalTrafficPolicyCluster
for _, port := range svc.Spec.Ports {
svcInfo := serviceInfo{
clusterIP: net.ParseIP(svc.Spec.ClusterIP),
clusterIPs: make([]string, len(svc.Spec.ClusterIPs)),
port: int(port.Port),
targetPort: port.TargetPort.String(),
protocol: strings.ToLower(string(port.Protocol)),
nodePort: int(port.NodePort),
name: svc.ObjectMeta.Name,
namespace: svc.ObjectMeta.Namespace,
externalIPs: make([]string, len(svc.Spec.ExternalIPs)),
local: false,
clusterIP: net.ParseIP(svc.Spec.ClusterIP),
clusterIPs: make([]string, len(svc.Spec.ClusterIPs)),
port: int(port.Port),
targetPort: port.TargetPort.String(),
protocol: strings.ToLower(string(port.Protocol)),
nodePort: int(port.NodePort),
name: svc.ObjectMeta.Name,
namespace: svc.ObjectMeta.Namespace,
externalIPs: make([]string, len(svc.Spec.ExternalIPs)),
intTrafficPolicy: &intClusterPolicyDefault,
extTrafficPolicy: &extClusterPolicyDefault,
}
dsrMethod, ok := svc.ObjectMeta.Annotations[svcDSRAnnotation]
if ok {
Expand Down Expand Up @@ -971,10 +975,18 @@ func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap {
}
_, svcInfo.hairpin = svc.ObjectMeta.Annotations[svcHairpinAnnotation]
_, svcInfo.hairpinExternalIPs = svc.ObjectMeta.Annotations[svcHairpinExternalIPsAnnotation]
_, svcInfo.local = svc.ObjectMeta.Annotations[svcLocalAnnotation]
_, svcInfo.skipLbIps = svc.ObjectMeta.Annotations[svcSkipLbIpsAnnotation]
if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal {
svcInfo.local = true
svcInfo.intTrafficPolicy = svc.Spec.InternalTrafficPolicy
svcInfo.extTrafficPolicy = &svc.Spec.ExternalTrafficPolicy

// The kube-router.io/service.local annotation has the ability to override the internal and external traffic
// policy that is set in the spec. In this case we set both to local when the annotation is true so that
// previous functionality of the annotation is best preserved.
if svc.ObjectMeta.Annotations[svcLocalAnnotation] == "true" {
intTrafficPolicyLocal := v1.ServiceInternalTrafficPolicyLocal
extTrafficPolicyLocal := v1.ServiceExternalTrafficPolicyLocal
svcInfo.intTrafficPolicy = &intTrafficPolicyLocal
svcInfo.extTrafficPolicy = &extTrafficPolicyLocal
}

svcID := generateServiceID(svc.Namespace, svc.Name, port.Name)
Expand Down
31 changes: 19 additions & 12 deletions pkg/controllers/proxy/service_endpoints_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (nsc *NetworkServicesController) setupClusterIPServices(serviceInfoMap serv
endpoints := endpointsInfoMap[k]
// First we check to see if this is a local service and that it has any active endpoints, if it doesn't there
// isn't any use doing any of the below work, let's save some compute cycles and break fast
if svc.local && !hasActiveEndpoints(endpoints) {
if *svc.intTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal && !hasActiveEndpoints(endpoints) {
klog.V(1).Infof("Skipping setting up ClusterIP service %s/%s as it does not have active endpoints",
svc.namespace, svc.name)
continue
Expand Down Expand Up @@ -159,7 +159,7 @@ func (nsc *NetworkServicesController) setupClusterIPServices(serviceInfoMap serv
}

// add IPVS remote server to the IPVS service
nsc.addEndpointsToIPVSService(endpoints, activeServiceEndpointMap, svc, svcID, ipvsSvc, clusterIP)
nsc.addEndpointsToIPVSService(endpoints, activeServiceEndpointMap, svc, svcID, ipvsSvc, clusterIP, true)
}
}
}
Expand All @@ -185,7 +185,8 @@ func (nsc *NetworkServicesController) addIPVSService(ipvsSvcs []*ipvs.Service, s
}

func (nsc *NetworkServicesController) addEndpointsToIPVSService(endpoints []endpointSliceInfo,
svcEndpointMap map[string][]string, svc *serviceInfo, svcID string, ipvsSvc *ipvs.Service, vip net.IP) {
svcEndpointMap map[string][]string, svc *serviceInfo, svcID string, ipvsSvc *ipvs.Service, vip net.IP,
isClusterIP bool) {
var family v1.IPFamily
if vip.To4() != nil {
family = v1.IPv4Protocol
Expand All @@ -201,9 +202,14 @@ func (nsc *NetworkServicesController) addEndpointsToIPVSService(endpoints []endp
// 1) Service is not a local service
// 2) Service is a local service, but has no active endpoints on this node
// 3) Service is a local service, has active endpoints on this node, and this endpoint is one of them
if svc.local && !endpoint.isLocal {
klog.V(2).Info("service is local, but endpoint is not, continuing...")
continue
if !endpoint.isLocal {
if isClusterIP && *svc.intTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal {
klog.V(2).Info("service has an internal traffic policy of local, but endpoint is not, continuing...")
continue
} else if !isClusterIP && *svc.extTrafficPolicy == v1.ServiceExternalTrafficPolicyLocal {
klog.V(2).Info("service has an external traffic policy of local, but endpoint is not, continuing...")
continue
}
}
var syscallINET uint16
eIP := net.ParseIP(endpoint.ip)
Expand Down Expand Up @@ -260,7 +266,7 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi
endpoints := endpointsInfoMap[k]
// First we check to see if this is a local service and that it has any active endpoints, if it doesn't there
// isn't any use doing any of the below work, let's save some compute cycles and break fast
if svc.local && !hasActiveEndpoints(endpoints) {
if *svc.extTrafficPolicy == v1.ServiceExternalTrafficPolicyLocal && !hasActiveEndpoints(endpoints) {
klog.V(1).Infof("Skipping setting up NodePort service %s/%s as it does not have active endpoints",
svc.namespace, svc.name)
continue
Expand Down Expand Up @@ -301,7 +307,7 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi
if svcID == "" {
continue
}
nsc.addEndpointsToIPVSService(endpoints, activeServiceEndpointMap, svc, svcID, ipvsSvc, addr)
nsc.addEndpointsToIPVSService(endpoints, activeServiceEndpointMap, svc, svcID, ipvsSvc, addr, false)
}
}
} else {
Expand All @@ -311,7 +317,8 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi
if svcID == "" {
continue
}
nsc.addEndpointsToIPVSService(endpoints, activeServiceEndpointMap, svc, svcID, ipvsSvc, nsc.primaryIP)
nsc.addEndpointsToIPVSService(endpoints, activeServiceEndpointMap, svc, svcID, ipvsSvc, nsc.primaryIP,
false)
}
}

Expand All @@ -324,7 +331,7 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser
endpoints := endpointsInfoMap[k]
// First we check to see if this is a local service and that it has any active endpoints, if it doesn't there
// isn't any use doing any of the below work, let's save some compute cycles and break fast
if svc.local && !hasActiveEndpoints(endpoints) {
if *svc.extTrafficPolicy == v1.ServiceExternalTrafficPolicyLocal && !hasActiveEndpoints(endpoints) {
klog.V(1).Infof("Skipping setting up IPVS service for external IP and LoadBalancer IP "+
"for the service %s/%s as it does not have active endpoints\n", svc.namespace, svc.name)
continue
Expand Down Expand Up @@ -427,7 +434,7 @@ func (nsc *NetworkServicesController) setupExternalIPForService(svc *serviceInfo
}

// add pod endpoints to the IPVS service
nsc.addEndpointsToIPVSService(endpoints, svcEndpointMap, svc, svcID, ipvsExternalIPSvc, externalIP)
nsc.addEndpointsToIPVSService(endpoints, svcEndpointMap, svc, svcID, ipvsExternalIPSvc, externalIP, false)

return nil
}
Expand Down Expand Up @@ -508,7 +515,7 @@ func (nsc *NetworkServicesController) setupExternalIPForDSRService(svc *serviceI
// 1) Service is not a local service
// 2) Service is a local service, but has no active endpoints on this node
// 3) Service is a local service, has active endpoints on this node, and this endpoint is one of them
if svc.local && !endpoint.isLocal {
if *svc.extTrafficPolicy == v1.ServiceExternalTrafficPolicyLocal && !endpoint.isLocal {
continue
}
var syscallINET uint16
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/routing/bgp_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (nrc *NetworkRoutingController) addServiceVIPsDefinedSet() error {
}

advIPPrefixList := make([]*gobgpapi.Prefix, 0)
advIps, _, _ := nrc.getAllVIPs()
advIps, _, _ := nrc.getVIPs()
for _, ipStr := range advIps {
ip := net.ParseIP(ipStr)
if ip == nil {
Expand Down
Loading

0 comments on commit fcd21b4

Please sign in to comment.