Skip to content

Commit

Permalink
Fix Teleport update reconciliation on status updates (#34063)
Browse files Browse the repository at this point in the history
* Fix Teleport update reconciliation on `status` updates

This pull request addresses the issue where the Teleport operator reconciliation runs every time the operator updates the `status` subresource.
This continuous reconciliation has led to an infinite loop, causing millions of reconciliations per minute.
When an error occurs, such as having invalid role properties, the Operator updates the status and returns an error, which should trigger a rescheduled reconciliation with exponential backoff. The problem arises because the operator failed to enforce a resource generation change, resulting in an immediate trigger of a new reconciliation when the `status` field is updated.

This pull request modifies the operator to avoid updating subresources and only trigger updates when there is a change in resource generation.

Special thanks to @strideynet for confirming my hypothesis and giving
the solution!

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>

* return proper status conditions on failures

* enforce condition update on silentUpdateStatus

---------

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>
  • Loading branch information
tigrato committed Nov 3, 2023
1 parent 1737c9d commit 8e7718c
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 79 deletions.
5 changes: 5 additions & 0 deletions operator/apis/resources/v2/user_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (u TeleportUser) ToTeleport() types.User {
}
}

// StatusConditions returns a pointer to Status.Conditions slice.
func (u *TeleportUser) StatusConditions() *[]metav1.Condition {
return &u.Status.Conditions
}

// Marshal serializes a spec into binary data.
func (spec *TeleportUserSpec) Marshal() ([]byte, error) {
return (*types.UserSpecV2)(spec).Marshal()
Expand Down
71 changes: 48 additions & 23 deletions operator/controllers/resources/role_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"

"github.com/gravitational/trace"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -87,6 +86,7 @@ func (r *RoleReconciler) SetupWithManager(mgr ctrl.Manager) error {
obj := GetUnstructuredObjectFromGVK(TeleportRoleGVKV5)
return ctrl.NewControllerManagedBy(mgr).
For(obj).
WithEventFilter(buildPredicate()).
Complete(r)
}

Expand All @@ -111,53 +111,78 @@ func (r *RoleReconciler) Upsert(ctx context.Context, obj kclient.Object) error {
u.Object,
k8sResource, true, /* returnUnknownFields */
)
newStructureCondition := getStructureConditionFromError(err)
meta.SetStatusCondition(k8sResource.StatusConditions(), newStructureCondition)
if err != nil {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.Wrap(err)
updateErr := updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getStructureConditionFromError(err),
})
if err != nil || updateErr != nil {
return trace.NewAggregate(err, updateErr)
}

// Converting the Kubernetes resource into a Teleport one, checking potential ownership issues.
teleportResource := k8sResource.ToTeleport()
teleportClient, err := r.TeleportClientAccessor(ctx)
if err != nil {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.Wrap(err)
updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getTeleportClientConditionFromError(err),
})
if err != nil || updateErr != nil {
return trace.NewAggregate(err, updateErr)
}

existingResource, err := teleportClient.GetRole(ctx, teleportResource.GetName())
if err != nil && !trace.IsNotFound(err) {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.Wrap(err)
updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getReconciliationConditionFromError(err, true /* ignoreNotFound */),
})
if err != nil && !trace.IsNotFound(err) || updateErr != nil {
return trace.NewAggregate(err, updateErr)
}

if err == nil {
// The resource already exists
newOwnershipCondition, isOwned := checkOwnership(existingResource)
meta.SetStatusCondition(k8sResource.StatusConditions(), newOwnershipCondition)
if updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: newOwnershipCondition,
}); updateErr != nil {
return trace.Wrap(updateErr)
}
if !isOwned {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.AlreadyExists("unowned resource '%s' already exists", existingResource.GetName())
}
} else {
// The resource does not yet exist
meta.SetStatusCondition(k8sResource.StatusConditions(), newResourceCondition)
if updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: newResourceCondition,
}); updateErr != nil {
return trace.Wrap(updateErr)
}
}

r.AddTeleportResourceOrigin(teleportResource)

