Skip to content

Commit

Permalink
remove bind metric decorator (#685)
Browse files Browse the repository at this point in the history
  • Loading branch information
cjerad committed Sep 17, 2021
1 parent 18dc533 commit db1d8bc
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 64 deletions.
2 changes: 1 addition & 1 deletion pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var _ = BeforeSuite(func() {
registry.RegisterOrDie(cloudProvider)
controller = &allocation.Controller{
Filter: &allocation.Filter{KubeClient: e.Client},
Binder: allocation.NewBinder(e.Client, clientSet.CoreV1()),
Binder: &allocation.Binder{KubeClient: e.Client, CoreV1Client: clientSet.CoreV1()},
Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond),
Scheduler: scheduling.NewScheduler(cloudProvider, e.Client),
Packer: binpacking.NewPacker(),
Expand Down
104 changes: 45 additions & 59 deletions pkg/controllers/allocation/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,53 @@ import (
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
)

type Binder interface {
Bind(context.Context, *v1.Node, []*v1.Pod) error
var bindTimeHistogramVec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metrics.KarpenterNamespace,
Subsystem: "allocation_controller",
Name: "bind_duration_seconds",
Help: "Duration of bind process in seconds. Broken down by result.",
Buckets: metrics.DurationBuckets(),
},
[]string{metrics.ResultLabel},
)

func init() {
crmetrics.Registry.MustRegister(bindTimeHistogramVec)
}

type binder struct {
kubeClient client.Client
coreV1Client corev1.CoreV1Interface
type Binder struct {
KubeClient client.Client
CoreV1Client corev1.CoreV1Interface
}

func NewBinder(kubeClient client.Client, coreV1Client corev1.CoreV1Interface) Binder {
return &binder{kubeClient: kubeClient, coreV1Client: coreV1Client}
func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error {
startTime := time.Now()
bindErr := b.bind(ctx, node, pods)
durationSeconds := time.Since(startTime).Seconds()

result := "success"
if bindErr != nil {
result = "error"
}

labels := prometheus.Labels{metrics.ResultLabel: result}
observer, promErr := bindTimeHistogramVec.GetMetricWith(labels)
if promErr != nil {
logging.FromContext(ctx).Warnf(
"Failed to record bind duration metric [labels=%s, duration=%f]: error=%s",
labels,
durationSeconds,
promErr.Error(),
)
} else {
observer.Observe(durationSeconds)
}

return bindErr
}

func (b *binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error {
func (b *Binder) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error {
// 1. Add the Karpenter finalizer to the node to enable the termination workflow
node.Finalizers = append(node.Finalizers, v1alpha3.TerminationFinalizer)
// 2. Taint karpenter.sh/not-ready=NoSchedule to prevent the kube scheduler
Expand All @@ -67,7 +100,7 @@ func (b *binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error
// with the API server. In the common case, we create the node object
// ourselves to enforce the binding decision and enable images to be pulled
// before the node is fully Ready.
if _, err := b.coreV1Client.Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
if _, err := b.CoreV1Client.Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
if !errors.IsAlreadyExists(err) {
return fmt.Errorf("creating node %s, %w", node.Name, err)
}
Expand All @@ -76,16 +109,16 @@ func (b *binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error
// 4. Bind pods
errs := make([]error, len(pods))
workqueue.ParallelizeUntil(ctx, len(pods), len(pods), func(index int) {
errs[index] = b.bind(ctx, node, pods[index])
errs[index] = b.bindPod(ctx, node, pods[index])
})
err := multierr.Combine(errs...)
logging.FromContext(ctx).Infof("Bound %d pod(s) to node %s", len(pods)-len(multierr.Errors(err)), node.Name)
return err
}

func (b *binder) bind(ctx context.Context, node *v1.Node, pod *v1.Pod) error {
func (b *Binder) bindPod(ctx context.Context, node *v1.Node, pod *v1.Pod) error {
// TODO, Stop using deprecated v1.Binding
if err := b.coreV1Client.Pods(pod.Namespace).Bind(ctx, &v1.Binding{
if err := b.CoreV1Client.Pods(pod.Namespace).Bind(ctx, &v1.Binding{
TypeMeta: pod.TypeMeta,
ObjectMeta: pod.ObjectMeta,
Target: v1.ObjectReference{Name: node.Name},
Expand All @@ -94,50 +127,3 @@ func (b *binder) bind(ctx context.Context, node *v1.Node, pod *v1.Pod) error {
}
return nil
}

type binderMetricsDecorator struct {
binder Binder
bindTimeHistogramVec *prometheus.HistogramVec
}

func DecorateBinderMetrics(binder Binder) Binder {
bindTimeHistogramVec := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metrics.KarpenterNamespace,
Subsystem: "allocation_controller",
Name: "bind_duration_seconds",
Help: "Duration of bind process in seconds. Broken down by result.",
Buckets: metrics.DurationBuckets(),
},
[]string{metrics.ResultLabel},
)
crmetrics.Registry.MustRegister(bindTimeHistogramVec)

return &binderMetricsDecorator{binder: binder, bindTimeHistogramVec: bindTimeHistogramVec}
}

func (b *binderMetricsDecorator) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error {
startTime := time.Now()
bindErr := b.binder.Bind(ctx, node, pods)
durationSeconds := time.Since(startTime).Seconds()

result := "success"
if bindErr != nil {
result = "error"
}

observer, promErr := b.bindTimeHistogramVec.GetMetricWith(prometheus.Labels{metrics.ResultLabel: result})
if promErr != nil {
logging.FromContext(ctx).Warnf(
"Failed to record bind duration metric [%s=%s, duration=%f]: error=%w",
metrics.ResultLabel,
result,
durationSeconds,
promErr,
)
} else {
observer.Observe(durationSeconds)
}

return bindErr
}
4 changes: 2 additions & 2 deletions pkg/controllers/allocation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const (
type Controller struct {
Batcher *Batcher
Filter *Filter
Binder Binder
Binder *Binder
Scheduler *scheduling.Scheduler
Packer binpacking.Packer
CloudProvider cloudprovider.CloudProvider
Expand All @@ -66,7 +66,7 @@ type Controller struct {
func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface, cloudProvider cloudprovider.CloudProvider) *Controller {
return &Controller{
Filter: &Filter{KubeClient: kubeClient},
Binder: DecorateBinderMetrics(NewBinder(kubeClient, coreV1Client)),
Binder: &Binder{KubeClient: kubeClient, CoreV1Client: coreV1Client},
Batcher: NewBatcher(maxBatchWindow, batchIdleTimeout),
Scheduler: scheduling.NewScheduler(cloudProvider, kubeClient),
Packer: binpacking.NewPacker(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/allocation/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var _ = BeforeSuite(func() {
registry.RegisterOrDie(cloudProvider)
controller = &allocation.Controller{
Filter: &allocation.Filter{KubeClient: e.Client},
Binder: allocation.NewBinder(e.Client, corev1.NewForConfigOrDie(e.Config)),
Binder: &allocation.Binder{KubeClient: e.Client, CoreV1Client: corev1.NewForConfigOrDie(e.Config)},
Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond),
Scheduler: scheduling.NewScheduler(cloudProvider, e.Client),
Packer: binpacking.NewPacker(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/allocation/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var _ = BeforeSuite(func() {
registry.RegisterOrDie(cloudProvider)
controller = &allocation.Controller{
Filter: &allocation.Filter{KubeClient: e.Client},
Binder: allocation.NewBinder(e.Client, corev1.NewForConfigOrDie(e.Config)),
Binder: &allocation.Binder{KubeClient: e.Client, CoreV1Client: corev1.NewForConfigOrDie(e.Config)},
Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond),
Scheduler: scheduling.NewScheduler(cloudProvider, e.Client),
Packer: binpacking.NewPacker(),
Expand Down

0 comments on commit db1d8bc

Please sign in to comment.