Skip to content

Commit

Permalink
Implement support of horizontal scaling rules for Autoscaling (#27119)
Browse files Browse the repository at this point in the history
  • Loading branch information
vboulineau committed Jul 5, 2024
1 parent 2711267 commit d7fefdb
Show file tree
Hide file tree
Showing 21 changed files with 2,269 additions and 335 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ require (
github.com/DataDog/datadog-agent/pkg/util/pointer v0.55.0-rc.3
github.com/DataDog/datadog-agent/pkg/util/scrubber v0.55.0-rc.3
github.com/DataDog/datadog-go/v5 v5.5.0
github.com/DataDog/datadog-operator v0.7.1-0.20240522081847-e83dd785258a
github.com/DataDog/datadog-operator v0.7.1-0.20240627103854-fe86c2214d4c
github.com/DataDog/ebpf-manager v0.6.1
github.com/DataDog/gopsutil v1.2.2
github.com/DataDog/nikos v1.12.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions pkg/clusteragent/autoscaling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,27 @@ func (p ProcessResult) ShouldRequeue() bool {
return p.Requeue || p.RequeueAfter > 0
}

// After returns a copy of current ProcessResult with RequeueAfter set to the given duration
func (p ProcessResult) After(after time.Duration) ProcessResult {
p.RequeueAfter = after
return p
}

// Merge merges the other ProcessResult into a newly returned one
func (p ProcessResult) Merge(other ProcessResult) ProcessResult {
if other.Requeue {
p.Requeue = true

if p.RequeueAfter > 0 {
p.RequeueAfter = min(p.RequeueAfter, other.RequeueAfter)
} else {
p.RequeueAfter = other.RequeueAfter
}
}

return p
}

var (
// Requeue is a shortcut to avoid having ProcessResult{Requeue: true} everywhere in the code
Requeue = ProcessResult{Requeue: true}
Expand Down
38 changes: 14 additions & 24 deletions pkg/clusteragent/autoscaling/workload/config_retriever_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,11 @@ func (p autoscalingSettingsProcessor) process(receivedTimestamp time.Time, confi
podAutoscaler, podAutoscalerFound := p.store.LockRead(podAutoscalerID, true)
// If the PodAutoscaler is not found, we need to create it
if !podAutoscalerFound {
podAutoscaler = model.PodAutoscalerInternal{
Namespace: settings.Namespace,
Name: settings.Name,
}
podAutoscaler = model.NewPodAutoscalerFromSettings(settings.Namespace, settings.Name, settings.Spec, rawConfig.Metadata.Version, receivedTimestamp)
} else {
podAutoscaler.UpdateFromSettings(settings.Spec, rawConfig.Metadata.Version, receivedTimestamp)
}

podAutoscaler.UpdateFromSettings(settings.Spec, rawConfig.Metadata.Version, receivedTimestamp)
p.store.UnlockSet(podAutoscalerID, podAutoscaler, configRetrieverStoreID)
p.processed[podAutoscalerID] = struct{}{}
}
Expand All @@ -72,28 +70,20 @@ func (p autoscalingSettingsProcessor) postProcess(errors []error) {
return
}

// We first get all PodAutoscalers that were not part of the last update
var toDelete []string
_ = p.store.GetFiltered(func(pai model.PodAutoscalerInternal) bool {
if pai.Spec == nil || pai.Spec.Owner != v1alpha1.DatadogPodAutoscalerRemoteOwner {
return false
// Update the store to flag all PodAutoscalers owned by remote that were not processed
p.store.Update(func(pai model.PodAutoscalerInternal) (model.PodAutoscalerInternal, bool) {
if pai.Spec() == nil || pai.Spec().Owner != v1alpha1.DatadogPodAutoscalerRemoteOwner {
return pai, false
}

_, found := p.processed[pai.ID()]
paID := pai.ID()
_, found := p.processed[paID]
if !found {
toDelete = append(toDelete, pai.ID())
pai.SetDeleted()
log.Infof("PodAutoscaler %s was not part of the last update, flagging it as deleted", paID)
return pai, true
}

return false
})

// Then we can properly lock read/write to flag them as deleted
for _, id := range toDelete {
pai, found := p.store.LockRead(id, false)
if found {
pai.Deleted = true
log.Infof("PodAutoscaler %s was not part of the last update, flagging it as deleted", id)
p.store.UnlockSet(id, pai, configRetrieverStoreID)
}
}
return pai, false
}, configRetrieverStoreID)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,9 @@ func TestConfigRetriverAutoscalingSettingsFollower(t *testing.T) {
cr, mockRCClient := newMockConfigRetriever(t, false, clock.NewFakeClock(testTime))

// Dummy objects in store
dummy2 := model.PodAutoscalerInternal{
Namespace: "ns",
Name: "name2",
}
dummy3 := model.PodAutoscalerInternal{
Namespace: "ns",
Name: "name3",
}
dummy2 := model.NewFakePodAutoscalerInternal("ns", "name2", nil)
dummy3 := model.NewFakePodAutoscalerInternal("ns", "name3", nil)

cr.store.Set("ns/name2", dummy2, "unittest")
cr.store.Set("ns/name3", dummy3, "unittest")

Expand Down Expand Up @@ -109,7 +104,7 @@ func TestConfigRetriverAutoscalingSettingsLeader(t *testing.T) {
Name: "deploy3",
},
Policy: &datadoghq.DatadogPodAutoscalerPolicy{
ApplyMode: datadoghq.DatadogPodAutoscalerAllApplyNone,
ApplyMode: datadoghq.DatadogPodAutoscalerNoneApplyMode,
Update: &datadoghq.DatadogPodAutoscalerUpdatePolicy{
Strategy: datadoghq.DatadogPodAutoscalerAutoUpdateStrategy,
},
Expand Down Expand Up @@ -166,7 +161,7 @@ func TestConfigRetriverAutoscalingSettingsLeader(t *testing.T) {
object1Spec.RemoteVersion = pointer.Ptr[uint64](1)
object2Spec.RemoteVersion = pointer.Ptr[uint64](1)
object3Spec.RemoteVersion = pointer.Ptr[uint64](10)
model.AssertPodAutoscalersEqual(t, []model.PodAutoscalerInternal{
model.AssertPodAutoscalersEqual(t, []model.FakePodAutoscalerInternal{
{
Namespace: "ns",
Name: "name1",
Expand Down Expand Up @@ -216,7 +211,7 @@ func TestConfigRetriverAutoscalingSettingsLeader(t *testing.T) {

// Set expected versions: only one change for for foo2
object3Spec.RemoteVersion = pointer.Ptr[uint64](11)
model.AssertPodAutoscalersEqual(t, []model.PodAutoscalerInternal{
model.AssertPodAutoscalersEqual(t, []model.FakePodAutoscalerInternal{
{
Namespace: "ns",
Name: "name1",
Expand Down Expand Up @@ -257,7 +252,7 @@ func TestConfigRetriverAutoscalingSettingsLeader(t *testing.T) {
podAutoscalers = cr.store.GetAll()

// No changes in expected versions
model.AssertPodAutoscalersEqual(t, []model.PodAutoscalerInternal{
model.AssertPodAutoscalersEqual(t, []model.FakePodAutoscalerInternal{
{
Namespace: "ns",
Name: "name1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ func TestConfigRetriverAutoscalingValuesFollower(t *testing.T) {
cr, mockRCClient := newMockConfigRetriever(t, false, clock.NewFakeClock(testTime))

// Dummy objects in store
dummy2 := model.PodAutoscalerInternal{
dummy2 := model.FakePodAutoscalerInternal{
Namespace: "ns",
Name: "name2",
}
dummy3 := model.PodAutoscalerInternal{
dummy3 := model.FakePodAutoscalerInternal{
Namespace: "ns",
Name: "name3",
}
cr.store.Set("ns/name2", dummy2, "unittest")
cr.store.Set("ns/name3", dummy3, "unittest")
cr.store.Set("ns/name2", dummy2.Build(), "unittest")
cr.store.Set("ns/name3", dummy3.Build(), "unittest")

// Object specs
value1 := &kubeAutoscaling.WorkloadValues{
Expand Down Expand Up @@ -71,26 +71,26 @@ func TestConfigRetriverAutoscalingValuesFollower(t *testing.T) {

assert.Equal(t, 1, stateCallbackCalled)
podAutoscalers := cr.store.GetAll()
model.AssertPodAutoscalersEqual(t, []model.PodAutoscalerInternal{dummy2, dummy3}, podAutoscalers)
model.AssertPodAutoscalersEqual(t, []model.FakePodAutoscalerInternal{dummy2, dummy3}, podAutoscalers)
}

func TestConfigRetriverAutoscalingValuesLeader(t *testing.T) {
testTime := time.Now()
cr, mockRCClient := newMockConfigRetriever(t, true, clock.NewFakeClock(testTime))

// Dummy objects in store
cr.store.Set("ns/name1", model.PodAutoscalerInternal{
cr.store.Set("ns/name1", model.FakePodAutoscalerInternal{
Namespace: "ns",
Name: "name1",
}, "unittest")
cr.store.Set("ns/name2", model.PodAutoscalerInternal{
}.Build(), "unittest")
cr.store.Set("ns/name2", model.FakePodAutoscalerInternal{
Namespace: "ns",
Name: "name2",
}, "unittest")
cr.store.Set("ns/name3", model.PodAutoscalerInternal{
}.Build(), "unittest")
cr.store.Set("ns/name3", model.FakePodAutoscalerInternal{
Namespace: "ns",
Name: "name3",
}, "unittest")
}.Build(), "unittest")

// Object specs
value1 := &kubeAutoscaling.WorkloadValues{
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestConfigRetriverAutoscalingValuesLeader(t *testing.T) {
assert.Equal(t, 2, stateCallbackCalled)
podAutoscalers := cr.store.GetAll()

model.AssertPodAutoscalersEqual(t, []model.PodAutoscalerInternal{
model.AssertPodAutoscalersEqual(t, []model.FakePodAutoscalerInternal{
{
Namespace: "ns",
Name: "name1",
Expand Down Expand Up @@ -298,7 +298,7 @@ func TestConfigRetriverAutoscalingValuesLeader(t *testing.T) {
assert.Equal(t, 2, stateCallbackCalled)

podAutoscalers = cr.store.GetAll()
model.AssertPodAutoscalersEqual(t, []model.PodAutoscalerInternal{
model.AssertPodAutoscalersEqual(t, []model.FakePodAutoscalerInternal{
{
Namespace: "ns",
Name: "name1",
Expand Down Expand Up @@ -360,7 +360,7 @@ func TestConfigRetriverAutoscalingValuesLeader(t *testing.T) {
assert.Equal(t, 1, stateCallbackCalled)

podAutoscalers = cr.store.GetAll()
model.AssertPodAutoscalersEqual(t, []model.PodAutoscalerInternal{
model.AssertPodAutoscalersEqual(t, []model.FakePodAutoscalerInternal{
{
Namespace: "ns",
Name: "name1",
Expand Down Expand Up @@ -422,7 +422,7 @@ func TestConfigRetriverAutoscalingValuesLeader(t *testing.T) {
assert.Equal(t, 1, stateCallbackCalled)

podAutoscalers = cr.store.GetAll()
model.AssertPodAutoscalersEqual(t, []model.PodAutoscalerInternal{
model.AssertPodAutoscalersEqual(t, []model.FakePodAutoscalerInternal{
{
Namespace: "ns",
Name: "name1",
Expand Down
51 changes: 28 additions & 23 deletions pkg/clusteragent/autoscaling/workload/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
// Object is not present in Kubernetes
// If flagged for deletion, we just need to clear up our store (deletion complete)
// Also if object was not owned by remote config, we also need to delete it (deleted by user)
if podAutoscalerInternal.Deleted || podAutoscalerInternal.Spec.Owner != datadoghq.DatadogPodAutoscalerRemoteOwner {
if podAutoscalerInternal.Deleted() || podAutoscalerInternal.Spec().Owner != datadoghq.DatadogPodAutoscalerRemoteOwner {
log.Infof("Object %s not present in Kuberntes and flagged for deletion (remote) or owner == local, clearing internal store", key)
c.store.UnlockDelete(key, c.ID)
return autoscaling.NoRequeue, nil
Expand All @@ -185,7 +185,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
if podAutoscaler.Spec.Owner == datadoghq.DatadogPodAutoscalerRemoteOwner {
// First implement deletion logic, as if it's a deletion, we don't need to update the object.
// Deletion can only happen if the object is owned by remote config.
if podAutoscalerInternal.Deleted {
if podAutoscalerInternal.Deleted() {
log.Infof("Remote owned PodAutoscaler with Deleted flag, deleting object: %s", key)
err := c.deletePodAutoscaler(ns, name)
// In case of not found, it means the object is gone but informer cache is not updated yet, we can safely delete it from our store
Expand All @@ -202,9 +202,9 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string

// If the object is owned by remote config and newer, we need to update the spec in Kubernetes
// If Kubernetes is newer, we wait for RC to update the object in our internal store.
if podAutoscalerInternal.Spec.RemoteVersion != nil &&
if podAutoscalerInternal.Spec().RemoteVersion != nil &&
podAutoscaler.Spec.RemoteVersion != nil &&
*podAutoscalerInternal.Spec.RemoteVersion > *podAutoscaler.Spec.RemoteVersion {
*podAutoscalerInternal.Spec().RemoteVersion > *podAutoscaler.Spec.RemoteVersion {
err := c.updatePodAutoscalerSpec(ctx, podAutoscalerInternal, podAutoscaler)

// When doing an external update, we stop and reqeue the object to not have multiple changes at once.
Expand All @@ -215,7 +215,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
// If Generation != podAutoscaler.Generation, we should compute `.Spec` hash
// and compare it with the one in the PodAutoscaler. If they differ, we should update the PodAutoscaler
// otherwise store the Generation
if podAutoscalerInternal.Generation != podAutoscaler.Generation {
if podAutoscalerInternal.Generation() != podAutoscaler.Generation {
localHash, err := autoscaling.ObjectHash(podAutoscalerInternal.Spec)
if err != nil {
c.store.Unlock(key)
Expand All @@ -236,19 +236,19 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
return autoscaling.Requeue, err
}

podAutoscalerInternal.Generation = podAutoscaler.Generation
podAutoscalerInternal.SetGeneration(podAutoscaler.Generation)
}
}

// Implement sync logic for local ownership, source of truth is Kubernetes
if podAutoscalerInternal.Spec.Owner == datadoghq.DatadogPodAutoscalerLocalOwner {
if podAutoscalerInternal.Generation != podAutoscaler.Generation {
if podAutoscalerInternal.Spec().Owner == datadoghq.DatadogPodAutoscalerLocalOwner {
if podAutoscalerInternal.Generation() != podAutoscaler.Generation {
podAutoscalerInternal.UpdateFromPodAutoscaler(podAutoscaler)
}
}

// Reaching this point, we had an error in processing, clearing up global error
podAutoscalerInternal.Error = nil
podAutoscalerInternal.SetError(nil)

// Now that everything is synced, we can perform the actual processing
result, err := c.handleScaling(ctx, podAutoscaler, &podAutoscalerInternal)
Expand Down Expand Up @@ -276,18 +276,23 @@ func (c *Controller) handleScaling(ctx context.Context, podAutoscaler *datadoghq
return horizontalRes, err
}

return c.verticalController.sync(ctx, podAutoscaler, podAutoscalerInternal)
verticalRes, err := c.verticalController.sync(ctx, podAutoscaler, podAutoscalerInternal)
if err != nil {
return verticalRes, err
}

return horizontalRes.Merge(verticalRes), nil
}

func (c *Controller) createPodAutoscaler(ctx context.Context, podAutoscalerInternal model.PodAutoscalerInternal) error {
log.Infof("Creating PodAutoscaler Spec: %s/%s", podAutoscalerInternal.Namespace, podAutoscalerInternal.Name)
log.Infof("Creating PodAutoscaler Spec: %s/%s", podAutoscalerInternal.Namespace(), podAutoscalerInternal.Name())
autoscalerObj := &datadoghq.DatadogPodAutoscaler{
TypeMeta: podAutoscalerMeta,
ObjectMeta: metav1.ObjectMeta{
Namespace: podAutoscalerInternal.Namespace,
Name: podAutoscalerInternal.Name,
Namespace: podAutoscalerInternal.Namespace(),
Name: podAutoscalerInternal.Name(),
},
Spec: *podAutoscalerInternal.Spec.DeepCopy(),
Spec: *podAutoscalerInternal.Spec().DeepCopy(),
Status: podAutoscalerInternal.BuildStatus(metav1.NewTime(c.clock.Now()), nil),
}

Expand All @@ -296,30 +301,30 @@ func (c *Controller) createPodAutoscaler(ctx context.Context, podAutoscalerInter
return err
}

_, err = c.Client.Resource(podAutoscalerGVR).Namespace(podAutoscalerInternal.Namespace).Create(ctx, obj, metav1.CreateOptions{})
_, err = c.Client.Resource(podAutoscalerGVR).Namespace(podAutoscalerInternal.Namespace()).Create(ctx, obj, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("Unable to create PodAutoscaler: %s/%s, err: %v", podAutoscalerInternal.Namespace, podAutoscalerInternal.Name, err)
return fmt.Errorf("Unable to create PodAutoscaler: %s/%s, err: %v", podAutoscalerInternal.Namespace(), podAutoscalerInternal.Name(), err)
}

return nil
}

func (c *Controller) updatePodAutoscalerSpec(ctx context.Context, podAutoscalerInternal model.PodAutoscalerInternal, podAutoscaler *datadoghq.DatadogPodAutoscaler) error {
log.Infof("Updating PodAutoscaler Spec: %s/%s", podAutoscalerInternal.Namespace, podAutoscalerInternal.Name)
log.Infof("Updating PodAutoscaler Spec: %s/%s", podAutoscalerInternal.Namespace(), podAutoscalerInternal.Name())
autoscalerObj := &datadoghq.DatadogPodAutoscaler{
TypeMeta: podAutoscalerMeta,
ObjectMeta: podAutoscaler.ObjectMeta,
Spec: *podAutoscalerInternal.Spec.DeepCopy(),
Spec: *podAutoscalerInternal.Spec().DeepCopy(),
}

obj, err := autoscaling.ToUnstructured(autoscalerObj)
if err != nil {
return err
}

_, err = c.Client.Resource(podAutoscalerGVR).Namespace(podAutoscalerInternal.Namespace).Update(ctx, obj, metav1.UpdateOptions{})
_, err = c.Client.Resource(podAutoscalerGVR).Namespace(podAutoscalerInternal.Namespace()).Update(ctx, obj, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("Unable to update PodAutoscaler Spec: %s/%s, err: %w", podAutoscalerInternal.Namespace, podAutoscalerInternal.Name, err)
return fmt.Errorf("Unable to update PodAutoscaler Spec: %s/%s, err: %w", podAutoscalerInternal.Namespace(), podAutoscalerInternal.Name(), err)
}

return nil
Expand All @@ -331,7 +336,7 @@ func (c *Controller) updatePodAutoscalerStatus(ctx context.Context, podAutoscale
return nil
}

log.Debugf("Updating PodAutoscaler Status: %s/%s", podAutoscalerInternal.Namespace, podAutoscalerInternal.Name)
log.Debugf("Updating PodAutoscaler Status: %s/%s", podAutoscalerInternal.Namespace(), podAutoscalerInternal.Name())
autoscalerObj := &datadoghq.DatadogPodAutoscaler{
TypeMeta: podAutoscalerMeta,
ObjectMeta: podAutoscaler.ObjectMeta,
Expand All @@ -343,9 +348,9 @@ func (c *Controller) updatePodAutoscalerStatus(ctx context.Context, podAutoscale
return err
}

_, err = c.Client.Resource(podAutoscalerGVR).Namespace(podAutoscalerInternal.Namespace).UpdateStatus(ctx, obj, metav1.UpdateOptions{})
_, err = c.Client.Resource(podAutoscalerGVR).Namespace(podAutoscalerInternal.Namespace()).UpdateStatus(ctx, obj, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("Unable to update PodAutoscaler Status: %s/%s, err: %w", podAutoscalerInternal.Namespace, podAutoscalerInternal.Name, err)
return fmt.Errorf("Unable to update PodAutoscaler Status: %s/%s, err: %w", podAutoscalerInternal.Namespace(), podAutoscalerInternal.Name(), err)
}

return nil
Expand Down
Loading

0 comments on commit d7fefdb

Please sign in to comment.