// If an error happens we want to put it in status.conditions before returning.
err = teleportClient.UpsertRole(ctx, teleportResource)
newReconciliationCondition := getReconciliationConditionFromError(err)
meta.SetStatusCondition(k8sResource.StatusConditions(), newReconciliationCondition)
if err != nil {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.Wrap(err)
}

updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getReconciliationConditionFromError(err, false /* ignoreNotFound */),
})
// We update the status conditions on exit
return trace.Wrap(r.Status().Update(ctx, k8sResource))
return trace.NewAggregate(err, updateErr)
}

func (r *RoleReconciler) AddTeleportResourceOrigin(resource types.Role) {
Expand Down
91 changes: 71 additions & 20 deletions operator/controllers/resources/teleport_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"reflect"

"github.com/gravitational/trace"
"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/gravitational/teleport/api/types"
)
Expand Down Expand Up @@ -62,8 +63,8 @@ type TeleportResourceClient[T TeleportResource] interface {
// NewTeleportResourceReconciler instanciates a TeleportResourceReconciler from a TeleportResourceClient.
func NewTeleportResourceReconciler[T TeleportResource, K TeleportKubernetesResource[T]](
client kclient.Client,
resourceClient TeleportResourceClient[T]) *TeleportResourceReconciler[T, K] {

resourceClient TeleportResourceClient[T],
) *TeleportResourceReconciler[T, K] {
reconciler := &TeleportResourceReconciler[T, K]{
ResourceBaseReconciler: ResourceBaseReconciler{Client: client},
resourceClient: resourceClient,
Expand All @@ -84,23 +85,41 @@ func (r TeleportResourceReconciler[T, K]) Upsert(ctx context.Context, obj kclien
teleportResource := k8sResource.ToTeleport()

existingResource, err := r.resourceClient.Get(ctx, teleportResource.GetName())
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
updateErr := updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getReconciliationConditionFromError(err, true /* ignoreNotFound */),
})

if err != nil && !trace.IsNotFound(err) || updateErr != nil {
return trace.NewAggregate(err, updateErr)
}
// If err is nil, we found the resource. If err != nil (and we did return), then the error was `NotFound`
exists := err == nil

if exists {
newOwnershipCondition, isOwned := checkOwnership(existingResource)
meta.SetStatusCondition(k8sResource.StatusConditions(), newOwnershipCondition)
if updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: newOwnershipCondition,
}); updateErr != nil {
return trace.Wrap(updateErr)
}
if !isOwned {
return trace.NewAggregate(
trace.AlreadyExists("unowned resource '%s' already exists", existingResource.GetName()),
r.Status().Update(ctx, k8sResource),
)
return trace.AlreadyExists("unowned resource '%s' already exists", existingResource.GetName())
}
} else {
meta.SetStatusCondition(k8sResource.StatusConditions(), newResourceCondition)
if updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: newResourceCondition,
}); updateErr != nil {
return trace.Wrap(updateErr)
}
}

teleportResource.SetOrigin(types.OriginKubernetes)
Expand All @@ -113,14 +132,14 @@ func (r TeleportResourceReconciler[T, K]) Upsert(ctx context.Context, obj kclien
err = r.resourceClient.Update(ctx, teleportResource)
}
// If an error happens we want to put it in status.conditions before returning.
newReconciliationCondition := getReconciliationConditionFromError(err)
meta.SetStatusCondition(k8sResource.StatusConditions(), newReconciliationCondition)
if err != nil {
return trace.NewAggregate(err, r.Status().Update(ctx, k8sResource))
}

// We update the status conditions on exit
return trace.Wrap(r.Status().Update(ctx, k8sResource))
updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getReconciliationConditionFromError(err, false /* ignoreNotFound */),
})

return trace.NewAggregate(err, updateErr)
}

