Skip to content

Commit

Permalink
fix(ecmp_vip): update VIPs based on svc change
Browse files Browse the repository at this point in the history
Previously we used to do an idempotent sync all active VIPs any time we
got a service or endpoint update. However, this only worked when we
assumed a single-stack deployment model where IPs were never deleted
unless the whole service was deleted.

In a dual-stack model, we can add / remove LoadBalancer IPs and Cluster
IPs on updates. Given this, we need to take into account the finite
change that happens, and not just revert to sync-all because we'll never
stop advertising IPs that should be removed.

As a fall-back, we still have the outer Run loop that syncs all active
routes every X amount of seconds (configured by user CLI parameter). So
on that timer we'll still have something that syncs all active VIPs and
works as an outer control loop to ensure that desired state eventually
becomes active state if we accidentally remove a VIP that should have
been there.
  • Loading branch information
aauren committed Oct 7, 2023
1 parent f5ac980 commit 1d5c9ce
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 98 deletions.
212 changes: 114 additions & 98 deletions pkg/controllers/routing/ecmp_vip.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"github.com/cloudnativelabs/kube-router/v2/pkg/metrics"
"github.com/cloudnativelabs/kube-router/v2/pkg/utils"

"strings"

gobgpapi "github.com/osrg/gobgp/v3/api"
v1core "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -118,7 +116,7 @@ func (nrc *NetworkRoutingController) newServiceEventHandler() cache.ResourceEven
nrc.OnServiceCreate(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
nrc.OnServiceUpdate(newObj, oldObj)
nrc.OnServiceUpdate(oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
nrc.OnServiceDelete(obj)
Expand All @@ -127,20 +125,23 @@ func (nrc *NetworkRoutingController) newServiceEventHandler() cache.ResourceEven
}

func getServiceObject(obj interface{}) (svc *v1core.Service) {
if obj == nil {
return
}
if svc, _ = obj.(*v1core.Service); svc == nil {
klog.Errorf("cache indexer returned obj that is not type *v1.Service")
}
return
}

func (nrc *NetworkRoutingController) handleServiceUpdate(svc *v1core.Service) {
func (nrc *NetworkRoutingController) handleServiceUpdate(svcOld, svcNew *v1core.Service) {
if !nrc.bgpServerStarted {
klog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync",
svc.Namespace, svc.Name)
svcNew.Namespace, svcNew.Name)
return
}

toAdvertise, toWithdraw, err := nrc.getActiveVIPs()
toAdvertise, toWithdraw, err := nrc.getChangedVIPs(svcOld, svcNew, true)
if err != nil {
klog.Errorf("error getting routes for services: %s", err)
return
Expand All @@ -156,11 +157,11 @@ func (nrc *NetworkRoutingController) handleServiceUpdate(svc *v1core.Service) {
nrc.withdrawVIPs(toWithdraw)
}

func (nrc *NetworkRoutingController) handleServiceDelete(svc *v1core.Service) {
func (nrc *NetworkRoutingController) handleServiceDelete(oldSvc *v1core.Service) {

if !nrc.bgpServerStarted {
klog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync",
svc.Namespace, svc.Name)
oldSvc.Namespace, oldSvc.Name)
return
}

Expand All @@ -174,124 +175,80 @@ func (nrc *NetworkRoutingController) handleServiceDelete(svc *v1core.Service) {
klog.Errorf("Failed to get active VIP's on service delete event due to: %s", err.Error())
return
}
activeVIPsMap := make(map[string]bool)
for _, activeVIP := range activeVIPs {
activeVIPsMap[activeVIP] = true
}
advertiseIPList, unadvertiseIPList := nrc.getAllVIPsForService(svc)
advertiseIPList, unadvertiseIPList := nrc.getAllVIPsForService(oldSvc)
//nolint:gocritic // we understand that we're assigning to a new slice
allIPList := append(advertiseIPList, unadvertiseIPList...)
withdrawVIPs := make([]string, 0)
for _, serviceVIP := range allIPList {
// withdraw VIP only if deleted service is the last service using the VIP
if !activeVIPsMap[serviceVIP] {
if !utils.SliceContainsString(serviceVIP, activeVIPs) {
withdrawVIPs = append(withdrawVIPs, serviceVIP)
}
}
nrc.withdrawVIPs(withdrawVIPs)

}

func (nrc *NetworkRoutingController) tryHandleServiceUpdate(obj interface{}, logMsgFormat string) {
if svc := getServiceObject(obj); svc != nil {
klog.V(1).Infof(logMsgFormat, svc.Namespace, svc.Name)
func (nrc *NetworkRoutingController) tryHandleServiceUpdate(objOld, objNew interface{}) {
svcOld := getServiceObject(objOld)
svcNew := getServiceObject(objNew)

// If the service is headless and the previous version of the service is either non-existent or also headless,
// skip processing as we only work with VIPs in the next section. Since the ClusterIP field is immutable we
// don't need to consider previous versions of the service here as we are guaranteed if is a ClusterIP now,
// it was a ClusterIP before.
if utils.ServiceIsHeadless(obj) {
klog.V(1).Infof("%s/%s is headless, skipping...", svc.Namespace, svc.Name)
return
}
// We expect at least svcNew to be non-nil in order to process this service update, if not get out quick
if svcNew == nil {
klog.Warningf("received a nil service objects, aborting as we can't continue")
return
}

klog.V(1).Infof("attempting to update service %s:%s", svcNew.Namespace, svcNew.Name)

nrc.handleServiceUpdate(svc)
// If the service is headless and the previous version of the service is either non-existent or also headless,
// skip processing as we only work with VIPs in the next section. Since the ClusterIP field is immutable we
// don't need to consider previous versions of the service here as we are guaranteed if is a ClusterIP now,
// it was a ClusterIP before.
if utils.ServiceIsHeadless(objNew) {
klog.V(1).Infof("%s/%s is headless, skipping...", svcNew.Namespace, svcNew.Name)
return
}

nrc.handleServiceUpdate(svcOld, svcNew)
}

func (nrc *NetworkRoutingController) tryHandleServiceDelete(obj interface{}, logMsgFormat string) {
svc, ok := obj.(*v1core.Service)
func (nrc *NetworkRoutingController) tryHandleServiceDelete(oldObj interface{}, logMsgFormat string) {
oldSvc, ok := oldObj.(*v1core.Service)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
tombstone, ok := oldObj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.Errorf("unexpected object type: %v", obj)
klog.Errorf("unexpected object type: %v", oldObj)
return
}
if svc, ok = tombstone.Obj.(*v1core.Service); !ok {
klog.Errorf("unexpected object type: %v", obj)
if oldSvc, ok = tombstone.Obj.(*v1core.Service); !ok {
klog.Errorf("unexpected object type: %v", oldObj)
return
}
}
klog.V(1).Infof(logMsgFormat, svc.Namespace, svc.Name)
klog.V(1).Infof(logMsgFormat, oldSvc.Namespace, oldSvc.Name)

// If the service is headless skip processing as we only work with VIPs in the next section.
if utils.ServiceIsHeadless(obj) {
klog.V(1).Infof("%s/%s is headless, skipping...", svc.Namespace, svc.Name)
if utils.ServiceIsHeadless(oldObj) {
klog.V(1).Infof("%s/%s is headless, skipping...", oldSvc.Namespace, oldSvc.Name)
return
}

nrc.handleServiceDelete(svc)
nrc.handleServiceDelete(oldSvc)
}

// OnServiceCreate handles new service create event from the kubernetes API server
func (nrc *NetworkRoutingController) OnServiceCreate(obj interface{}) {
nrc.tryHandleServiceUpdate(obj, "Received new service: %s/%s from watch API")
nrc.tryHandleServiceUpdate(nil, obj)
}

// OnServiceUpdate handles the service relates updates from the kubernetes API server
func (nrc *NetworkRoutingController) OnServiceUpdate(objNew interface{}, objOld interface{}) {
nrc.tryHandleServiceUpdate(objNew, "Received update on service: %s/%s from watch API")

// This extra call needs to be here, because during the update the list of externalIPs may have changed and
// externalIPs is the only service VIP field that is:
// a) mutable after first creation
// b) an array
//
// This means that while we only need to withdraw ClusterIP VIPs and LoadBalancer VIPs on delete, we may need
// to withdraw ExternalIPs on update.
//
// As such, it needs to be handled differently as nrc.handleServiceUpdate only withdraws VIPs if the service
// endpoint is no longer scheduled on this node and its a local type service.
nrc.withdrawVIPs(nrc.getExternalIPsToWithdraw(getServiceObject(objOld), getServiceObject(objNew)))
}

func (nrc *NetworkRoutingController) getExternalIPsToWithdraw(svcOld, svcNew *v1core.Service) (out []string) {
withdrawnServiceVips := make([]string, 0)
if svcOld != nil && svcNew != nil {
withdrawnServiceVips = getMissingPrevGen(nrc.getExternalIPs(svcOld), nrc.getExternalIPs(svcNew))
}
// ensure external IP to be withdrawn is not used by any other service
allActiveVIPs, _, err := nrc.getActiveVIPs()
if err != nil {
klog.Errorf("failed to get all active VIP's due to: %s", err.Error())
return
}
activeVIPsMap := make(map[string]bool)
for _, activeVIP := range allActiveVIPs {
activeVIPsMap[activeVIP] = true
}
for _, serviceVIP := range withdrawnServiceVips {
// withdraw VIP only if updated service is the last service using the VIP
if !activeVIPsMap[serviceVIP] {
out = append(out, serviceVIP)
}
}
return
}

func getMissingPrevGen(old, new []string) (withdrawIPs []string) {
lookIn := " " + strings.Join(new, " ") + " "
for _, s := range old {
if !strings.Contains(lookIn, " "+s+" ") {
withdrawIPs = append(withdrawIPs, s)
}
}
return
func (nrc *NetworkRoutingController) OnServiceUpdate(objOld interface{}, objNew interface{}) {
nrc.tryHandleServiceUpdate(objOld, objNew)
}

// OnServiceDelete handles the service delete updates from the kubernetes API server
func (nrc *NetworkRoutingController) OnServiceDelete(obj interface{}) {
nrc.tryHandleServiceDelete(obj, "Received event to delete service: %s/%s from watch API")
func (nrc *NetworkRoutingController) OnServiceDelete(oldObj interface{}) {
nrc.tryHandleServiceDelete(oldObj, "Received event to delete service: %s/%s from watch API")
}

func (nrc *NetworkRoutingController) newEndpointsEventHandler() cache.ResourceEventHandler {
Expand Down Expand Up @@ -353,7 +310,7 @@ func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) {
return
}

nrc.tryHandleServiceUpdate(svc, "Updating service %s/%s triggered by endpoint update event")
nrc.tryHandleServiceUpdate(nil, svc)
}

func (nrc *NetworkRoutingController) getClusterIP(svc *v1core.Service) []string {
Expand Down Expand Up @@ -407,6 +364,71 @@ func (nrc *NetworkRoutingController) getLoadBalancerIPs(svc *v1core.Service) []s
return loadBalancerIPList
}

func (nrc *NetworkRoutingController) getChangedVIPs(oldSvc, newSvc *v1core.Service,
onlyActiveEndpoints bool) ([]string, []string, error) {
advertiseService := false

_, hasLocalAnnotation := newSvc.Annotations[svcLocalAnnotation]
hasLocalTrafficPolicy := newSvc.Spec.ExternalTrafficPolicy == v1core.ServiceExternalTrafficPolicyTypeLocal
isLocal := hasLocalAnnotation || hasLocalTrafficPolicy

if onlyActiveEndpoints && isLocal {
var err error
advertiseService, err = nrc.nodeHasEndpointsForService(newSvc)
if err != nil {
return nil, nil, err
}
}

newAdvertiseServiceVIPs, newUnadvertiseServiceVIPs := nrc.getAllVIPsForService(newSvc)
// This function allows oldSvc to be nil, if this is the case, we don't have any old VIPs to compare against and
// possibly withdraw instead treat all VIPs as new and return them as either toAdvertise or toWithdraw depending
// on service configuration
if oldSvc == nil {
if advertiseService {
return newAdvertiseServiceVIPs, newUnadvertiseServiceVIPs, nil
} else {
//nolint:gocritic // we understand that we're assigning to a new slice
allVIPs := append(newAdvertiseServiceVIPs, newUnadvertiseServiceVIPs...)
return nil, allVIPs, nil
}
}
oldAdvertiseServiceVIPs, oldUnadvertiseServiceVIPs := nrc.getAllVIPsForService(oldSvc)
//nolint:gocritic // we understand that we're assigning to a new slice
oldAllServiceVIPs := append(oldAdvertiseServiceVIPs, oldUnadvertiseServiceVIPs...)

// If we are instructed to only advertise local services and this service doesn't have endpoints on the node we are
// currently running on, then attempt to withdraw all the VIPs that the old service had.
if !advertiseService {
return nil, oldAllServiceVIPs, nil
}

// At this point we're sure that we should be advertising some VIPs, but we need to figure out which VIPs to
// advertise and which, if any to withdraw.
toAdvertiseListFinal := newAdvertiseServiceVIPs
toWithdrawList := newUnadvertiseServiceVIPs
for _, oldServiceVIP := range oldAllServiceVIPs {
if !utils.SliceContainsString(oldServiceVIP, toAdvertiseListFinal) {
toWithdrawList = append(toWithdrawList, oldServiceVIP)
}
}

// It is possible that this host may have the same IP advertised from multiple services, and we don't want to
// withdraw it if there is an active service for this VIP on a different service than the one that is changing.
toWithdrawListFinal := make([]string, 0)
allVIPsOnServer, _, err := nrc.getVIPs(onlyActiveEndpoints)
if err != nil {
return nil, nil, err
}
for _, withdrawVIP := range toWithdrawList {
if !utils.SliceContainsString(withdrawVIP, allVIPsOnServer) {
toWithdrawListFinal = append(toWithdrawListFinal, withdrawVIP)
}
}

return toAdvertiseListFinal, toWithdrawListFinal, nil
}

func (nrc *NetworkRoutingController) getAllVIPs() ([]string, []string, error) {
return nrc.getVIPs(false)
}
Expand Down Expand Up @@ -438,18 +460,12 @@ func (nrc *NetworkRoutingController) getVIPs(onlyActiveEndpoints bool) ([]string

// We need to account for the niche case where multiple services may have the same VIP, in this case, one service
// might be ready while the other service is not. We still want to advertise the VIP as long as there is at least
// one active endpoint on the node or we might introduce a service disruption.
// one active endpoint on the node, or we might introduce a service disruption.
finalToWithdrawList := make([]string, 0)
OUTER:
for _, withdrawVIP := range toWithdrawList {
for _, advertiseVIP := range toAdvertiseList {
if withdrawVIP == advertiseVIP {
// if there is a VIP that is set to both be advertised and withdrawn, don't add it to the final
// withdraw list
continue OUTER
}
if !utils.SliceContainsString(withdrawVIP, toAdvertiseList) {
finalToWithdrawList = append(finalToWithdrawList, withdrawVIP)
}
finalToWithdrawList = append(finalToWithdrawList, withdrawVIP)
}

return toAdvertiseList, finalToWithdrawList, nil
Expand Down
11 changes: 11 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,14 @@ func ContainsIPv6Address(addrs []string) bool {
}
return false
}

// SliceContainsString checks to see if needle is contained within haystack, returns true if found, otherwise
// returns false
func SliceContainsString(needle string, haystack []string) bool {
for _, hay := range haystack {
if needle == hay {
return true
}
}
return false
}

0 comments on commit 1d5c9ce

Please sign in to comment.