Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event handler to Applier/Destroyer #1199

Merged
merged 1 commit into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
305 changes: 132 additions & 173 deletions pkg/applier/applier.go

Large diffs are not rendered by default.

232 changes: 171 additions & 61 deletions pkg/applier/applier_test.go

Large diffs are not rendered by default.

148 changes: 111 additions & 37 deletions pkg/applier/destroyer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/GoogleContainerTools/kpt/pkg/live"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"kpt.dev/configsync/pkg/applier/stats"
"kpt.dev/configsync/pkg/core"
"kpt.dev/configsync/pkg/kinds"
"kpt.dev/configsync/pkg/status"
Expand All @@ -36,6 +37,7 @@ import (
"sigs.k8s.io/cli-utils/pkg/apply/filter"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/testutil"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -61,24 +63,28 @@ func (a *fakeKptDestroyer) Run(_ context.Context, _ inventory.Info, _ apply.Dest
}

func TestDestroy(t *testing.T) {
deploymentObj := newDeploymentObj()
deploymentObj.SetName("deployment-1")
deploymentID := object.UnstructuredToObjMetadata(deploymentObj)
deploymentObj1 := newDeploymentObj()
deploymentObj1.SetName("deployment-1")
deploymentObj1Meta := object.UnstructuredToObjMetadata(deploymentObj1)
deploymentObj1ID := core.IDOf(deploymentObj1)

deployment2Obj := newDeploymentObj()
deployment2Obj.SetName("deployment-2")
deployment2ID := object.UnstructuredToObjMetadata(deployment2Obj)
deploymentObj2 := newDeploymentObj()
deploymentObj2.SetName("deployment-2")
deploymentObj2Meta := object.UnstructuredToObjMetadata(deploymentObj2)

testObj := newTestObj("test-1")
testID := object.UnstructuredToObjMetadata(testObj)
testObj1 := newTestObj("test-1")
testObj1Meta := object.UnstructuredToObjMetadata(testObj1)
testObj1ID := core.IDOf(testObj1)

testObj2 := newTestObj("test-2")
testObj2ID := core.IDOf(testObj2)

namespaceObj := fake.UnstructuredObject(kinds.Namespace(),
core.Name("test-namespace"))
namespaceID := object.UnstructuredToObjMetadata(namespaceObj)
namespaceMeta := object.UnstructuredToObjMetadata(namespaceObj)
namespaceObjID := core.IDOf(namespaceObj)

uid := core.ID{
inventoryID := core.ID{
GroupKind: live.ResourceGroupGVK.GroupKind(),
ObjectKey: client.ObjectKey{
Name: "rs",
Expand All @@ -92,22 +98,31 @@ func TestDestroy(t *testing.T) {
etcdError := errors.New("etcdserver: request is too large") // satisfies util.IsRequestTooLargeError

testcases := []struct {
name string
events []event.Event
multiErr error
name string
events []event.Event
expectedError error
expectedObjectStatusMap ObjectStatusMap
expectedSyncStats *stats.SyncStats
}{
{
name: "unknown type for some resource",
events: []event.Event{
formDeleteEvent(event.DeleteFailed, testObj, applyerror.NewUnknownTypeError(testError1)),
formDeleteEvent(event.DeleteFailed, testObj1, applyerror.NewUnknownTypeError(testError1)),
formDeleteEvent(event.DeletePending, testObj2, nil),
},
multiErr: DeleteErrorForResource(testError1, idFrom(testID)),
expectedError: DeleteErrorForResource(testError1, testObj1ID),
expectedObjectStatusMap: ObjectStatusMap{
testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationFailed},
testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationPending},
},
expectedSyncStats: stats.NewSyncStats().
WithDeleteEvents(event.DeleteFailed, 1).
WithDeleteEvents(event.DeletePending, 1),
},
{
name: "conflict error for some resource",
events: []event.Event{
formDeleteSkipEvent(testID, testObj.DeepCopy(), &inventory.PolicyPreventedActuationError{
formDeleteSkipEvent(testObj1Meta, testObj1.DeepCopy(), &inventory.PolicyPreventedActuationError{
Strategy: actuation.ActuationStrategyDelete,
Policy: inventory.PolicyMustMatch,
Status: inventory.NoMatch,
Expand All @@ -116,73 +131,119 @@ func TestDestroy(t *testing.T) {
},
// Prunes and Deletes ignore PolicyPreventedActuationErrors.
// This allows abandoning of managed objects.
multiErr: nil,
expectedError: nil,
expectedObjectStatusMap: ObjectStatusMap{
testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSkipped},
testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationPending},
},
expectedSyncStats: stats.NewSyncStats().
WithDeleteEvents(event.DeleteSkipped, 1).
WithDeleteEvents(event.DeletePending, 1),
},
{
name: "inventory object is too large",
events: []event.Event{
formErrorEvent(etcdError),
},
multiErr: largeResourceGroupError(etcdError, uid),
expectedError: largeResourceGroupError(etcdError, inventoryID),
expectedObjectStatusMap: ObjectStatusMap{},
expectedSyncStats: stats.NewSyncStats().
WithErrorEvents(1),
},
{
name: "failed to delete",
events: []event.Event{
formDeleteEvent(event.DeleteFailed, testObj, testError1),
formDeleteEvent(event.DeleteFailed, testObj1, testError1),
formDeleteEvent(event.DeletePending, testObj2, nil),
},
multiErr: DeleteErrorForResource(testError1, idFrom(testID)),
expectedError: DeleteErrorForResource(testError1, testObj1ID),
expectedObjectStatusMap: ObjectStatusMap{
testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationFailed},
testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationPending},
},
expectedSyncStats: stats.NewSyncStats().
WithDeleteEvents(event.DeleteFailed, 1).
WithDeleteEvents(event.DeletePending, 1),
},
{
name: "skipped delete",
events: []event.Event{
formDeleteEvent(event.DeleteSuccessful, testObj, nil),
formDeleteEvent(event.DeleteSuccessful, testObj1, nil),
formDeleteEvent(event.DeleteSkipped, namespaceObj, &filter.NamespaceInUseError{
Namespace: "test-namespace",
}),
formDeleteEvent(event.DeleteSuccessful, testObj2, nil),
},
multiErr: SkipErrorForResource(
expectedError: SkipErrorForResource(
errors.New("namespace still in use: test-namespace"),
idFrom(namespaceID),
idFrom(namespaceMeta),
actuation.ActuationStrategyDelete),
expectedObjectStatusMap: ObjectStatusMap{
testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded},
namespaceObjID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSkipped},
testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded},
},
expectedSyncStats: stats.NewSyncStats().
WithDeleteEvents(event.DeleteSuccessful, 2).
WithDeleteEvents(event.DeleteSkipped, 1),
},
{
name: "all passed",
events: []event.Event{
formDeleteEvent(event.DeletePending, testObj2, nil),
formDeleteEvent(event.DeleteSuccessful, testObj, nil),
formDeleteEvent(event.DeleteSuccessful, deploymentObj, nil),
formDeleteEvent(event.DeleteSuccessful, testObj1, nil),
formDeleteEvent(event.DeleteSuccessful, deploymentObj1, nil),
},
expectedObjectStatusMap: ObjectStatusMap{
testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationPending},
testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded},
deploymentObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSucceeded},
},
expectedSyncStats: stats.NewSyncStats().
WithDeleteEvents(event.DeleteSuccessful, 2).
WithDeleteEvents(event.DeletePending, 1),
},
{
name: "all failed",
events: []event.Event{
formDeleteEvent(event.DeletePending, testObj2, nil),
formDeleteEvent(event.DeleteFailed, testObj, testError1),
formDeleteEvent(event.DeleteFailed, deploymentObj, testError2),
formDeleteEvent(event.DeleteFailed, testObj1, testError1),
formDeleteEvent(event.DeleteFailed, deploymentObj1, testError2),
},
multiErr: status.Wrap(
DeleteErrorForResource(testError1, idFrom(testID)),
DeleteErrorForResource(testError2, idFrom(deploymentID))),
expectedError: status.Wrap(
DeleteErrorForResource(testError1, testObj1ID),
DeleteErrorForResource(testError2, deploymentObj1ID)),
expectedObjectStatusMap: ObjectStatusMap{
testObj2ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationPending},
testObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationFailed},
deploymentObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationFailed},
},
expectedSyncStats: stats.NewSyncStats().
WithDeleteEvents(event.DeleteFailed, 2).
WithDeleteEvents(event.DeletePending, 1),
},
{
name: "failed dependency during delete",
events: []event.Event{
formDeleteSkipEventWithDependent(deploymentObj.DeepCopy(), deployment2Obj.DeepCopy()),
formDeleteSkipEventWithDependent(deploymentObj1.DeepCopy(), deploymentObj2.DeepCopy()),
},
multiErr: SkipErrorForResource(
expectedError: SkipErrorForResource(
&filter.DependencyPreventedActuationError{
Object: deploymentID,
Object: deploymentObj1Meta,
Strategy: actuation.ActuationStrategyDelete,
Relationship: filter.RelationshipDependent,
Relation: deployment2ID,
Relation: deploymentObj2Meta,
RelationPhase: filter.PhaseReconcile,
RelationActuationStatus: actuation.ActuationSucceeded,
RelationReconcileStatus: actuation.ReconcileTimeout,
},
idFrom(deploymentID),
deploymentObj1ID,
actuation.ActuationStrategyDelete),
expectedObjectStatusMap: ObjectStatusMap{
deploymentObj1ID: &ObjectStatus{Strategy: actuation.ActuationStrategyDelete, Actuation: actuation.ActuationSkipped},
},
expectedSyncStats: stats.NewSyncStats().
WithDeleteEvents(event.DeleteSkipped, 1),
},
}