// Delete is the TeleportResourceReconciler of the ResourceBaseReconciler DeleteExertal
Expand All @@ -137,7 +156,13 @@ func (r TeleportResourceReconciler[T, K]) Reconcile(ctx context.Context, req ctr
// SetupWithManager have a controllerruntime.Manager run the TeleportResourceReconciler
func (r TeleportResourceReconciler[T, K]) SetupWithManager(mgr ctrl.Manager) error {
kubeResource := newKubeResource[T, K]()
return ctrl.NewControllerManagedBy(mgr).For(kubeResource).Complete(r)
return ctrl.
NewControllerManagedBy(mgr).
For(kubeResource).
WithEventFilter(
buildPredicate(),
).
Complete(r)
}

// newKubeResource creates a new TeleportKubernetesResource
Expand All @@ -157,3 +182,29 @@ func newKubeResource[T TeleportResource, K TeleportKubernetesResource[T]]() K {
}
return resource
}

// buildPredicate returns a predicate that triggers the reconciliation when:
// - the resource generation changes
// - the resource finalizers change
// - the resource annotations change
// - the resource labels change
// - the resource is created
// - the resource is deleted
// It does not trigger the reconciliation when:
// - the resource status changes
func buildPredicate() predicate.Predicate {
return predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.LabelChangedPredicate{},
predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}

return !reflect.DeepEqual(e.ObjectNew.GetFinalizers(), e.ObjectOld.GetFinalizers())
},
},
)
}
58 changes: 41 additions & 17 deletions operator/controllers/resources/user_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"

"github.com/gravitational/trace"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -61,6 +60,7 @@ func (r *UserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
func (r *UserReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&resourcesv2.TeleportUser{}).
WithEventFilter(buildPredicate()).
Complete(r)
}

Expand All @@ -78,29 +78,56 @@ func (r *UserReconciler) Upsert(ctx context.Context, obj kclient.Object) error {
return fmt.Errorf("failed to convert Object into resource object: %T", obj)
}
teleportResource := k8sResource.ToTeleport()

teleportClient, err := r.TeleportClientAccessor(ctx)
updateErr := updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getTeleportClientConditionFromError(err),
})
if err != nil || updateErr != nil {
return trace.NewAggregate(err, updateErr)
}
if err != nil {
return trace.Wrap(err)
}

existingResource, err := teleportClient.GetUser(teleportResource.GetName(), false)
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getReconciliationConditionFromError(err, true /* ignoreNotFound */),
})
if err != nil && !trace.IsNotFound(err) || updateErr != nil {
return trace.NewAggregate(err, updateErr)
}

exists := !trace.IsNotFound(err)

if exists {
newOwnershipCondition, isOwned := checkOwnership(existingResource)
meta.SetStatusCondition(&k8sResource.Status.Conditions, newOwnershipCondition)
if updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: newOwnershipCondition,
}); updateErr != nil {
return trace.Wrap(updateErr)
}
if !isOwned {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.AlreadyExists("unowned resource '%s' already exists", existingResource.GetName())
}
} else {
// The resource does not yet exist
meta.SetStatusCondition(&k8sResource.Status.Conditions, newResourceCondition)
if updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: newResourceCondition,
}); updateErr != nil {
return trace.Wrap(updateErr)
}
}

r.AddTeleportResourceOrigin(teleportResource)
Expand All @@ -112,16 +139,13 @@ func (r *UserReconciler) Upsert(ctx context.Context, obj kclient.Object) error {
teleportResource.SetCreatedBy(existingResource.GetCreatedBy())
err = teleportClient.UpdateUser(ctx, teleportResource)
}
// If an error happens we want to put it in status.conditions before returning.
newReconciliationCondition := getReconciliationConditionFromError(err)
meta.SetStatusCondition(&k8sResource.Status.Conditions, newReconciliationCondition)
if err != nil {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.Wrap(err)
}

// We update the status conditions on exit
return trace.Wrap(r.Status().Update(ctx, k8sResource))
updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getReconciliationConditionFromError(err, false /* ignoreNotFound */),
})
return trace.NewAggregate(err, updateErr)
}

func (r *UserReconciler) AddTeleportResourceOrigin(resource types.User) {
Expand Down

0 comments on commit 8e7718c

Please sign in to comment.