Skip to content

Commit

Permalink
Add event handler to Applier/Destroyer
Browse files Browse the repository at this point in the history
Use an injected event handler function to avoid needing to cache
errors in the supervisor. Instead, cache errors in the Updater,
which already has a lock for this.

This event handler simplifies the Applier/Destroyer API and makes
it easy to add more events in the future, like for inventory updates.
  • Loading branch information
karlkfi committed May 3, 2024
1 parent ee8d669 commit 0d2c829
Show file tree
Hide file tree
Showing 13 changed files with 617 additions and 260 deletions.
214 changes: 93 additions & 121 deletions pkg/applier/applier.go

Large diffs are not rendered by default.

205 changes: 160 additions & 45 deletions pkg/applier/applier_test.go

Large diffs are not rendered by default.

148 changes: 111 additions & 37 deletions pkg/applier/destroyer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/GoogleContainerTools/kpt/pkg/live"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"kpt.dev/configsync/pkg/applier/stats"
"kpt.dev/configsync/pkg/core"
"kpt.dev/configsync/pkg/kinds"
"kpt.dev/configsync/pkg/status"
Expand All @@ -36,6 +37,7 @@ import (
"sigs.k8s.io/cli-utils/pkg/apply/filter"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/testutil"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -61,24 +63,28 @@ func (a *fakeKptDestroyer) Run(_ context.Context, _ inventory.Info, _ apply.Dest
}

