Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Teleport update reconciliation on status updates #34063

Merged
merged 3 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
71 changes: 48 additions & 23 deletions integrations/operator/controllers/resources/role_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"

"github.com/gravitational/trace"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -87,6 +86,7 @@ func (r *RoleReconciler) SetupWithManager(mgr ctrl.Manager) error {
obj := GetUnstructuredObjectFromGVK(TeleportRoleGVKV5)
return ctrl.NewControllerManagedBy(mgr).
For(obj).
WithEventFilter(buildPredicate()).
Complete(r)
}

Expand All @@ -111,53 +111,78 @@ func (r *RoleReconciler) Upsert(ctx context.Context, obj kclient.Object) error {
u.Object,
k8sResource, true, /* returnUnknownFields */
)
newStructureCondition := getStructureConditionFromError(err)
meta.SetStatusCondition(k8sResource.StatusConditions(), newStructureCondition)
if err != nil {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.Wrap(err)
updateErr := updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getStructureConditionFromError(err),
})
if err != nil || updateErr != nil {
return trace.NewAggregate(err, updateErr)
}

// Converting the Kubernetes resource into a Teleport one, checking potential ownership issues.
teleportResource := k8sResource.ToTeleport()
teleportClient, err := r.TeleportClientAccessor(ctx)
if err != nil {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.Wrap(err)
updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getTeleportClientConditionFromError(err),
})
if err != nil || updateErr != nil {
return trace.NewAggregate(err, updateErr)
}

existingResource, err := teleportClient.GetRole(ctx, teleportResource.GetName())
if err != nil && !trace.IsNotFound(err) {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.Wrap(err)
updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getReconciliationConditionFromError(err, true /* ignoreNotFound */),
})
if err != nil && !trace.IsNotFound(err) || updateErr != nil {
return trace.NewAggregate(err, updateErr)
}

if err == nil {
// The resource already exists
newOwnershipCondition, isOwned := checkOwnership(existingResource)
meta.SetStatusCondition(k8sResource.StatusConditions(), newOwnershipCondition)
if updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: newOwnershipCondition,
}); updateErr != nil {
return trace.Wrap(updateErr)
}
if !isOwned {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.AlreadyExists("unowned resource '%s' already exists", existingResource.GetName())
}
} else {
// The resource does not yet exist
meta.SetStatusCondition(k8sResource.StatusConditions(), newResourceCondition)
if updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: newResourceCondition,
}); updateErr != nil {
return trace.Wrap(updateErr)
}
}

r.AddTeleportResourceOrigin(teleportResource)

// If an error happens we want to put it in status.conditions before returning.
_, err = teleportClient.UpsertRole(ctx, teleportResource)
newReconciliationCondition := getReconciliationConditionFromError(err)
meta.SetStatusCondition(k8sResource.StatusConditions(), newReconciliationCondition)
if err != nil {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.Wrap(err)
}

updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getReconciliationConditionFromError(err, false /* ignoreNotFound */),
})
// We update the status conditions on exit
return trace.Wrap(r.Status().Update(ctx, k8sResource))
return trace.NewAggregate(err, updateErr)
}

