Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support EndpointSlices with BGP mode by updating MetalLB to v0.10.0 #16524

Merged
merged 4 commits into from
Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 0 additions & 11 deletions Documentation/gettingstarted/bgp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,3 @@ Verify that traffic to the external IP is directed to the backends:

$ # Exec / SSH into BGP router
$ curl 192.0.2.154

Limitations
===========

BGP support relies on MetalLB. Due to the `lack of upstream support
<https://github.com/metallb/metallb/issues/811>`_ for ``EndpointSlices`` in
MetalLB, Cilium will fallback to using the original ``Endpoints`` resource.

The Kubernetes documentation provides a simple `explanation
<https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/>`_ of
the advantages of ``EndpointSlices`` and the issues with ``Endpoints``.
11 changes: 0 additions & 11 deletions daemon/cmd/daemon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1442,17 +1442,6 @@ func initEnv(cmd *cobra.Command) {
if option.Config.BGPAnnounceLBIP {
option.Config.EnableNodePort = true
log.Infof("Auto-set BPF NodePort (%q) because LB IP announcements via BGP depend on it.", option.EnableNodePort)

if option.Config.K8sEnableK8sEndpointSlice {
option.Config.K8sEnableK8sEndpointSlice = false
log.WithFields(logrus.Fields{
logfields.URL: "https://github.com/metallb/metallb/issues/811",
}).Warnf(
"Disabling EndpointSlice support (%q) due to incompatibility with BGP mode. "+
"Cilium will fallback to using the original Endpoint resource.",
option.K8sEnableEndpointSlice,
)
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0
go.uber.org/goleak v1.1.10
go.universe.tf/metallb v0.9.6
go.universe.tf/metallb v0.10.0
golang.org/x/crypto v0.0.0-20210503195802-e9a32991a82e
golang.org/x/net v0.0.0-20210504132125-bbd867fde50d
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand Down Expand Up @@ -117,7 +117,7 @@ replace (
github.com/miekg/dns => github.com/cilium/dns v1.1.4-0.20190417235132-8e25ec9a0ff3
github.com/optiopay/kafka => github.com/cilium/kafka v0.0.0-20180809090225-01ce283b732b

go.universe.tf/metallb => github.com/cilium/metallb v0.1.1-0.20210607221240-b4c60b959dd7
go.universe.tf/metallb => github.com/cilium/metallb v0.1.1-0.20210609003938-62cef75b3c9f

// Using private fork of controller-tools. See commit msg for more context
// as to why we are using a private fork.
Expand Down
5 changes: 3 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion pkg/bgp/manager/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
"github.com/cilium/cilium/pkg/logging/logfields"

metallbk8s "go.universe.tf/metallb/pkg/k8s"
"go.universe.tf/metallb/pkg/k8s/types"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -131,7 +132,9 @@ func (m *Manager) process(event interface{}) types.SyncState {
// reconcile calls down to the MetalLB controller to reconcile the service
// object, which will allocate it an LB IP.
func (m *Manager) reconcile(name string, svc *slim_corev1.Service) types.SyncState {
return m.SetBalancer(m.Logger(), name, toV1Service(svc), nil)
return m.SetBalancer(m.Logger(), name, toV1Service(svc), metallbk8s.EpsOrSlices{
Type: metallbk8s.Eps,
})
}

// forceResync re-adds all the services from the indexer to the queue. See
Expand Down
70 changes: 70 additions & 0 deletions pkg/bgp/speaker/speaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
bgplog "github.com/cilium/cilium/pkg/bgp/log"
"github.com/cilium/cilium/pkg/k8s"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
slim_discover_v1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1"
slim_discover_v1beta1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1beta1"
"github.com/cilium/cilium/pkg/lock"
nodetypes "github.com/cilium/cilium/pkg/node/types"
"github.com/cilium/cilium/pkg/option"
Expand Down Expand Up @@ -143,6 +145,38 @@ func (s *Speaker) OnUpdateEndpoints(eps *slim_corev1.Endpoints) {
}
}

// OnUpdateEndpointSliceV1 notifies the Speaker of an update to the backends of
// a service as endpoint slices.
func (s *Speaker) OnUpdateEndpointSliceV1(eps *slim_discover_v1.EndpointSlice) {
sliceID, _ := k8s.ParseEndpointSliceV1(eps)

s.Lock()
defer s.Unlock()
if svc, ok := s.services[sliceID.ServiceID]; ok {
s.queue.Add(epEvent{
id: sliceID.ServiceID,
svc: convertService(svc),
eps: convertEndpointSliceV1(eps),
})
}
}

// OnUpdateEndpointSliceV1Beta1 is the same as OnUpdateEndpointSliceV1() but for
// the v1beta1 variant.
func (s *Speaker) OnUpdateEndpointSliceV1Beta1(eps *slim_discover_v1beta1.EndpointSlice) {
sliceID, _ := k8s.ParseEndpointSliceV1Beta1(eps)

s.Lock()
defer s.Unlock()
if svc, ok := s.services[sliceID.ServiceID]; ok {
s.queue.Add(epEvent{
id: sliceID.ServiceID,
svc: convertService(svc),
eps: convertEndpointSliceV1Beta1(eps),
})
}
}

// OnUpdateNode notifies the Speaker of an update to a node.
func (s *Speaker) OnUpdateNode(node *v1.Node) {
s.queue.Add(nodeEvent(&node.Labels))
Expand Down Expand Up @@ -214,3 +248,39 @@ func convertEndpoints(in *slim_corev1.Endpoints) *metallbspr.Endpoints {
}
return out
}

func convertEndpointSliceV1(in *slim_discover_v1.EndpointSlice) *metallbspr.Endpoints {
if in == nil {
return nil
}
out := new(metallbspr.Endpoints)
for _, ep := range in.Endpoints {
for _, addr := range ep.Addresses {
out.Ready = append(out.Ready, metallbspr.Endpoint{
IP: addr,
NodeName: ep.NodeName,
})
}
// See above comment in convertEndpoints() for why we only append
// "ready" endpoints.
}
return out
}

func convertEndpointSliceV1Beta1(in *slim_discover_v1beta1.EndpointSlice) *metallbspr.Endpoints {
if in == nil {
return nil
}
out := new(metallbspr.Endpoints)
for _, ep := range in.Endpoints {
for _, addr := range ep.Addresses {
out.Ready = append(out.Ready, metallbspr.Endpoint{
IP: addr,
NodeName: ep.NodeName,
})
}
// See above comment in convertEndpoints() for why we only append
// "ready" endpoints.
}
return out
}
25 changes: 21 additions & 4 deletions pkg/k8s/watchers/endpoint_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
slim_discover_v1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1"
slim_discover_v1beta1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1beta1"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/option"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -57,7 +58,7 @@ func (k *K8sWatcher) endpointSlicesInit(k8sClient kubernetes.Interface, swgEps *
defer func() { k.K8sEventReceived(metricEndpointSlice, metricCreate, valid, equal) }()
if k8sEP := k8s.ObjToV1EndpointSlice(obj); k8sEP != nil {
valid = true
k.K8sSvcCache.UpdateEndpointSlicesV1(k8sEP, swgEps)
k.updateK8sEndpointSliceV1(k8sEP, swgEps)
k.K8sEventProcessed(metricEndpointSlice, metricCreate, true)
}
}
Expand All @@ -72,7 +73,7 @@ func (k *K8sWatcher) endpointSlicesInit(k8sClient kubernetes.Interface, swgEps *
return
}

k.K8sSvcCache.UpdateEndpointSlicesV1(newk8sEP, swgEps)
k.updateK8sEndpointSliceV1(newk8sEP, swgEps)
k.K8sEventProcessed(metricEndpointSlice, metricUpdate, true)
}
}
Expand Down Expand Up @@ -102,7 +103,7 @@ func (k *K8sWatcher) endpointSlicesInit(k8sClient kubernetes.Interface, swgEps *
defer func() { k.K8sEventReceived(metricEndpointSlice, metricCreate, valid, equal) }()
if k8sEP := k8s.ObjToV1Beta1EndpointSlice(obj); k8sEP != nil {
valid = true
k.K8sSvcCache.UpdateEndpointSlicesV1Beta1(k8sEP, swgEps)
k.updateK8sEndpointSliceV1Beta1(k8sEP, swgEps)
k.K8sEventProcessed(metricEndpointSlice, metricCreate, true)
}
}
Expand All @@ -117,7 +118,7 @@ func (k *K8sWatcher) endpointSlicesInit(k8sClient kubernetes.Interface, swgEps *
return
}

k.K8sSvcCache.UpdateEndpointSlicesV1Beta1(newk8sEP, swgEps)
k.updateK8sEndpointSliceV1Beta1(newk8sEP, swgEps)
k.K8sEventProcessed(metricEndpointSlice, metricUpdate, true)
}
}
Expand Down Expand Up @@ -162,3 +163,19 @@ func (k *K8sWatcher) endpointSlicesInit(k8sClient kubernetes.Interface, swgEps *
close(ecr)
return false
}

