Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 138 additions & 110 deletions pkg/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (r *Reconciler) Apply(ctx context.Context, inventory *[]*InventoryItem, obj
}
}

// add/update inventory with target objects
// prepare (add/update) new inventory with target objects
// TODO: review this; it would be cleaner to use a DeepCopy method for a []*InventoryItem type (if there would be such a type)
newInventory := slices.Collect(*inventory, func(item *InventoryItem) *InventoryItem { return item.DeepCopy() })
numAdded := 0
Expand Down Expand Up @@ -435,7 +435,7 @@ func (r *Reconciler) Apply(ctx context.Context, inventory *[]*InventoryItem, obj
}
}

// mark obsolete inventory items (clear digest)
// mark obsolete items (clear digest) in new inventory
for _, item := range newInventory {
found := false
for _, object := range objects {
Expand All @@ -451,7 +451,7 @@ func (r *Reconciler) Apply(ctx context.Context, inventory *[]*InventoryItem, obj
}
}

// validate object set:
// validate new inventory:
// - check that all managed instances have apply-order greater than or equal to the according managed type
// - check that all managed instances have delete-order less than or equal to the according managed type
// - check that no managed types are about to be deleted (empty digest) unless all related managed instances are as well
Expand Down Expand Up @@ -491,7 +491,7 @@ func (r *Reconciler) Apply(ctx context.Context, inventory *[]*InventoryItem, obj
}
}

// accept inventory for further processing, put into right order for future deletion
// accept new inventory for further processing, put into right order for future deletion
*inventory = sortObjectsForDelete(newInventory)

