Skip to content

Commit

Permalink
enforce condition update on silentUpdateStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrato committed Nov 2, 2023
1 parent 378a4fc commit c433dc9
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 56 deletions.
78 changes: 47 additions & 31 deletions integrations/operator/controllers/resources/role_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"

"github.com/gravitational/trace"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -113,61 +112,78 @@ func (r *RoleReconciler) Upsert(ctx context.Context, obj kclient.Object) error {
u.Object,
k8sResource, true, /* returnUnknownFields */
)
newStructureCondition := getStructureConditionFromError(err)
meta.SetStatusCondition(k8sResource.StatusConditions(), newStructureCondition)
if err != nil {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.Wrap(err)
updateErr := updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getStructureConditionFromError(err),
})
if err != nil || updateErr != nil {
return trace.NewAggregate(err, updateErr)
}

// Converting the Kubernetes resource into a Teleport one, checking potential ownership issues.
teleportResource := k8sResource.ToTeleport()
teleportClient, err := r.TeleportClientAccessor(ctx)
meta.SetStatusCondition(
k8sResource.StatusConditions(),
getTeleportClientConditionFromError(err),
)
if err != nil {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.Wrap(err)
updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getTeleportClientConditionFromError(err),
})
if err != nil || updateErr != nil {
return trace.NewAggregate(err, updateErr)
}

existingResource, err := teleportClient.GetRole(ctx, teleportResource.GetName())
meta.SetStatusCondition(
k8sResource.StatusConditions(),
getReconciliationConditionFromError(err, true /* ignoreNotFound */),
)
if err != nil && !trace.IsNotFound(err) {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.Wrap(err)
updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getReconciliationConditionFromError(err, true /* ignoreNotFound */),
})
if err != nil && !trace.IsNotFound(err) || updateErr != nil {
return trace.NewAggregate(err, updateErr)
}

if err == nil {
// The resource already exists
newOwnershipCondition, isOwned := checkOwnership(existingResource)
meta.SetStatusCondition(k8sResource.StatusConditions(), newOwnershipCondition)
if updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: newOwnershipCondition,
}); updateErr != nil {
return trace.Wrap(updateErr)
}
if !isOwned {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.AlreadyExists("unowned resource '%s' already exists", existingResource.GetName())
}
} else {
// The resource does not yet exist
meta.SetStatusCondition(k8sResource.StatusConditions(), newResourceCondition)
if updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: newResourceCondition,
}); updateErr != nil {
return trace.Wrap(updateErr)
}
}

r.AddTeleportResourceOrigin(teleportResource)

// If an error happens we want to put it in status.conditions before returning.
_, err = teleportClient.UpsertRole(ctx, teleportResource)
newReconciliationCondition := getReconciliationConditionFromError(err, false /* ignoreNotFound */)
meta.SetStatusCondition(k8sResource.StatusConditions(), newReconciliationCondition)
if err != nil {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.Wrap(err)
}

updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getReconciliationConditionFromError(err, false /* ignoreNotFound */),
})
// We update the status conditions on exit
return trace.Wrap(r.Status().Update(ctx, k8sResource))
return trace.NewAggregate(err, updateErr)
}

func (r *RoleReconciler) AddTeleportResourceOrigin(resource types.Role) {
Expand Down
53 changes: 33 additions & 20 deletions integrations/operator/controllers/resources/teleport_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"reflect"

"github.com/gravitational/trace"
"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -93,26 +92,41 @@ func (r TeleportResourceReconciler[T, K]) Upsert(ctx context.Context, obj kclien
teleportResource := k8sResource.ToTeleport()

existingResource, err := r.resourceClient.Get(ctx, teleportResource.GetName())
meta.SetStatusCondition(
k8sResource.StatusConditions(),
getReconciliationConditionFromError(err, true /* ignoreNotFound */),
)
if err != nil && !trace.IsNotFound(err) {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.Wrap(err)
updateErr := updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getReconciliationConditionFromError(err, true /* ignoreNotFound */),
})

if err != nil && !trace.IsNotFound(err) || updateErr != nil {
return trace.NewAggregate(err, updateErr)
}
// If err is nil, we found the resource. If err != nil (and we did return), then the error was `NotFound`
exists := err == nil

if exists {
newOwnershipCondition, isOwned := checkOwnership(existingResource)
meta.SetStatusCondition(k8sResource.StatusConditions(), newOwnershipCondition)
if updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: newOwnershipCondition,
}); updateErr != nil {
return trace.Wrap(updateErr)
}
if !isOwned {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.AlreadyExists("unowned resource '%s' already exists", existingResource.GetName())
}
} else {
meta.SetStatusCondition(k8sResource.StatusConditions(), newResourceCondition)
if updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: newResourceCondition,
}); updateErr != nil {
return trace.Wrap(updateErr)
}
}

teleportResource.SetOrigin(types.OriginKubernetes)
Expand All @@ -128,15 +142,14 @@ func (r TeleportResourceReconciler[T, K]) Upsert(ctx context.Context, obj kclien
err = r.resourceClient.Update(ctx, teleportResource)
}
// If an error happens we want to put it in status.conditions before returning.
newReconciliationCondition := getReconciliationConditionFromError(err, false /* ignoreNotFound */)
meta.SetStatusCondition(k8sResource.StatusConditions(), newReconciliationCondition)
if err != nil {
silentUpdateStatus(ctx, r.Client, k8sResource)
return trace.Wrap(err)
}

// We update the status conditions on exit
return trace.Wrap(r.Status().Update(ctx, k8sResource))
updateErr = updateStatus(updateStatusConfig{
ctx: ctx,
client: r.Client,
k8sResource: k8sResource,
condition: getReconciliationConditionFromError(err, false /* ignoreNotFound */),
})

return trace.NewAggregate(err, updateErr)
}

// Delete is the TeleportResourceReconciler of the ResourceBaseReconciler DeleteExertal
Expand Down
30 changes: 25 additions & 5 deletions integrations/operator/controllers/resources/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

"github.com/gravitational/trace"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -145,12 +146,31 @@ func getTeleportClientConditionFromError(err error) metav1.Condition {
}
}

// silentUpdateStatus updates the resource status but swallows the error if the update fails.
// This should be used when an error already happened, and we're going to re-run the reconciliation loop anyway.
func silentUpdateStatus(ctx context.Context, client kclient.Client, k8sResource kclient.Object) {
log := ctrllog.FromContext(ctx)
statusErr := client.Status().Update(ctx, k8sResource)
// updateStatusConfig is a configuration struct for silentUpdateStatus.
type updateStatusConfig struct {
ctx context.Context
client kclient.Client
k8sResource interface {
kclient.Object
StatusConditions() *[]metav1.Condition
}
condition metav1.Condition
}

// updateStatus updates the resource status but swallows the error if the update fails.
func updateStatus(config updateStatusConfig) error {
// If the condition is empty, we don't want to update the status.
if config.condition == (metav1.Condition{}) {
return nil
}
log := ctrllog.FromContext(config.ctx)
meta.SetStatusCondition(
config.k8sResource.StatusConditions(),
config.condition,
)
statusErr := config.client.Status().Update(config.ctx, config.k8sResource)
if statusErr != nil {
log.Error(statusErr, "failed to report error in status conditions")
}
return trace.Wrap(statusErr)
}

0 comments on commit c433dc9

Please sign in to comment.