Expand All @@ -198,8 +259,21 @@ func TestDestroy(t *testing.T) {
destroyer, err := NewNamespaceSupervisor(cs, "test-namespace", "rs", 5*time.Minute)
require.NoError(t, err)

errs := destroyer.Destroy(context.Background())
testerrors.AssertEqual(t, tc.multiErr, errs)
var errs status.MultiError
eventHandler := func(event Event) {
if errEvent, ok := event.(ErrorEvent); ok {
if errs == nil {
errs = errEvent.Error
} else {
errs = status.Append(errs, errEvent.Error)
}
}
}

objectStatusMap, syncStats := destroyer.Destroy(context.Background(), eventHandler)
testutil.AssertEqual(t, tc.expectedObjectStatusMap, objectStatusMap)
testutil.AssertEqual(t, tc.expectedSyncStats, syncStats)
testerrors.AssertEqual(t, tc.expectedError, errs)
})
}
}
Expand Down
27 changes: 13 additions & 14 deletions pkg/applier/fake/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"kpt.dev/configsync/pkg/applier"
"kpt.dev/configsync/pkg/applier/stats"
"kpt.dev/configsync/pkg/status"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -28,12 +29,9 @@ import (
// This is not in kpt.dev/configsync/pkg/testing/fake because that would cause
// a import loop (applier -> fake -> applier).
type Applier struct {
ApplyCalls int
ApplyInputs []ApplierInputs
ApplyOutputs []ApplierOutputs

ApplyCalls, ErrorsCalls int

currentErrors status.MultiError
}

// ApplierInputs stores inputs for fake.Applier.Apply()
Expand All @@ -43,26 +41,27 @@ type ApplierInputs struct {

// ApplierOutputs stores outputs for fake.Applier.Apply()
type ApplierOutputs struct {
Errors status.MultiError
Errors []status.Error
ObjectStatusMap applier.ObjectStatusMap
SyncStats *stats.SyncStats
}

// Apply fakes applier.Applier.Apply()
func (a *Applier) Apply(_ context.Context, objects []client.Object) status.MultiError {
func (a *Applier) Apply(_ context.Context, eventHandler func(applier.Event), objects []client.Object) (applier.ObjectStatusMap, *stats.SyncStats) {
a.ApplyInputs = append(a.ApplyInputs, ApplierInputs{
Objects: objects,
})
if a.ApplyCalls >= len(a.ApplyOutputs) {
panic(fmt.Sprintf("Expected only %d calls to Applier.Apply, but got more. Update Applier.ApplyOutputs if this is expected.", len(a.ApplyOutputs)))
panic(fmt.Sprintf("Expected only %d calls to Applier.Apply, but got more. Update Applier.Outputs if this is expected.", len(a.ApplyOutputs)))
}
outputs := a.ApplyOutputs[a.ApplyCalls]
a.ApplyCalls++
a.currentErrors = outputs.Errors
return outputs.Errors
}

// Errors fakes applier.Applier.Errors()
func (a *Applier) Errors() status.MultiError {
return a.currentErrors
for _, err := range outputs.Errors {
eventHandler(applier.ErrorEvent{
Error: err,
})
}
return outputs.ObjectStatusMap, outputs.SyncStats
}

var _ applier.Applier = &Applier{}