func (r *RoleReconciler) AddTeleportResourceOrigin(resource types.Role) {
Expand Down
92 changes: 72 additions & 20 deletions integrations/operator/controllers/resources/teleport_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package resources
import (
"context"
"reflect"
"slices"

"github.com/gravitational/trace"
"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/gravitational/teleport/api/types"
)
Expand Down Expand Up @@ -70,8 +72,8 @@ type TeleportResourceMutator[T TeleportResource] interface {
// NewTeleportResourceReconciler instanciates a TeleportResourceReconciler from a TeleportResourceClient.
func NewTeleportResourceReconciler[T TeleportResource, K TeleportKubernetesResource[T]](
client kclient.Client,
resourceClient TeleportResourceClient[T]) *TeleportResourceReconciler[T, K] {

resourceClient TeleportResourceClient[T],
) *TeleportResourceReconciler[T, K] {
reconciler := &TeleportResourceReconciler[T, K]{
ResourceBaseReconciler: ResourceBaseReconciler{Client: client},
resourceClient: resourceClient,
Expand All @@ -92,23 +94,41 @@ func (r TeleportResourceReconciler[T, K]) Upsert(ctx context.Context, obj kclien
teleportResource := k8sResource.ToTeleport()

existingResource, err := r.resourceClient.Get(ctx, teleportResource.GetName())
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
updateErr := updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getReconciliationConditionFromError(err, true /* ignoreNotFound */),
})

if err != nil && !trace.IsNotFound(err) || updateErr != nil {
return trace.NewAggregate(err, updateErr)
}
// If err is nil, we found the resource. If err != nil (and we did return), then the error was `NotFound`
exists := err == nil

if exists {
newOwnershipCondition, isOwned := checkOwnership(existingResource)
meta.SetStatusCondition(k8sResource.StatusConditions(), newOwnershipCondition)
if updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: newOwnershipCondition,
}); updateErr != nil {
return trace.Wrap(updateErr)
}
if !isOwned {
return trace.NewAggregate(
trace.AlreadyExists("unowned resource '%s' already exists", existingResource.GetName()),
r.Status().Update(ctx, k8sResource),
)
return trace.AlreadyExists("unowned resource '%s' already exists", existingResource.GetName())
}
} else {
meta.SetStatusCondition(k8sResource.StatusConditions(), newResourceCondition)
if updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: newResourceCondition,
}); updateErr != nil {
return trace.Wrap(updateErr)
}
}

teleportResource.SetOrigin(types.OriginKubernetes)
Expand All @@ -124,14 +144,14 @@ func (r TeleportResourceReconciler[T, K]) Upsert(ctx context.Context, obj kclien
err = r.resourceClient.Update(ctx, teleportResource)
}
// If an error happens we want to put it in status.conditions before returning.
newReconciliationCondition := getReconciliationConditionFromError(err)
meta.SetStatusCondition(k8sResource.StatusConditions(), newReconciliationCondition)
if err != nil {
return trace.NewAggregate(err, r.Status().Update(ctx, k8sResource))
}

// We update the status conditions on exit
return trace.Wrap(r.Status().Update(ctx, k8sResource))
updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getReconciliationConditionFromError(err, false /* ignoreNotFound */),
})

return trace.NewAggregate(err, updateErr)
}

// Delete is the TeleportResourceReconciler of the ResourceBaseReconciler DeleteExertal
Expand All @@ -148,7 +168,13 @@ func (r TeleportResourceReconciler[T, K]) Reconcile(ctx context.Context, req ctr
// SetupWithManager have a controllerruntime.Manager run the TeleportResourceReconciler
func (r TeleportResourceReconciler[T, K]) SetupWithManager(mgr ctrl.Manager) error {
kubeResource := newKubeResource[T, K]()
return ctrl.NewControllerManagedBy(mgr).For(kubeResource).Complete(r)
return ctrl.
NewControllerManagedBy(mgr).
For(kubeResource).
WithEventFilter(
buildPredicate(),
).
Complete(r)
}

// newKubeResource creates a new TeleportKubernetesResource
Expand All @@ -168,3 +194,29 @@ func newKubeResource[T TeleportResource, K TeleportKubernetesResource[T]]() K {
}
return resource
}

// buildPredicate returns a predicate that triggers the reconciliation when:
// - the resource generation changes
// - the resource finalizers change
// - the resource annotations change
// - the resource labels change
// - the resource is created
// - the resource is deleted
// It does not trigger the reconciliation when:
// - the resource status changes
func buildPredicate() predicate.Predicate {
return predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.LabelChangedPredicate{},
predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}

return !slices.Equal(e.ObjectNew.GetFinalizers(), e.ObjectOld.GetFinalizers())
},
},
)
}
75 changes: 57 additions & 18 deletions integrations/operator/controllers/resources/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"

"github.com/gravitational/trace"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -34,9 +36,11 @@ const (
ConditionReasonNewResource = "NewResource"
ConditionReasonNoError = "NoError"
ConditionReasonTeleportError = "TeleportError"
ConditionReasonTeleportClientError = "TeleportClientError"
ConditionTypeTeleportResourceOwned = "TeleportResourceOwned"
ConditionTypeSuccessfullyReconciled = "SuccessfullyReconciled"
ConditionTypeValidStructure = "ValidStructure"
ConditionTypeTeleportClient = "TeleportClient"
)