func (k *K8sWatcher) updateK8sEndpointSliceV1(eps *slim_discover_v1.EndpointSlice, swgEps *lock.StoppableWaitGroup) {
k.K8sSvcCache.UpdateEndpointSlicesV1(eps, swgEps)

if option.Config.BGPAnnounceLBIP {
k.bgpSpeakerManager.OnUpdateEndpointSliceV1(eps)
}
}

func (k *K8sWatcher) updateK8sEndpointSliceV1Beta1(eps *slim_discover_v1beta1.EndpointSlice, swgEps *lock.StoppableWaitGroup) {
k.K8sSvcCache.UpdateEndpointSlicesV1Beta1(eps, swgEps)

if option.Config.BGPAnnounceLBIP {
k.bgpSpeakerManager.OnUpdateEndpointSliceV1Beta1(eps)
}
}
4 changes: 4 additions & 0 deletions pkg/k8s/watchers/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/cilium/cilium/pkg/k8s"
k8smetrics "github.com/cilium/cilium/pkg/k8s/metrics"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
slim_discover_v1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1"
slim_discover_v1beta1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1beta1"
"github.com/cilium/cilium/pkg/k8s/synced"
k8sTypes "github.com/cilium/cilium/pkg/k8s/types"
"github.com/cilium/cilium/pkg/k8s/utils"
Expand Down Expand Up @@ -159,6 +161,8 @@ type bgpSpeakerManager interface {
OnDeleteService(svc *slim_corev1.Service)

OnUpdateEndpoints(eps *slim_corev1.Endpoints)
OnUpdateEndpointSliceV1(eps *slim_discover_v1.EndpointSlice)
OnUpdateEndpointSliceV1Beta1(eps *slim_discover_v1beta1.EndpointSlice)

OnUpdateNode(node *corev1.Node)
}
Expand Down