Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions pkg/controller/generic_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (gr *GenericReconciler) processNamespacedResources(
gr.logger.Info("reconcileNamespaceResources",
"Reconciling group of", len(objects), "objects with labels", label,
"in the namespace", ns.name)
err := gr.reconcileGroupOfObjects(ctx, objects, ns.name)
err := gr.reconcileGroupOfObjects(objects, ns.uid)
if err != nil {
return fmt.Errorf(
"reconciling related objects with labels '%s': %w", label, err,
Expand All @@ -307,15 +307,13 @@ func (gr *GenericReconciler) processNamespacedResources(
return nil
}

func (gr *GenericReconciler) reconcileGroupOfObjects(ctx context.Context,
objs []*unstructured.Unstructured, namespace string) error {
func (gr *GenericReconciler) reconcileGroupOfObjects(objs []*unstructured.Unstructured, namespaceUID string) error {

if gr.allObjectsValidated(objs) {
if gr.allObjectsValidated(objs, namespaceUID) {
gr.logger.Info("reconcileGroupOfObjects", "All objects are validated", "Nothing to do")
return nil
}

namespaceUID := gr.watchNamespaces.getNamespaceUID(namespace)
cliObjects := make([]client.Object, 0, len(objs))
for _, o := range objs {
typedClientObject, err := gr.unstructuredToTyped(o)
Expand All @@ -330,21 +328,21 @@ func (gr *GenericReconciler) reconcileGroupOfObjects(ctx context.Context,
return fmt.Errorf("running validations: %w", err)
}
for _, o := range objs {
gr.objectValidationCache.store(o, outcome)
gr.objectValidationCache.store(o, namespaceUID, outcome)
}

return nil
}

// allObjectsValidated checks whether all unstructured objects passed as argument are validated
// and thus present in the cache
func (gr *GenericReconciler) allObjectsValidated(objs []*unstructured.Unstructured) bool {
func (gr *GenericReconciler) allObjectsValidated(objs []*unstructured.Unstructured, namespaceID string) bool {
allObjectsValidated := true
// we must be sure that all objects in the given group are cached (validated)
// see DVO-103
for _, o := range objs {
gr.currentObjects.store(o, "")
if !gr.objectValidationCache.objectAlreadyValidated(o) {
gr.currentObjects.store(o, namespaceID, "")
if !gr.objectValidationCache.objectAlreadyValidated(o, namespaceID) {
allObjectsValidated = false
}
}
Expand Down Expand Up @@ -384,7 +382,7 @@ func (gr *GenericReconciler) handleResourceDeletions() {
Kind: k.kind,
Name: k.name,
Namespace: k.namespace,
NamespaceUID: gr.watchNamespaces.getNamespaceUID(k.namespace),
NamespaceUID: k.nsID,
UID: v.uid,
}

Expand Down
22 changes: 15 additions & 7 deletions pkg/controller/generic_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,11 +869,12 @@ func TestProcessNamespacedResources(t *testing.T) {
err = testReconciler.processNamespacedResources(context.Background(), tt.gvks, tt.namespaces)
assert.NoError(t, err)
for _, o := range tt.objects {
vr, ok := testReconciler.objectValidationCache.retrieve(o)
namespaceID := testReconciler.watchNamespaces.getNamespaceUID(o.GetNamespace())
vr, ok := testReconciler.objectValidationCache.retrieve(o, namespaceID)
assert.True(t, ok, "can't find object %v in the validation cache", o)
assert.Equal(t, string(o.GetUID()), vr.uid)

co, ok := testReconciler.currentObjects.retrieve(o)
co, ok := testReconciler.currentObjects.retrieve(o, namespaceID)
assert.True(t, ok, "can't find object %v in the current objects", o)
assert.Equal(t, string(o.GetUID()), co.uid)
}
Expand Down Expand Up @@ -987,26 +988,33 @@ func TestHandleResourceDeletions(t *testing.T) {

// store the test objects in the caches
for _, co := range tt.testCurrentObjects {
testReconciler.currentObjects.store(co, validations.ObjectNeedsImprovement)
testReconciler.currentObjects.store(co,
testReconciler.watchNamespaces.getNamespaceUID(co.GetNamespace()),
validations.ObjectNeedsImprovement)
}
for _, co := range tt.testValidatedObjects {
testReconciler.objectValidationCache.store(co, validations.ObjectNeedsImprovement)
testReconciler.objectValidationCache.store(co,
testReconciler.watchNamespaces.getNamespaceUID(co.GetNamespace()),
validations.ObjectNeedsImprovement)
}
testReconciler.handleResourceDeletions()
// currentObjects should be always empty after calling handleResourceDeletions
for _, co := range tt.testCurrentObjects {
_, ok := testReconciler.currentObjects.retrieve(co)
_, ok := testReconciler.currentObjects.retrieve(co,
testReconciler.watchNamespaces.getNamespaceUID(co.GetNamespace()))
assert.False(t, ok)
}

if tt.expectedValidatedObjects == nil {
for _, vo := range tt.testValidatedObjects {
_, ok := testReconciler.objectValidationCache.retrieve(vo)
_, ok := testReconciler.objectValidationCache.retrieve(vo,
testReconciler.watchNamespaces.getNamespaceUID(vo.GetNamespace()))
assert.False(t, ok)
}
} else {
for _, vo := range tt.expectedValidatedObjects {
_, ok := testReconciler.objectValidationCache.retrieve(vo)
_, ok := testReconciler.objectValidationCache.retrieve(vo,
testReconciler.watchNamespaces.getNamespaceUID(vo.GetNamespace()))
assert.True(t, ok)
}
}
Expand Down
28 changes: 15 additions & 13 deletions pkg/controller/validationscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
)

type validationKey struct {
group, version, kind, namespace, name string
uid types.UID
group, version, kind string
name, namespace, nsID string
uid types.UID
}

type resourceVersion string
Expand All @@ -19,14 +20,15 @@ func newResourceversionVal(str string) resourceVersion {

// newValidationKey returns a unique identifier for the given
// object suitable for hashing.
func newValidationKey(obj client.Object) validationKey {
func newValidationKey(obj client.Object, nsID string) validationKey {
gvk := obj.GetObjectKind().GroupVersionKind()
return validationKey{
group: gvk.Group,
version: gvk.Version,
kind: gvk.Kind,
namespace: obj.GetNamespace(),
name: obj.GetName(),
namespace: obj.GetNamespace(),
nsID: nsID,
uid: obj.GetUID(),
}
}
Expand Down Expand Up @@ -67,8 +69,8 @@ func (vc *validationCache) has(key validationKey) bool {
// store caches a 'ValidationOutcome' for the given 'Object'.
// constraint: cached outcomes will be updated in-place for a given object and
// consecutive updates will not preserve previous state.
func (vc *validationCache) store(obj client.Object, outcome validations.ValidationOutcome) {
key := newValidationKey(obj)
func (vc *validationCache) store(obj client.Object, nsID string, outcome validations.ValidationOutcome) {
key := newValidationKey(obj, nsID)
(*vc)[key] = newValidationResource(
newResourceversionVal(obj.GetResourceVersion()),
string(obj.GetUID()),
Expand All @@ -85,8 +87,8 @@ func (vc *validationCache) drain() {
// remove uncaches the 'ValidationOutcome' for the
// given object if it exists and performs a noop
// if it does not.
func (vc *validationCache) remove(obj client.Object) {
key := newValidationKey(obj)
func (vc *validationCache) remove(obj client.Object, nsID string) {
key := newValidationKey(obj, nsID)
vc.removeKey(key)
}

Expand All @@ -98,8 +100,8 @@ func (vc *validationCache) removeKey(key validationKey) {
// retrieve returns a tuple of 'validationResource' (if present)
// and 'ok' which returns 'true' if a 'validationResource' exists
// for the given 'Object' and 'false' otherwise.
func (vc *validationCache) retrieve(obj client.Object) (*validationResource, bool) {
key := newValidationKey(obj)
func (vc *validationCache) retrieve(obj client.Object, nsID string) (*validationResource, bool) {
key := newValidationKey(obj, nsID)
val, exists := (*vc)[key]
return val, exists
}
Expand All @@ -110,15 +112,15 @@ func (vc *validationCache) retrieve(obj client.Object) (*validationResource, boo
// If the 'ResourceVersion' of an existing 'Object' is stale the cached
// 'ValidationOutcome' is removed and 'false' is returned. In all other
// cases 'false' is returned.
func (vc *validationCache) objectAlreadyValidated(obj client.Object) bool {
validationOutcome, ok := vc.retrieve(obj)
func (vc *validationCache) objectAlreadyValidated(obj client.Object, nsID string) bool {
validationOutcome, ok := vc.retrieve(obj, nsID)
if !ok {
return false
}
storedResourceVersion := validationOutcome.version
currentResourceVersion := obj.GetResourceVersion()
if string(storedResourceVersion) != currentResourceVersion {
vc.remove(obj)
vc.remove(obj, nsID)
return false
}
return true
Expand Down
30 changes: 15 additions & 15 deletions pkg/controller/validationscache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ func TestValidationsCache(t *testing.T) {
}}

// When
mock.store(&mockClientObject, "mock_outcome")
mock.store(&mockClientObject, "", "mock_outcome")

// Assert
expected := newValidationResource(newResourceversionVal("mock_version"), "mock_uid", "mock_outcome")
assert.Equal(t, expected, (*mock)[newValidationKey(&mockClientObject)])
assert.Equal(t, expected, (*mock)[newValidationKey(&mockClientObject, "")])
})

t.Run("objectAlreadyValidated : key does not exist", func(t *testing.T) {
Expand All @@ -45,7 +45,7 @@ func TestValidationsCache(t *testing.T) {
}}

// When
test := mock.objectAlreadyValidated(&mockClientObject)
test := mock.objectAlreadyValidated(&mockClientObject, "")

// Assert
assert.False(t, test)
Expand All @@ -58,12 +58,12 @@ func TestValidationsCache(t *testing.T) {
ResourceVersion: "mock_version",
UID: "mock_uid",
}}
mock.store(&mockClientObject, "mock_outcome")
toBeRemovedKey := newValidationKey(&mockClientObject)
mock.store(&mockClientObject, "", "mock_outcome")
toBeRemovedKey := newValidationKey(&mockClientObject, "")

// When
mockClientObject.ResourceVersion = "mock_new_version"
test := mock.objectAlreadyValidated(&mockClientObject)
test := mock.objectAlreadyValidated(&mockClientObject, "")

// Assert
assert.False(t, test)
Expand All @@ -77,10 +77,10 @@ func TestValidationsCache(t *testing.T) {
ResourceVersion: "mock_version",
UID: "mock_uid",
}}
mock.store(&mockClientObject, "mock_outcome")
mock.store(&mockClientObject, "", "mock_outcome")

// When
test := mock.objectAlreadyValidated(&mockClientObject)
test := mock.objectAlreadyValidated(&mockClientObject, "")

// Assert
assert.True(t, test)
Expand All @@ -103,14 +103,14 @@ func TestValidationsCache(t *testing.T) {
UID: "bar345",
},
}
testCache.store(&dep1, validations.ObjectNeedsImprovement)
testCache.store(&dep2, validations.ObjectValid)
testCache.store(&dep1, "", validations.ObjectNeedsImprovement)
testCache.store(&dep2, "", validations.ObjectValid)

resource1, exists := testCache.retrieve(&dep1)
resource1, exists := testCache.retrieve(&dep1, "")
assert.True(t, exists)
assert.Equal(t, validations.ObjectNeedsImprovement, resource1.outcome)

resource2, exists := testCache.retrieve(&dep2)
resource2, exists := testCache.retrieve(&dep2, "")
assert.True(t, exists)
assert.Equal(t, validations.ObjectValid, resource2.outcome)
})
Expand All @@ -128,19 +128,19 @@ func Benchmark_ValidationCache(b *testing.B) {

for i := 0; i < b.N; i++ {
name := fmt.Sprintf("test-%d", i)
vc.store(&appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: name}}, validations.ObjectValid)
vc.store(&appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: name}}, "", validations.ObjectValid)
}
printMemoryInfo(fmt.Sprintf("Memory consumption after storing %d items in the cache", b.N))
for i := 0; i < b.N; i++ {
name := fmt.Sprintf("test-%d", i)
vc.remove(&appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: name}})
vc.remove(&appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: name}}, "")
}
runtime.GC()
printMemoryInfo("Memory consumption after removing the items ")

for i := 0; i < b.N; i++ {
name := fmt.Sprintf("test-%d", i)
vc.store(&appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: name}}, validations.ObjectValid)
vc.store(&appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: name}}, "", validations.ObjectValid)
}
printMemoryInfo(fmt.Sprintf("Memory consumption after storing %d items again", b.N))
}