Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

k8s,daemon: Implement service topology aware hints #17929

Merged
merged 8 commits into from
Nov 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Documentation/cmdref/cilium-agent.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Documentation/gettingstarted/kubeproxy-free.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,14 @@ working, take a look at `this KEP
free mode, make sure that default Kubernetes services like ``kube-dns`` and ``kubernetes``
have the required label value.

Topology Aware Hints
********************

The kube-proxy replacement implements the K8s service
`Topology Aware Hints <https://kubernetes.io/docs/concepts/services-networking/topology-aware-hints>`__.
This allows Cilium nodes to prefer service endpoints residing in the same zone.
To enable the feature, set ``loadBalancer.serviceTopology=true``.

brb marked this conversation as resolved.
Show resolved Hide resolved
Neighbor Discovery
******************

Expand Down
3 changes: 3 additions & 0 deletions daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,9 @@ func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointma
if option.Config.BGPAnnounceLBIP || option.Config.BGPAnnouncePodCIDR {
d.k8sWatcher.NodeChain.Register(d.bgpSpeaker)
}
if option.Config.EnableServiceTopology {
d.k8sWatcher.NodeChain.Register(&d.k8sWatcher.K8sSvcCache)
}

d.redirectPolicyManager.RegisterSvcCache(&d.k8sWatcher.K8sSvcCache)
d.redirectPolicyManager.RegisterGetStores(d.k8sWatcher)
Expand Down
3 changes: 3 additions & 0 deletions daemon/cmd/daemon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,9 @@ func initializeFlags() {
flags.Bool(option.EnableSessionAffinity, false, "Enable support for service session affinity")
option.BindEnv(option.EnableSessionAffinity)

flags.Bool(option.EnableServiceTopology, false, "Enable support for service topology aware hints")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Off by default sounds good as a starting point. Once it's fully GA in K8s, we could also flip it to default on for the agent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small item, could also be followup from my pov: would be nice to add this to cilium status --verbose that it's enabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do it in follow up!

brb marked this conversation as resolved.
Show resolved Hide resolved
option.BindEnv(option.EnableServiceTopology)

flags.Bool(option.EnableIdentityMark, true, "Enable setting identity mark for local traffic")
option.BindEnv(option.EnableIdentityMark)

Expand Down
3 changes: 3 additions & 0 deletions install/kubernetes/cilium/templates/cilium-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,9 @@ data:
{{- if hasKey .Values.loadBalancer "dsrDispatch" }}
bpf-lb-dsr-dispatch: {{ .Values.loadBalancer.dsrDispatch | quote }}
{{- end }}
{{- if hasKey .Values.loadBalancer "serviceTopology" }}
enable-service-topology: {{ .Values.loadBalancer.serviceTopology | quote }}
brb marked this conversation as resolved.
Show resolved Hide resolved
{{- end }}

{{- end }}
{{- if hasKey .Values.maglev "tableSize" }}
Expand Down
4 changes: 4 additions & 0 deletions install/kubernetes/cilium/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,10 @@ monitor:
# used to pass a service IP and port to remote backend
# dsrDispatch: opt

# -- serviceTopology enables K8s Topology Aware Hints -based service
# endpoints filtering
# serviceTopology: false

# -- Configure N-S k8s service loadbalancing
nodePort:
# -- Enable the Cilium NodePort service implementation.
Expand Down
8 changes: 5 additions & 3 deletions pkg/bgp/speaker/speaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (s *MetalLBSpeaker) OnUpdateEndpointSliceV1Beta1(eps *slim_discover_v1beta1
}

// OnAddNode notifies the Speaker of a new node.
func (s *MetalLBSpeaker) OnAddNode(node *v1.Node) error {
func (s *MetalLBSpeaker) OnAddNode(node *v1.Node, swg *lock.StoppableWaitGroup) error {
if s.shutDown() {
return ErrShutDown
}
Expand Down Expand Up @@ -308,7 +308,9 @@ func (s *MetalLBSpeaker) OnAddNode(node *v1.Node) error {
}

// OnUpdateNode notifies the Speaker of an update to a node.
func (s *MetalLBSpeaker) OnUpdateNode(oldNode, newNode *v1.Node) error {
func (s *MetalLBSpeaker) OnUpdateNode(oldNode, newNode *v1.Node,
swg *lock.StoppableWaitGroup) error {

if s.shutDown() {
return ErrShutDown
}
Expand Down Expand Up @@ -344,7 +346,7 @@ func (s *MetalLBSpeaker) OnUpdateNode(oldNode, newNode *v1.Node) error {
// is shuttig down it will send a BGP message to its peer
// instructing it to withdrawal all previously advertised
// routes.
func (s *MetalLBSpeaker) OnDeleteNode(node *v1.Node) error {
func (s *MetalLBSpeaker) OnDeleteNode(node *v1.Node, swg *lock.StoppableWaitGroup) error {
if s.shutDown() {
return ErrShutDown
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/bgp/speaker/speaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func TestSpeakerOnUpdateNode(t *testing.T) {

go spkr.run(ctx)

err := spkr.OnUpdateNode(&node, &node)
err := spkr.OnUpdateNode(&node, &node, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -478,7 +478,7 @@ func TestSpeakerOnDeleteNode(t *testing.T) {

go spkr.run(ctx)

err := spkr.OnDeleteNode(&node)
err := spkr.OnDeleteNode(&node, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -501,10 +501,10 @@ func TestSpeakerOnDeleteNode(t *testing.T) {
if spkr.shutDown() != true {
t.Fatalf("wanted speaker to be shutdown")
}
if err := spkr.OnAddNode(nil); err != ErrShutDown {
if err := spkr.OnAddNode(nil, nil); err != ErrShutDown {
t.Fatalf("got: %v, want: %v", err, ErrShutDown)
}
if err := spkr.OnDeleteNode(nil); err != ErrShutDown {
if err := spkr.OnDeleteNode(nil, nil); err != ErrShutDown {
t.Fatalf("got: %v, want: %v", err, ErrShutDown)
}
if err := spkr.OnDeleteService(nil); err != ErrShutDown {
Expand All @@ -513,7 +513,7 @@ func TestSpeakerOnDeleteNode(t *testing.T) {
if err := spkr.OnUpdateEndpoints(nil); err != ErrShutDown {
t.Fatalf("got: %v, want: %v", err, ErrShutDown)
}
if err := spkr.OnUpdateNode(nil, nil); err != ErrShutDown {
if err := spkr.OnUpdateNode(nil, nil, nil); err != ErrShutDown {
t.Fatalf("got: %v, want: %v", err, ErrShutDown)
}
if err := spkr.OnUpdateService(nil); err != ErrShutDown {
Expand Down
17 changes: 14 additions & 3 deletions pkg/endpointmanager/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package endpointmanager
import (
"github.com/cilium/cilium/pkg/endpoint"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/lock"

v1 "k8s.io/api/core/v1"
)
Expand All @@ -30,12 +31,18 @@ func (mgr *EndpointManager) HostEndpointExists() bool {
// OnAddNode implements the EndpointManager's logic for reacting to new nodes
// from K8s. It is currently not implemented as the EndpointManager has not
// need for it. This adheres to the subscriber.NodeHandler interface.
func (mgr *EndpointManager) OnAddNode(node *v1.Node) error { return nil }
func (mgr *EndpointManager) OnAddNode(node *v1.Node,
swg *lock.StoppableWaitGroup) error {

return nil
}

// OnUpdateNode implements the EndpointManager's logic for reacting to updated
// nodes in K8s. It is currently not implemented as the EndpointManager has not
// need for it. This adheres to the subscriber.NodeHandler interface.
func (mgr *EndpointManager) OnUpdateNode(oldNode, newNode *v1.Node) error {
func (mgr *EndpointManager) OnUpdateNode(oldNode, newNode *v1.Node,
swg *lock.StoppableWaitGroup) error {

oldNodeLabels := oldNode.GetLabels()
newNodeLabels := newNode.GetLabels()

Expand All @@ -56,4 +63,8 @@ func (mgr *EndpointManager) OnUpdateNode(oldNode, newNode *v1.Node) error {
// OnDeleteNode implements the EndpointManager's logic for reacting to node
// deletions from K8s. It is currently not implemented as the EndpointManager
// has not need for it. This adheres to the subscriber.NodeHandler interface.
func (mgr *EndpointManager) OnDeleteNode(node *v1.Node) error { return nil }
func (mgr *EndpointManager) OnDeleteNode(node *v1.Node,
swg *lock.StoppableWaitGroup) error {

return nil
}
14 changes: 11 additions & 3 deletions pkg/k8s/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ func (e *Endpoints) DeepEqual(o *Endpoints) bool {
// +k8s:deepcopy-gen=true
// +deepequal-gen=true
type Backend struct {
Ports serviceStore.PortConfiguration
NodeName string
Terminating bool
Ports serviceStore.PortConfiguration
NodeName string
Terminating bool
HintsForZones []string
}

// String returns the string representation of an endpoints resource, with
Expand Down Expand Up @@ -289,6 +290,13 @@ func ParseEndpointSliceV1(ep *slim_discovery_v1.EndpointSlice) (EndpointSliceID,
backend.Ports[name] = lbPort
}
}
if sub.Hints != nil && (*sub.Hints).ForZones != nil {
hints := (*sub.Hints).ForZones
backend.HintsForZones = make([]string, len(hints))
for i, hint := range hints {
backend.HintsForZones[i] = hint.Name
}
}
}
}

Expand Down
38 changes: 38 additions & 0 deletions pkg/k8s/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,44 @@ func (s *K8sSuite) Test_parseK8sEPSlicev1(c *check.C) {
return svcEP
},
},
{
name: "endpoints have zone hints",
setupArgs: func() args {
return args{
eps: &slim_discovery_v1.EndpointSlice{
ObjectMeta: slim_metav1.ObjectMeta{
Name: "foo",
Namespace: "bar",
},
Endpoints: []slim_discovery_v1.Endpoint{
{
Addresses: []string{"172.0.0.1"},
Hints: &slim_discovery_v1.EndpointHints{
ForZones: []slim_discovery_v1.ForZone{{Name: "testing"}},
},
},
},
Ports: []slim_discovery_v1.EndpointPort{
{
Name: func() *string { a := "http-test-svc"; return &a }(),
Protocol: func() *slim_corev1.Protocol { a := slim_corev1.ProtocolTCP; return &a }(),
Port: func() *int32 { a := int32(8080); return &a }(),
},
},
},
}
},
setupWanted: func() *Endpoints {
svcEP := newEndpoints()
svcEP.Backends["172.0.0.1"] = &Backend{
Ports: serviceStore.PortConfiguration{
"http-test-svc": loadbalancer.NewL4Addr(loadbalancer.TCP, 8080),
},
HintsForZones: []string{"testing"},
}
return svcEP
},
},
}
for _, tt := range tests {
args := tt.setupArgs()
Expand Down
16 changes: 16 additions & 0 deletions pkg/k8s/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
v1 "k8s.io/api/core/v1"
)

const annotationTopologyAwareHints = "service.kubernetes.io/topology-aware-hints"

func getAnnotationIncludeExternal(svc *slim_corev1.Service) bool {
if value, ok := svc.ObjectMeta.Annotations[annotation.GlobalService]; ok {
return strings.ToLower(value) == "true"
Expand All @@ -42,6 +44,14 @@ func getAnnotationShared(svc *slim_corev1.Service) bool {
return getAnnotationIncludeExternal(svc)
}

func getAnnotationTopologyAwareHints(svc *slim_corev1.Service) bool {
if value, ok := svc.ObjectMeta.Annotations[annotationTopologyAwareHints]; ok {
return strings.ToLower(value) == "auto"
}

return false
}

// isValidServiceFrontendIP returns true if the provided service frontend IP address type
// is supported in cilium configuration.
func isValidServiceFrontendIP(netIP net.IP) bool {
Expand Down Expand Up @@ -209,6 +219,8 @@ func ParseService(svc *slim_corev1.Service, nodeAddressing datapath.NodeAddressi
}
}

svcInfo.TopologyAware = getAnnotationTopologyAwareHints(svc)

return svcID, svcInfo
}

Expand Down Expand Up @@ -323,6 +335,10 @@ type Service struct {
// Type is the internal service type
// +deepequal-gen=false
Type loadbalancer.SVCType

// TopologyAware denotes whether service endpoints might have topology aware
// hints
TopologyAware bool
}

// DeepEqual returns true if both the receiver and 'o' are deeply equal.
Expand Down