func TestDestroy(t *testing.T) {
deploymentObj := newDeploymentObj()
deploymentObj.SetName("deployment-1")
deploymentID := object.UnstructuredToObjMetadata(deploymentObj)
deploymentObj1 := newDeploymentObj()
deploymentObj1.SetName("deployment-1")
deploymentObj1Meta := object.UnstructuredToObjMetadata(deploymentObj1)
deploymentObj1ID := core.IDOf(deploymentObj1)

deployment2Obj := newDeploymentObj()
deployment2Obj.SetName("deployment-2")
deployment2ID := object.UnstructuredToObjMetadata(deployment2Obj)
deploymentObj2 := newDeploymentObj()
deploymentObj2.SetName("deployment-2")
deploymentObj2Meta := object.UnstructuredToObjMetadata(deploymentObj2)

testObj := newTestObj("test-1")
testID := object.UnstructuredToObjMetadata(testObj)
testObj1 := newTestObj("test-1")
testObj1Meta := object.UnstructuredToObjMetadata(testObj1)
testObj1ID := core.IDOf(testObj1)

testObj2 := newTestObj("test-2")
testObj2ID := core.IDOf(testObj2)

namespaceObj := fake.UnstructuredObject(kinds.Namespace(),
core.Name("test-namespace"))
namespaceID := object.UnstructuredToObjMetadata(namespaceObj)
namespaceMeta := object.UnstructuredToObjMetadata(namespaceObj)
namespaceObjID := core.IDOf(namespaceObj)

uid := core.ID{
inventoryID := core.ID{
GroupKind: live.ResourceGroupGVK.GroupKind(),
ObjectKey: client.ObjectKey{
Name: "rs",
Expand All @@ -92,22 +98,31 @@ func TestDestroy(t *testing.T) {
etcdError := errors.New("etcdserver: request is too large") // satisfies util.IsRequestTooLargeError

testcases := []struct {
name string
events []event.Event
multiErr error
name string
events []event.Event
expectedError error
expectedObjectStatusMap ObjectStatusMap
expectedSyncStats *stats.SyncStats
}{
{
name: "unknown type for some resource",
events: []event.Event{
formDeleteEvent(event.DeleteFailed, testObj, applyerror.NewUnknownTypeError(testError1)),
formDeleteEvent(event.DeleteFailed, testObj1, applyerror.NewUnknownTypeError(testError1)),
formDeleteEvent(event.DeletePending, testObj2, nil),
},
multiErr: DeleteErrorForResource(testError1, idFrom(testID)),
expectedError: DeleteErrorForResource(testError1, testObj1ID),
expectedObjectStatusMap: ObjectStatusMap{
testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationFailed},
testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationPending},
},
expectedSyncStats: stats.NewSyncStats().
WithDeleteEvents(event.DeleteFailed, 1).
WithDeleteEvents(event.DeletePending, 1),
},
{
name: "conflict error for some resource",
events: []event.Event{
formDeleteSkipEvent(testID, testObj.DeepCopy(), &inventory.PolicyPreventedActuationError{
formDeleteSkipEvent(testObj1Meta, testObj1.DeepCopy(), &inventory.PolicyPreventedActuationError{
Strategy: actuation.ActuationStrategyDelete,
Policy: inventory.PolicyMustMatch,
Status: inventory.NoMatch,
Expand All @@ -116,73 +131,119 @@ func TestDestroy(t *testing.T) {
},
// Prunes and Deletes ignore PolicyPreventedActuationErrors.
// This allows abandoning of managed objects.
multiErr: nil,
expectedError: nil,
expectedObjectStatusMap: ObjectStatusMap{
testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSkipped},
testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationPending},
},
expectedSyncStats: stats.NewSyncStats().
WithDeleteEvents(event.DeleteSkipped, 1).
WithDeleteEvents(event.DeletePending, 1),
},
{
name: "inventory object is too large",
events: []event.Event{
formErrorEvent(etcdError),
},
multiErr: largeResourceGroupError(etcdError, uid),
expectedError: largeResourceGroupError(etcdError, inventoryID),
expectedObjectStatusMap: ObjectStatusMap{},
expectedSyncStats: stats.NewSyncStats().
WithErrorEvents(1),
},
{
name: "failed to delete",
events: []event.Event{
formDeleteEvent(event.DeleteFailed, testObj, testError1),
formDeleteEvent(event.DeleteFailed, testObj1, testError1),
formDeleteEvent(event.DeletePending, testObj2, nil),
},
multiErr: DeleteErrorForResource(testError1, idFrom(testID)),
expectedError: DeleteErrorForResource(testError1, testObj1ID),
expectedObjectStatusMap: ObjectStatusMap{
testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationFailed},
testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationPending},
},
expectedSyncStats: stats.NewSyncStats().
WithDeleteEvents(event.DeleteFailed, 1).
WithDeleteEvents(event.DeletePending, 1),
},
{
name: "skipped delete",
events: []event.Event{
formDeleteEvent(event.DeleteSuccessful, testObj, nil),
formDeleteEvent(event.DeleteSuccessful, testObj1, nil),
formDeleteEvent(event.DeleteSkipped, namespaceObj, &filter.NamespaceInUseError{
Namespace: "test-namespace",
}),
formDeleteEvent(event.DeleteSuccessful, testObj2, nil),
},
multiErr: SkipErrorForResource(
expectedError: SkipErrorForResource(
errors.New("namespace still in use: test-namespace"),
idFrom(namespaceID),
idFrom(namespaceMeta),
actuation.ActuationStrategyDelete),
expectedObjectStatusMap: ObjectStatusMap{
testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded},
namespaceObjID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSkipped},
testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded},
},
expectedSyncStats: stats.NewSyncStats().
WithDeleteEvents(event.DeleteSuccessful, 2).
WithDeleteEvents(event.DeleteSkipped, 1),
},
{
name: "all passed",
events: []event.Event{
formDeleteEvent(event.DeletePending, testObj2, nil),
formDeleteEvent(event.DeleteSuccessful, testObj, nil),
formDeleteEvent(event.DeleteSuccessful, deploymentObj, nil),
formDeleteEvent(event.DeleteSuccessful, testObj1, nil),
formDeleteEvent(event.DeleteSuccessful, deploymentObj1, nil),
},
expectedObjectStatusMap: ObjectStatusMap{
testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationPending},
testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded},
deploymentObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded},
},
expectedSyncStats: stats.NewSyncStats().
WithDeleteEvents(event.DeleteSuccessful, 2).
WithDeleteEvents(event.DeletePending, 1),
},
{
name: "all failed",
events: []event.Event{
formDeleteEvent(event.DeletePending, testObj2, nil),
formDeleteEvent(event.DeleteFailed, testObj, testError1),
formDeleteEvent(event.DeleteFailed, deploymentObj, testError2),
formDeleteEvent(event.DeleteFailed, testObj1, testError1),
formDeleteEvent(event.DeleteFailed, deploymentObj1, testError2),
},
multiErr: status.Wrap(
DeleteErrorForResource(testError1, idFrom(testID)),
DeleteErrorForResource(testError2, idFrom(deploymentID))),
expectedError: status.Wrap(
DeleteErrorForResource(testError1, testObj1ID),
DeleteErrorForResource(testError2, deploymentObj1ID)),
expectedObjectStatusMap: ObjectStatusMap{
testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationPending},
testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationFailed},
deploymentObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationFailed},
},
expectedSyncStats: stats.NewSyncStats().
WithDeleteEvents(event.DeleteFailed, 2).
WithDeleteEvents(event.DeletePending, 1),
},
{
name: "failed dependency during delete",
events: []event.Event{
formDeleteSkipEventWithDependent(deploymentObj.DeepCopy(), deployment2Obj.DeepCopy()),
formDeleteSkipEventWithDependent(deploymentObj1.DeepCopy(), deploymentObj2.DeepCopy()),
},
multiErr: SkipErrorForResource(
expectedError: SkipErrorForResource(
&filter.DependencyPreventedActuationError{
Object: deploymentID,
Object: deploymentObj1Meta,
Strategy: actuation.ActuationStrategyDelete,
Relationship: filter.RelationshipDependent,
Relation: deployment2ID,
Relation: deploymentObj2Meta,
RelationPhase: filter.PhaseReconcile,
RelationActuationStatus: actuation.ActuationSucceeded,
RelationReconcileStatus: actuation.ReconcileTimeout,
},
idFrom(deploymentID),
deploymentObj1ID,
actuation.ActuationStrategyDelete),
expectedObjectStatusMap: ObjectStatusMap{
deploymentObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSkipped},
},
expectedSyncStats: stats.NewSyncStats().
WithDeleteEvents(event.DeleteSkipped, 1),
},
}

