From ac48b424a0ed9ecb3642eb5ea9bc42fa502b374d Mon Sep 17 00:00:00 2001 From: Karl Isenberg Date: Fri, 26 Apr 2024 16:15:15 -0700 Subject: [PATCH] Add event handler to Applier/Destroyer Use an injected event handler function to avoid needing to cache errors in the supervisor. Instead, cache errors in the Updater, which already has a lock for this. This event handler simplifies the Applier/Destroyer API and makes it easy to add more events in the future, like for inventory updates. --- pkg/applier/applier.go | 305 ++++++++---------- pkg/applier/applier_test.go | 232 +++++++++---- pkg/applier/destroyer_test.go | 148 ++++++--- pkg/applier/fake/applier.go | 27 +- pkg/applier/stats/stats.go | 75 +++++ pkg/parse/root_test.go | 12 +- pkg/parse/updater.go | 57 +++- pkg/reconciler/finalizer/base_finalizer.go | 61 ++++ pkg/reconciler/finalizer/finalizer.go | 8 +- .../finalizer/reposync_finalizer.go | 9 +- .../finalizer/reposync_finalizer_test.go | 12 +- .../finalizer/rootsync_finalizer.go | 9 +- .../finalizer/rootsync_finalizer_test.go | 40 ++- 13 files changed, 667 insertions(+), 328 deletions(-) create mode 100644 pkg/reconciler/finalizer/base_finalizer.go diff --git a/pkg/applier/applier.go b/pkg/applier/applier.go index c25f402edb..a6fa9483e3 100644 --- a/pkg/applier/applier.go +++ b/pkg/applier/applier.go @@ -68,28 +68,47 @@ var ( type Applier interface { // Apply creates, updates, or prunes all managed resources, depending on // the new desired resource objects. - // Returns the set of GVKs which were successfully applied and any errors. + // Error events are sent to the eventHandler. + // Returns the status of the applied objects and statistics of the sync. // This is called by the reconciler when changes are detected in the // source of truth (git, OCI, helm) and periodically. - Apply(ctx context.Context, desiredResources []client.Object) status.MultiError - // Errors returns the errors encountered during apply. - // This method may be called while Destroy is running, to get the set of - // errors encountered so far. - Errors() status.MultiError + Apply(ctx context.Context, eventHandler func(Event), desiredResources []client.Object) (ObjectStatusMap, *stats.SyncStats) } // Destroyer is a bulk client for deleting all the managed resource objects // tracked in a single ResourceGroup inventory. type Destroyer interface { // Destroy deletes all managed resources. - // Returns any errors encountered while destroying. + // Error events are sent to the eventHandler. + // Returns the status of the destroyed objects and statistics of the sync. // This is called by the reconciler finalizer when deletion propagation is // enabled. - Destroy(ctx context.Context) status.MultiError - // Errors returns the errors encountered during destroy. - // This method may be called while Destroy is running, to get the set of - // errors encountered so far. - Errors() status.MultiError + Destroy(ctx context.Context, eventHandler func(Event)) (ObjectStatusMap, *stats.SyncStats) +} + +// EventType is the type used by SuperEvent.Type +type EventType string + +const ( + // ErrorEventType is the type of the ErrorEvent + ErrorEventType EventType = "ErrorEvent" +) + +// Event is sent to the eventHandler by the supervisor. +type Event interface { + // Type returns the type of the event. + Type() EventType +} + +// ErrorEvent is sent after the supervisor has errored. +// Generally, the supervisor will still continue until success or timeout. +type ErrorEvent struct { + Error status.Error +} + +// Type returns the type of the event. +func (e ErrorEvent) Type() EventType { + return ErrorEventType } // Supervisor is a bulk client for applying and deleting a mutable set of @@ -125,11 +144,6 @@ type supervisor struct { // execMux prevents concurrent Apply/Destroy calls execMux sync.Mutex - // errorMux prevents concurrent modifications to the cached set of errors - errorMux sync.RWMutex - // errs received from the current (if running) or previous Apply/Destroy. - // These errors is cleared at the start of the Apply/Destroy methods. - errs status.MultiError } var _ Applier = &supervisor{} @@ -211,16 +225,9 @@ func wrapInventoryObj(obj *unstructured.Unstructured) (*live.InventoryResourceGr return inv, nil } -type eventHandler struct { - // isDestroy indicates whether the events being processed are for a Destroy - isDestroy bool - // clientSet is used for accessing various k8s clients - clientSet *ClientSet -} - -func (h *eventHandler) processApplyEvent(ctx context.Context, e event.ApplyEvent, s *stats.ApplyEventStats, objectStatusMap ObjectStatusMap, unknownTypeResources map[core.ID]struct{}, resourceMap map[core.ID]client.Object) status.Error { +func (s *supervisor) processApplyEvent(ctx context.Context, e event.ApplyEvent, syncStats *stats.ApplyEventStats, objectStatusMap ObjectStatusMap, unknownTypeResources map[core.ID]struct{}, resourceMap map[core.ID]client.Object) status.Error { id := idFrom(e.Identifier) - s.Add(e.Status) + syncStats.Add(e.Status) objectStatus, ok := objectStatusMap[id] if !ok || objectStatus == nil { @@ -256,16 +263,16 @@ func (h *eventHandler) processApplyEvent(ctx context.Context, e event.ApplyEvent case event.ApplySkipped: objectStatus.Actuation = actuation.ActuationSkipped // Skip event always includes an error with the reason - return h.handleApplySkippedEvent(e.Resource, id, e.Error) + return s.handleApplySkippedEvent(e.Resource, id, e.Error) default: return ErrorForResource(fmt.Errorf("unexpected prune event status: %v", e.Status), id) } } -func (h *eventHandler) processWaitEvent(e event.WaitEvent, s *stats.WaitEventStats, objectStatusMap ObjectStatusMap) error { +func (s *supervisor) processWaitEvent(e event.WaitEvent, syncStats *stats.WaitEventStats, objectStatusMap ObjectStatusMap, isDestroy bool) error { id := idFrom(e.Identifier) - s.Add(e.Status) + syncStats.Add(e.Status) objectStatus, ok := objectStatusMap[id] if !ok || objectStatus == nil { @@ -283,13 +290,13 @@ func (h *eventHandler) processWaitEvent(e event.WaitEvent, s *stats.WaitEventSta case event.ReconcileFailed: objectStatus.Reconcile = actuation.ReconcileFailed // ReconcileFailed is treated as an error for destroy - if h.isDestroy { + if isDestroy { return WaitErrorForResource(fmt.Errorf("reconcile failed"), id) } case event.ReconcileTimeout: objectStatus.Reconcile = actuation.ReconcileTimeout // ReconcileTimeout is treated as an error for destroy - if h.isDestroy { + if isDestroy { return WaitErrorForResource(fmt.Errorf("reconcile timeout"), id) } default: @@ -299,7 +306,7 @@ func (h *eventHandler) processWaitEvent(e event.WaitEvent, s *stats.WaitEventSta } // handleApplySkippedEvent translates from apply skipped event into resource error. -func (h *eventHandler) handleApplySkippedEvent(obj *unstructured.Unstructured, id core.ID, err error) status.Error { +func (s *supervisor) handleApplySkippedEvent(obj *unstructured.Unstructured, id core.ID, err error) status.Error { var depErr *filter.DependencyPreventedActuationError if errors.As(err, &depErr) { return SkipErrorForResource(err, id, depErr.Strategy) @@ -323,9 +330,9 @@ func (h *eventHandler) handleApplySkippedEvent(obj *unstructured.Unstructured, i } // processPruneEvent handles PruneEvents from the Applier -func (h *eventHandler) processPruneEvent(ctx context.Context, e event.PruneEvent, s *stats.PruneEventStats, objectStatusMap ObjectStatusMap) status.Error { +func (s *supervisor) processPruneEvent(ctx context.Context, e event.PruneEvent, syncStats *stats.PruneEventStats, objectStatusMap ObjectStatusMap) status.Error { id := idFrom(e.Identifier) - s.Add(e.Status) + syncStats.Add(e.Status) objectStatus, ok := objectStatusMap[id] if !ok || objectStatus == nil { @@ -352,7 +359,7 @@ func (h *eventHandler) processPruneEvent(ctx context.Context, e event.PruneEvent case event.PruneSkipped: objectStatus.Actuation = actuation.ActuationSkipped // Skip event always includes an error with the reason - return h.handleDeleteSkippedEvent(ctx, event.PruneType, e.Object, id, e.Error) + return s.handleDeleteSkippedEvent(ctx, event.PruneType, e.Object, id, e.Error) default: return PruneErrorForResource(fmt.Errorf("unexpected prune event status: %v", e.Status), id) @@ -360,9 +367,9 @@ func (h *eventHandler) processPruneEvent(ctx context.Context, e event.PruneEvent } // processDeleteEvent handles DeleteEvents from the Destroyer -func (h *eventHandler) processDeleteEvent(ctx context.Context, e event.DeleteEvent, s *stats.DeleteEventStats, objectStatusMap ObjectStatusMap) status.Error { +func (s *supervisor) processDeleteEvent(ctx context.Context, e event.DeleteEvent, syncStats *stats.DeleteEventStats, objectStatusMap ObjectStatusMap) status.Error { id := idFrom(e.Identifier) - s.Add(e.Status) + syncStats.Add(e.Status) objectStatus, ok := objectStatusMap[id] if !ok || objectStatus == nil { @@ -389,7 +396,7 @@ func (h *eventHandler) processDeleteEvent(ctx context.Context, e event.DeleteEve case event.DeleteSkipped: objectStatus.Actuation = actuation.ActuationSkipped // Skip event always includes an error with the reason - return h.handleDeleteSkippedEvent(ctx, event.DeleteType, e.Object, id, e.Error) + return s.handleDeleteSkippedEvent(ctx, event.DeleteType, e.Object, id, e.Error) default: return DeleteErrorForResource(fmt.Errorf("unexpected delete event status: %v", e.Status), id) @@ -398,11 +405,11 @@ func (h *eventHandler) processDeleteEvent(ctx context.Context, e event.DeleteEve // handleDeleteSkippedEvent translates from prune skip or delete skip event into // a resource error. -func (h *eventHandler) handleDeleteSkippedEvent(ctx context.Context, eventType event.Type, obj *unstructured.Unstructured, id core.ID, err error) status.Error { +func (s *supervisor) handleDeleteSkippedEvent(ctx context.Context, eventType event.Type, obj *unstructured.Unstructured, id core.ID, err error) status.Error { // Disable protected namespaces that were removed from the desired object set. if isNamespace(obj) && differ.SpecialNamespaces[obj.GetName()] { // the `client.lifecycle.config.k8s.io/deletion: detach` annotation is not a part of the Config Sync metadata, and will not be removed here. - err := h.abandonObject(ctx, obj) + err := s.abandonObject(ctx, obj) handleMetrics(ctx, "unmanage", err) if err != nil { err = fmt.Errorf("failed to remove the Config Sync metadata from %v (protected namespace): %v", @@ -450,7 +457,7 @@ func (h *eventHandler) handleDeleteSkippedEvent(ctx context.Context, eventType e // For prunes/deletes, this is desired behavior, not a fatal error. klog.Infof("Resource object removed from inventory, but not deleted: %v: %v", id, err) // The `client.lifecycle.config.k8s.io/deletion: detach` annotation is not a part of the Config Sync metadata, and will not be removed here. - err := h.abandonObject(ctx, obj) + err := s.abandonObject(ctx, obj) handleMetrics(ctx, "unmanage", err) if err != nil { err = fmt.Errorf("failed to remove the Config Sync metadata from %v (%s: %s): %v", @@ -480,43 +487,40 @@ func handleMetrics(ctx context.Context, operation string, err error) { // checkInventoryObjectSize checks the inventory object size limit. // If it is close to the size limit 1M, log a warning. -func (a *supervisor) checkInventoryObjectSize(ctx context.Context, c client.Client) { - u := newInventoryUnstructured(a.syncKind, a.syncName, a.syncNamespace, a.clientSet.StatusMode) - err := c.Get(ctx, client.ObjectKey{Namespace: a.syncNamespace, Name: a.syncName}, u) +func (s *supervisor) checkInventoryObjectSize(ctx context.Context, c client.Client) { + u := newInventoryUnstructured(s.syncKind, s.syncName, s.syncNamespace, s.clientSet.StatusMode) + err := c.Get(ctx, client.ObjectKey{Namespace: s.syncNamespace, Name: s.syncName}, u) if err == nil { size, err := getObjectSize(u) if err != nil { - klog.Warningf("Failed to marshal ResourceGroup %s/%s to get its size: %s", a.syncNamespace, a.syncName, err) + klog.Warningf("Failed to marshal ResourceGroup %s/%s to get its size: %s", s.syncNamespace, s.syncName, err) } if int64(size) > maxRequestBytes/2 { klog.Warningf("ResourceGroup %s/%s is close to the maximum object size limit (size: %d, max: %s). "+ "There are too many resources being synced than Config Sync can handle! Please split your repo into smaller repos "+ - "to avoid future failure.", a.syncNamespace, a.syncName, size, maxRequestBytesStr) + "to avoid future failure.", s.syncNamespace, s.syncName, size, maxRequestBytesStr) } } } // applyInner triggers a kpt live apply library call to apply a set of resources. -func (a *supervisor) applyInner(ctx context.Context, objs []client.Object) status.MultiError { - a.checkInventoryObjectSize(ctx, a.clientSet.Client) - eh := eventHandler{ - isDestroy: false, - clientSet: a.clientSet, - } +func (s *supervisor) applyInner(ctx context.Context, eventHandler func(Event), objs []client.Object) (ObjectStatusMap, *stats.SyncStats) { + s.checkInventoryObjectSize(ctx, s.clientSet.Client) + isDestroy := false - s := stats.NewSyncStats() + syncStats := stats.NewSyncStats() objStatusMap := make(ObjectStatusMap) // disabledObjs are objects for which the management are disabled // through annotation. enabledObjs, disabledObjs := partitionObjs(objs) if len(disabledObjs) > 0 { klog.Infof("%v objects to be disabled: %v", len(disabledObjs), core.GKNNs(disabledObjs)) - disabledCount, err := eh.handleDisabledObjects(ctx, a.inventory, disabledObjs) + disabledCount, err := s.handleDisabledObjects(ctx, s.inventory, disabledObjs) if err != nil { - a.addError(err) - return a.Errors() + sendErrorEvent(err, eventHandler) + return objStatusMap, syncStats } - s.DisableObjs = &stats.DisabledObjStats{ + syncStats.DisableObjs = &stats.DisabledObjStats{ Total: uint64(len(disabledObjs)), Succeeded: disabledCount, } @@ -524,8 +528,8 @@ func (a *supervisor) applyInner(ctx context.Context, objs []client.Object) statu klog.Infof("%v objects to be applied: %v", len(enabledObjs), core.GKNNs(enabledObjs)) resources, err := toUnstructured(enabledObjs) if err != nil { - a.addError(err) - return a.Errors() + sendErrorEvent(err, eventHandler) + return objStatusMap, syncStats } unknownTypeResources := make(map[core.ID]struct{}) @@ -535,14 +539,14 @@ func (a *supervisor) applyInner(ctx context.Context, objs []client.Object) statu ForceConflicts: true, FieldManager: configsync.FieldManager, }, - InventoryPolicy: a.policy, + InventoryPolicy: s.policy, // Leaving ReconcileTimeout and PruneTimeout unset may cause a WaitTask to wait forever. // ReconcileTimeout defines the timeout for a wait task after an apply task. // ReconcileTimeout is a task-level setting instead of an object-level setting. - ReconcileTimeout: a.reconcileTimeout, + ReconcileTimeout: s.reconcileTimeout, // PruneTimeout defines the timeout for a wait task after a prune task. // PruneTimeout is a task-level setting instead of an object-level setting. - PruneTimeout: a.reconcileTimeout, + PruneTimeout: s.reconcileTimeout, // PrunePropagationPolicy defines what policy to use when pruning // managed objects. // Use "Background" for now, otherwise managed RootSyncs cannot be @@ -554,15 +558,15 @@ func (a *supervisor) applyInner(ctx context.Context, objs []client.Object) statu // Reset shared mapper before each apply to invalidate the discovery cache. // This allows for picking up CRD changes. - meta.MaybeResetRESTMapper(a.clientSet.Mapper) + meta.MaybeResetRESTMapper(s.clientSet.Mapper) // Builds a map of id -> resource resourceMap := make(map[core.ID]client.Object) for _, obj := range resources { - resourceMap[idFrom(object.UnstructuredToObjMetadata(obj))] = obj + resourceMap[idFrom(ObjMetaFromObject(obj))] = obj } - events := a.clientSet.KptApplier.Run(ctx, a.inventory, object.UnstructuredSet(resources), options) + events := s.clientSet.KptApplier.Run(ctx, s.inventory, object.UnstructuredSet(resources), options) for e := range events { switch e.Type { case event.InitType: @@ -573,12 +577,12 @@ func (a *supervisor) applyInner(ctx context.Context, objs []client.Object) statu klog.Info(e.ActionGroupEvent) case event.ErrorType: klog.Info(e.ErrorEvent) - if util.IsRequestTooLargeError(e.ErrorEvent.Err) { - a.addError(largeResourceGroupError(e.ErrorEvent.Err, idFromInventory(a.inventory))) - } else { - a.addError(e.ErrorEvent.Err) + err := e.ErrorEvent.Err + if util.IsRequestTooLargeError(err) { + err = largeResourceGroupError(err, idFromInventory(s.inventory)) } - s.ErrorTypeEvents++ + sendErrorEvent(err, eventHandler) + syncStats.ErrorTypeEvents++ case event.WaitType: // Pending events are sent for any objects that haven't reconciled // when the WaitEvent starts. They're not very useful to the user. @@ -588,89 +592,58 @@ func (a *supervisor) applyInner(ctx context.Context, objs []client.Object) statu } else { klog.V(1).Info(e.WaitEvent) } - a.addError(eh.processWaitEvent(e.WaitEvent, s.WaitEvent, objStatusMap)) + if err := s.processWaitEvent(e.WaitEvent, syncStats.WaitEvent, objStatusMap, isDestroy); err != nil { + sendErrorEvent(err, eventHandler) + } case event.ApplyType: if e.ApplyEvent.Error != nil { klog.Info(e.ApplyEvent) } else { klog.V(1).Info(e.ApplyEvent) } - a.addError(eh.processApplyEvent(ctx, e.ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources, resourceMap)) + if err := s.processApplyEvent(ctx, e.ApplyEvent, syncStats.ApplyEvent, objStatusMap, unknownTypeResources, resourceMap); err != nil { + sendErrorEvent(err, eventHandler) + } case event.PruneType: if e.PruneEvent.Error != nil { klog.Info(e.PruneEvent) } else { klog.V(1).Info(e.PruneEvent) } - a.addError(eh.processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap)) + if err := s.processPruneEvent(ctx, e.PruneEvent, syncStats.PruneEvent, objStatusMap); err != nil { + sendErrorEvent(err, eventHandler) + } default: klog.Infof("Unhandled event (%s): %v", e.Type, e) } } - errs := a.Errors() - if errs == nil { - klog.V(4).Infof("Apply completed without error: all resources are up to date.") - } - if s.Empty() { - klog.V(4).Infof("Applier made no new progress") - } else { - klog.Infof("Applier made new progress: %s", s.String()) - objStatusMap.Log(klog.V(0)) - } - return errs + return objStatusMap, syncStats } -// Errors returns the errors encountered during the last apply or current apply -// if still running. -// Errors implements the Applier and Destroyer interfaces. -func (a *supervisor) Errors() status.MultiError { - a.errorMux.RLock() - defer a.errorMux.RUnlock() - - if a.errs != nil { - // Return a copy to avoid persisting caller modifications - return status.Wrap(a.errs.Errors()...) - } - return nil +func sendErrorEvent(err error, eventHandler func(Event)) { + eventHandler(ErrorEvent{Error: wrapError(err)}) } -func (a *supervisor) addError(err error) { - if err == nil { - return +func wrapError(err error) status.Error { + if statusErr, ok := err.(status.Error); ok { + return statusErr } - a.errorMux.Lock() - defer a.errorMux.Unlock() - - if _, ok := err.(status.Error); !ok { - // Wrap as an applier.Error to indicate the source of the error - err = Error(err) - } - - a.errs = status.Append(a.errs, err) -} - -func (a *supervisor) invalidateErrors() { - a.errorMux.Lock() - defer a.errorMux.Unlock() - - a.errs = nil + // Wrap as an applier.Error to indicate the source of the error + return Error(err) } // destroyInner triggers a kpt live destroy library call to destroy a set of resources. -func (a *supervisor) destroyInner(ctx context.Context) status.MultiError { - s := stats.NewSyncStats() +func (s *supervisor) destroyInner(ctx context.Context, eventHandler func(Event)) (ObjectStatusMap, *stats.SyncStats) { + syncStats := stats.NewSyncStats() objStatusMap := make(ObjectStatusMap) - eh := eventHandler{ - isDestroy: true, - clientSet: a.clientSet, - } + isDestroy := true options := apply.DestroyerOptions{ - InventoryPolicy: a.policy, + InventoryPolicy: s.policy, // DeleteTimeout defines the timeout for a wait task after a delete task. // DeleteTimeout is a task-level setting instead of an object-level setting. - DeleteTimeout: a.reconcileTimeout, + DeleteTimeout: s.reconcileTimeout, // DeletePropagationPolicy defines what policy to use when deleting // managed objects. // Use "Foreground" to ensure owned resources are deleted in the right @@ -681,9 +654,9 @@ func (a *supervisor) destroyInner(ctx context.Context) status.MultiError { // Reset shared mapper before each destroy to invalidate the discovery cache. // This allows for picking up CRD changes. - meta.MaybeResetRESTMapper(a.clientSet.Mapper) + meta.MaybeResetRESTMapper(s.clientSet.Mapper) - events := a.clientSet.KptDestroyer.Run(ctx, a.inventory, options) + events := s.clientSet.KptDestroyer.Run(ctx, s.inventory, options) for e := range events { switch e.Type { case event.InitType: @@ -694,12 +667,12 @@ func (a *supervisor) destroyInner(ctx context.Context) status.MultiError { klog.Info(e.ActionGroupEvent) case event.ErrorType: klog.Info(e.ErrorEvent) - if util.IsRequestTooLargeError(e.ErrorEvent.Err) { - a.addError(largeResourceGroupError(e.ErrorEvent.Err, idFromInventory(a.inventory))) - } else { - a.addError(e.ErrorEvent.Err) + err := e.ErrorEvent.Err + if util.IsRequestTooLargeError(err) { + err = largeResourceGroupError(err, idFromInventory(s.inventory)) } - s.ErrorTypeEvents++ + sendErrorEvent(err, eventHandler) + syncStats.ErrorTypeEvents++ case event.WaitType: // Pending events are sent for any objects that haven't reconciled // when the WaitEvent starts. They're not very useful to the user. @@ -709,56 +682,42 @@ func (a *supervisor) destroyInner(ctx context.Context) status.MultiError { } else { klog.V(1).Info(e.WaitEvent) } - a.addError(eh.processWaitEvent(e.WaitEvent, s.WaitEvent, objStatusMap)) + if err := s.processWaitEvent(e.WaitEvent, syncStats.WaitEvent, objStatusMap, isDestroy); err != nil { + sendErrorEvent(err, eventHandler) + } case event.DeleteType: if e.DeleteEvent.Error != nil { klog.Info(e.DeleteEvent) } else { klog.V(1).Info(e.DeleteEvent) } - a.addError(eh.processDeleteEvent(ctx, e.DeleteEvent, s.DeleteEvent, objStatusMap)) + if err := s.processDeleteEvent(ctx, e.DeleteEvent, syncStats.DeleteEvent, objStatusMap); err != nil { + sendErrorEvent(err, eventHandler) + } default: klog.Infof("Unhandled event (%s): %v", e.Type, e) } } - errs := a.Errors() - if errs == nil { - klog.V(4).Infof("Destroy completed without error: all resources are deleted.") - } - if s.Empty() { - klog.V(4).Infof("Destroyer made no new progress") - } else { - klog.Infof("Destroyer made new progress: %s.", s.String()) - objStatusMap.Log(klog.V(0)) - } - return errs + return objStatusMap, syncStats } -// Apply all managed resource objects and return any errors. +// Apply all managed resource objects and return their status. // Apply implements the Applier interface. -func (a *supervisor) Apply(ctx context.Context, desiredResource []client.Object) status.MultiError { - a.execMux.Lock() - defer a.execMux.Unlock() - - // Ideally we want to avoid invalidating errors that will continue to happen, - // but for now, invalidate all errors until they recur. - // TODO: improve error cache invalidation to make rsync status more stable - a.invalidateErrors() - return a.applyInner(ctx, desiredResource) +func (s *supervisor) Apply(ctx context.Context, eventHandler func(Event), desiredResource []client.Object) (ObjectStatusMap, *stats.SyncStats) { + s.execMux.Lock() + defer s.execMux.Unlock() + + return s.applyInner(ctx, eventHandler, desiredResource) } -// Destroy all managed resource objects and return any errors. +// Destroy all managed resource objects and return their status. // Destroy implements the Destroyer interface. -func (a *supervisor) Destroy(ctx context.Context) status.MultiError { - a.execMux.Lock() - defer a.execMux.Unlock() - - // Ideally we want to avoid invalidating errors that will continue to happen, - // but for now, invalidate all errors until they recur. - // TODO: improve error cache invalidation to make rsync status more stable - a.invalidateErrors() - return a.destroyInner(ctx) +func (s *supervisor) Destroy(ctx context.Context, eventHandler func(Event)) (ObjectStatusMap, *stats.SyncStats) { + s.execMux.Lock() + defer s.execMux.Unlock() + + return s.destroyInner(ctx, eventHandler) } // newInventoryUnstructured creates an inventory object as an unstructured. @@ -785,10 +744,10 @@ func InventoryID(name, namespace string) string { // then disables them, one by one, by removing the ConfigSync metadata. // Returns the number of objects which are disabled successfully, and any errors // encountered. -func (h *eventHandler) handleDisabledObjects(ctx context.Context, rg *live.InventoryResourceGroup, objs []client.Object) (uint64, status.MultiError) { +func (s *supervisor) handleDisabledObjects(ctx context.Context, rg *live.InventoryResourceGroup, objs []client.Object) (uint64, status.MultiError) { // disabledCount tracks the number of objects which are disabled successfully var disabledCount uint64 - err := h.removeFromInventory(rg, objs) + err := s.removeFromInventory(rg, objs) if err != nil { if nomosutil.IsRequestTooLargeError(err) { return disabledCount, largeResourceGroupError(err, idFromInventory(rg)) @@ -798,7 +757,7 @@ func (h *eventHandler) handleDisabledObjects(ctx context.Context, rg *live.Inven var errs status.MultiError for _, obj := range objs { id := core.IDOf(obj) - err := h.abandonObject(ctx, obj) + err := s.abandonObject(ctx, obj) handleMetrics(ctx, "unmanage", err) if err != nil { err = fmt.Errorf("failed to remove the Config Sync metadata from %v (%s: %s): %v", @@ -816,8 +775,8 @@ func (h *eventHandler) handleDisabledObjects(ctx context.Context, rg *live.Inven // removeFromInventory removes the specified objects from the inventory, if it // exists. -func (h *eventHandler) removeFromInventory(rg *live.InventoryResourceGroup, objs []client.Object) error { - clusterInv, err := h.clientSet.InvClient.GetClusterInventoryInfo(rg) +func (s *supervisor) removeFromInventory(rg *live.InventoryResourceGroup, objs []client.Object) error { + clusterInv, err := s.clientSet.InvClient.GetClusterInventoryInfo(rg) if err != nil { return err } @@ -838,19 +797,19 @@ func (h *eventHandler) removeFromInventory(rg *live.InventoryResourceGroup, objs if err != nil { return err } - return h.clientSet.InvClient.Replace(rg, newObjs, nil, common.DryRunNone) + return s.clientSet.InvClient.Replace(rg, newObjs, nil, common.DryRunNone) } // abandonObject removes ConfigSync labels and annotations from an object, // disabling management. -func (h *eventHandler) abandonObject(ctx context.Context, obj client.Object) error { - gvk, err := kinds.Lookup(obj, h.clientSet.Client.Scheme()) +func (s *supervisor) abandonObject(ctx context.Context, obj client.Object) error { + gvk, err := kinds.Lookup(obj, s.clientSet.Client.Scheme()) if err != nil { return err } uObj := &unstructured.Unstructured{} uObj.SetGroupVersionKind(gvk) - err = h.clientSet.Client.Get(ctx, client.ObjectKeyFromObject(obj), uObj) + err = s.clientSet.Client.Get(ctx, client.ObjectKeyFromObject(obj), uObj) if err != nil { if apierrors.IsNotFound(err) { klog.Warningf("Failed to abandon object: object not found: %s", core.IDOf(obj)) @@ -877,7 +836,7 @@ func (h *eventHandler) abandonObject(ctx context.Context, obj client.Object) err // Use merge-patch instead of server-side-apply, because we don't have // the object's source of truth handy and don't want to take ownership // of all the fields managed by other clients. - return h.clientSet.Client.Patch(ctx, toObj, client.MergeFrom(fromObj), + return s.clientSet.Client.Patch(ctx, toObj, client.MergeFrom(fromObj), client.FieldOwner(configsync.FieldManager)) } return nil diff --git a/pkg/applier/applier_test.go b/pkg/applier/applier_test.go index 9fc7d97d3a..84589bc59f 100644 --- a/pkg/applier/applier_test.go +++ b/pkg/applier/applier_test.go @@ -24,8 +24,10 @@ import ( "github.com/GoogleContainerTools/kpt/pkg/live" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + "kpt.dev/configsync/pkg/api/configsync/v1beta1" "kpt.dev/configsync/pkg/applier/stats" "kpt.dev/configsync/pkg/core" "kpt.dev/configsync/pkg/declared" @@ -77,10 +79,12 @@ func TestApply(t *testing.T) { resourceManager := declared.ResourceManager(syncScope, syncName) deploymentObj := newDeploymentObj() - deploymentID := object.UnstructuredToObjMetadata(deploymentObj) + deploymentObjMeta := object.UnstructuredToObjMetadata(deploymentObj) + deploymentObjID := core.IDOf(deploymentObj) - testObj := newTestObj("test-1") - testID := object.UnstructuredToObjMetadata(testObj) + testObj1 := newTestObj("test-1") + testObj1Meta := object.UnstructuredToObjMetadata(testObj1) + testObj1ID := core.IDOf(testObj1) abandonObj := deploymentObj.DeepCopy() abandonObj.SetName("abandon-me") @@ -99,17 +103,21 @@ func TestApply(t *testing.T) { metadata.ArchLabel: "anything", "example-to-not-delete": "anything", }) + abandonObjID := core.IDOf(abandonObj) testObj2 := newTestObj("test-2") + testObj2ID := core.IDOf(testObj2) testObj3 := newTestObj("test-3") + testObj3ID := core.IDOf(testObj3) - objs := []client.Object{deploymentObj, testObj} + objs := []client.Object{deploymentObj, testObj1} namespaceObj := fake.UnstructuredObject(kinds.Namespace(), core.Name(string(syncScope))) - namespaceID := object.UnstructuredToObjMetadata(namespaceObj) + namespaceObjMeta := object.UnstructuredToObjMetadata(namespaceObj) + namespaceObjID := core.IDOf(namespaceObj) - uid := core.ID{ + inventoryID := core.ID{ GroupKind: live.ResourceGroupGVK.GroupKind(), ObjectKey: client.ObjectKey{ Name: syncName, @@ -122,59 +130,92 @@ func TestApply(t *testing.T) { etcdError := errors.New("etcdserver: request is too large") // satisfies util.IsRequestTooLargeError testcases := []struct { - name string - serverObjs []client.Object - events []event.Event - expectedError status.MultiError - expectedServerObjs []client.Object + name string + serverObjs []client.Object + events []event.Event + expectedError status.MultiError + expectedObjectStatusMap ObjectStatusMap + expectedSyncStats *stats.SyncStats + expectedServerObjs []client.Object }{ { name: "unknown type for some resource", events: []event.Event{ - formApplyEvent(event.ApplyFailed, testObj, applyerror.NewUnknownTypeError(errors.New("unknown type"))), + formApplyEvent(event.ApplyFailed, testObj1, applyerror.NewUnknownTypeError(errors.New("unknown type"))), formApplyEvent(event.ApplyPending, testObj2, nil), }, - expectedError: ErrorForResourceWithResource(errors.New("unknown type"), idFrom(testID), testObj), + expectedError: ErrorForResourceWithResource(errors.New("unknown type"), testObj1ID, testObj1), + expectedObjectStatusMap: ObjectStatusMap{ + testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyApply, Actuation: actuation.ActuationFailed}, + testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyApply, Actuation: actuation.ActuationPending}, + }, + expectedSyncStats: stats.NewSyncStats(). + WithApplyEvents(event.ApplyFailed, 1). + WithApplyEvents(event.ApplyPending, 1), }, { name: "conflict error for some resource", events: []event.Event{ - formApplySkipEvent(testID, testObj.DeepCopy(), &inventory.PolicyPreventedActuationError{ + formApplySkipEvent(testObj1Meta, testObj1.DeepCopy(), &inventory.PolicyPreventedActuationError{ Strategy: actuation.ActuationStrategyApply, Policy: inventory.PolicyMustMatch, Status: inventory.NoMatch, }), formApplyEvent(event.ApplyPending, testObj2, nil), }, - expectedError: KptManagementConflictError(testObj), + expectedError: KptManagementConflictError(testObj1), + expectedObjectStatusMap: ObjectStatusMap{ + testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyApply, Actuation: actuation.ActuationSkipped}, + testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyApply, Actuation: actuation.ActuationPending}, + }, + expectedSyncStats: stats.NewSyncStats(). + WithApplyEvents(event.ApplySkipped, 1). + WithApplyEvents(event.ApplyPending, 1), }, { name: "inventory object is too large", events: []event.Event{ formErrorEvent(etcdError), }, - expectedError: largeResourceGroupError(etcdError, uid), + expectedError: largeResourceGroupError(etcdError, inventoryID), + expectedObjectStatusMap: ObjectStatusMap{}, + expectedSyncStats: stats.NewSyncStats(). + WithErrorEvents(1), }, { name: "failed to apply", events: []event.Event{ - formApplyEvent(event.ApplyFailed, testObj, applyerror.NewApplyRunError(errors.New("failed apply"))), + formApplyEvent(event.ApplyFailed, testObj1, applyerror.NewApplyRunError(errors.New("failed apply"))), formApplyEvent(event.ApplyPending, testObj2, nil), }, - expectedError: ErrorForResourceWithResource(errors.New("failed apply"), idFrom(testID), testObj), + expectedError: ErrorForResourceWithResource(errors.New("failed apply"), testObj1ID, testObj1), + expectedObjectStatusMap: ObjectStatusMap{ + testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyApply, Actuation: actuation.ActuationFailed}, + testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyApply, Actuation: actuation.ActuationPending}, + }, + expectedSyncStats: stats.NewSyncStats(). + WithApplyEvents(event.ApplyFailed, 1). + WithApplyEvents(event.ApplyPending, 1), }, { name: "failed to prune", events: []event.Event{ - formPruneEvent(event.PruneFailed, testObj, errors.New("failed pruning")), + formPruneEvent(event.PruneFailed, testObj1, errors.New("failed pruning")), formPruneEvent(event.PruneSuccessful, testObj2, nil), }, - expectedError: PruneErrorForResource(errors.New("failed pruning"), idFrom(testID)), + expectedError: PruneErrorForResource(errors.New("failed pruning"), testObj1ID), + expectedObjectStatusMap: ObjectStatusMap{ + testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationFailed}, + testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded}, + }, + expectedSyncStats: stats.NewSyncStats(). + WithPruneEvents(event.PruneFailed, 1). + WithPruneEvents(event.PruneSuccessful, 1), }, { name: "skipped pruning", events: []event.Event{ - formPruneEvent(event.PruneSuccessful, testObj, nil), + formPruneEvent(event.PruneSuccessful, testObj1, nil), formPruneEvent(event.PruneSkipped, namespaceObj, &filter.NamespaceInUseError{ Namespace: "test-namespace", }), @@ -182,49 +223,87 @@ func TestApply(t *testing.T) { }, expectedError: SkipErrorForResource( errors.New("namespace still in use: test-namespace"), - idFrom(namespaceID), + idFrom(namespaceObjMeta), actuation.ActuationStrategyDelete), + expectedObjectStatusMap: ObjectStatusMap{ + testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded}, + namespaceObjID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSkipped}, + testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded}, + }, + expectedSyncStats: stats.NewSyncStats(). + WithPruneEvents(event.PruneSuccessful, 2). + WithPruneEvents(event.PruneSkipped, 1), }, { name: "all passed", events: []event.Event{ - formApplyEvent(event.ApplySuccessful, testObj, nil), + formApplyEvent(event.ApplySuccessful, testObj1, nil), formApplyEvent(event.ApplySuccessful, deploymentObj, nil), formApplyEvent(event.ApplyPending, testObj2, nil), formPruneEvent(event.PruneSuccessful, testObj3, nil), }, + expectedObjectStatusMap: ObjectStatusMap{ + testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyApply, Actuation: actuation.ActuationSucceeded}, + deploymentObjID: &ObjectStatus{Strategy: actuation.ActuationStrategyApply, Actuation: actuation.ActuationSucceeded}, + testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyApply, Actuation: actuation.ActuationPending}, + testObj3ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded}, + }, + expectedSyncStats: stats.NewSyncStats(). + WithApplyEvents(event.ApplySuccessful, 2). + WithApplyEvents(event.ApplyPending, 1). + WithPruneEvents(event.PruneSuccessful, 1), }, { name: "all failed", events: []event.Event{ - formApplyEvent(event.ApplyFailed, testObj, applyerror.NewUnknownTypeError(errors.New("unknown type"))), + formApplyEvent(event.ApplyFailed, testObj1, applyerror.NewUnknownTypeError(errors.New("unknown type"))), formApplyEvent(event.ApplyFailed, deploymentObj, applyerror.NewApplyRunError(errors.New("failed apply"))), formApplyEvent(event.ApplyPending, testObj2, nil), formPruneEvent(event.PruneSuccessful, testObj3, nil), }, - expectedError: status.Wrap( - ErrorForResourceWithResource(errors.New("unknown type"), idFrom(testID), testObj), - ErrorForResourceWithResource(errors.New("failed apply"), idFrom(deploymentID), deploymentObj)), + expectedError: status.Append( + ErrorForResourceWithResource(errors.New("unknown type"), testObj1ID, testObj1), + ErrorForResourceWithResource(errors.New("failed apply"), deploymentObjID, deploymentObj)), + expectedObjectStatusMap: ObjectStatusMap{ + testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyApply, Actuation: actuation.ActuationFailed}, + deploymentObjID: &ObjectStatus{Strategy: actuation.ActuationStrategyApply, Actuation: actuation.ActuationFailed}, + testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyApply, Actuation: actuation.ActuationPending}, + testObj3ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded}, + }, + expectedSyncStats: stats.NewSyncStats(). + WithApplyEvents(event.ApplyFailed, 2). + WithApplyEvents(event.ApplyPending, 1). + WithPruneEvents(event.PruneSuccessful, 1), }, { name: "failed dependency during apply", events: []event.Event{ - formApplySkipEventWithDependency(deploymentID, deploymentObj.DeepCopy()), + formApplySkipEventWithDependency(deploymentObjMeta, deploymentObj.DeepCopy()), }, expectedError: SkipErrorForResource( errors.New("dependency apply reconcile timeout: namespace_name_group_kind"), - idFrom(deploymentID), + deploymentObjID, actuation.ActuationStrategyApply), + expectedObjectStatusMap: ObjectStatusMap{ + deploymentObjID: &ObjectStatus{Strategy: actuation.ActuationStrategyApply, Actuation: actuation.ActuationSkipped}, + }, + expectedSyncStats: stats.NewSyncStats(). + WithApplyEvents(event.ApplySkipped, 1), }, { name: "failed dependency during prune", events: []event.Event{ - formPruneSkipEventWithDependency(deploymentID), + formPruneSkipEventWithDependency(deploymentObjMeta), }, expectedError: SkipErrorForResource( errors.New("dependent delete actuation failed: namespace_name_group_kind"), - idFrom(deploymentID), + deploymentObjID, actuation.ActuationStrategyDelete), + expectedObjectStatusMap: ObjectStatusMap{ + deploymentObjID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSkipped}, + }, + expectedSyncStats: stats.NewSyncStats(). + WithPruneEvents(event.PruneSkipped, 1), }, { name: "abandon object", @@ -235,6 +314,11 @@ func TestApply(t *testing.T) { formPruneSkipEventWithDetach(abandonObj), }, expectedError: nil, + expectedObjectStatusMap: ObjectStatusMap{ + abandonObjID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSkipped}, + }, + expectedSyncStats: stats.NewSyncStats(). + WithPruneEvents(event.PruneSkipped, 1), expectedServerObjs: []client.Object{ func() client.Object { obj := abandonObj.DeepCopy() @@ -280,8 +364,23 @@ func TestApply(t *testing.T) { applier, err := NewNamespaceSupervisor(cs, syncScope, syncName, 5*time.Minute) require.NoError(t, err) - errs := applier.Apply(context.Background(), objs) - testerrors.AssertEqual(t, tc.expectedError, errs) + var errs status.MultiError + eventHandler := func(event Event) { + if errEvent, ok := event.(ErrorEvent); ok { + if errs == nil { + errs = errEvent.Error + } else { + errs = status.Append(errs, errEvent.Error) + } + } + } + + objectStatusMap, syncStats := applier.Apply(context.Background(), eventHandler, objs) + + testutil.AssertEqual(t, tc.expectedError, errs) + testutil.AssertEqual(t, tc.expectedObjectStatusMap, objectStatusMap) + testutil.AssertEqual(t, tc.expectedSyncStats, syncStats) + fakeClient.Check(t, tc.expectedServerObjs...) }) } @@ -419,39 +518,53 @@ func formErrorEvent(err error) event.Event { func TestProcessApplyEvent(t *testing.T) { deploymentObj := newDeploymentObj() - deploymentID := object.UnstructuredToObjMetadata(deploymentObj) - testObj := newTestObj("test-1") - testID := object.UnstructuredToObjMetadata(testObj) + deploymentObjID := core.IDOf(deploymentObj) + testObj1 := newTestObj("test-1") + testObj1ID := core.IDOf(testObj1) ctx := context.Background() - s := stats.NewSyncStats() + syncStats := stats.NewSyncStats() objStatusMap := make(ObjectStatusMap) unknownTypeResources := make(map[core.ID]struct{}) - eh := eventHandler{ - isDestroy: false, - } + s := supervisor{} resourceMap := make(map[core.ID]client.Object) - resourceMap[idFrom(deploymentID)] = deploymentObj - - err := eh.processApplyEvent(ctx, formApplyEvent(event.ApplyFailed, deploymentObj, fmt.Errorf("test error")).ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources, resourceMap) - expectedError := ErrorForResourceWithResource(fmt.Errorf("test error"), idFrom(deploymentID), deploymentObj) - testerrors.AssertEqual(t, expectedError, err, "expected processPruneEvent to error on apply %s", event.ApplyFailed) + resourceMap[deploymentObjID] = deploymentObj + + err := s.processApplyEvent(ctx, formApplyEvent(event.ApplyFailed, deploymentObj, fmt.Errorf("test error")).ApplyEvent, syncStats.ApplyEvent, objStatusMap, unknownTypeResources, resourceMap) + expectedError := ErrorForResourceWithResource(fmt.Errorf("test error"), deploymentObjID, deploymentObj) + testutil.AssertEqual(t, expectedError, err, "expected processPruneEvent to error on apply %s", event.ApplyFailed) + + expectedCSE := v1beta1.ConfigSyncError{ + Code: "2009", + ErrorMessage: "KNV2009: failed to apply Deployment.apps, test-namespace/random-name: test error\n\nsource: namespaces/foo/role.yaml\nnamespace: test-namespace\nmetadata.name: random-name\ngroup: apps\nversion: v1\nkind: Deployment\n\nFor more information, see https://g.co/cloud/acm-errors#knv2009", + Resources: []v1beta1.ResourceRef{{ + SourcePath: "namespaces/foo/role.yaml", + Name: "random-name", + Namespace: "test-namespace", + GVK: metav1.GroupVersionKind{ + Group: "apps", + Version: "v1", + Kind: "Deployment", + }, + }}, + } + testutil.AssertEqual(t, expectedCSE, err.ToCSE(), "expected CSEs to match") - err = eh.processApplyEvent(ctx, formApplyEvent(event.ApplySuccessful, testObj, nil).ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources, resourceMap) + err = s.processApplyEvent(ctx, formApplyEvent(event.ApplySuccessful, testObj1, nil).ApplyEvent, syncStats.ApplyEvent, objStatusMap, unknownTypeResources, resourceMap) assert.Nil(t, err, "expected processApplyEvent NOT to error on apply %s", event.ApplySuccessful) expectedApplyStatus := stats.NewSyncStats() expectedApplyStatus.ApplyEvent.Add(event.ApplyFailed) expectedApplyStatus.ApplyEvent.Add(event.ApplySuccessful) - testutil.AssertEqual(t, expectedApplyStatus, s, "expected event stats to match") + testutil.AssertEqual(t, expectedApplyStatus, syncStats, "expected event stats to match") expectedObjStatusMap := ObjectStatusMap{ - idFrom(deploymentID): { + deploymentObjID: { Strategy: actuation.ActuationStrategyApply, Actuation: actuation.ActuationFailed, }, - idFrom(testID): { + testObj1ID: { Strategy: actuation.ActuationStrategyApply, Actuation: actuation.ActuationSucceeded, }, @@ -470,25 +583,24 @@ func TestProcessPruneEvent(t *testing.T) { testID := object.UnstructuredToObjMetadata(testObj) ctx := context.Background() - s := stats.NewSyncStats() + syncStats := stats.NewSyncStats() objStatusMap := make(ObjectStatusMap) cs := &ClientSet{} - eh := eventHandler{ - isDestroy: false, + s := supervisor{ clientSet: cs, } - err := eh.processPruneEvent(ctx, formPruneEvent(event.PruneFailed, deploymentObj, fmt.Errorf("test error")).PruneEvent, s.PruneEvent, objStatusMap) + err := s.processPruneEvent(ctx, formPruneEvent(event.PruneFailed, deploymentObj, fmt.Errorf("test error")).PruneEvent, syncStats.PruneEvent, objStatusMap) expectedError := PruneErrorForResource(fmt.Errorf("test error"), idFrom(deploymentID)) testerrors.AssertEqual(t, expectedError, err, "expected processPruneEvent to error on prune %s", event.PruneFailed) - err = eh.processPruneEvent(ctx, formPruneEvent(event.PruneSuccessful, testObj, nil).PruneEvent, s.PruneEvent, objStatusMap) + err = s.processPruneEvent(ctx, formPruneEvent(event.PruneSuccessful, testObj, nil).PruneEvent, syncStats.PruneEvent, objStatusMap) assert.Nil(t, err, "expected processPruneEvent NOT to error on prune %s", event.PruneSuccessful) expectedApplyStatus := stats.NewSyncStats() expectedApplyStatus.PruneEvent.Add(event.PruneFailed) expectedApplyStatus.PruneEvent.Add(event.PruneSuccessful) - testutil.AssertEqual(t, expectedApplyStatus, s, "expected event stats to match") + testutil.AssertEqual(t, expectedApplyStatus, syncStats, "expected event stats to match") expectedObjStatusMap := ObjectStatusMap{ idFrom(deploymentID): { @@ -583,16 +695,14 @@ func TestProcessWaitEvent(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - p := eventHandler{ - isDestroy: tc.isDestroy, - } - s := stats.NewSyncStats() + s := supervisor{} + syncStats := stats.NewSyncStats() objStatusMap := make(ObjectStatusMap) expectedApplyStatus := stats.NewSyncStats() expectedObjStatusMap := ObjectStatusMap{} for _, e := range tc.events { - err := p.processWaitEvent(formWaitEvent(e.status, e.id).WaitEvent, s.WaitEvent, objStatusMap) + err := s.processWaitEvent(formWaitEvent(e.status, e.id).WaitEvent, syncStats.WaitEvent, objStatusMap, tc.isDestroy) if e.expectedErr == nil { assert.Nil(t, err) } else { @@ -604,7 +714,7 @@ func TestProcessWaitEvent(t *testing.T) { } } - testutil.AssertEqual(t, expectedApplyStatus, s, "expected event stats to match") + testutil.AssertEqual(t, expectedApplyStatus, syncStats, "expected event stats to match") testutil.AssertEqual(t, expectedObjStatusMap, objStatusMap, "expected object status to match") }) } diff --git a/pkg/applier/destroyer_test.go b/pkg/applier/destroyer_test.go index 18b934b315..eb11a1ca52 100644 --- a/pkg/applier/destroyer_test.go +++ b/pkg/applier/destroyer_test.go @@ -23,6 +23,7 @@ import ( "github.com/GoogleContainerTools/kpt/pkg/live" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "kpt.dev/configsync/pkg/applier/stats" "kpt.dev/configsync/pkg/core" "kpt.dev/configsync/pkg/kinds" "kpt.dev/configsync/pkg/status" @@ -36,6 +37,7 @@ import ( "sigs.k8s.io/cli-utils/pkg/apply/filter" "sigs.k8s.io/cli-utils/pkg/inventory" "sigs.k8s.io/cli-utils/pkg/object" + "sigs.k8s.io/cli-utils/pkg/testutil" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -61,24 +63,28 @@ func (a *fakeKptDestroyer) Run(_ context.Context, _ inventory.Info, _ apply.Dest } func TestDestroy(t *testing.T) { - deploymentObj := newDeploymentObj() - deploymentObj.SetName("deployment-1") - deploymentID := object.UnstructuredToObjMetadata(deploymentObj) + deploymentObj1 := newDeploymentObj() + deploymentObj1.SetName("deployment-1") + deploymentObj1Meta := object.UnstructuredToObjMetadata(deploymentObj1) + deploymentObj1ID := core.IDOf(deploymentObj1) - deployment2Obj := newDeploymentObj() - deployment2Obj.SetName("deployment-2") - deployment2ID := object.UnstructuredToObjMetadata(deployment2Obj) + deploymentObj2 := newDeploymentObj() + deploymentObj2.SetName("deployment-2") + deploymentObj2Meta := object.UnstructuredToObjMetadata(deploymentObj2) - testObj := newTestObj("test-1") - testID := object.UnstructuredToObjMetadata(testObj) + testObj1 := newTestObj("test-1") + testObj1Meta := object.UnstructuredToObjMetadata(testObj1) + testObj1ID := core.IDOf(testObj1) testObj2 := newTestObj("test-2") + testObj2ID := core.IDOf(testObj2) namespaceObj := fake.UnstructuredObject(kinds.Namespace(), core.Name("test-namespace")) - namespaceID := object.UnstructuredToObjMetadata(namespaceObj) + namespaceMeta := object.UnstructuredToObjMetadata(namespaceObj) + namespaceObjID := core.IDOf(namespaceObj) - uid := core.ID{ + inventoryID := core.ID{ GroupKind: live.ResourceGroupGVK.GroupKind(), ObjectKey: client.ObjectKey{ Name: "rs", @@ -92,22 +98,31 @@ func TestDestroy(t *testing.T) { etcdError := errors.New("etcdserver: request is too large") // satisfies util.IsRequestTooLargeError testcases := []struct { - name string - events []event.Event - multiErr error + name string + events []event.Event + expectedError error + expectedObjectStatusMap ObjectStatusMap + expectedSyncStats *stats.SyncStats }{ { name: "unknown type for some resource", events: []event.Event{ - formDeleteEvent(event.DeleteFailed, testObj, applyerror.NewUnknownTypeError(testError1)), + formDeleteEvent(event.DeleteFailed, testObj1, applyerror.NewUnknownTypeError(testError1)), formDeleteEvent(event.DeletePending, testObj2, nil), }, - multiErr: DeleteErrorForResource(testError1, idFrom(testID)), + expectedError: DeleteErrorForResource(testError1, testObj1ID), + expectedObjectStatusMap: ObjectStatusMap{ + testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationFailed}, + testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationPending}, + }, + expectedSyncStats: stats.NewSyncStats(). + WithDeleteEvents(event.DeleteFailed, 1). + WithDeleteEvents(event.DeletePending, 1), }, { name: "conflict error for some resource", events: []event.Event{ - formDeleteSkipEvent(testID, testObj.DeepCopy(), &inventory.PolicyPreventedActuationError{ + formDeleteSkipEvent(testObj1Meta, testObj1.DeepCopy(), &inventory.PolicyPreventedActuationError{ Strategy: actuation.ActuationStrategyDelete, Policy: inventory.PolicyMustMatch, Status: inventory.NoMatch, @@ -116,73 +131,119 @@ func TestDestroy(t *testing.T) { }, // Prunes and Deletes ignore PolicyPreventedActuationErrors. // This allows abandoning of managed objects. - multiErr: nil, + expectedError: nil, + expectedObjectStatusMap: ObjectStatusMap{ + testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSkipped}, + testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationPending}, + }, + expectedSyncStats: stats.NewSyncStats(). + WithDeleteEvents(event.DeleteSkipped, 1). + WithDeleteEvents(event.DeletePending, 1), }, { name: "inventory object is too large", events: []event.Event{ formErrorEvent(etcdError), }, - multiErr: largeResourceGroupError(etcdError, uid), + expectedError: largeResourceGroupError(etcdError, inventoryID), + expectedObjectStatusMap: ObjectStatusMap{}, + expectedSyncStats: stats.NewSyncStats(). + WithErrorEvents(1), }, { name: "failed to delete", events: []event.Event{ - formDeleteEvent(event.DeleteFailed, testObj, testError1), + formDeleteEvent(event.DeleteFailed, testObj1, testError1), formDeleteEvent(event.DeletePending, testObj2, nil), }, - multiErr: DeleteErrorForResource(testError1, idFrom(testID)), + expectedError: DeleteErrorForResource(testError1, testObj1ID), + expectedObjectStatusMap: ObjectStatusMap{ + testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationFailed}, + testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationPending}, + }, + expectedSyncStats: stats.NewSyncStats(). + WithDeleteEvents(event.DeleteFailed, 1). + WithDeleteEvents(event.DeletePending, 1), }, { name: "skipped delete", events: []event.Event{ - formDeleteEvent(event.DeleteSuccessful, testObj, nil), + formDeleteEvent(event.DeleteSuccessful, testObj1, nil), formDeleteEvent(event.DeleteSkipped, namespaceObj, &filter.NamespaceInUseError{ Namespace: "test-namespace", }), formDeleteEvent(event.DeleteSuccessful, testObj2, nil), }, - multiErr: SkipErrorForResource( + expectedError: SkipErrorForResource( errors.New("namespace still in use: test-namespace"), - idFrom(namespaceID), + idFrom(namespaceMeta), actuation.ActuationStrategyDelete), + expectedObjectStatusMap: ObjectStatusMap{ + testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded}, + namespaceObjID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSkipped}, + testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded}, + }, + expectedSyncStats: stats.NewSyncStats(). + WithDeleteEvents(event.DeleteSuccessful, 2). + WithDeleteEvents(event.DeleteSkipped, 1), }, { name: "all passed", events: []event.Event{ formDeleteEvent(event.DeletePending, testObj2, nil), - formDeleteEvent(event.DeleteSuccessful, testObj, nil), - formDeleteEvent(event.DeleteSuccessful, deploymentObj, nil), + formDeleteEvent(event.DeleteSuccessful, testObj1, nil), + formDeleteEvent(event.DeleteSuccessful, deploymentObj1, nil), + }, + expectedObjectStatusMap: ObjectStatusMap{ + testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationPending}, + testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded}, + deploymentObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded}, }, + expectedSyncStats: stats.NewSyncStats(). + WithDeleteEvents(event.DeleteSuccessful, 2). + WithDeleteEvents(event.DeletePending, 1), }, { name: "all failed", events: []event.Event{ formDeleteEvent(event.DeletePending, testObj2, nil), - formDeleteEvent(event.DeleteFailed, testObj, testError1), - formDeleteEvent(event.DeleteFailed, deploymentObj, testError2), + formDeleteEvent(event.DeleteFailed, testObj1, testError1), + formDeleteEvent(event.DeleteFailed, deploymentObj1, testError2), }, - multiErr: status.Wrap( - DeleteErrorForResource(testError1, idFrom(testID)), - DeleteErrorForResource(testError2, idFrom(deploymentID))), + expectedError: status.Wrap( + DeleteErrorForResource(testError1, testObj1ID), + DeleteErrorForResource(testError2, deploymentObj1ID)), + expectedObjectStatusMap: ObjectStatusMap{ + testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationPending}, + testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationFailed}, + deploymentObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationFailed}, + }, + expectedSyncStats: stats.NewSyncStats(). + WithDeleteEvents(event.DeleteFailed, 2). + WithDeleteEvents(event.DeletePending, 1), }, { name: "failed dependency during delete", events: []event.Event{ - formDeleteSkipEventWithDependent(deploymentObj.DeepCopy(), deployment2Obj.DeepCopy()), + formDeleteSkipEventWithDependent(deploymentObj1.DeepCopy(), deploymentObj2.DeepCopy()), }, - multiErr: SkipErrorForResource( + expectedError: SkipErrorForResource( &filter.DependencyPreventedActuationError{ - Object: deploymentID, + Object: deploymentObj1Meta, Strategy: actuation.ActuationStrategyDelete, Relationship: filter.RelationshipDependent, - Relation: deployment2ID, + Relation: deploymentObj2Meta, RelationPhase: filter.PhaseReconcile, RelationActuationStatus: actuation.ActuationSucceeded, RelationReconcileStatus: actuation.ReconcileTimeout, }, - idFrom(deploymentID), + deploymentObj1ID, actuation.ActuationStrategyDelete), + expectedObjectStatusMap: ObjectStatusMap{ + deploymentObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSkipped}, + }, + expectedSyncStats: stats.NewSyncStats(). + WithDeleteEvents(event.DeleteSkipped, 1), }, } @@ -198,8 +259,21 @@ func TestDestroy(t *testing.T) { destroyer, err := NewNamespaceSupervisor(cs, "test-namespace", "rs", 5*time.Minute) require.NoError(t, err) - errs := destroyer.Destroy(context.Background()) - testerrors.AssertEqual(t, tc.multiErr, errs) + var errs status.MultiError + eventHandler := func(event Event) { + if errEvent, ok := event.(ErrorEvent); ok { + if errs == nil { + errs = errEvent.Error + } else { + errs = status.Append(errs, errEvent.Error) + } + } + } + + objectStatusMap, syncStats := destroyer.Destroy(context.Background(), eventHandler) + testutil.AssertEqual(t, tc.expectedObjectStatusMap, objectStatusMap) + testutil.AssertEqual(t, tc.expectedSyncStats, syncStats) + testerrors.AssertEqual(t, tc.expectedError, errs) }) } } diff --git a/pkg/applier/fake/applier.go b/pkg/applier/fake/applier.go index 4b12bcab93..ef4a501100 100644 --- a/pkg/applier/fake/applier.go +++ b/pkg/applier/fake/applier.go @@ -19,6 +19,7 @@ import ( "fmt" "kpt.dev/configsync/pkg/applier" + "kpt.dev/configsync/pkg/applier/stats" "kpt.dev/configsync/pkg/status" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -28,12 +29,9 @@ import ( // This is not in kpt.dev/configsync/pkg/testing/fake because that would cause // a import loop (applier -> fake -> applier). type Applier struct { + ApplyCalls int ApplyInputs []ApplierInputs ApplyOutputs []ApplierOutputs - - ApplyCalls, ErrorsCalls int - - currentErrors status.MultiError } // ApplierInputs stores inputs for fake.Applier.Apply() @@ -43,26 +41,27 @@ type ApplierInputs struct { // ApplierOutputs stores outputs for fake.Applier.Apply() type ApplierOutputs struct { - Errors status.MultiError + Errors []status.Error + ObjectStatusMap applier.ObjectStatusMap + SyncStats *stats.SyncStats } // Apply fakes applier.Applier.Apply() -func (a *Applier) Apply(_ context.Context, objects []client.Object) status.MultiError { +func (a *Applier) Apply(_ context.Context, eventHandler func(applier.Event), objects []client.Object) (applier.ObjectStatusMap, *stats.SyncStats) { a.ApplyInputs = append(a.ApplyInputs, ApplierInputs{ Objects: objects, }) if a.ApplyCalls >= len(a.ApplyOutputs) { - panic(fmt.Sprintf("Expected only %d calls to Applier.Apply, but got more. Update Applier.ApplyOutputs if this is expected.", len(a.ApplyOutputs))) + panic(fmt.Sprintf("Expected only %d calls to Applier.Apply, but got more. Update Applier.Outputs if this is expected.", len(a.ApplyOutputs))) } outputs := a.ApplyOutputs[a.ApplyCalls] a.ApplyCalls++ - a.currentErrors = outputs.Errors - return outputs.Errors -} - -// Errors fakes applier.Applier.Errors() -func (a *Applier) Errors() status.MultiError { - return a.currentErrors + for _, err := range outputs.Errors { + eventHandler(applier.ErrorEvent{ + Error: err, + }) + } + return outputs.ObjectStatusMap, outputs.SyncStats } var _ applier.Applier = &Applier{} diff --git a/pkg/applier/stats/stats.go b/pkg/applier/stats/stats.go index 499bfba718..fefcae34b2 100644 --- a/pkg/applier/stats/stats.go +++ b/pkg/applier/stats/stats.go @@ -36,6 +36,14 @@ func (s *PruneEventStats) Add(status event.PruneEventStatus) { s.EventByOp[status]++ } +// Set records a total number of events +func (s *PruneEventStats) Set(status event.PruneEventStatus, count uint64) { + if s.EventByOp == nil { + s.EventByOp = map[event.PruneEventStatus]uint64{} + } + s.EventByOp[status] = count +} + // String returns the stats as a human readable string. func (s PruneEventStats) String() string { var strs []string @@ -77,6 +85,14 @@ func (s *DeleteEventStats) Add(status event.DeleteEventStatus) { s.EventByOp[status]++ } +// Set records a total number of events +func (s *DeleteEventStats) Set(status event.DeleteEventStatus, count uint64) { + if s.EventByOp == nil { + s.EventByOp = map[event.DeleteEventStatus]uint64{} + } + s.EventByOp[status] = count +} + // String returns the stats as a human readable string. func (s DeleteEventStats) String() string { var strs []string @@ -119,6 +135,14 @@ func (s *ApplyEventStats) Add(status event.ApplyEventStatus) { s.EventByOp[status]++ } +// Set records a total number of events +func (s *ApplyEventStats) Set(status event.ApplyEventStatus, count uint64) { + if s.EventByOp == nil { + s.EventByOp = map[event.ApplyEventStatus]uint64{} + } + s.EventByOp[status] = count +} + // String returns the stats as a human readable string. func (s ApplyEventStats) String() string { var strs []string @@ -161,6 +185,14 @@ func (s *WaitEventStats) Add(status event.WaitEventStatus) { s.EventByOp[status]++ } +// Set records a total number of events +func (s *WaitEventStats) Set(status event.WaitEventStatus, count uint64) { + if s.EventByOp == nil { + s.EventByOp = map[event.WaitEventStatus]uint64{} + } + s.EventByOp[status] = count +} + // String returns the stats as a human readable string. func (s WaitEventStats) String() string { var strs []string @@ -249,6 +281,49 @@ func (s *SyncStats) Empty() bool { return s == nil || s.ErrorTypeEvents == 0 && s.PruneEvent.Empty() && s.DeleteEvent.Empty() && s.ApplyEvent.Empty() && s.WaitEvent.Empty() && s.DisableObjs.Empty() } +// WithApplyEvents sets the number of times this apply event status occurred. +// Intended for easy test expectation construction. +func (s *SyncStats) WithApplyEvents(status event.ApplyEventStatus, count uint64) *SyncStats { + s.ApplyEvent.Set(status, count) + return s +} + +// WithPruneEvents sets the number of times this prune event status occurred. +// Intended for easy test expectation construction. +func (s *SyncStats) WithPruneEvents(status event.PruneEventStatus, count uint64) *SyncStats { + s.PruneEvent.Set(status, count) + return s +} + +// WithDeleteEvents sets the number of times this delete event status occurred. +// Intended for easy test expectation construction. +func (s *SyncStats) WithDeleteEvents(status event.DeleteEventStatus, count uint64) *SyncStats { + s.DeleteEvent.Set(status, count) + return s +} + +// WithWaitEvent sets the number of times this wait event status occurred. +// Intended for easy test expectation construction. +func (s *SyncStats) WithWaitEvent(status event.WaitEventStatus, count uint64) *SyncStats { + s.WaitEvent.Set(status, count) + return s +} + +// WithDisabledObjStats sets the disabled object stats. +// Intended for easy test expectation construction. +func (s *SyncStats) WithDisabledObjStats(total, succeeded uint64) *SyncStats { + s.DisableObjs.Total = total + s.DisableObjs.Succeeded = succeeded + return s +} + +// WithErrorEvents sets the total number of error events. +// Intended for easy test expectation construction. +func (s *SyncStats) WithErrorEvents(count uint64) *SyncStats { + s.ErrorTypeEvents = count + return s +} + // NewSyncStats constructs a SyncStats with empty event maps. func NewSyncStats() *SyncStats { return &SyncStats{ diff --git a/pkg/parse/root_test.go b/pkg/parse/root_test.go index 5f8f10a676..6b2ef910dc 100644 --- a/pkg/parse/root_test.go +++ b/pkg/parse/root_test.go @@ -1066,13 +1066,15 @@ func TestRoot_SourceReconcilerErrorsMetricValidation(t *testing.T) { func TestRoot_SourceAndSyncReconcilerErrorsMetricValidation(t *testing.T) { testCases := []struct { name string - applyErrors status.MultiError + applyErrors []status.Error expectedError status.MultiError expectedMetrics []*view.Row }{ { - name: "single reconciler error in sync component", - applyErrors: applier.Error(errors.New("sync error")), + name: "single reconciler error in sync component", + applyErrors: []status.Error{ + applier.Error(errors.New("sync error")), + }, expectedError: applier.Error(errors.New("sync error")), expectedMetrics: []*view.Row{ {Data: &view.LastValueData{Value: 0}, Tags: []tag.Tag{{Key: metrics.KeyComponent, Value: "source"}, {Key: metrics.KeyErrorClass, Value: "1xxx"}}}, @@ -1085,10 +1087,10 @@ func TestRoot_SourceAndSyncReconcilerErrorsMetricValidation(t *testing.T) { }, { name: "multiple reconciler errors in sync component", - applyErrors: status.Wrap( + applyErrors: []status.Error{ applier.Error(errors.New("sync error")), status.InternalError("internal error"), - ), + }, expectedError: status.Wrap( applier.Error(errors.New("sync error")), status.InternalError("internal error"), diff --git a/pkg/parse/updater.go b/pkg/parse/updater.go index 322973a52d..bafc73c45a 100644 --- a/pkg/parse/updater.go +++ b/pkg/parse/updater.go @@ -46,8 +46,9 @@ type Updater struct { // tracking them in a ResourceGroup inventory. Applier applier.Applier - errorMux sync.RWMutex + statusMux sync.RWMutex validationErrs status.MultiError + applyErrs status.MultiError watchErrs status.MultiError updateMux sync.RWMutex @@ -65,14 +66,14 @@ func (u *Updater) managementConflict() bool { // Errors returns the latest known set of errors from the updater. // This method is safe to call while Update is running. func (u *Updater) Errors() status.MultiError { - u.errorMux.RLock() - defer u.errorMux.RUnlock() + u.statusMux.RLock() + defer u.statusMux.RUnlock() var errs status.MultiError errs = status.Append(errs, u.conflictErrors()) errs = status.Append(errs, u.fightErrors()) errs = status.Append(errs, u.validationErrs) - errs = status.Append(errs, u.Applier.Errors()) + errs = status.Append(errs, u.applyErrors()) errs = status.Append(errs, u.watchErrs) return errs } @@ -98,14 +99,32 @@ func (u *Updater) fightErrors() status.MultiError { } func (u *Updater) setValidationErrs(errs status.MultiError) { - u.errorMux.Lock() - defer u.errorMux.Unlock() + u.statusMux.Lock() + defer u.statusMux.Unlock() u.validationErrs = errs } +func (u *Updater) applyErrors() status.MultiError { + u.statusMux.RLock() + defer u.statusMux.RUnlock() + return u.applyErrs +} + +func (u *Updater) addApplyError(err status.Error) { + u.statusMux.Lock() + defer u.statusMux.Unlock() + u.applyErrs = status.Append(u.applyErrs, err) +} + +func (u *Updater) resetApplyErrors() { + u.statusMux.Lock() + defer u.statusMux.Unlock() + u.applyErrs = nil +} + func (u *Updater) setWatchErrs(errs status.MultiError) { - u.errorMux.Lock() - defer u.errorMux.Unlock() + u.statusMux.Lock() + defer u.statusMux.Unlock() u.watchErrs = errs } @@ -236,14 +255,34 @@ func (u *Updater) declare(ctx context.Context, objs []client.Object, commit stri } func (u *Updater) apply(ctx context.Context, objs []client.Object, commit string) status.MultiError { + // Collect errors into a MultiError + var err status.MultiError + eventHandler := func(event applier.Event) { + if errEvent, ok := event.(applier.ErrorEvent); ok { + if err == nil { + err = errEvent.Error + } else { + err = status.Append(err, errEvent.Error) + } + u.addApplyError(errEvent.Error) + } + } klog.V(1).Info("Applier starting...") start := time.Now() - err := u.Applier.Apply(ctx, objs) + u.resetApplyErrors() + objStatusMap, syncStats := u.Applier.Apply(ctx, eventHandler, objs) + if syncStats.Empty() { + klog.V(4).Info("Applier made no new progress") + } else { + klog.Infof("Applier made new progress: %s", syncStats.String()) + objStatusMap.Log(klog.V(0)) + } metrics.RecordApplyDuration(ctx, metrics.StatusTagKey(err), commit, start) if err != nil { klog.Warningf("Failed to apply declared resources: %v", err) return err } + klog.V(4).Info("Apply completed without error: all resources are up to date.") klog.V(3).Info("Applier stopped") return nil } diff --git a/pkg/reconciler/finalizer/base_finalizer.go b/pkg/reconciler/finalizer/base_finalizer.go new file mode 100644 index 0000000000..745fb20127 --- /dev/null +++ b/pkg/reconciler/finalizer/base_finalizer.go @@ -0,0 +1,61 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package finalizer + +import ( + "context" + + "k8s.io/klog/v2" + "kpt.dev/configsync/pkg/applier" + "kpt.dev/configsync/pkg/status" +) + +type baseFinalizer struct { + Destroyer applier.Destroyer +} + +// destroy calls Destroyer.Destroy, collects errors, and handles logging, +// similar to Updater.apply. +func (bf *baseFinalizer) destroy(ctx context.Context) status.MultiError { + var err status.MultiError + eventHandler := func(event applier.Event) { + if errEvent, ok := event.(applier.ErrorEvent); ok { + if err == nil { + err = errEvent.Error + } else { + err = status.Append(err, errEvent.Error) + } + } + } + klog.V(1).Info("Destroyer starting...") + // start := time.Now() + objStatusMap, syncStats := bf.Destroyer.Destroy(ctx, eventHandler) + if syncStats.Empty() { + klog.V(4).Info("Destroyer made no new progress") + } else { + klog.Infof("Destroyer made new progress: %s", syncStats.String()) + objStatusMap.Log(klog.V(0)) + } + // TODO: should we have a destroy duration metric? + // We don't have the commit here, so we can't send the apply metric. + // metrics.RecordApplyDuration(ctx, metrics.StatusTagKey(errs), commit, start) + if err != nil { + klog.Warningf("Failed to destroy declared resources: %v", err) + return err + } + klog.V(4).Info("Destroyer completed without error: all resources are deleted.") + klog.V(3).Info("Applier stopped") + return nil +} diff --git a/pkg/reconciler/finalizer/finalizer.go b/pkg/reconciler/finalizer/finalizer.go index c2ef932b00..a7b94cac6a 100644 --- a/pkg/reconciler/finalizer/finalizer.go +++ b/pkg/reconciler/finalizer/finalizer.go @@ -37,14 +37,18 @@ type Finalizer interface { func New(scope declared.Scope, destroyer applier.Destroyer, c client.Client, stopControllers context.CancelFunc, controllersStopped <-chan struct{}) Finalizer { if scope == declared.RootReconciler { return &RootSyncFinalizer{ - Destroyer: destroyer, + baseFinalizer: baseFinalizer{ + Destroyer: destroyer, + }, Client: c, StopControllers: stopControllers, ControllersStopped: controllersStopped, } } return &RepoSyncFinalizer{ - Destroyer: destroyer, + baseFinalizer: baseFinalizer{ + Destroyer: destroyer, + }, Client: c, StopControllers: stopControllers, ControllersStopped: controllersStopped, diff --git a/pkg/reconciler/finalizer/reposync_finalizer.go b/pkg/reconciler/finalizer/reposync_finalizer.go index fe5517098e..53d8f0ddd4 100644 --- a/pkg/reconciler/finalizer/reposync_finalizer.go +++ b/pkg/reconciler/finalizer/reposync_finalizer.go @@ -20,7 +20,6 @@ import ( "k8s.io/klog/v2" "kpt.dev/configsync/pkg/api/configsync/v1beta1" - "kpt.dev/configsync/pkg/applier" "kpt.dev/configsync/pkg/reposync" "kpt.dev/configsync/pkg/status" "kpt.dev/configsync/pkg/util/mutate" @@ -31,8 +30,10 @@ import ( // to destroy all managed user objects previously applied from source. // Implements the Finalizer interface. type RepoSyncFinalizer struct { - Destroyer applier.Destroyer - Client client.Client + baseFinalizer + + // Client used to update RSync spec and status. + Client client.Client // StopControllers will be called by the Finalize() method to stop the Parser and Remediator. StopControllers context.CancelFunc @@ -176,7 +177,7 @@ func (f *RepoSyncFinalizer) removeFinalizingCondition(ctx context.Context, syncO // deleteManagedObjects uses the destroyer to delete managed objects and then // updates the ReconcilerFinalizerFailure condition on the specified object. func (f *RepoSyncFinalizer) deleteManagedObjects(ctx context.Context, syncObj *v1beta1.RepoSync) error { - destroyErrs := f.Destroyer.Destroy(ctx) + destroyErrs := f.destroy(ctx) // Update the FinalizerFailure condition whether the destroy succeeded or failed if _, updateErr := f.updateFailureCondition(ctx, syncObj, destroyErrs); updateErr != nil { updateErr = fmt.Errorf("updating FinalizerFailure condition: %w", updateErr) diff --git a/pkg/reconciler/finalizer/reposync_finalizer_test.go b/pkg/reconciler/finalizer/reposync_finalizer_test.go index 22b1899780..eaa4a5bd32 100644 --- a/pkg/reconciler/finalizer/reposync_finalizer_test.go +++ b/pkg/reconciler/finalizer/reposync_finalizer_test.go @@ -68,7 +68,7 @@ func TestRepoSyncFinalize(t *testing.T) { name string rsync client.Object setup func(*fake.Client) error - destroyErrs status.MultiError + destroyErrs []status.Error expectedRsyncBeforeDestroy client.Object expectedError error expectedStopped bool @@ -123,7 +123,9 @@ func TestRepoSyncFinalize(t *testing.T) { } return obj }(), - destroyErrs: status.APIServerError(fmt.Errorf("destroy error"), "example message"), + destroyErrs: []status.Error{ + status.APIServerError(fmt.Errorf("destroy error"), "example message"), + }, expectedError: fmt.Errorf( "deleting managed objects: %w", status.APIServerError(fmt.Errorf("destroy error"), "example message")), @@ -282,7 +284,7 @@ func TestRepoSyncFinalize(t *testing.T) { defer close(continueCh) stopped = true } - destroyFunc := func(context.Context) status.MultiError { + destroyFunc := func(context.Context) []status.Error { // Lookup the current RepoSync key := client.ObjectKeyFromObject(repoSync1) rsync := &v1beta1.RepoSync{} @@ -294,7 +296,9 @@ func TestRepoSyncFinalize(t *testing.T) { } fakeDestroyer := newFakeDestroyer(tc.destroyErrs, destroyFunc) finalizer := &RepoSyncFinalizer{ - Destroyer: fakeDestroyer, + baseFinalizer: baseFinalizer{ + Destroyer: fakeDestroyer, + }, Client: fakeClient, StopControllers: stopFunc, ControllersStopped: continueCh, diff --git a/pkg/reconciler/finalizer/rootsync_finalizer.go b/pkg/reconciler/finalizer/rootsync_finalizer.go index 38ebcaa002..24e3a3cd85 100644 --- a/pkg/reconciler/finalizer/rootsync_finalizer.go +++ b/pkg/reconciler/finalizer/rootsync_finalizer.go @@ -20,7 +20,6 @@ import ( "k8s.io/klog/v2" "kpt.dev/configsync/pkg/api/configsync/v1beta1" - "kpt.dev/configsync/pkg/applier" "kpt.dev/configsync/pkg/rootsync" "kpt.dev/configsync/pkg/status" "kpt.dev/configsync/pkg/util/mutate" @@ -31,8 +30,10 @@ import ( // to destroy all managed user objects previously applied from source. // Implements the Finalizer interface. type RootSyncFinalizer struct { - Destroyer applier.Destroyer - Client client.Client + baseFinalizer + + // Client used to update RSync spec and status. + Client client.Client // StopControllers will be called by the Finalize() method to stop the Parser and Remediator. StopControllers context.CancelFunc @@ -176,7 +177,7 @@ func (f *RootSyncFinalizer) removeFinalizingCondition(ctx context.Context, syncO // deleteManagedObjects uses the destroyer to delete managed objects and then // updates the ReconcilerFinalizerFailure condition on the specified object. func (f *RootSyncFinalizer) deleteManagedObjects(ctx context.Context, syncObj *v1beta1.RootSync) error { - destroyErrs := f.Destroyer.Destroy(ctx) + destroyErrs := f.destroy(ctx) // Update the FinalizerFailure condition whether the destroy succeeded or failed if _, updateErr := f.updateFailureCondition(ctx, syncObj, destroyErrs); updateErr != nil { updateErr = fmt.Errorf("updating FinalizerFailure condition: %w", updateErr) diff --git a/pkg/reconciler/finalizer/rootsync_finalizer_test.go b/pkg/reconciler/finalizer/rootsync_finalizer_test.go index af44399d61..2c2a85cd62 100644 --- a/pkg/reconciler/finalizer/rootsync_finalizer_test.go +++ b/pkg/reconciler/finalizer/rootsync_finalizer_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer" "kpt.dev/configsync/pkg/api/configsync/v1beta1" "kpt.dev/configsync/pkg/applier" + "kpt.dev/configsync/pkg/applier/stats" "kpt.dev/configsync/pkg/core" "kpt.dev/configsync/pkg/kinds" "kpt.dev/configsync/pkg/metadata" @@ -76,7 +77,7 @@ func TestRootSyncFinalize(t *testing.T) { name string rsync client.Object setup func(*fake.Client) error - destroyErrs status.MultiError + destroyErrs []status.Error expectedRsyncBeforeDestroy client.Object expectedError error expectedStopped bool @@ -131,7 +132,9 @@ func TestRootSyncFinalize(t *testing.T) { } return obj }(), - destroyErrs: status.APIServerError(fmt.Errorf("destroy error"), "example message"), + destroyErrs: []status.Error{ + status.APIServerError(fmt.Errorf("destroy error"), "example message"), + }, expectedError: fmt.Errorf( "deleting managed objects: %w", status.APIServerError(fmt.Errorf("destroy error"), "example message")), @@ -288,7 +291,7 @@ func TestRootSyncFinalize(t *testing.T) { defer close(continueCh) stopped = true } - destroyFunc := func(context.Context) status.MultiError { + destroyFunc := func(context.Context) []status.Error { // Lookup the current RootSync key := client.ObjectKeyFromObject(rootSync1) rsync := &v1beta1.RootSync{} @@ -300,7 +303,9 @@ func TestRootSyncFinalize(t *testing.T) { } fakeDestroyer := newFakeDestroyer(tc.destroyErrs, destroyFunc) finalizer := &RootSyncFinalizer{ - Destroyer: fakeDestroyer, + baseFinalizer: baseFinalizer{ + Destroyer: fakeDestroyer, + }, Client: fakeClient, StopControllers: stopFunc, ControllersStopped: continueCh, @@ -619,26 +624,31 @@ func updateToRemoveFinalizers(ctx context.Context, fakeClient *fake.Client, obj } type fakeDestroyer struct { - errs status.MultiError - destroyFunc func(context.Context) status.MultiError + errors []status.Error + destroyFunc func(context.Context) []status.Error } var _ applier.Destroyer = &fakeDestroyer{} -func newFakeDestroyer(errs status.MultiError, destroyFunc func(context.Context) status.MultiError) *fakeDestroyer { +func newFakeDestroyer(errs []status.Error, destroyFunc func(context.Context) []status.Error) *fakeDestroyer { return &fakeDestroyer{ - errs: errs, + errors: errs, destroyFunc: destroyFunc, } } -func (d *fakeDestroyer) Destroy(ctx context.Context) status.MultiError { +func (d *fakeDestroyer) Destroy(ctx context.Context, eventHandler func(applier.Event)) (applier.ObjectStatusMap, *stats.SyncStats) { + var errs []status.Error if d.destroyFunc != nil { - return d.destroyFunc(ctx) + errs = d.destroyFunc(ctx) + } else { + errs = d.errors } - return d.errs -} - -func (d *fakeDestroyer) Errors() status.MultiError { - return d.errs + for _, err := range errs { + eventHandler(applier.ErrorEvent{ + Error: err, + }) + } + // TODO: test ObjectStatusMap & SyncStats + return applier.ObjectStatusMap{}, &stats.SyncStats{} }