Skip to content

Commit

Permalink
bgpv1: Local internalTrafficPolicy support for ClusterIP advertisements
Browse files Browse the repository at this point in the history
Fixes: 31389

Signed-off-by: chaunceyjiang <chaunceyjiang@gmail.com>
  • Loading branch information
chaunceyjiang committed Mar 20, 2024
1 parent 6ddd70c commit 74c7fe0
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 95 deletions.
7 changes: 6 additions & 1 deletion Documentation/network/bgp-control-plane.rst
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,15 @@ defaults to the ``LoadBalancerIP`` service. You can also specify the ``.serviceA
field to advertise specific service types, with options such as ``LoadBalancerIP``,
``ClusterIP`` and ``ExternalIP``.

It is worth noting that when you configure ``virtualRouters[*].serviceAdvertisements`` as ``ClusterIP``,
the BGP Control Plane will only consider the configuration of the service's ``.spec.internalTrafficPolicy`` and ignore
the configuration of ``.spec.externalTrafficPolicy`` of Service.
For ``ExternalIP`` and ``LoadBalancerIP``,it will only consider the configuration of
the service's ``.spec.externalTrafficPolicy`` and ignore the configuration of ``.spec.internalTrafficPolicy``.

The ``.serviceSelector`` field is a label selector that selects Services matching
the specified ``.matchLabels`` or ``.matchExpressions``.


