Skip to content

Commit

Permalink
bgpv1: Support externalTrafficPolicy=local for service advertisement
Browse files Browse the repository at this point in the history
The semantics of the externalTrafficPolicy=local for service
advertisement. When at least one active endpoint present, we advertise
the LB VIP, otherwise, stop advertisement.

We can track active endpoints by tracking the Endpoints or EndpointSlice
object. They contain the list of the endpoint IPs and nodes. We leverage
a new Resource[*Endpoints] for subscribing the changes. For an efficient
processing, we wrap it with DiffStore and do reconciliation against the
service affected by the endpoint changes.

On every reconciliation, we populate the structure called localServices
from DiffStore store which represents the information that services have
at least one available local endpoint or not.

Signed-off-by: Yutaro Hayakawa <yutaro.hayakawa@isovalent.com>
  • Loading branch information
YutaroHayakawa authored and dylandreimerink committed Jun 6, 2023
1 parent 6d4b2f7 commit 24e37ed
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 21 deletions.
16 changes: 12 additions & 4 deletions pkg/bgpv1/agent/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ type ControlPlaneState struct {
IPv4 netip.Addr
// The current IPv6 address of the agent, reachable externally.
IPv6 netip.Addr
// The current node name
CurrentNodeName string
}

// ResolveRouterID resolves router ID, if we have an annotation and it can be
Expand Down Expand Up @@ -395,15 +397,21 @@ func (c *Controller) Reconcile(ctx context.Context) error {
return fmt.Errorf("failed to retrieve Node's pod CIDR ranges: %w", err)
}

currentNodeName, err := c.NodeSpec.CurrentNodeName()
if err != nil {
return fmt.Errorf("failed to retrieve current node's name: %w", err)
}

ipv4, _ := ip.AddrFromIP(nodeaddr.GetIPv4())
ipv6, _ := ip.AddrFromIP(nodeaddr.GetIPv6())

// define our current point-in-time control plane state.
state := &ControlPlaneState{
PodCIDRs: podCIDRs,
Annotations: annoMap,
IPv4: ipv4,
IPv6: ipv6,
PodCIDRs: podCIDRs,
Annotations: annoMap,
IPv4: ipv4,
IPv6: ipv6,
CurrentNodeName: currentNodeName,
}

// call bgp sub-systems required to apply this policy's BGP topology.
Expand Down
4 changes: 4 additions & 0 deletions pkg/bgpv1/agent/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (f *fakeNodeSpecer) Annotations() (map[string]string, error) {
return f.Annotations_()
}

func (f *fakeNodeSpecer) CurrentNodeName() (string, error) {
return nodeName, nil
}

