Skip to content

Commit

Permalink
bgpv1: ClusterIP advertisement with BGP Control Plane
Browse files Browse the repository at this point in the history
Fixes: #30875

Signed-off-by: chaunceyjiang <chaunceyjiang@gmail.com>
  • Loading branch information
chaunceyjiang committed Mar 7, 2024
1 parent 4082bc3 commit e3ebcb4
Show file tree
Hide file tree
Showing 13 changed files with 1,005 additions and 58 deletions.
4 changes: 1 addition & 3 deletions operator/pkg/bgpv2/bgpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ func (b *BGPResourceManager) reconcileBGPPAdvertisement(ctx context.Context, bgp
advertisements = append(advertisements, cilium_api_v2alpha1.BGPAdvertisement{
AdvertisementType: cilium_api_v2alpha1.BGPServiceAdvert,
Service: &cilium_api_v2alpha1.BGPServiceOptions{
Addresses: []cilium_api_v2alpha1.BGPServiceAddressType{
cilium_api_v2alpha1.BGPLoadBalancerIPAddr,
},
Addresses: vr.ServiceAdvertisements,
},
Selector: vr.ServiceSelector,
Attributes: getAttributes(neigh, cilium_api_v2alpha1.CiliumLoadBalancerIPPoolSelectorName),
Expand Down
4 changes: 2 additions & 2 deletions pkg/bgpv1/manager/reconciler/preflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestReconcileAfterServerReinit(t *testing.T) {
require.NoError(t, err)

diffstore.Upsert(obj)
reconciler := NewLBServiceReconciler(diffstore, epDiffStore)
reconciler := NewServiceReconciler(diffstore, epDiffStore)
err = reconciler.Reconciler.Reconcile(context.Background(), params)
require.NoError(t, err)

Expand All @@ -267,7 +267,7 @@ func TestReconcileAfterServerReinit(t *testing.T) {
require.NoError(t, err)

// Update LB service
reconciler = NewLBServiceReconciler(diffstore, epDiffStore)
reconciler = NewServiceReconciler(diffstore, epDiffStore)
err = reconciler.Reconciler.Reconcile(context.Background(), params)
require.NoError(t, err)
}
2 changes: 1 addition & 1 deletion pkg/bgpv1/manager/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var ConfigReconcilers = cell.Provide(
NewNeighborReconciler,
NewExportPodCIDRReconciler,
NewPodIPPoolReconciler,
NewLBServiceReconciler,
NewServiceReconciler,
NewRoutePolicyReconciler,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"slices"

"golang.org/x/exp/maps"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/cilium/cilium/pkg/bgpv1/manager/instance"
"github.com/cilium/cilium/pkg/bgpv1/manager/store"
Expand All @@ -30,7 +32,7 @@ type LBServiceReconcilerOut struct {
Reconciler ConfigReconciler `group:"bgp-config-reconciler"`
}

type LBServiceReconciler struct {
type ServiceReconciler struct {
diffStore store.DiffStore[*slim_corev1.Service]
epDiffStore store.DiffStore[*k8s.Endpoints]
}
Expand All @@ -40,30 +42,30 @@ type LBServiceReconcilerMetadata map[resource.Key][]*types.Path

type localServices map[k8s.ServiceID]struct{}

func NewLBServiceReconciler(diffStore store.DiffStore[*slim_corev1.Service], epDiffStore store.DiffStore[*k8s.Endpoints]) LBServiceReconcilerOut {
func NewServiceReconciler(diffStore store.DiffStore[*slim_corev1.Service], epDiffStore store.DiffStore[*k8s.Endpoints]) LBServiceReconcilerOut {
if diffStore == nil {
return LBServiceReconcilerOut{}
}

return LBServiceReconcilerOut{
Reconciler: &LBServiceReconciler{
Reconciler: &ServiceReconciler{
diffStore: diffStore,
epDiffStore: epDiffStore,
},
}
}

func (r *LBServiceReconciler) Name() string {
return "LBService"
func (r *ServiceReconciler) Name() string {
return "Service"
}

func (r *LBServiceReconciler) Priority() int {
func (r *ServiceReconciler) Priority() int {
return 40
}

func (r *LBServiceReconciler) Reconcile(ctx context.Context, p ReconcileParams) error {
func (r *ServiceReconciler) Reconcile(ctx context.Context, p ReconcileParams) error {
if p.CiliumNode == nil {
return fmt.Errorf("attempted load balancer service reconciliation with nil local CiliumNode")
return fmt.Errorf("attempted service reconciliation with nil local CiliumNode")
}

ls, err := r.populateLocalServices(p.CiliumNode.Name)
Expand All @@ -77,14 +79,14 @@ func (r *LBServiceReconciler) Reconcile(ctx context.Context, p ReconcileParams)
return r.svcDiffReconciliation(ctx, p.CurrentServer, p.DesiredConfig, ls)
}

func (r *LBServiceReconciler) getMetadata(sc *instance.ServerWithConfig) LBServiceReconcilerMetadata {
func (r *ServiceReconciler) getMetadata(sc *instance.ServerWithConfig) LBServiceReconcilerMetadata {
if _, found := sc.ReconcilerMetadata[r.Name()]; !found {
sc.ReconcilerMetadata[r.Name()] = make(LBServiceReconcilerMetadata)
}
return sc.ReconcilerMetadata[r.Name()].(LBServiceReconcilerMetadata)
}

func (r *LBServiceReconciler) resolveSvcFromEndpoints(eps *k8s.Endpoints) (*slim_corev1.Service, bool, error) {
func (r *ServiceReconciler) resolveSvcFromEndpoints(eps *k8s.Endpoints) (*slim_corev1.Service, bool, error) {
k := resource.Key{
Name: eps.ServiceID.Name,
Namespace: eps.ServiceID.Namespace,
Expand All @@ -94,7 +96,7 @@ func (r *LBServiceReconciler) resolveSvcFromEndpoints(eps *k8s.Endpoints) (*slim

// requiresFullReconciliation returns true if the desired config requires full reconciliation
// (reconciliation of all services), false if partial (diff) reconciliation is sufficient.
func (r *LBServiceReconciler) requiresFullReconciliation(p ReconcileParams) bool {
func (r *ServiceReconciler) requiresFullReconciliation(p ReconcileParams) bool {
var existingSelector *slim_metav1.LabelSelector
if p.CurrentServer != nil && p.CurrentServer.Config != nil {
existingSelector = p.CurrentServer.Config.ServiceSelector
Expand All @@ -106,7 +108,7 @@ func (r *LBServiceReconciler) requiresFullReconciliation(p ReconcileParams) bool
}

// Populate locally available services used for externalTrafficPolicy=local handling
func (r *LBServiceReconciler) populateLocalServices(localNodeName string) (localServices, error) {
func (r *ServiceReconciler) populateLocalServices(localNodeName string) (localServices, error) {
ls := make(localServices)

epList, err := r.epDiffStore.List()
Expand Down Expand Up @@ -156,7 +158,7 @@ func hasLocalEndpoints(svc *slim_corev1.Service, ls localServices) bool {

// fullReconciliation reconciles all services, this is a heavy operation due to the potential amount of services and
// thus should be avoided if partial reconciliation is an option.
func (r *LBServiceReconciler) fullReconciliation(ctx context.Context, sc *instance.ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, ls localServices) error {
func (r *ServiceReconciler) fullReconciliation(ctx context.Context, sc *instance.ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, ls localServices) error {
toReconcile, toWithdraw, err := r.fullReconciliationServiceList(sc)
if err != nil {
return err
Expand All @@ -176,7 +178,7 @@ func (r *LBServiceReconciler) fullReconciliation(ctx context.Context, sc *instan

// svcDiffReconciliation performs reconciliation, only on services which have been created, updated or deleted since
// the last diff reconciliation.
func (r *LBServiceReconciler) svcDiffReconciliation(ctx context.Context, sc *instance.ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, ls localServices) error {
func (r *ServiceReconciler) svcDiffReconciliation(ctx context.Context, sc *instance.ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, ls localServices) error {
toReconcile, toWithdraw, err := r.diffReconciliationServiceList()
if err != nil {
return err
Expand All @@ -197,7 +199,7 @@ func (r *LBServiceReconciler) svcDiffReconciliation(ctx context.Context, sc *ins

// fullReconciliationServiceList return a list of services to reconcile and to withdraw when performing
// full service reconciliation.
func (r *LBServiceReconciler) fullReconciliationServiceList(sc *instance.ServerWithConfig) (toReconcile []*slim_corev1.Service, toWithdraw []resource.Key, err error) {
func (r *ServiceReconciler) fullReconciliationServiceList(sc *instance.ServerWithConfig) (toReconcile []*slim_corev1.Service, toWithdraw []resource.Key, err error) {
// Loop over all existing announcements, find announcements for services which no longer exist
serviceAnnouncements := r.getMetadata(sc)
for svcKey := range serviceAnnouncements {
Expand All @@ -223,7 +225,7 @@ func (r *LBServiceReconciler) fullReconciliationServiceList(sc *instance.ServerW

// diffReconciliationServiceList returns a list of services to reconcile and to withdraw when
// performing partial (diff) service reconciliation.
func (r *LBServiceReconciler) diffReconciliationServiceList() (toReconcile []*slim_corev1.Service, toWithdraw []resource.Key, err error) {
func (r *ServiceReconciler) diffReconciliationServiceList() (toReconcile []*slim_corev1.Service, toWithdraw []resource.Key, err error) {
upserted, deleted, err := r.diffStore.Diff()
if err != nil {
return nil, nil, fmt.Errorf("svc store diff: %w", err)
Expand Down Expand Up @@ -277,17 +279,12 @@ func (r *LBServiceReconciler) diffReconciliationServiceList() (toReconcile []*sl

// svcDesiredRoutes determines which, if any routes should be announced for the given service. This determines the
// desired state.
func (r *LBServiceReconciler) svcDesiredRoutes(newc *v2alpha1api.CiliumBGPVirtualRouter, svc *slim_corev1.Service, ls localServices) ([]netip.Prefix, error) {
func (r *ServiceReconciler) svcDesiredRoutes(newc *v2alpha1api.CiliumBGPVirtualRouter, svc *slim_corev1.Service, ls localServices) ([]netip.Prefix, error) {
if newc.ServiceSelector == nil {
// If the vRouter has no service selector, there are no desired routes.
return nil, nil
}

// Ignore non-loadbalancer services.
if svc.Spec.Type != slim_corev1.ServiceTypeLoadBalancer {
return nil, nil
}

// The vRouter has a service selector, so determine the desired routes.
svcSelector, err := slim_metav1.LabelSelectorAsSelector(newc.ServiceSelector)
if err != nil {
Expand All @@ -298,38 +295,63 @@ func (r *LBServiceReconciler) svcDesiredRoutes(newc *v2alpha1api.CiliumBGPVirtua
if !svcSelector.Matches(serviceLabelSet(svc)) {
return nil, nil
}

// Ignore service managed by an unsupported LB class.
if svc.Spec.LoadBalancerClass != nil && *svc.Spec.LoadBalancerClass != v2alpha1api.BGPLoadBalancerClass {
// The service is managed by a different LB class.
return nil, nil
}

// Ignore externalTrafficPolicy == Local && no local endpoints.
if svc.Spec.ExternalTrafficPolicy == slim_corev1.ServiceExternalTrafficPolicyLocal &&
!hasLocalEndpoints(svc, ls) {
return nil, nil
}

var desiredRoutes []netip.Prefix
for _, ingress := range svc.Status.LoadBalancer.Ingress {
if ingress.IP == "" {
continue
}

addr, err := netip.ParseAddr(ingress.IP)
if err != nil {
continue
// Loop over the service advertisements and determine the desired routes.
for _, svcAdv := range newc.ServiceAdvertisements {
switch svcAdv {
case v2alpha1api.BGPLoadBalancerIPAddr:
if svc.Spec.Type != slim_corev1.ServiceTypeLoadBalancer {
continue
}
// Ignore service managed by an unsupported LB class.
if svc.Spec.LoadBalancerClass != nil && *svc.Spec.LoadBalancerClass != v2alpha1api.BGPLoadBalancerClass {
// The service is managed by a different LB class.
continue
}
for _, ingress := range svc.Status.LoadBalancer.Ingress {
if ingress.IP == "" {
continue
}
addr, err := netip.ParseAddr(ingress.IP)
if err != nil {
continue
}
desiredRoutes = append(desiredRoutes, netip.PrefixFrom(addr, addr.BitLen()))
}
case v2alpha1api.BGPClusterIPAddr:
if svc.Spec.ClusterIP == "" || len(svc.Spec.ClusterIPs) == 0 || svc.Spec.ClusterIP == corev1.ClusterIPNone {
continue
}
ips := sets.New[string]()
if svc.Spec.ClusterIP != "" {
ips.Insert(svc.Spec.ClusterIP)
}
for _, clusterIP := range svc.Spec.ClusterIPs {
if clusterIP == "" || clusterIP == corev1.ClusterIPNone {
continue
}
ips.Insert(clusterIP)
}
for _, ip := range sets.List(ips) {
addr, err := netip.ParseAddr(ip)
if err != nil {
continue
}
desiredRoutes = append(desiredRoutes, netip.PrefixFrom(addr, addr.BitLen()))
}
}

desiredRoutes = append(desiredRoutes, netip.PrefixFrom(addr, addr.BitLen()))
}

return desiredRoutes, err
}

// reconcileService gets the desired routes of a given service and makes sure that is what is being announced.
func (r *LBServiceReconciler) reconcileService(ctx context.Context, sc *instance.ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, svc *slim_corev1.Service, ls localServices) error {
func (r *ServiceReconciler) reconcileService(ctx context.Context, sc *instance.ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, svc *slim_corev1.Service, ls localServices) error {

desiredRoutes, err := r.svcDesiredRoutes(newc, svc, ls)
if err != nil {
Expand All @@ -340,7 +362,7 @@ func (r *LBServiceReconciler) reconcileService(ctx context.Context, sc *instance

// reconcileServiceRoutes ensures that desired routes of a given service are announced,
// adding missing announcements or withdrawing unwanted ones.
func (r *LBServiceReconciler) reconcileServiceRoutes(ctx context.Context, sc *instance.ServerWithConfig, svc *slim_corev1.Service, desiredRoutes []netip.Prefix) error {
func (r *ServiceReconciler) reconcileServiceRoutes(ctx context.Context, sc *instance.ServerWithConfig, svc *slim_corev1.Service, desiredRoutes []netip.Prefix) error {
serviceAnnouncements := r.getMetadata(sc)
svcKey := resource.NewKey(svc)

Expand Down Expand Up @@ -383,7 +405,7 @@ func (r *LBServiceReconciler) reconcileServiceRoutes(ctx context.Context, sc *in
}

// withdrawService removes all announcements for the given service
func (r *LBServiceReconciler) withdrawService(ctx context.Context, sc *instance.ServerWithConfig, key resource.Key) error {
func (r *ServiceReconciler) withdrawService(ctx context.Context, sc *instance.ServerWithConfig, key resource.Key) error {
serviceAnnouncements := r.getMetadata(sc)
advertisements := serviceAnnouncements[key]
// Loop in reverse order so we can delete without effect to the iteration.
Expand Down

0 comments on commit e3ebcb4

Please sign in to comment.