Skip to content

Commit

Permalink
k8s: Introduce shared resources
Browse files Browse the repository at this point in the history
This introduces a set of centrally managed resource.Resource[T] objects. These aim
to replace the current k8s/watchers and k8s/watchers/subscriber, with a single
interface (Resource[T]) that one can be observed and its store queried from any cell.
With this we'll gradually remove K8sWatcher by moving code from k8s/watchers/ into feature
specific packages that subscribe to Resource[T]. See pkg/k8s/resource/example.go for usage
example of Resource[T].

To start things off, in this patch the Resource[*Service] and Resource[*Node] are introduced
into the object graph and the existing service and node watchers are refactored to be
implemented on top of it. This keeps the existing code working, but allows new cells to
use these objects directly without dependency on K8sWatcher.

Signed-off-by: Jussi Maki <jussi@isovalent.com>
  • Loading branch information
joamaki committed Dec 22, 2022
1 parent f3a9b0a commit b4335ee
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 159 deletions.
5 changes: 5 additions & 0 deletions daemon/cmd/cells.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/pkg/gops"
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/k8s"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/node"
"github.com/cilium/cilium/pkg/option"
Expand Down Expand Up @@ -52,6 +53,10 @@ var (
// observing changes to it.
node.LocalNodeStoreCell,

// Shared resources provide access to k8s resources as event streams or as
// read-only stores.
k8s.SharedResourcesCell,

// daemonCell wraps the legacy daemon initialization and provides Promise[*Daemon].
daemonCell,

Expand Down
2 changes: 2 additions & 0 deletions daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup,
epMgr *endpointmanager.EndpointManager, dp datapath.Datapath,
wgAgent *wg.Agent,
clientset k8sClient.Clientset,
sharedResources k8s.SharedResources,
) (*Daemon, *endpointRestoreState, error) {

var (
Expand Down Expand Up @@ -701,6 +702,7 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup,
option.Config,
d.ipcache,
d.cgroupManager,
sharedResources,
)
nd.RegisterK8sGetters(d.k8sWatcher)

Expand Down
18 changes: 10 additions & 8 deletions daemon/cmd/daemon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1638,13 +1638,14 @@ var daemonCell = cell.Module(
type daemonParams struct {
cell.In

Lifecycle hive.Lifecycle
Clientset k8sClient.Clientset
Datapath datapath.Datapath
WGAgent *wg.Agent `optional:"true"`
LocalNodeStore node.LocalNodeStore
BGPController *bgpv1.Controller
Shutdowner hive.Shutdowner
Lifecycle hive.Lifecycle
Clientset k8sClient.Clientset
Datapath datapath.Datapath
WGAgent *wg.Agent `optional:"true"`
LocalNodeStore node.LocalNodeStore
BGPController *bgpv1.Controller
Shutdowner hive.Shutdowner
SharedResources k8s.SharedResources
}

func newDaemonPromise(params daemonParams) promise.Promise[*Daemon] {
Expand All @@ -1671,7 +1672,8 @@ func newDaemonPromise(params daemonParams) promise.Promise[*Daemon] {
WithDefaultEndpointManager(daemonCtx, params.Clientset, endpoint.CheckHealth),
params.Datapath,
params.WGAgent,
params.Clientset)
params.Clientset,
params.SharedResources)
if err != nil {
return fmt.Errorf("daemon creation failed: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion daemon/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ var (
dotGraphCmd = &cobra.Command{
Use: "dot-graph",
Short: "Output the internal dependencies of cilium-agent in graphviz dot format",
Run: func(cmd *cobra.Command, args []string) { agentHive.PrintDotGraph() },
Run: func(cmd *cobra.Command, args []string) {
agentHive.PrintDotGraph()
},
}

objectsCmd = &cobra.Command{
Expand Down
28 changes: 0 additions & 28 deletions pkg/bgpv1/cell.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ import (
"github.com/cilium/cilium/pkg/hive/cell"
v2alpha1api "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
"github.com/cilium/cilium/pkg/k8s/client"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/resource"
slim_core_v1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
"github.com/cilium/cilium/pkg/k8s/utils"
k8sutils "github.com/cilium/cilium/pkg/k8s/utils"
"github.com/cilium/cilium/pkg/option"
)

Expand All @@ -34,8 +32,6 @@ var Cell = cell.Module(
// goBGP is currently the only supported RouterManager, if more are
// implemented, provide the manager via a Cell that pics implementation based on configuration.
gobgp.NewBGPRouterManager,
// Create a slim service resource
newSlimServiceResource,
// Create a slim service DiffStore
gobgp.NewDiffStore[*slim_core_v1.Service],
),
Expand All @@ -58,27 +54,3 @@ func newBGPPeeringPolicyResource(lc hive.Lifecycle, c client.Clientset, dc *opti
c.CiliumV2alpha1().CiliumBGPPeeringPolicies(),
))
}

// Constructs a slim service resource with perpetual retries at a exponential backoff
func newSlimServiceResource(lc hive.Lifecycle, c k8sClient.Clientset, dc *option.DaemonConfig) (resource.Resource[*slim_core_v1.Service], error) {
// Do not create this resource if the BGP Control Plane is disabled
if !dc.BGPControlPlaneEnabled() {
return nil, nil
}

if !c.IsEnabled() {
return nil, nil
}

optsModifier, err := k8sutils.GetServiceListOptionsModifier(option.Config)
if err != nil {
return nil, err
}
return resource.New[*slim_core_v1.Service](
lc,
k8sutils.ListerWatcherWithModifier(
k8sutils.ListerWatcherFromTyped[*slim_core_v1.ServiceList](c.Slim().CoreV1().Services("")),
optsModifier),
), nil
}

63 changes: 63 additions & 0 deletions pkg/k8s/shared_resources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package k8s

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"

"github.com/cilium/cilium/pkg/hive"
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/resource"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
"github.com/cilium/cilium/pkg/k8s/utils"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
"github.com/cilium/cilium/pkg/option"
)

var (
// SharedResourceCell provides a set of shared handles to Kubernetes resources used throughout the
// Cilium agent. Each of the resources share a client-go informer and backing store so we only
// have one watch API call for each resource kind and that we maintain only one copy of each object.
//
// See pkg/k8s/resource/resource.go for documentation on the Resource[T] type.
SharedResourcesCell = cell.Module(
"k8s-shared-resources",
"Shared Kubernetes resources",

cell.Provide(
serviceResource,
localNodeResource,
),
)
)

type SharedResources struct {
cell.In
Services resource.Resource[*slim_corev1.Service]
LocalNode resource.Resource[*corev1.Node]
}

func serviceResource(lc hive.Lifecycle, cs client.Clientset) (resource.Resource[*slim_corev1.Service], error) {
if !cs.IsEnabled() {
return nil, nil
}
optsModifier, err := utils.GetServiceListOptionsModifier(option.Config)
if err != nil {
return nil, err
}
lw := utils.ListerWatcherFromTyped[*slim_corev1.ServiceList](cs.Slim().CoreV1().Services(""))
lw = utils.ListerWatcherWithModifier(lw, optsModifier)
return resource.New[*slim_corev1.Service](lc, lw), nil
}

func localNodeResource(lc hive.Lifecycle, cs client.Clientset) (resource.Resource[*corev1.Node], error) {
if !cs.IsEnabled() {
return nil, nil
}
lw := utils.ListerWatcherFromTyped[*corev1.NodeList](cs.CoreV1().Nodes())
lw = utils.ListerWatcherWithFields(lw, fields.ParseSelectorOrDie("metadata.name="+nodeTypes.GetName()))
return resource.New[*corev1.Node](lc, lw), nil
}
111 changes: 54 additions & 57 deletions pkg/k8s/watchers/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,11 @@ import (

v1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"

"github.com/cilium/cilium/pkg/comparator"
"github.com/cilium/cilium/pkg/k8s"
"github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/informer"
"github.com/cilium/cilium/pkg/k8s/utils"
"github.com/cilium/cilium/pkg/k8s/resource"
"github.com/cilium/cilium/pkg/k8s/watchers/resources"
"github.com/cilium/cilium/pkg/k8s/watchers/subscriber"
"github.com/cilium/cilium/pkg/lock"
Expand Down Expand Up @@ -48,63 +42,66 @@ func nodeEventsAreEqual(oldNode, newNode *v1.Node) bool {
}

func (k *K8sWatcher) NodesInit(k8sClient client.Clientset) {
apiGroup := k8sAPIGroupNodeV1Core
k.nodesInitOnce.Do(func() {
synced := false
swg := lock.NewStoppableWaitGroup()

nodeStore, nodeController := informer.NewInformer(
utils.ListerWatcherWithFields(
utils.ListerWatcherFromTyped[*v1.NodeList](k8sClient.CoreV1().Nodes()),
fields.ParseSelectorOrDie("metadata.name="+nodeTypes.GetName())),
&v1.Node{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
var valid bool
if node := k8s.ObjToV1Node(obj); node != nil {
valid = true
errs := k.NodeChain.OnAddNode(node, swg)
k.K8sEventProcessed(metricNode, resources.MetricCreate, errs == nil)
}
k.K8sEventReceived(apiGroup, metricNode, resources.MetricCreate, valid, false)
},
UpdateFunc: func(oldObj, newObj interface{}) {
var valid, equal bool
if oldNode := k8s.ObjToV1Node(oldObj); oldNode != nil {
valid = true
if newNode := k8s.ObjToV1Node(newObj); newNode != nil {
equal = nodeEventsAreEqual(oldNode, newNode)
if !equal {
errs := k.NodeChain.OnUpdateNode(oldNode, newNode, swg)
k.K8sEventProcessed(metricNode, resources.MetricCreate, errs == nil)
}
}
}
k.K8sEventReceived(apiGroup, metricNode, resources.MetricCreate, valid, false)
},
DeleteFunc: func(obj interface{}) {
},
},
nil,
k.blockWaitGroupToSyncResources(
k.stop,
swg,
func() bool { return synced },
k8sAPIGroupNodeV1Core,
)

k.nodeStore = nodeStore

k.blockWaitGroupToSyncResources(wait.NeverStop, swg, nodeController.HasSynced, k8sAPIGroupNodeV1Core)
go nodeController.Run(k.stop)
k.k8sAPIGroups.AddAPI(apiGroup)
go k.nodeEventLoop(&synced, swg)
})
}

func (k *K8sWatcher) nodeEventLoop(synced *bool, swg *lock.StoppableWaitGroup) {
apiGroup := k8sAPIGroupNodeV1Core
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

events := k.sharedResources.LocalNode.Events(ctx)
var oldNode *v1.Node
for {
select {
case <-k.stop:
cancel()
case event, ok := <-events:
if !ok {
return
}
switch event.Kind {
case resource.Sync:
*synced = true
case resource.Upsert:
newNode := event.Object
if oldNode == nil {
k.K8sEventReceived(apiGroup, metricNode, resources.MetricCreate, true, false)
errs := k.NodeChain.OnAddNode(newNode, swg)
k.K8sEventProcessed(metricNode, resources.MetricCreate, errs == nil)
} else {
equal := nodeEventsAreEqual(oldNode, newNode)
k.K8sEventReceived(apiGroup, metricNode, resources.MetricUpdate, true, equal)
if !equal {
errs := k.NodeChain.OnUpdateNode(oldNode, newNode, swg)
k.K8sEventProcessed(metricNode, resources.MetricUpdate, errs == nil)
}
}
oldNode = newNode
}
event.Done(nil)
}
}
}

// GetK8sNode returns the *local Node* from the local store.
func (k *K8sWatcher) GetK8sNode(_ context.Context, nodeName string) (*v1.Node, error) {
k.WaitForCacheSync(k8sAPIGroupNodeV1Core)
pName := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
},
func (k *K8sWatcher) GetK8sNode(ctx context.Context, nodeName string) (*v1.Node, error) {
// Retrieve the store. Blocks until synced (or ctx cancelled).
store, err := k.sharedResources.LocalNode.Store(ctx)
if err != nil {
return nil, err
}
nodeInterface, exists, err := k.nodeStore.Get(pName)
node, exists, err := store.GetByKey(resource.Key{Name: nodeName})
if err != nil {
return nil, err
}
Expand All @@ -114,7 +111,7 @@ func (k *K8sWatcher) GetK8sNode(_ context.Context, nodeName string) (*v1.Node, e
Resource: "Node",
}, nodeName)
}
return nodeInterface.(*v1.Node).DeepCopy(), nil
return node.DeepCopy(), nil
}

// ciliumNodeUpdater implements the subscriber.Node interface and is used
Expand Down

0 comments on commit b4335ee

Please sign in to comment.