Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

k8s.Resource metrics #26269

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/bgpv1/cell.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,5 @@ func newBGPPeeringPolicyResource(lc hive.Lifecycle, c client.Clientset, dc *opti
return resource.New[*v2alpha1api.CiliumBGPPeeringPolicy](
lc, utils.ListerWatcherFromTyped[*v2alpha1api.CiliumBGPPeeringPolicyList](
c.CiliumV2alpha1().CiliumBGPPeeringPolicies(),
))
), resource.WithMetric("CiliumBGPPeeringPolicy"))
}
4 changes: 2 additions & 2 deletions pkg/k8s/resource/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ var resourcesCell = cell.Module(
return nil
}
lw := utils.ListerWatcherFromTyped[*corev1.PodList](c.CoreV1().Pods(""))
return resource.New[*corev1.Pod](lc, lw)
return resource.New[*corev1.Pod](lc, lw, resource.WithMetric("Pod"))
},
func(lc hive.Lifecycle, c client.Clientset) resource.Resource[*corev1.Service] {
if !c.IsEnabled() {
return nil
}
lw := utils.ListerWatcherFromTyped[*corev1.ServiceList](c.CoreV1().Services(""))
return resource.New[*corev1.Service](lc, lw)
return resource.New[*corev1.Service](lc, lw, resource.WithMetric("Service"))
},
),
)
Expand Down
53 changes: 51 additions & 2 deletions pkg/k8s/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"runtime"
"strconv"
"sync"

corev1 "k8s.io/api/core/v1"
Expand All @@ -15,7 +16,9 @@ import (
"k8s.io/client-go/util/workqueue"

"github.com/cilium/cilium/pkg/hive"
k8smetrics "github.com/cilium/cilium/pkg/k8s/metrics"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/metrics"
"github.com/cilium/cilium/pkg/promise"
"github.com/cilium/cilium/pkg/stream"
)
Expand Down Expand Up @@ -118,8 +121,9 @@ func New[T k8sRuntime.Object](lc hive.Lifecycle, lw cache.ListerWatcher, opts ..
}

type options struct {
transform cache.TransformFunc // if non-nil, the object is transformed with this function before storing
sourceObj func() k8sRuntime.Object // prototype for the object before it is transformed
transform cache.TransformFunc // if non-nil, the object is transformed with this function before storing
sourceObj func() k8sRuntime.Object // prototype for the object before it is transformed
metricScope string // the scope label used when recording metrics for the resource
}

type ResourceOption func(o *options)
Expand Down Expand Up @@ -152,6 +156,13 @@ func WithLazyTransform(sourceObj func() k8sRuntime.Object, transform cache.Trans
}
}

// WithMetric enables metrics collection for the resource using the provided scope.
func WithMetric(scope string) ResourceOption {
return func(o *options) {
o.metricScope = scope
}
}

type resource[T k8sRuntime.Object] struct {
mu lock.RWMutex
ctx context.Context
Expand Down Expand Up @@ -190,7 +201,44 @@ func (r *resource[T]) Store(ctx context.Context) (Store[T], error) {
return r.storePromise.Await(ctx)
}

func (r *resource[T]) metricEventProcessed(eventKind EventKind, status bool) {
if r.opts.metricScope == "" {
return
}

result := "success"
if status == false {
result = "failed"
}

var action string
switch eventKind {
case Sync:
return
case Upsert:
action = "update"
case Delete:
action = "delete"
}

metrics.KubernetesEventProcessed.WithLabelValues(r.opts.metricScope, action, result).Inc()
}

func (r *resource[T]) metricEventReceived(action string, valid, equal bool) {
if r.opts.metricScope == "" {
return
}

k8smetrics.LastInteraction.Reset()

metrics.EventTS.WithLabelValues(metrics.LabelEventSourceK8s, r.opts.metricScope, action).SetToCurrentTime()
validStr := strconv.FormatBool(valid)
equalStr := strconv.FormatBool(equal)
metrics.KubernetesEventReceived.WithLabelValues(r.opts.metricScope, action, validStr, equalStr).Inc()
}

func (r *resource[T]) pushUpdate(key Key) {
r.metricEventReceived("update", true, false)
r.mu.RLock()
for _, queue := range r.queues {
queue.AddUpsert(key)
Expand Down Expand Up @@ -415,6 +463,7 @@ func (r *resource[T]) Events(ctx context.Context, opts ...EventsOpt) <-chan Even
event.Done = func(err error) {
runtime.SetFinalizer(eventDoneSentinel, nil)
queue.eventDone(entry, err)
r.metricEventProcessed(event.Kind, err == nil)
}

// Add a finalizer to catch forgotten calls to Done().
Expand Down
23 changes: 12 additions & 11 deletions pkg/k8s/resource_ctors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func ServiceResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*metav
utils.ListerWatcherFromTyped[*slim_corev1.ServiceList](cs.Slim().CoreV1().Services("")),
append(opts, optsModifier)...,
)
return resource.New[*slim_corev1.Service](lc, lw), nil
return resource.New[*slim_corev1.Service](lc, lw, resource.WithMetric("Service")), nil
}

func NodeResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*metav1.ListOptions)) (resource.Resource[*slim_corev1.Node], error) {
Expand All @@ -49,7 +49,7 @@ func NodeResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*metav1.L
utils.ListerWatcherFromTyped[*slim_corev1.NodeList](cs.Slim().CoreV1().Nodes()),
opts...,
)
return resource.New[*slim_corev1.Node](lc, lw), nil
return resource.New[*slim_corev1.Node](lc, lw, resource.WithMetric("Node")), nil
}

func CiliumNodeResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*metav1.ListOptions)) (resource.Resource[*cilium_api_v2.CiliumNode], error) {
Expand All @@ -60,7 +60,7 @@ func CiliumNodeResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*me
utils.ListerWatcherFromTyped[*cilium_api_v2.CiliumNodeList](cs.CiliumV2().CiliumNodes()),
opts...,
)
return resource.New[*cilium_api_v2.CiliumNode](lc, lw), nil
return resource.New[*cilium_api_v2.CiliumNode](lc, lw, resource.WithMetric("CiliumNode")), nil
}

func PodResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*metav1.ListOptions)) (resource.Resource[*slim_corev1.Pod], error) {
Expand All @@ -71,7 +71,7 @@ func PodResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*metav1.Li
utils.ListerWatcherFromTyped[*slim_corev1.PodList](cs.Slim().CoreV1().Pods("")),
opts...,
)
return resource.New[*slim_corev1.Pod](lc, lw), nil
return resource.New[*slim_corev1.Pod](lc, lw, resource.WithMetric("Pod")), nil
}

func NamespaceResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*metav1.ListOptions)) (resource.Resource[*slim_corev1.Namespace], error) {
Expand All @@ -82,7 +82,7 @@ func NamespaceResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*met
utils.ListerWatcherFromTyped[*slim_corev1.NamespaceList](cs.Slim().CoreV1().Namespaces()),
opts...,
)
return resource.New[*slim_corev1.Namespace](lc, lw), nil
return resource.New[*slim_corev1.Namespace](lc, lw, resource.WithMetric("Namespace")), nil
}

func LBIPPoolsResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*metav1.ListOptions)) (resource.Resource[*cilium_api_v2alpha1.CiliumLoadBalancerIPPool], error) {
Expand All @@ -93,7 +93,7 @@ func LBIPPoolsResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*met
utils.ListerWatcherFromTyped[*cilium_api_v2alpha1.CiliumLoadBalancerIPPoolList](cs.CiliumV2alpha1().CiliumLoadBalancerIPPools()),
opts...,
)
return resource.New[*cilium_api_v2alpha1.CiliumLoadBalancerIPPool](lc, lw), nil
return resource.New[*cilium_api_v2alpha1.CiliumLoadBalancerIPPool](lc, lw, resource.WithMetric("CiliumLoadBalancerIPPool")), nil
}

func CiliumIdentityResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*metav1.ListOptions)) (resource.Resource[*cilium_api_v2.CiliumIdentity], error) {
Expand All @@ -104,7 +104,7 @@ func CiliumIdentityResource(lc hive.Lifecycle, cs client.Clientset, opts ...func
utils.ListerWatcherFromTyped[*cilium_api_v2.CiliumIdentityList](cs.CiliumV2().CiliumIdentities()),
opts...,
)
return resource.New[*cilium_api_v2.CiliumIdentity](lc, lw), nil
return resource.New[*cilium_api_v2.CiliumIdentity](lc, lw, resource.WithMetric("CiliumIdentityList")), nil
}

func CiliumNetworkPolicyResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*metav1.ListOptions)) (resource.Resource[*cilium_api_v2.CiliumNetworkPolicy], error) {
Expand All @@ -115,7 +115,7 @@ func CiliumNetworkPolicyResource(lc hive.Lifecycle, cs client.Clientset, opts ..
utils.ListerWatcherFromTyped[*cilium_api_v2.CiliumNetworkPolicyList](cs.CiliumV2().CiliumNetworkPolicies("")),
opts...,
)
return resource.New[*cilium_api_v2.CiliumNetworkPolicy](lc, lw), nil
return resource.New[*cilium_api_v2.CiliumNetworkPolicy](lc, lw, resource.WithMetric("CiliumNetworkPolicy")), nil
}

func CiliumClusterwideNetworkPolicyResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*metav1.ListOptions)) (resource.Resource[*cilium_api_v2.CiliumClusterwideNetworkPolicy], error) {
Expand All @@ -126,7 +126,7 @@ func CiliumClusterwideNetworkPolicyResource(lc hive.Lifecycle, cs client.Clients
utils.ListerWatcherFromTyped[*cilium_api_v2.CiliumClusterwideNetworkPolicyList](cs.CiliumV2().CiliumClusterwideNetworkPolicies()),
opts...,
)
return resource.New[*cilium_api_v2.CiliumClusterwideNetworkPolicy](lc, lw), nil
return resource.New[*cilium_api_v2.CiliumClusterwideNetworkPolicy](lc, lw, resource.WithMetric("CiliumClusterwideNetworkPolicy")), nil
}

func CiliumCIDRGroupResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*metav1.ListOptions)) (resource.Resource[*cilium_api_v2alpha1.CiliumCIDRGroup], error) {
Expand All @@ -137,7 +137,7 @@ func CiliumCIDRGroupResource(lc hive.Lifecycle, cs client.Clientset, opts ...fun
utils.ListerWatcherFromTyped[*cilium_api_v2alpha1.CiliumCIDRGroupList](cs.CiliumV2alpha1().CiliumCIDRGroups()),
opts...,
)
return resource.New[*cilium_api_v2alpha1.CiliumCIDRGroup](lc, lw), nil
return resource.New[*cilium_api_v2alpha1.CiliumCIDRGroup](lc, lw, resource.WithMetric("CiliumCIDRGroup")), nil
}

func CiliumPodIPPoolResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*metav1.ListOptions)) (resource.Resource[*cilium_api_v2alpha1.CiliumPodIPPool], error) {
Expand All @@ -148,7 +148,7 @@ func CiliumPodIPPoolResource(lc hive.Lifecycle, cs client.Clientset, opts ...fun
utils.ListerWatcherFromTyped[*cilium_api_v2alpha1.CiliumPodIPPoolList](cs.CiliumV2alpha1().CiliumPodIPPools()),
opts...,
)
return resource.New[*cilium_api_v2alpha1.CiliumPodIPPool](lc, lw), nil
return resource.New[*cilium_api_v2alpha1.CiliumPodIPPool](lc, lw, resource.WithMetric("CiliumPodIPPool")), nil
}

func EndpointsResource(lc hive.Lifecycle, cs client.Clientset) (resource.Resource[*Endpoints], error) {
Expand All @@ -165,6 +165,7 @@ func EndpointsResource(lc hive.Lifecycle, cs client.Clientset) (resource.Resourc
lc,
lw,
resource.WithLazyTransform(lw.getSourceObj, transformEndpoint),
resource.WithMetric("Endpoint"),
), nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/l2announcer/l2announcer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func l2AnnouncementPolicyResource(lc hive.Lifecycle, cs client.Clientset) (resou
lw := utils.ListerWatcherFromTyped[*cilium_api_v2alpha1.CiliumL2AnnouncementPolicyList](
cs.CiliumV2alpha1().CiliumL2AnnouncementPolicies(),
)
return resource.New[*cilium_api_v2alpha1.CiliumL2AnnouncementPolicy](lc, lw), nil
return resource.New[*cilium_api_v2alpha1.CiliumL2AnnouncementPolicy](lc, lw, resource.WithMetric("CiliumL2AnnouncementPolicy")), nil
}

type l2AnnouncerParams struct {
Expand Down