Expand All @@ -198,8 +259,21 @@ func TestDestroy(t *testing.T) {
destroyer, err := NewNamespaceSupervisor(cs, "test-namespace", "rs", 5*time.Minute)
require.NoError(t, err)

errs := destroyer.Destroy(context.Background())
testerrors.AssertEqual(t, tc.multiErr, errs)
var errs status.MultiError
eventHandler := func(event Event) {
if errEvent, ok := event.(ErrorEvent); ok {
if errs == nil {
errs = errEvent.Error
} else {
errs = status.Append(errs, errEvent.Error)
}
}
}

objectStatusMap, syncStats := destroyer.Destroy(context.Background(), eventHandler)
testutil.AssertEqual(t, tc.expectedObjectStatusMap, objectStatusMap)
testutil.AssertEqual(t, tc.expectedSyncStats, syncStats)
testerrors.AssertEqual(t, tc.expectedError, errs)
})
}
}
Expand Down
27 changes: 13 additions & 14 deletions pkg/applier/fake/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"kpt.dev/configsync/pkg/applier"
"kpt.dev/configsync/pkg/applier/stats"
"kpt.dev/configsync/pkg/status"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -28,12 +29,9 @@ import (
// This is not in kpt.dev/configsync/pkg/testing/fake because that would cause
// a import loop (applier -> fake -> applier).
type Applier struct {
ApplyCalls int
ApplyInputs []ApplierInputs
ApplyOutputs []ApplierOutputs

ApplyCalls, ErrorsCalls int

currentErrors status.MultiError
}

// ApplierInputs stores inputs for fake.Applier.Apply()
Expand All @@ -43,26 +41,27 @@ type ApplierInputs struct {

// ApplierOutputs stores outputs for fake.Applier.Apply()
type ApplierOutputs struct {
Errors status.MultiError
Errors []status.Error
ObjectStatusMap applier.ObjectStatusMap
SyncStats *stats.SyncStats
}

// Apply fakes applier.Applier.Apply()
func (a *Applier) Apply(_ context.Context, objects []client.Object) status.MultiError {
func (a *Applier) Apply(_ context.Context, eventHandler func(applier.Event), objects []client.Object) (applier.ObjectStatusMap, *stats.SyncStats) {
a.ApplyInputs = append(a.ApplyInputs, ApplierInputs{
Objects: objects,
})
if a.ApplyCalls >= len(a.ApplyOutputs) {
panic(fmt.Sprintf("Expected only %d calls to Applier.Apply, but got more. Update Applier.ApplyOutputs if this is expected.", len(a.ApplyOutputs)))
panic(fmt.Sprintf("Expected only %d calls to Applier.Apply, but got more. Update Applier.Outputs if this is expected.", len(a.ApplyOutputs)))
}
outputs := a.ApplyOutputs[a.ApplyCalls]
a.ApplyCalls++
a.currentErrors = outputs.Errors
return outputs.Errors
}

// Errors fakes applier.Applier.Errors()
func (a *Applier) Errors() status.MultiError {
return a.currentErrors
for _, err := range outputs.Errors {
eventHandler(applier.ErrorEvent{
Error: err,
})
}
return outputs.ObjectStatusMap, outputs.SyncStats
}

var _ applier.Applier = &Applier{}

0 comments on commit 0d2c829

Please sign in to comment.