Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support externalTrafficPolicy=Local for BGP CPlane service VIP advertisement #25477

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 12 additions & 4 deletions pkg/bgpv1/agent/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ type ControlPlaneState struct {
IPv4 netip.Addr
// The current IPv6 address of the agent, reachable externally.
IPv6 netip.Addr
// The current node name
CurrentNodeName string
}

// ResolveRouterID resolves router ID, if we have an annotation and it can be
Expand Down Expand Up @@ -395,15 +397,21 @@ func (c *Controller) Reconcile(ctx context.Context) error {
return fmt.Errorf("failed to retrieve Node's pod CIDR ranges: %w", err)
}

currentNodeName, err := c.NodeSpec.CurrentNodeName()
if err != nil {
return fmt.Errorf("failed to retrieve current node's name: %w", err)
}

ipv4, _ := ip.AddrFromIP(nodeaddr.GetIPv4())
ipv6, _ := ip.AddrFromIP(nodeaddr.GetIPv6())

// define our current point-in-time control plane state.
state := &ControlPlaneState{
PodCIDRs: podCIDRs,
Annotations: annoMap,
IPv4: ipv4,
IPv6: ipv6,
PodCIDRs: podCIDRs,
Annotations: annoMap,
IPv4: ipv4,
IPv6: ipv6,
CurrentNodeName: currentNodeName,
}

// call bgp sub-systems required to apply this policy's BGP topology.
Expand Down
4 changes: 4 additions & 0 deletions pkg/bgpv1/agent/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (f *fakeNodeSpecer) Annotations() (map[string]string, error) {
return f.Annotations_()
}

func (f *fakeNodeSpecer) CurrentNodeName() (string, error) {
return nodeName, nil
}