// trigger another reconcile if something was added (to be sure that it is persisted)
Expand All @@ -504,136 +504,77 @@ func (r *Reconciler) Apply(ctx context.Context, inventory *[]*InventoryItem, obj
// - the persisted inventory at least has the same object keys as the in-memory inventory
// now it is about to synchronize the cluster state with the inventory

// delete redundant objects and maintain inventory;
// objects are deleted in waves according to their delete order;
// that means, only if all redundant objects of a wave are gone or comppleted, the next
// wave will be processed; within each wave, objects which are instances of managed
// types are deleted before all other objects, and namespaces will only be deleted
// if they are not used by any object in the inventory (note that this may cause deadlocks)
numManagedToBeDeleted := 0
numToBeDeleted := 0
for k, item := range *inventory {
// if this is the first object of an order, then
// count instances of managed types in this wave which are about to be deleted
if k == 0 || (*inventory)[k-1].DeleteOrder < item.DeleteOrder {
log.V(2).Info("begin of deletion wave", "order", item.DeleteOrder)
numManagedToBeDeleted = 0
for j := k; j < len(*inventory) && (*inventory)[j].DeleteOrder == item.DeleteOrder; j++ {
_item := (*inventory)[j]
if (_item.Phase == PhaseScheduledForDeletion || _item.Phase == PhaseScheduledForCompletion || _item.Phase == PhaseDeleting || _item.Phase == PhaseCompleting) && isInstanceOfManagedType(*inventory, _item) {
numManagedToBeDeleted++
// note: after this point, it is also guaranteed that objects is contained in the persisted inventory;
// the inventory therefore consists of two parts:
// - items which are contained in objects
// these items can have one of the following phases:
// - PhaseScheduledForApplication
// - PhaseCreating
// - PhaseUpdating
// - PhaseReady
// - PhaseScheduledForCompletion
// - PhaseCompleting
// - PhaseCompleted
// - items which are not contained in objects
// their phase is one of the following:
// - PhaseScheduledForDeletion
// - PhaseDeleting

// create missing namespaces
if r.createMissingNamespaces {
for _, namespace := range findMissingNamespaces(objects) {
if err := r.client.Get(ctx, apitypes.NamespacedName{Name: namespace}, &corev1.Namespace{}); err != nil {
if !apierrors.IsNotFound(err) {
return false, errors.Wrapf(err, "error reading namespace %s", namespace)
}
if err := r.client.Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, client.FieldOwner(r.name)); err != nil {
return false, errors.Wrapf(err, "error creating namespace %s", namespace)
}
}
}
}

if item.Phase == PhaseScheduledForDeletion || item.Phase == PhaseScheduledForCompletion || item.Phase == PhaseDeleting || item.Phase == PhaseCompleting {
// fetch object (if existing)
// put objects into right order for applying
objects = sortObjectsForApply(objects, getApplyOrder)

// finish due completions
// note that completions do not honor delete-order or delete-policy
// however, due to the way how PhaseScheduledForCompletion is set, the affected objects will
// always be in one and the same apply order
// in addition deletions are triggered in the canonical deletion order (but not waited for)
numToBeCompleted := 0
for _, item := range *inventory {
if item.Phase == PhaseScheduledForCompletion || item.Phase == PhaseCompleting {
existingObject, err := r.readObject(ctx, item)
if err != nil {
return false, errors.Wrapf(err, "error reading object %s", item)
}

orphan := item.DeletePolicy == DeletePolicyOrphan

switch item.Phase {
case PhaseScheduledForDeletion:
// delete namespaces after all contained inventory items
// delete all instances of managed types before remaining objects; this ensures that no objects are prematurely
// deleted which are needed for the deletion of the managed instances, such as webhook servers, api servers, ...
if (!isNamespace(item) || !isNamespaceUsed(*inventory, item.Name)) && (numManagedToBeDeleted == 0 || isInstanceOfManagedType(*inventory, item)) {
if orphan {
item.Phase = ""
} else {
// note: here is a theoretical risk that we delete an existing foreign object, because informers are not yet synced
// however not sending the delete request is also not an option, because this might lead to orphaned own dependents
// TODO: perform an additional owner id check
if err := r.deleteObject(ctx, item, existingObject); err != nil {
return false, errors.Wrapf(err, "error deleting object %s", item)
}
item.Phase = PhaseDeleting
item.Status = status.TerminatingStatus
numToBeDeleted++
}
} else {
numToBeDeleted++
}
case PhaseScheduledForCompletion:
// delete namespaces after all contained inventory items
// delete all instances of managed types before remaining objects; this ensures that no objects are prematurely
// deleted which are needed for the deletion of the managed instances, such as webhook servers, api servers, ...
if (!isNamespace(item) || !isNamespaceUsed(*inventory, item.Name)) && (numManagedToBeDeleted == 0 || isInstanceOfManagedType(*inventory, item)) {
if orphan {
return false, fmt.Errorf("invalid usage of deletion policy: object %s is scheduled for completion and therefore cannot be orphaned", item)
} else {
// note: here is a theoretical risk that we delete an existing foreign object, because informers are not yet synced
// however not sending the delete request is also not an option, because this might lead to orphaned own dependents
// TODO: perform an additional owner id check
if err := r.deleteObject(ctx, item, existingObject); err != nil {
return false, errors.Wrapf(err, "error deleting object %s", item)
}
item.Phase = PhaseCompleting
item.Status = status.TerminatingStatus
numToBeDeleted++
}
} else {
numToBeDeleted++
}
case PhaseDeleting:
// if object is gone, we can remove it from inventory
if existingObject == nil {
item.Phase = ""
} else {
numToBeDeleted++
if err := r.deleteObject(ctx, item, existingObject); err != nil {
return false, errors.Wrapf(err, "error deleting object %s", item)
}
item.Phase = PhaseCompleting
item.Status = status.TerminatingStatus
numToBeCompleted++
case PhaseCompleting:
// if object is gone, it is set to completed, and kept in inventory
if existingObject == nil {
item.Phase = PhaseCompleted
item.Status = ""
} else {
numToBeDeleted++
numToBeCompleted++
}
default:
// note: any other phase value would indicate a severe code problem, so we want to see the panic in that case
panic("this cannot happen")
}
}

// trigger another reconcile if this is the last object of the wave, and some deletions are not yet completed
if k == len(*inventory)-1 || (*inventory)[k+1].DeleteOrder > item.DeleteOrder {
log.V(2).Info("end of deletion wave", "order", item.DeleteOrder)
if numToBeDeleted > 0 {
break
}
}
}

*inventory = slices.Select(*inventory, func(item *InventoryItem) bool { return item.Phase != "" })

// trigger another reconcile
if numToBeDeleted > 0 {
// trigger another reconcile if any to-be-completed objects are left
if numToBeCompleted > 0 {
return false, nil
}

// note: after this point, PhaseScheduledForDeletion, PhaseScheduledForCompletion, PhaseDeleting, PhaseCompleting cannot occur anymore in inventory
// in other words: the inventory and objects contains the same resources

// create missing namespaces
if r.createMissingNamespaces {
for _, namespace := range findMissingNamespaces(objects) {
if err := r.client.Get(ctx, apitypes.NamespacedName{Name: namespace}, &corev1.Namespace{}); err != nil {
if !apierrors.IsNotFound(err) {
return false, errors.Wrapf(err, "error reading namespace %s", namespace)
}
if err := r.client.Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, client.FieldOwner(r.name)); err != nil {
return false, errors.Wrapf(err, "error creating namespace %s", namespace)
}
}
}
}

// put objects into right order for applying
objects = sortObjectsForApply(objects, getApplyOrder)
// note: after this point, PhaseScheduledForCompletion, PhaseCompleting cannot occur anymore in inventory

// apply objects and maintain inventory;
// objects are applied (i.e. created/updated) in waves according to their apply order;
Expand Down Expand Up @@ -748,7 +689,94 @@ func (r *Reconciler) Apply(ctx context.Context, inventory *[]*InventoryItem, obj
}
}

return numUnready == 0, nil
// trigger another reconcile if any unready objects are left
if numUnready > 0 {
return false, nil
}

// delete redundant objects and maintain inventory;
// objects are deleted in waves according to their delete order;
// that means, only if all redundant objects of a wave are gone , the next
// wave will be processed; within each wave, objects which are instances of managed
// types are deleted before all other objects, and namespaces will only be deleted
// if they are not used by any object in the inventory (note that this may cause deadlocks)
numManagedToBeDeleted := 0
numToBeDeleted := 0
for k, item := range *inventory {
// if this is the first object of an order, then
// count instances of managed types in this wave which are about to be deleted
if k == 0 || (*inventory)[k-1].DeleteOrder < item.DeleteOrder {
log.V(2).Info("begin of deletion wave", "order", item.DeleteOrder)
numManagedToBeDeleted = 0
for j := k; j < len(*inventory) && (*inventory)[j].DeleteOrder == item.DeleteOrder; j++ {
_item := (*inventory)[j]
if (_item.Phase == PhaseScheduledForDeletion || _item.Phase == PhaseDeleting) && isInstanceOfManagedType(*inventory, _item) {
numManagedToBeDeleted++
}
}
}

if item.Phase == PhaseScheduledForDeletion || item.Phase == PhaseDeleting {
// fetch object (if existing)
existingObject, err := r.readObject(ctx, item)
if err != nil {
return false, errors.Wrapf(err, "error reading object %s", item)
}

orphan := item.DeletePolicy == DeletePolicyOrphan

switch item.Phase {
case PhaseScheduledForDeletion:
// delete namespaces after all contained inventory items
// delete all instances of managed types before remaining objects; this ensures that no objects are prematurely
// deleted which are needed for the deletion of the managed instances, such as webhook servers, api servers, ...
if (!isNamespace(item) || !isNamespaceUsed(*inventory, item.Name)) && (numManagedToBeDeleted == 0 || isInstanceOfManagedType(*inventory, item)) {
if orphan {
item.Phase = ""
} else {
// note: here is a theoretical risk that we delete an existing foreign object, because informers are not yet synced
// however not sending the delete request is also not an option, because this might lead to orphaned own dependents
// TODO: perform an additional owner id check
if err := r.deleteObject(ctx, item, existingObject); err != nil {
return false, errors.Wrapf(err, "error deleting object %s", item)
}
item.Phase = PhaseDeleting
item.Status = status.TerminatingStatus
numToBeDeleted++
}
} else {
numToBeDeleted++
}
case PhaseDeleting:
// if object is gone, we can remove it from inventory
if existingObject == nil {
item.Phase = ""
} else {
numToBeDeleted++
}
default:
// note: any other phase value would indicate a severe code problem, so we want to see the panic in that case
panic("this cannot happen")
}
}

// trigger another reconcile if this is the last object of the wave, and some deletions are not yet finished
if k == len(*inventory)-1 || (*inventory)[k+1].DeleteOrder > item.DeleteOrder {
log.V(2).Info("end of deletion wave", "order", item.DeleteOrder)
if numToBeDeleted > 0 {
break
}
}
}

*inventory = slices.Select(*inventory, func(item *InventoryItem) bool { return item.Phase != "" })

// trigger another reconcile if any to-be-deleted objects are left
if numToBeDeleted > 0 {
return false, nil
}

return true, nil
}

// Delete objects stored in the inventory from the target cluster and maintain inventory.
Expand Down