// TestControllerSanity ensures that the controller calls the correct methods,
// with the correct arguments, during its Reconcile loop.
func TestControllerSanity(t *testing.T) {
Expand Down
15 changes: 15 additions & 0 deletions pkg/bgpv1/agent/nodespecer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type nodeSpecer interface {
Annotations() (map[string]string, error)
Labels() (map[string]string, error)
PodCIDRs() ([]string, error)
CurrentNodeName() (string, error)
}

type localNodeStoreSpecerParams struct {
Expand Down Expand Up @@ -140,6 +141,13 @@ func (s *kubernetesNodeSpecer) PodCIDRs() ([]string, error) {
return []string{}, nil
}

func (s *kubernetesNodeSpecer) CurrentNodeName() (string, error) {
if s.currentNode == nil {
return "", errors.New("node name is not yet available")
}
return s.currentNode.Name, nil
}

type ciliumNodeSpecer struct {
nodeResource k8s.LocalCiliumNodeResource

Expand Down Expand Up @@ -212,3 +220,10 @@ func (s *ciliumNodeSpecer) PodCIDRs() ([]string, error) {

return []string{}, nil
}

func (s *ciliumNodeSpecer) CurrentNodeName() (string, error) {
if s.currentNode == nil {
return "", errors.New("node name is not yet available")
}
return s.currentNode.Name, nil
}
3 changes: 3 additions & 0 deletions pkg/bgpv1/cell.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/cilium/cilium/pkg/bgpv1/manager"
"github.com/cilium/cilium/pkg/hive"
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/k8s"
v2alpha1api "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
"github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/resource"
Expand All @@ -34,6 +35,8 @@ var Cell = cell.Module(
manager.NewBGPRouterManager,
// Create a slim service DiffStore
manager.NewDiffStore[*slim_core_v1.Service],
// Create a endpoints DiffStore
manager.NewDiffStore[*k8s.Endpoints],
),
// Provides the reconcilers used by the route manager to update the config
manager.ConfigReconcilers,
Expand Down
135 changes: 122 additions & 13 deletions pkg/bgpv1/manager/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
"github.com/cilium/cilium/pkg/bgpv1/agent"
"github.com/cilium/cilium/pkg/bgpv1/types"
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/k8s"
v2alpha1api "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
"github.com/cilium/cilium/pkg/k8s/resource"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
"github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/labels"
slim_metav1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1"
ciliumslices "github.com/cilium/cilium/pkg/slices"

"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -402,17 +404,21 @@ type LBServiceReconcilerOut struct {
}

type LBServiceReconciler struct {
diffStore DiffStore[*slim_corev1.Service]
diffStore DiffStore[*slim_corev1.Service]
epDiffStore DiffStore[*k8s.Endpoints]
}

func NewLBServiceReconciler(diffStore DiffStore[*slim_corev1.Service]) LBServiceReconcilerOut {
type localServices map[k8s.ServiceID]struct{}

func NewLBServiceReconciler(diffStore DiffStore[*slim_corev1.Service], epDiffStore DiffStore[*k8s.Endpoints]) LBServiceReconcilerOut {
if diffStore == nil {
return LBServiceReconcilerOut{}
}

return LBServiceReconcilerOut{
Reconciler: &LBServiceReconciler{
diffStore: diffStore,
diffStore: diffStore,
epDiffStore: epDiffStore,
},
}
}
Expand All @@ -425,6 +431,53 @@ func (r *LBServiceReconciler) Reconcile(ctx context.Context, params ReconcilePar
return r.lbServiceReconciler(ctx, params.Server, params.NewC, params.CState)
}

func (r *LBServiceReconciler) resolveSvcFromEndpoints(eps *k8s.Endpoints) (*slim_corev1.Service, bool, error) {
k := resource.Key{
Name: eps.ServiceID.Name,
Namespace: eps.ServiceID.Namespace,
}
return r.diffStore.GetByKey(k)
}

// Populate locally available services used for externalTrafficPolicy=local handling
func (r *LBServiceReconciler) populateLocalServices(localNodeName string) localServices {
ls := make(localServices)

endpointsLoop:
for _, eps := range r.epDiffStore.List() {
svc, exists, err := r.resolveSvcFromEndpoints(eps)
if err != nil {
// Cannot resolve service from endpoints. We have nothing to do here.
continue
}

if !exists {
// No service associated with this endpoint. We're not interested in this.
continue
}

// We only need Endpoints tracking for externalTrafficPolicy=Local
if svc.Spec.ExternalTrafficPolicy != slim_corev1.ServiceExternalTrafficPolicyTypeLocal {
continue
}

svcID := eps.ServiceID

for _, be := range eps.Backends {
if be.NodeName == localNodeName {
// At least one endpoint is available on this node. We
// can make unavailable to available.
if _, found := ls[svcID]; !found {
ls[svcID] = struct{}{}
}
continue endpointsLoop
}
}
}

return ls
}

func (r *LBServiceReconciler) lbServiceReconciler(
ctx context.Context,
sc *ServerWithConfig,
Expand All @@ -436,29 +489,36 @@ func (r *LBServiceReconciler) lbServiceReconciler(
existingSelector = sc.Config.ServiceSelector
}

ls := r.populateLocalServices(cstate.CurrentNodeName)

// If the existing selector was updated, went from nil to something or something to nil, we need to perform full
// reconciliation and check if every existing announcement's service still matches the selector.
changed := (existingSelector != nil && newc.ServiceSelector != nil && !newc.ServiceSelector.DeepEqual(existingSelector)) ||
((existingSelector == nil) != (newc.ServiceSelector == nil))

if changed {
if err := r.fullReconciliation(ctx, sc, newc, cstate); err != nil {
if err := r.fullReconciliation(ctx, sc, newc, cstate, ls); err != nil {
return fmt.Errorf("full reconciliation: %w", err)
}

return nil
}

if err := r.svcDiffReconciliation(ctx, sc, newc, cstate); err != nil {
if err := r.svcDiffReconciliation(ctx, sc, newc, cstate, ls); err != nil {
return fmt.Errorf("svc Diff reconciliation: %w", err)
}

return nil
}

func hasLocalEndpoints(svc *slim_corev1.Service, ls localServices) bool {
_, found := ls[k8s.ServiceID{Name: svc.GetName(), Namespace: svc.GetNamespace()}]
return found
}

// fullReconciliation reconciles all services, this is a heavy operation due to the potential amount of services and
// thus should be avoided if partial reconciliation is an option.
func (r *LBServiceReconciler) fullReconciliation(ctx context.Context, sc *ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, cstate *agent.ControlPlaneState) error {
func (r *LBServiceReconciler) fullReconciliation(ctx context.Context, sc *ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, cstate *agent.ControlPlaneState, ls localServices) error {
// Loop over all existing announcements, delete announcements for services which no longer exist
for svcKey := range sc.ServiceAnnouncements {
_, found, err := r.diffStore.GetByKey(svcKey)
Expand Down Expand Up @@ -490,21 +550,64 @@ func (r *LBServiceReconciler) fullReconciliation(ctx context.Context, sc *Server
continue
}

r.reconcileService(ctx, sc, newc, svc)
r.reconcileService(ctx, sc, newc, svc, ls)
}
return nil
}

// svcDiffReconciliation performs reconciliation, only on services which have been created, updated or deleted since
// the last diff reconciliation.
func (r *LBServiceReconciler) svcDiffReconciliation(ctx context.Context, sc *ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, cstate *agent.ControlPlaneState) error {
func (r *LBServiceReconciler) svcDiffReconciliation(ctx context.Context, sc *ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, cstate *agent.ControlPlaneState, ls localServices) error {
upserted, deleted, err := r.diffStore.Diff()
if err != nil {
return fmt.Errorf("svc store diff: %w", err)
}

for _, svc := range upserted {
if err := r.reconcileService(ctx, sc, newc, svc); err != nil {
// For externalTrafficPolicy=local, we need to take care of
// the endpoint changes in addition to the service changes.
// Take a diff of the endpoints and get affected services.
// We don't handle service deletion here since we only see
// the key, we cannot resolve associated service, so we have
// nothing to do.
epsUpserted, _, err := r.epDiffStore.Diff()
if err != nil {
return fmt.Errorf("endpoints store diff: %w", err)
}

for _, eps := range epsUpserted {
svc, exists, err := r.resolveSvcFromEndpoints(eps)
if err != nil {
// Cannot resolve service from endpoints. We have nothing to do here.
continue
}

if !exists {
// No service associated with this endpoint. We're not interested in this.
continue
}

// We only need Endpoints tracking for externalTrafficPolicy=Local
if svc.Spec.ExternalTrafficPolicy != slim_corev1.ServiceExternalTrafficPolicyTypeLocal {
continue
}

upserted = append(upserted, svc)
}

// We may have duplicated services that changes happened for both of
// service and associated endpoints.
deduped := ciliumslices.UniqueFunc(
upserted,
func(i int) resource.Key {
return resource.Key{
Name: upserted[i].GetName(),
Namespace: upserted[i].GetNamespace(),
}
},
)

for _, svc := range deduped {
if err := r.reconcileService(ctx, sc, newc, svc, ls); err != nil {
return fmt.Errorf("reconcile service: %w", err)
}
}
Expand All @@ -521,7 +624,7 @@ func (r *LBServiceReconciler) svcDiffReconciliation(ctx context.Context, sc *Ser

// svcDesiredRoutes determines which, if any routes should be announced for the given service. This determines the
// desired state.
func (r *LBServiceReconciler) svcDesiredRoutes(newc *v2alpha1api.CiliumBGPVirtualRouter, svc *slim_corev1.Service) ([]netip.Prefix, error) {
func (r *LBServiceReconciler) svcDesiredRoutes(newc *v2alpha1api.CiliumBGPVirtualRouter, svc *slim_corev1.Service, ls localServices) ([]netip.Prefix, error) {
if newc.ServiceSelector == nil {
// If the vRouter has no service selector, there are no desired routes
return nil, nil
Expand All @@ -542,6 +645,12 @@ func (r *LBServiceReconciler) svcDesiredRoutes(newc *v2alpha1api.CiliumBGPVirtua
return nil, nil
}

// Ignore externalTrafficPolicy == Local && no local endpoints
if svc.Spec.ExternalTrafficPolicy == slim_corev1.ServiceExternalTrafficPolicyTypeLocal &&
!hasLocalEndpoints(svc, ls) {
return nil, nil
}

var desiredRoutes []netip.Prefix
for _, ingress := range svc.Status.LoadBalancer.Ingress {
if ingress.IP == "" {
Expand All @@ -561,10 +670,10 @@ func (r *LBServiceReconciler) svcDesiredRoutes(newc *v2alpha1api.CiliumBGPVirtua

// reconcileService gets the desired routes of a given service and makes sure that is what is being announced.
// Adding missing announcements or withdrawing unwanted ones.
func (r *LBServiceReconciler) reconcileService(ctx context.Context, sc *ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, svc *slim_corev1.Service) error {
func (r *LBServiceReconciler) reconcileService(ctx context.Context, sc *ServerWithConfig, newc *v2alpha1api.CiliumBGPVirtualRouter, svc *slim_corev1.Service, ls localServices) error {
svcKey := resource.NewKey(svc)

desiredCidrs, err := r.svcDesiredRoutes(newc, svc)
desiredCidrs, err := r.svcDesiredRoutes(newc, svc, ls)
if err != nil {
return fmt.Errorf("svcDesiredRoutes(): %w", err)
}
Expand Down
18 changes: 14 additions & 4 deletions pkg/bgpv1/manager/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cilium/cilium/pkg/bgpv1/agent"
"github.com/cilium/cilium/pkg/bgpv1/types"
"github.com/cilium/cilium/pkg/k8s"
v2alpha1api "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
"github.com/cilium/cilium/pkg/k8s/resource"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
Expand Down Expand Up @@ -525,6 +526,8 @@ func TestLBServiceReconciler(t *testing.T) {
upsertedServices []*slim_corev1.Service
// the services which will be "deleted" in the diffstore
deletedServices []resource.Key
// the endpoints which will be "upserted" in the diffstore
upsertedEndpoints []*k8s.Endpoints
// the updated PodCIDR blocks to reconcile, these are string encoded
// for the convenience of attaching directly to the NodeSpec.PodCIDRs
// field.
Expand Down Expand Up @@ -955,7 +958,8 @@ func TestLBServiceReconciler(t *testing.T) {
ServiceSelector: tt.newServiceSelector,
}
newcstate := agent.ControlPlaneState{
IPv4: netip.MustParseAddr("127.0.0.1"),
IPv4: netip.MustParseAddr("127.0.0.1"),
CurrentNodeName: "node1",
}

diffstore := newFakeDiffStore[*slim_corev1.Service]()
Expand All @@ -966,7 +970,12 @@ func TestLBServiceReconciler(t *testing.T) {
diffstore.Delete(key)
}

reconciler := NewLBServiceReconciler(diffstore)
epDiffStore := newFakeDiffStore[*k8s.Endpoints]()
for _, obj := range tt.upsertedEndpoints {
epDiffStore.Upsert(obj)
}

reconciler := NewLBServiceReconciler(diffstore, epDiffStore)
err = reconciler.Reconciler.Reconcile(context.Background(), ReconcileParams{
Server: testSC,
NewC: newc,
Expand Down Expand Up @@ -1031,6 +1040,7 @@ func TestReconcileAfterServerReinit(t *testing.T) {
localASN = 64125
newRouterID = "192.168.0.2"
diffstore = newFakeDiffStore[*slim_corev1.Service]()
epDiffStore = newFakeDiffStore[*k8s.Endpoints]()
serviceSelector = &slim_metav1.LabelSelector{MatchLabels: map[string]string{"color": "blue"}}
obj = &slim_corev1.Service{
ObjectMeta: slim_metav1.ObjectMeta{
Expand Down Expand Up @@ -1094,7 +1104,7 @@ func TestReconcileAfterServerReinit(t *testing.T) {
require.NoError(t, err)

diffstore.Upsert(obj)
reconciler := NewLBServiceReconciler(diffstore)
reconciler := NewLBServiceReconciler(diffstore, epDiffStore)
err = reconciler.Reconciler.Reconcile(context.Background(), ReconcileParams{
Server: testSC,
NewC: newc,
Expand Down Expand Up @@ -1124,7 +1134,7 @@ func TestReconcileAfterServerReinit(t *testing.T) {
require.NoError(t, err)

// Update LB service
reconciler = NewLBServiceReconciler(diffstore)
reconciler = NewLBServiceReconciler(diffstore, epDiffStore)
err = reconciler.Reconciler.Reconcile(context.Background(), ReconcileParams{
Server: testSC,
NewC: newc,
Expand Down

0 comments on commit 24e37ed

Please sign in to comment.