// TestControllerSanity ensures that the controller calls the correct methods,
// with the correct arguments, during its Reconcile loop.
func TestControllerSanity(t *testing.T) {
Expand Down
15 changes: 15 additions & 0 deletions pkg/bgpv1/agent/nodespecer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type nodeSpecer interface {
Annotations() (map[string]string, error)
Labels() (map[string]string, error)
PodCIDRs() ([]string, error)
CurrentNodeName() (string, error)
}

type localNodeStoreSpecerParams struct {
Expand Down Expand Up @@ -140,6 +141,13 @@ func (s *kubernetesNodeSpecer) PodCIDRs() ([]string, error) {
return []string{}, nil
}

func (s *kubernetesNodeSpecer) CurrentNodeName() (string, error) {
if s.currentNode == nil {
return "", errors.New("node name is not yet available")
}
return s.currentNode.Name, nil
}

type ciliumNodeSpecer struct {
nodeResource k8s.LocalCiliumNodeResource

Expand Down Expand Up @@ -212,3 +220,10 @@ func (s *ciliumNodeSpecer) PodCIDRs() ([]string, error) {

return []string{}, nil
}

func (s *ciliumNodeSpecer) CurrentNodeName() (string, error) {
if s.currentNode == nil {
return "", errors.New("node name is not yet available")
}
return s.currentNode.Name, nil
}
3 changes: 3 additions & 0 deletions pkg/bgpv1/cell.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/cilium/cilium/pkg/bgpv1/manager"
"github.com/cilium/cilium/pkg/hive"
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/k8s"
v2alpha1api "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
"github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/resource"
Expand All @@ -34,6 +35,8 @@ var Cell = cell.Module(
manager.NewBGPRouterManager,
// Create a slim service DiffStore
manager.NewDiffStore[*slim_core_v1.Service],
// Create a endpoints DiffStore
manager.NewDiffStore[*k8s.Endpoints],
),
// Provides the reconcilers used by the route manager to update the config
manager.ConfigReconcilers,
Expand Down
135 changes: 122 additions & 13 deletions pkg/bgpv1/manager/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
"github.com/cilium/cilium/pkg/bgpv1/agent"
"github.com/cilium/cilium/pkg/bgpv1/types"
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/k8s"
v2alpha1api "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
"github.com/cilium/cilium/pkg/k8s/resource"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
"github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/labels"
slim_metav1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1"
ciliumslices "github.com/cilium/cilium/pkg/slices"

"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -402,17 +404,21 @@ type LBServiceReconcilerOut struct {
}

type LBServiceReconciler struct {
diffStore DiffStore[*slim_corev1.Service]
diffStore DiffStore[*slim_corev1.Service]
epDiffStore DiffStore[*k8s.Endpoints]
}

func NewLBServiceReconciler(diffStore DiffStore[*slim_corev1.Service]) LBServiceReconcilerOut {
type localServices map[k8s.ServiceID]struct{}

func NewLBServiceReconciler(diffStore DiffStore[*slim_corev1.Service], epDiffStore DiffStore[*k8s.Endpoints]) LBServiceReconcilerOut {
if diffStore == nil {
return LBServiceReconcilerOut{}
}

return LBServiceReconcilerOut{
Reconciler: &LBServiceReconciler{
diffStore: diffStore,
diffStore: diffStore,
epDiffStore: epDiffStore,
},
}
}
Expand All @@ -425,6 +431,53 @@ func (r *LBServiceReconciler) Reconcile(ctx context.Context, params ReconcilePar
return r.lbServiceReconciler(ctx, params.Server, params.NewC, params.CState)
}

func (r *LBServiceReconciler) resolveSvcFromEndpoints(eps *k8s.Endpoints) (*slim_corev1.Service, bool, error) {
k := resource.Key{
Name: eps.ServiceID.Name,
Namespace: eps.ServiceID.Namespace,
}
return r.diffStore.GetByKey(k)
}

// Populate locally available services used for externalTrafficPolicy=local handling
func (r *LBServiceReconciler) populateLocalServices(localNodeName string) localServices {
ls := make(localServices)

endpointsLoop:
for _, eps := range r.epDiffStore.List() {
svc, exists, err := r.resolveSvcFromEndpoints(eps)
if err != nil {
// Cannot resolve service from endpoints. We have nothing to do here.
continue
}

if !exists {
// No service associated with this endpoint. We're not interested in this.
continue
}

// We only need Endpoints tracking for externalTrafficPolicy=Local
if svc.Spec.ExternalTrafficPolicy != slim_corev1.ServiceExternalTrafficPolicyTypeLocal {
continue
}

svcID := eps.ServiceID

for _, be := range eps.Backends {
if be.NodeName == localNodeName {
// At least one endpoint is available on this node. We
// can make unavailable to available.
if _, found := ls[svcID]; !found {
ls[svcID] = struct{}{}
}
continue endpointsLoop
}
}
}

return ls
}

func (r *LBServiceReconciler) lbServiceReconciler(
ctx context.Context,
sc *ServerWithConfig,
Expand All @@ -436,29 +489,36 @@ func (r *LBServiceReconciler) lbServiceReconciler(
existingSelector = sc.Config.ServiceSelector
}

ls := r.populateLocalServices(cstate.CurrentNodeName)

// If the existing selector was updated, went from nil to something or something to nil, we need to perform full
// reconciliation and check if every existing announcement's service still matches the selector.
changed := (existingSelector != nil && newc.ServiceSelector != nil && !newc.ServiceSelector.DeepEqual(existingSelector)) ||
((existingSelector == nil) != (newc.ServiceSelector == nil))

if changed {
if err := r.fullReconciliation(ctx, sc, newc, cstate); err != nil {
if err := r.fullReconciliation(ctx, sc, newc, cstate, ls); err != nil {
return fmt.Errorf("full reconciliation: %w", err)
}

return nil
}

if err := r.svcDiffReconciliation(ctx, sc, newc, cstate); err != nil {
if err := r.svcDiffReconciliation(ctx, sc, newc, cstate, ls); err != nil {
return fmt.Errorf("svc Diff reconciliation: %w", err)
}

return nil
}

func hasLocalEndpoints(svc *slim_corev1.Service, ls localServices) bool {
_, found := ls[k8s.ServiceID{Name: svc.GetName(), Namespace: svc.GetNamespace()}]
return found
}

// fullReconciliation reconciles all services, this is a heavy operation due to the potential amount of services and
// thus should be avoided if partial reconciliation is an option.
func (r *LBServiceReconciler) fullReconciliation(ctx context.Context, sc *ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, cstate *agent.ControlPlaneState) error {
func (r *LBServiceReconciler) fullReconciliation(ctx context.Context, sc *ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, cstate *agent.ControlPlaneState, ls localServices) error {
// Loop over all existing announcements, delete announcements for services which no longer exist
for svcKey := range sc.ServiceAnnouncements {
_, found, err := r.diffStore.GetByKey(svcKey)
Expand Down Expand Up @@ -490,21 +550,64 @@ func (r *LBServiceReconciler) fullReconciliation(ctx context.Context, sc *Server
continue
}

r.reconcileService(ctx, sc, newc, svc)
r.reconcileService(ctx, sc, newc, svc, ls)
}
return nil
}

// svcDiffReconciliation performs reconciliation, only on services which have been created, updated or deleted since
// the last diff reconciliation.
func (r *LBServiceReconciler) svcDiffReconciliation(ctx context.Context, sc *ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, cstate *agent.ControlPlaneState) error {
func (r *LBServiceReconciler) svcDiffReconciliation(ctx context.Context, sc *ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, cstate *agent.ControlPlaneState, ls localServices) error {
upserted, deleted, err := r.diffStore.Diff()
if err != nil {
return fmt.Errorf("svc store diff: %w", err)
}

for _, svc := range upserted {
if err := r.reconcileService(ctx, sc, newc, svc); err != nil {
// For externalTrafficPolicy=local, we need to take care of
// the endpoint changes in addition to the service changes.
// Take a diff of the endpoints and get affected services.
// We don't handle service deletion here since we only see
// the key, we cannot resolve associated service, so we have
// nothing to do.
epsUpserted, _, err := r.epDiffStore.Diff()
if err != nil {
return fmt.Errorf("endpoints store diff: %w", err)
}

for _, eps := range epsUpserted {
svc, exists, err := r.resolveSvcFromEndpoints(eps)
if err != nil {
// Cannot resolve service from endpoints. We have nothing to do here.
continue
}

if !exists {
// No service associated with this endpoint. We're not interested in this.
continue
}

// We only need Endpoints tracking for externalTrafficPolicy=Local
if svc.Spec.ExternalTrafficPolicy != slim_corev1.ServiceExternalTrafficPolicyTypeLocal {
continue
}

upserted = append(upserted, svc)
}

// We may have duplicated services that changes happened for both of
// service and associated endpoints.
deduped := ciliumslices.UniqueFunc(
upserted,
func(i int) resource.Key {
return resource.Key{
Name: upserted[i].GetName(),
Namespace: upserted[i].GetNamespace(),
}
},
)

for _, svc := range deduped {
if err := r.reconcileService(ctx, sc, newc, svc, ls); err != nil {
return fmt.Errorf("reconcile service: %w", err)
}
}
Expand All @@ -521,7 +624,7 @@ func (r *LBServiceReconciler) svcDiffReconciliation(ctx context.Context, sc *Ser

// svcDesiredRoutes determines which, if any routes should be announced for the given service. This determines the
// desired state.
func (r *LBServiceReconciler) svcDesiredRoutes(newc *v2alpha1api.CiliumBGPVirtualRouter, svc *slim_corev1.Service) ([]netip.Prefix, error) {
func (r *LBServiceReconciler) svcDesiredRoutes(newc *v2alpha1api.CiliumBGPVirtualRouter, svc *slim_corev1.Service, ls localServices) ([]netip.Prefix, error) {
if newc.ServiceSelector == nil {
// If the vRouter has no service selector, there are no desired routes
return nil, nil
Expand All @@ -542,6 +645,12 @@ func (r *LBServiceReconciler) svcDesiredRoutes(newc *v2alpha1api.CiliumBGPVirtua
return nil, nil
}

// Ignore externalTrafficPolicy == Local && no local endpoints
if svc.Spec.ExternalTrafficPolicy == slim_corev1.ServiceExternalTrafficPolicyTypeLocal &&
!hasLocalEndpoints(svc, ls) {
return nil, nil
}

var desiredRoutes []netip.Prefix
for _, ingress := range svc.Status.LoadBalancer.Ingress {
if ingress.IP == "" {
Expand All @@ -561,10 +670,10 @@ func (r *LBServiceReconciler) svcDesiredRoutes(newc *v2alpha1api.CiliumBGPVirtua

// reconcileService gets the desired routes of a given service and makes sure that is what is being announced.
// Adding missing announcements or withdrawing unwanted ones.
func (r *LBServiceReconciler) reconcileService(ctx context.Context, sc *ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, svc *slim_corev1.Service) error {
func (r *LBServiceReconciler) reconcileService(ctx context.Context, sc *ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, svc *slim_corev1.Service, ls localServices) error {
svcKey := resource.NewKey(svc)

desiredCidrs, err := r.svcDesiredRoutes(newc, svc)
desiredCidrs, err := r.svcDesiredRoutes(newc, svc, ls)
if err != nil {
return fmt.Errorf("svcDesiredRoutes(): %w", err)
}
Expand Down