var newResourceCondition = metav1.Condition{
Expand Down Expand Up @@ -85,25 +89,21 @@ func checkOwnership(existingResource ownedResource) (metav1.Condition, bool) {
// getReconciliationConditionFromError takes an error returned by a call to Teleport and returns a
// metav1.Condition describing how the Teleport resource reconciliation went. This is used to provide feedback to
// the user about the controller's ability to reconcile the resource.
func getReconciliationConditionFromError(err error) metav1.Condition {
var condition metav1.Condition
if err == nil {
condition = metav1.Condition{
func getReconciliationConditionFromError(err error, ignoreNotFound bool) metav1.Condition {
if err == nil || trace.IsNotFound(err) && ignoreNotFound {
return metav1.Condition{
Type: ConditionTypeSuccessfullyReconciled,
Status: metav1.ConditionTrue,
Reason: ConditionReasonNoError,
Message: "Teleport resource was successfully reconciled, no error was returned by Teleport.",
}
} else {
condition = metav1.Condition{
Type: ConditionTypeSuccessfullyReconciled,
Status: metav1.ConditionFalse,
Reason: ConditionReasonTeleportError,
Message: fmt.Sprintf("Teleport returned the error: %s", err),
}
}

return condition
return metav1.Condition{
Type: ConditionTypeSuccessfullyReconciled,
Status: metav1.ConditionFalse,
Reason: ConditionReasonTeleportError,
Message: fmt.Sprintf("Teleport returned the error: %s", err),
}
}

// getStructureConditionFromError takes a conversion error from k8s apimachinery's runtime.UnstructuredConverter
Expand All @@ -126,12 +126,51 @@ func getStructureConditionFromError(err error) metav1.Condition {
}
}

// silentUpdateStatus updates the resource status but swallows the error if the update fails.
// This should be used when an error already happened, and we're going to re-run the reconciliation loop anyway.
func silentUpdateStatus(ctx context.Context, client kclient.Client, k8sResource kclient.Object) {
log := ctrllog.FromContext(ctx)
statusErr := client.Status().Update(ctx, k8sResource)
// getTeleportClientConditionFromError takes an error returned by a call to Teleport ClientAccessor and returns a
// metav1.Condition describing how the Teleport client creation went. This is used to provide feedback to
// the user about the controller's ability to reconcile the resource.
func getTeleportClientConditionFromError(err error) metav1.Condition {
if err != nil {
return metav1.Condition{
Type: ConditionTypeTeleportClient,
Status: metav1.ConditionFalse,
Reason: ConditionReasonTeleportClientError,
Message: fmt.Sprintf("Failed to create Teleport client: %s", err),
}
}
return metav1.Condition{
Type: ConditionTypeTeleportClient,
Status: metav1.ConditionTrue,
Reason: ConditionReasonNoError,
Message: "Teleport client creation was successful.",
}
}

// updateStatusConfig is a configuration struct for silentUpdateStatus.
type updateStatusConfig struct {
ctx context.Context
client kclient.Client
k8sResource interface {
kclient.Object
StatusConditions() *[]metav1.Condition
}
condition metav1.Condition
}

// updateStatus updates the resource status but swallows the error if the update fails.
func updateStatus(config updateStatusConfig) error {
// If the condition is empty, we don't want to update the status.
if config.condition == (metav1.Condition{}) {
return nil
}
log := ctrllog.FromContext(config.ctx)
meta.SetStatusCondition(
config.k8sResource.StatusConditions(),
config.condition,
)
statusErr := config.client.Status().Update(config.ctx, config.k8sResource)
if statusErr != nil {
log.Error(statusErr, "failed to report error in status conditions")
}
return trace.Wrap(statusErr)
}
2 changes: 1 addition & 1 deletion integrations/operator/crdgen/ignored.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ This should be removed when the following feature is implemented
https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#transition-rules
*/
var ignoredFields = map[string]stringSet{
"UserSpecV2": stringSet{
"UserSpecV2": {
"LocalAuth": struct{}{}, // struct{}{} is used to signify "no value".
"Expires": struct{}{},
"CreatedBy": struct{}{},
Expand Down