Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

k8s: Use slim Node in LocalNode Resource and K8s watchers #25282

Merged
merged 2 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 10 additions & 18 deletions pkg/bgp/fence/fence.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"fmt"
"strconv"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

slim_metav1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

// Fencer provides a method set to prevent processing out of order events.
Expand Down Expand Up @@ -66,28 +64,22 @@ type Meta struct {
Rev uint64
}

// FromSlimObjectMeta allocates a meta derived from
// a slim k8s ObjectMeta and stores it at the memory
// pointed to by m.
func (m *Meta) FromSlimObjectMeta(om *slim_metav1.ObjectMeta) error {
rev, err := strconv.ParseUint(om.ResourceVersion, 10, 64)
if err != nil {
return fmt.Errorf("ObjectMeta.ResourceVersion must be parsible to Uint64")
}
(*m).Rev = rev
(*m).UUID = string(om.UID)
return nil
// metaGetter specifies the methods needed by (*Meta).FromObjectMeta.
// This is used extract metadata from a corev1 or a slim_corev1 object.
type metaGetter interface {
GetResourceVersion() string
GetUID() types.UID
}

// FromSlimObjectMeta allocates a meta derived from
// FromObjectMeta allocates a meta derived from
// a k8s ObjectMeta and stores it at the memory
// pointed to by m.
func (m *Meta) FromObjectMeta(om *v1.ObjectMeta) error {
rev, err := strconv.ParseUint(om.ResourceVersion, 10, 64)
func (m *Meta) FromObjectMeta(mg metaGetter) error {
rev, err := strconv.ParseUint(mg.GetResourceVersion(), 10, 64)
if err != nil {
return fmt.Errorf("ObjectMeta.ResourceVersion must be parsible to Uint64")
}
(*m).Rev = rev
(*m).UUID = string(om.UID)
(*m).UUID = string(mg.GetUID())
return nil
}
8 changes: 4 additions & 4 deletions pkg/bgp/mock/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,23 @@ import (
// to simulate it being the node the Cilium agent is running on.
//
// See definition for details.
func GenTestNodeAndAdvertisements() (v1.Node, []*metallbbgp.Advertisement) {
func GenTestNodeAndAdvertisements() (slim_corev1.Node, []*metallbbgp.Advertisement) {
const (
CIDR = "1.1.0.0/16"
)
meta := metav1.ObjectMeta{
meta := slim_metav1.ObjectMeta{
Name: nodetypes.GetName(),
Namespace: "TestNamespace",
Labels: map[string]string{
"TestLabel": "TestLabel",
},
ResourceVersion: "1",
}
spec := v1.NodeSpec{
spec := slim_corev1.NodeSpec{
PodCIDR: CIDR,
PodCIDRs: []string{CIDR},
}
node := v1.Node{
node := slim_corev1.Node{
ObjectMeta: meta,
Spec: spec,
}
Expand Down
59 changes: 26 additions & 33 deletions pkg/bgp/speaker/speaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/sirupsen/logrus"
metallbspr "go.universe.tf/metallb/pkg/speaker"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"

"github.com/cilium/cilium/pkg/bgp/fence"
Expand Down Expand Up @@ -124,7 +124,7 @@ func (s *MetalLBSpeaker) OnUpdateService(svc *slim_corev1.Service) error {
s.services[svcID] = svc
s.Unlock()

if err := meta.FromSlimObjectMeta(&svc.ObjectMeta); err != nil {
if err := meta.FromObjectMeta(&svc.ObjectMeta); err != nil {
l.WithError(err).Error("failed to parse event metadata")
}

Expand Down Expand Up @@ -157,7 +157,7 @@ func (s *MetalLBSpeaker) OnDeleteService(svc *slim_corev1.Service) error {
delete(s.services, svcID)
s.Unlock()

if err := meta.FromSlimObjectMeta(&svc.ObjectMeta); err != nil {
if err := meta.FromObjectMeta(&svc.ObjectMeta); err != nil {
l.WithError(err).Error("failed to parse event metadata")
}

Expand Down Expand Up @@ -192,7 +192,7 @@ func (s *MetalLBSpeaker) OnUpdateEndpoints(eps *slim_corev1.Endpoints) error {
s.Lock()
defer s.Unlock()

if err := meta.FromSlimObjectMeta(&eps.ObjectMeta); err != nil {
if err := meta.FromObjectMeta(&eps.ObjectMeta); err != nil {
l.WithError(err).Error("failed to parse event metadata")
}

Expand Down Expand Up @@ -227,7 +227,7 @@ func (s *MetalLBSpeaker) OnUpdateEndpointSliceV1(eps *slim_discover_v1.EndpointS
s.Lock()
defer s.Unlock()

if err := meta.FromSlimObjectMeta(&eps.ObjectMeta); err != nil {
if err := meta.FromObjectMeta(&eps.ObjectMeta); err != nil {
l.WithError(err).Error("failed to parse event metadata")
}

Expand Down Expand Up @@ -262,7 +262,7 @@ func (s *MetalLBSpeaker) OnUpdateEndpointSliceV1Beta1(eps *slim_discover_v1beta1
s.Lock()
defer s.Unlock()

if err := meta.FromSlimObjectMeta(&eps.ObjectMeta); err != nil {
if err := meta.FromObjectMeta(&eps.ObjectMeta); err != nil {
l.WithError(err).Error("failed to parse event metadata")
return err
}
Expand All @@ -280,8 +280,15 @@ func (s *MetalLBSpeaker) OnUpdateEndpointSliceV1Beta1(eps *slim_discover_v1beta1
return nil
}

type metaGetter interface {
GetName() string
GetResourceVersion() string
GetUID() types.UID
GetLabels() map[string]string
}

// notifyNodeEvent notifies the speaker of a node (K8s Node or CiliumNode) event
func (s *MetalLBSpeaker) notifyNodeEvent(op Op, nodeMeta *metav1.ObjectMeta, podCIDRs *[]string, withDraw bool) error {
func (s *MetalLBSpeaker) notifyNodeEvent(op Op, nodeMeta metaGetter, podCIDRs *[]string, withDraw bool) error {
if s.shutDown() {
return ErrShutDown
}
Expand All @@ -292,7 +299,7 @@ func (s *MetalLBSpeaker) notifyNodeEvent(op Op, nodeMeta *metav1.ObjectMeta, pod
l = log.WithFields(logrus.Fields{
"component": "MetalLBSpeaker.notifyNodeEvent",
"op": op.String(),
"node": nodeMeta.Name,
"node": nodeMeta.GetName(),
})
meta = fence.Meta{}
)
Expand All @@ -305,20 +312,20 @@ func (s *MetalLBSpeaker) notifyNodeEvent(op Op, nodeMeta *metav1.ObjectMeta, pod
s.queue.Add(nodeEvent{
Meta: meta,
op: op,
labels: nodeLabels(nodeMeta.Labels),
labels: nodeLabels(nodeMeta.GetLabels()),
podCIDRs: podCIDRs,
withDraw: withDraw,
})
return nil
}

// OnAddNode notifies the Speaker of a new node.
func (s *MetalLBSpeaker) OnAddNode(node *v1.Node, swg *lock.StoppableWaitGroup) error {
return s.notifyNodeEvent(Add, nodeMeta(node), nodePodCIDRs(node), false)
func (s *MetalLBSpeaker) OnAddNode(node *slim_corev1.Node, swg *lock.StoppableWaitGroup) error {
return s.notifyNodeEvent(Add, node, nodePodCIDRs(node), false)
}

func (s *MetalLBSpeaker) OnUpdateNode(oldNode, newNode *v1.Node, swg *lock.StoppableWaitGroup) error {
return s.notifyNodeEvent(Update, nodeMeta(newNode), nodePodCIDRs(newNode), false)
func (s *MetalLBSpeaker) OnUpdateNode(oldNode, newNode *slim_corev1.Node, swg *lock.StoppableWaitGroup) error {
return s.notifyNodeEvent(Update, newNode, nodePodCIDRs(newNode), false)
}

// OnDeleteNode notifies the Speaker of a node deletion.
Expand All @@ -327,18 +334,18 @@ func (s *MetalLBSpeaker) OnUpdateNode(oldNode, newNode *v1.Node, swg *lock.Stopp
// is shuttig down it will send a BGP message to its peer
// instructing it to withdrawal all previously advertised
// routes.
func (s *MetalLBSpeaker) OnDeleteNode(node *v1.Node, swg *lock.StoppableWaitGroup) error {
return s.notifyNodeEvent(Delete, nodeMeta(node), nodePodCIDRs(node), true)
func (s *MetalLBSpeaker) OnDeleteNode(node *slim_corev1.Node, swg *lock.StoppableWaitGroup) error {
return s.notifyNodeEvent(Delete, node, nodePodCIDRs(node), true)
}

// OnAddCiliumNode notifies the Speaker of a new CiliumNode.
func (s *MetalLBSpeaker) OnAddCiliumNode(node *ciliumv2.CiliumNode, swg *lock.StoppableWaitGroup) error {
return s.notifyNodeEvent(Add, ciliumNodeMeta(node), ciliumNodePodCIDRs(node), false)
return s.notifyNodeEvent(Add, node, ciliumNodePodCIDRs(node), false)
}

// OnUpdateCiliumNode notifies the Speaker of an update to a CiliumNode.
func (s *MetalLBSpeaker) OnUpdateCiliumNode(oldNode, newNode *ciliumv2.CiliumNode, swg *lock.StoppableWaitGroup) error {
return s.notifyNodeEvent(Update, ciliumNodeMeta(newNode), ciliumNodePodCIDRs(newNode), false)
return s.notifyNodeEvent(Update, newNode, ciliumNodePodCIDRs(newNode), false)
}

// OnDeleteCiliumNode notifies the Speaker of a CiliumNode deletion.
Expand All @@ -348,7 +355,7 @@ func (s *MetalLBSpeaker) OnUpdateCiliumNode(oldNode, newNode *ciliumv2.CiliumNod
// instructing it to withdrawal all previously advertised
// routes.
func (s *MetalLBSpeaker) OnDeleteCiliumNode(node *ciliumv2.CiliumNode, swg *lock.StoppableWaitGroup) error {
return s.notifyNodeEvent(Delete, ciliumNodeMeta(node), ciliumNodePodCIDRs(node), true)
return s.notifyNodeEvent(Delete, node, ciliumNodePodCIDRs(node), true)
}

// RegisterSvcCache registers the K8s watcher cache with this Speaker.
Expand Down Expand Up @@ -482,14 +489,7 @@ func nodeLabels(l map[string]string) *map[string]string {
return &n
}

func nodeMeta(node *v1.Node) *metav1.ObjectMeta {
if node == nil {
return nil
}
return &node.ObjectMeta
}

func nodePodCIDRs(node *v1.Node) *[]string {
func nodePodCIDRs(node *slim_corev1.Node) *[]string {
if node == nil {
return nil
}
Expand All @@ -506,13 +506,6 @@ func nodePodCIDRs(node *v1.Node) *[]string {
return &podCIDRs
}

func ciliumNodeMeta(node *ciliumv2.CiliumNode) *metav1.ObjectMeta {
if node == nil {
return nil
}
return &node.ObjectMeta
}

func ciliumNodePodCIDRs(node *ciliumv2.CiliumNode) *[]string {
if node == nil {
return nil
Expand Down
13 changes: 6 additions & 7 deletions pkg/bgpv1/agent/nodespecer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ import (
"errors"
"fmt"

corev1 "k8s.io/api/core/v1"

"github.com/cilium/cilium/pkg/hive"
"github.com/cilium/cilium/pkg/hive/cell"
ipamOption "github.com/cilium/cilium/pkg/ipam/option"
"github.com/cilium/cilium/pkg/k8s"
ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/k8s/resource"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
"github.com/cilium/cilium/pkg/option"

"github.com/cilium/workerpool"
Expand All @@ -33,8 +32,8 @@ type localNodeStoreSpecerParams struct {

Lifecycle hive.Lifecycle
Config *option.DaemonConfig
NodeResource *k8s.LocalNodeResource
CiliumNodeResource *k8s.LocalCiliumNodeResource
NodeResource k8s.LocalNodeResource
CiliumNodeResource k8s.LocalCiliumNodeResource
Signaler Signaler
}

Expand Down Expand Up @@ -67,9 +66,9 @@ func NewNodeSpecer(params localNodeStoreSpecerParams) (nodeSpecer, error) {
}

type kubernetesNodeSpecer struct {
nodeResource *k8s.LocalNodeResource
nodeResource k8s.LocalNodeResource

currentNode *corev1.Node
currentNode *slim_corev1.Node
workerpool *workerpool.WorkerPool
signaler Signaler
}
Expand Down Expand Up @@ -142,7 +141,7 @@ func (s *kubernetesNodeSpecer) PodCIDRs() ([]string, error) {
}

type ciliumNodeSpecer struct {
nodeResource *k8s.LocalCiliumNodeResource
nodeResource k8s.LocalCiliumNodeResource

currentNode *ciliumv2.CiliumNode
workerpool *workerpool.WorkerPool
Expand Down
9 changes: 4 additions & 5 deletions pkg/endpointmanager/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
package endpointmanager

import (
v1 "k8s.io/api/core/v1"

"github.com/cilium/cilium/pkg/endpoint"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/node"
Expand All @@ -32,7 +31,7 @@ func (mgr *endpointManager) HostEndpointExists() bool {
// OnAddNode implements the endpointManager's logic for reacting to new nodes
// from K8s. It is currently not implemented as the endpointManager has not
// need for it. This adheres to the subscriber.NodeHandler interface.
func (mgr *endpointManager) OnAddNode(node *v1.Node,
func (mgr *endpointManager) OnAddNode(node *slim_corev1.Node,
swg *lock.StoppableWaitGroup) error {

return nil
Expand All @@ -41,7 +40,7 @@ func (mgr *endpointManager) OnAddNode(node *v1.Node,
// OnUpdateNode implements the endpointManager's logic for reacting to updated
// nodes in K8s. It is currently not implemented as the endpointManager has not
// need for it. This adheres to the subscriber.NodeHandler interface.
func (mgr *endpointManager) OnUpdateNode(oldNode, newNode *v1.Node,
func (mgr *endpointManager) OnUpdateNode(oldNode, newNode *slim_corev1.Node,
swg *lock.StoppableWaitGroup) error {

oldNodeLabels := oldNode.GetLabels()
Expand All @@ -66,7 +65,7 @@ func (mgr *endpointManager) OnUpdateNode(oldNode, newNode *v1.Node,
// OnDeleteNode implements the endpointManager's logic for reacting to node
// deletions from K8s. It is currently not implemented as the endpointManager
// has not need for it. This adheres to the subscriber.NodeHandler interface.
func (mgr *endpointManager) OnDeleteNode(node *v1.Node,
func (mgr *endpointManager) OnDeleteNode(node *slim_corev1.Node,
swg *lock.StoppableWaitGroup) error {

return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/k8s/client/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
)

// Getters is a set of methods for retrieving common objects.
type Getters interface {
GetSecrets(ctx context.Context, namespace, name string) (map[string][]byte, error)
GetK8sNode(ctx context.Context, nodeName string) (*corev1.Node, error)
GetK8sNode(ctx context.Context, nodeName string) (*slim_corev1.Node, error)
GetCiliumNode(ctx context.Context, nodeName string) (*cilium_v2.CiliumNode, error)
}

Expand All @@ -39,12 +39,12 @@ func (cs *clientsetGetters) GetSecrets(ctx context.Context, ns, name string) (ma
}

// GetK8sNode returns the node with the given nodeName.
func (cs *clientsetGetters) GetK8sNode(ctx context.Context, nodeName string) (*corev1.Node, error) {
func (cs *clientsetGetters) GetK8sNode(ctx context.Context, nodeName string) (*slim_corev1.Node, error) {
if !cs.IsEnabled() {
return nil, fmt.Errorf("GetK8sNode: No k8s, cannot access k8s nodes")
}

return cs.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
return cs.Slim().CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
}

// GetCiliumNode returns the CiliumNode with the given nodeName.
Expand Down