When your upstream router supports Equal Cost Multi Path(ECMP), you can use
this feature to load balance traffic to the Service across multiple nodes by
advertising the same ingress IPs from multiple nodes.
Expand Down
142 changes: 83 additions & 59 deletions pkg/bgpv1/manager/reconciler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (r *ServiceReconciler) populateLocalServices(localNodeName string) (localSe

endpointsLoop:
for _, eps := range epList {
svc, exists, err := r.resolveSvcFromEndpoints(eps)
_, exists, err := r.resolveSvcFromEndpoints(eps)
if err != nil {
// Cannot resolve service from endpoints. We have nothing to do here.
continue
Expand All @@ -129,11 +129,6 @@ endpointsLoop:
continue
}

// We only need Endpoints tracking for externalTrafficPolicy=Local
if svc.Spec.ExternalTrafficPolicy != slim_corev1.ServiceExternalTrafficPolicyLocal {
continue
}

svcID := eps.ServiceID

for _, be := range eps.Backends {
Expand Down Expand Up @@ -295,72 +290,101 @@ func (r *ServiceReconciler) svcDesiredRoutes(newc *v2alpha1api.CiliumBGPVirtualR
if !svcSelector.Matches(serviceLabelSet(svc)) {
return nil, nil
}
// Ignore externalTrafficPolicy == Local && no local endpoints.
if svc.Spec.ExternalTrafficPolicy == slim_corev1.ServiceExternalTrafficPolicyLocal &&
!hasLocalEndpoints(svc, ls) {
return nil, nil
}

var desiredRoutes []netip.Prefix
// Loop over the service advertisements and determine the desired routes.
for _, svcAdv := range newc.ServiceAdvertisements {
switch svcAdv {
case v2alpha1api.BGPLoadBalancerIPAddr:
if svc.Spec.Type != slim_corev1.ServiceTypeLoadBalancer {
continue
}
// Ignore service managed by an unsupported LB class.
if svc.Spec.LoadBalancerClass != nil && *svc.Spec.LoadBalancerClass != v2alpha1api.BGPLoadBalancerClass {
// The service is managed by a different LB class.
continue
}
for _, ingress := range svc.Status.LoadBalancer.Ingress {
if ingress.IP == "" {
continue
}
addr, err := netip.ParseAddr(ingress.IP)
if err != nil {
continue
}
desiredRoutes = append(desiredRoutes, netip.PrefixFrom(addr, addr.BitLen()))
}
desiredRoutes = append(desiredRoutes, r.lbSvcDesiredRoutes(svc, ls)...)
case v2alpha1api.BGPClusterIPAddr:
if svc.Spec.ClusterIP == "" || len(svc.Spec.ClusterIPs) == 0 || svc.Spec.ClusterIP == corev1.ClusterIPNone {
continue
}
ips := sets.New[string]()
if svc.Spec.ClusterIP != "" {
ips.Insert(svc.Spec.ClusterIP)
}
for _, clusterIP := range svc.Spec.ClusterIPs {
if clusterIP == "" || clusterIP == corev1.ClusterIPNone {
continue
}
ips.Insert(clusterIP)
}
for _, ip := range sets.List(ips) {
addr, err := netip.ParseAddr(ip)
if err != nil {
continue
}
desiredRoutes = append(desiredRoutes, netip.PrefixFrom(addr, addr.BitLen()))
}
desiredRoutes = append(desiredRoutes, r.clusterIPDesiredRoutes(svc, ls)...)
case v2alpha1api.BGPExternalIPAddr:
for _, extIP := range svc.Spec.ExternalIPs {
if extIP == "" {
continue
}
addr, err := netip.ParseAddr(extIP)
if err != nil {
continue
}
desiredRoutes = append(desiredRoutes, netip.PrefixFrom(addr, addr.BitLen()))
}
desiredRoutes = append(desiredRoutes, r.externalIPDesiredRoutes(svc, ls)...)
}
}

return desiredRoutes, err
}

func (r *ServiceReconciler) externalIPDesiredRoutes(svc *slim_corev1.Service, ls localServices) []netip.Prefix {
var desiredRoutes []netip.Prefix
// Ignore externalTrafficPolicy == Local && no local endpoints.
if svc.Spec.ExternalTrafficPolicy == slim_corev1.ServiceExternalTrafficPolicyLocal &&
!hasLocalEndpoints(svc, ls) {
return desiredRoutes
}
for _, extIP := range svc.Spec.ExternalIPs {
if extIP == "" {
continue
}
addr, err := netip.ParseAddr(extIP)
if err != nil {
continue
}
desiredRoutes = append(desiredRoutes, netip.PrefixFrom(addr, addr.BitLen()))
}
return desiredRoutes
}

func (r *ServiceReconciler) clusterIPDesiredRoutes(svc *slim_corev1.Service, ls localServices) []netip.Prefix {
var desiredRoutes []netip.Prefix
// Ignore internalTrafficPolicy == Local && no local endpoints.
if svc.Spec.InternalTrafficPolicy != nil && *svc.Spec.InternalTrafficPolicy == slim_corev1.ServiceInternalTrafficPolicyLocal &&
!hasLocalEndpoints(svc, ls) {
return desiredRoutes
}
if svc.Spec.ClusterIP == "" || len(svc.Spec.ClusterIPs) == 0 || svc.Spec.ClusterIP == corev1.ClusterIPNone {
return desiredRoutes
}
ips := sets.New[string]()
if svc.Spec.ClusterIP != "" {
ips.Insert(svc.Spec.ClusterIP)
}
for _, clusterIP := range svc.Spec.ClusterIPs {
if clusterIP == "" || clusterIP == corev1.ClusterIPNone {
continue
}
ips.Insert(clusterIP)
}
for _, ip := range sets.List(ips) {
addr, err := netip.ParseAddr(ip)
if err != nil {
continue
}
desiredRoutes = append(desiredRoutes, netip.PrefixFrom(addr, addr.BitLen()))
}
return desiredRoutes
}

func (r *ServiceReconciler) lbSvcDesiredRoutes(svc *slim_corev1.Service, ls localServices) []netip.Prefix {
var desiredRoutes []netip.Prefix
if svc.Spec.Type != slim_corev1.ServiceTypeLoadBalancer {
return desiredRoutes
}
// Ignore externalTrafficPolicy == Local && no local endpoints.
if svc.Spec.ExternalTrafficPolicy == slim_corev1.ServiceExternalTrafficPolicyLocal &&
!hasLocalEndpoints(svc, ls) {
return desiredRoutes
}
// Ignore service managed by an unsupported LB class.
if svc.Spec.LoadBalancerClass != nil && *svc.Spec.LoadBalancerClass != v2alpha1api.BGPLoadBalancerClass {
// The service is managed by a different LB class.
return desiredRoutes
}
for _, ingress := range svc.Status.LoadBalancer.Ingress {
if ingress.IP == "" {
continue
}
addr, err := netip.ParseAddr(ingress.IP)
if err != nil {
continue
}
desiredRoutes = append(desiredRoutes, netip.PrefixFrom(addr, addr.BitLen()))
}
return desiredRoutes
}

// reconcileService gets the desired routes of a given service and makes sure that is what is being announced.
func (r *ServiceReconciler) reconcileService(ctx context.Context, sc *instance.ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, svc *slim_corev1.Service, ls localServices) error {

Expand Down
71 changes: 36 additions & 35 deletions pkg/bgpv1/manager/reconciler/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,15 +739,16 @@ func TestServiceReconcilerWithClusterIP(t *testing.T) {
svc1NonClusterIP := svc1.DeepCopy()
svc1NonClusterIP.Spec.ClusterIP = "None"
svc1NonClusterIP.Spec.ClusterIPs = append(svc1NonClusterIP.Spec.ClusterIPs, "None")
svc1ETPLocal := svc1.DeepCopy()
svc1ETPLocal.Spec.ExternalTrafficPolicy = slim_corev1.ServiceExternalTrafficPolicyLocal
svc1ITPLocal := svc1.DeepCopy()
internalTrafficPolicyLocal := slim_corev1.ServiceInternalTrafficPolicyLocal
svc1ITPLocal.Spec.InternalTrafficPolicy = &internalTrafficPolicyLocal

svc1ETPLocalTwoIngress := svc1TwoIngress.DeepCopy()
svc1ETPLocalTwoIngress.Spec.ExternalTrafficPolicy = slim_corev1.ServiceExternalTrafficPolicyLocal
svc1ITPLocalTwoIngress := svc1TwoIngress.DeepCopy()
svc1ITPLocalTwoIngress.Spec.InternalTrafficPolicy = &internalTrafficPolicyLocal

svc1IPv6ETPLocal := svc1.DeepCopy()
svc1IPv6ETPLocal.Spec.ClusterIPs = append(svc1IPv6ETPLocal.Spec.ClusterIPs, clusterIPV6)
svc1IPv6ETPLocal.Spec.ExternalTrafficPolicy = slim_corev1.ServiceExternalTrafficPolicyLocal
svc1IPv6ITPLocal := svc1.DeepCopy()
svc1IPv6ITPLocal.Spec.ClusterIPs = append(svc1IPv6ITPLocal.Spec.ClusterIPs, clusterIPV6)
svc1IPv6ITPLocal.Spec.InternalTrafficPolicy = &internalTrafficPolicyLocal

svc1LbClass := svc1.DeepCopy()
svc1LbClass.Spec.LoadBalancerClass = pointer.String(v2alpha1api.BGPLoadBalancerClass)
Expand Down Expand Up @@ -1096,55 +1097,55 @@ func TestServiceReconcilerWithClusterIP(t *testing.T) {
oldServiceSelector: &blueSelector,
newServiceSelector: &blueSelector,
advertised: map[resource.Key][]string{},
upsertedServices: []*slim_corev1.Service{svc1ETPLocal},
upsertedServices: []*slim_corev1.Service{svc1ITPLocal},
upsertedEndpoints: []*k8s.Endpoints{},
updated: map[resource.Key][]string{},
},
// externalTrafficPolicy=Local && IPv4 && single slice && local endpoint
// internalTrafficPolicyLocal=Local && IPv4 && single slice && local endpoint
{
name: "etp-local-ipv4-single-slice-local",
name: "itp-local-ipv4-single-slice-local",
oldServiceSelector: &blueSelector,
newServiceSelector: &blueSelector,
advertised: map[resource.Key][]string{},
upsertedServices: []*slim_corev1.Service{svc1ETPLocal},
upsertedServices: []*slim_corev1.Service{svc1ITPLocal},
upsertedEndpoints: []*k8s.Endpoints{eps1IPv4Local},
updated: map[resource.Key][]string{
svc1Name: {
clusterIPV4Prefix,
},
},
},
// externalTrafficPolicy=Local && IPv4 && single slice && remote endpoint
// internalTrafficPolicyLocal=Local && IPv4 && single slice && remote endpoint
{
name: "etp-local-ipv4-single-slice-remote",
name: "itp-local-ipv4-single-slice-remote",
oldServiceSelector: &blueSelector,
newServiceSelector: &blueSelector,
advertised: map[resource.Key][]string{},
upsertedServices: []*slim_corev1.Service{svc1ETPLocal},
upsertedServices: []*slim_corev1.Service{svc1ITPLocal},
upsertedEndpoints: []*k8s.Endpoints{eps1IPv4Remote},
updated: map[resource.Key][]string{},
},
// externalTrafficPolicy=Local && IPv4 && single slice && mixed endpoint
// internalTrafficPolicyLocal=Local && IPv4 && single slice && mixed endpoint
{
name: "etp-local-ipv4-single-slice-mixed",
name: "itp-local-ipv4-single-slice-mixed",
oldServiceSelector: &blueSelector,
newServiceSelector: &blueSelector,
advertised: map[resource.Key][]string{},
upsertedServices: []*slim_corev1.Service{svc1ETPLocal},
upsertedServices: []*slim_corev1.Service{svc1ITPLocal},
upsertedEndpoints: []*k8s.Endpoints{eps1IPv4Mixed},
updated: map[resource.Key][]string{
svc1Name: {
clusterIPV4Prefix,
},
},
},
// externalTrafficPolicy=Local && IPv6 && single slice && local endpoint
// internalTrafficPolicyLocal=Local && IPv6 && single slice && local endpoint
{
name: "etp-local-ipv6-single-slice-local",
name: "itp-local-ipv6-single-slice-local",
oldServiceSelector: &blueSelector,
newServiceSelector: &blueSelector,
advertised: map[resource.Key][]string{},
upsertedServices: []*slim_corev1.Service{svc1IPv6ETPLocal},
upsertedServices: []*slim_corev1.Service{svc1IPv6ITPLocal},
upsertedEndpoints: []*k8s.Endpoints{eps1IPv6Local},
updated: map[resource.Key][]string{
svc1Name: {
Expand All @@ -1153,23 +1154,23 @@ func TestServiceReconcilerWithClusterIP(t *testing.T) {
},
},
},
// externalTrafficPolicy=Local && IPv6 && single slice && remote endpoint
// internalTrafficPolicyLocal=Local && IPv6 && single slice && remote endpoint
{
name: "etp-local-ipv6-single-slice-remote",
name: "itp-local-ipv6-single-slice-remote",
oldServiceSelector: &blueSelector,
newServiceSelector: &blueSelector,
advertised: map[resource.Key][]string{},
upsertedServices: []*slim_corev1.Service{svc1IPv6ETPLocal},
upsertedServices: []*slim_corev1.Service{svc1IPv6ITPLocal},
upsertedEndpoints: []*k8s.Endpoints{eps1IPv6Remote},
updated: map[resource.Key][]string{},
},
// externalTrafficPolicy=Local && IPv6 && single slice && mixed endpoint
// internalTrafficPolicyLocal=Local && IPv6 && single slice && mixed endpoint
{
name: "etp-local-ipv6-single-slice-mixed",
name: "itp-local-ipv6-single-slice-mixed",
oldServiceSelector: &blueSelector,
newServiceSelector: &blueSelector,
advertised: map[resource.Key][]string{},
upsertedServices: []*slim_corev1.Service{svc1IPv6ETPLocal},
upsertedServices: []*slim_corev1.Service{svc1IPv6ITPLocal},
upsertedEndpoints: []*k8s.Endpoints{eps1IPv6Mixed},
updated: map[resource.Key][]string{
svc1Name: {
Expand All @@ -1178,13 +1179,13 @@ func TestServiceReconcilerWithClusterIP(t *testing.T) {
},
},
},
// externalTrafficPolicy=Local && Dual && two slices && local endpoint
// internalTrafficPolicyLocal=Local && Dual && two slices && local endpoint
{
name: "etp-local-dual-two-slices-local",
name: "itp-local-dual-two-slices-local",
oldServiceSelector: &blueSelector,
newServiceSelector: &blueSelector,
advertised: map[resource.Key][]string{},
upsertedServices: []*slim_corev1.Service{svc1ETPLocalTwoIngress},
upsertedServices: []*slim_corev1.Service{svc1ITPLocalTwoIngress},
upsertedEndpoints: []*k8s.Endpoints{
eps1IPv4Local,
eps1IPv6Local,
Expand All @@ -1196,13 +1197,13 @@ func TestServiceReconcilerWithClusterIP(t *testing.T) {
},
},
},
// externalTrafficPolicy=Local && Dual && two slices && remote endpoint
// internalTrafficPolicyLocal=Local && Dual && two slices && remote endpoint
{
name: "etp-local-dual-two-slices-remote",
name: "itp-local-dual-two-slices-remote",
oldServiceSelector: &blueSelector,
newServiceSelector: &blueSelector,
advertised: map[resource.Key][]string{},
upsertedServices: []*slim_corev1.Service{svc1ETPLocalTwoIngress},
upsertedServices: []*slim_corev1.Service{svc1ITPLocalTwoIngress},
upsertedEndpoints: []*k8s.Endpoints{
eps1IPv4Remote,
eps1IPv6Remote,
Expand All @@ -1211,13 +1212,13 @@ func TestServiceReconcilerWithClusterIP(t *testing.T) {
svc1Name: {},
},
},
// externalTrafficPolicy=Local && Dual && two slices && mixed endpoint
// internalTrafficPolicyLocal=Local && Dual && two slices && mixed endpoint
{
name: "etp-local-dual-two-slices-mixed",
name: "itp-local-dual-two-slices-mixed",
oldServiceSelector: &blueSelector,
newServiceSelector: &blueSelector,
advertised: map[resource.Key][]string{},
upsertedServices: []*slim_corev1.Service{svc1ETPLocalTwoIngress},
upsertedServices: []*slim_corev1.Service{svc1ITPLocalTwoIngress},
upsertedEndpoints: []*k8s.Endpoints{
eps1IPv4Mixed,
eps1IPv6Mixed,
Expand Down

0 comments on commit 74c7fe0

Please sign in to comment.