Skip to content

Commit

Permalink
Revert "Reduce BundleDeployment triggering on deployed resources upda…
Browse files Browse the repository at this point in the history
…tes"

This reverts PR rancher#2031
  • Loading branch information
aruiz14 committed Mar 22, 2024
1 parent b5f25c1 commit 2ec31f4
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 262 deletions.
2 changes: 1 addition & 1 deletion .github/scripts/run-integration-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ KUBEBUILDER_ASSETS=$(setup-envtest use --use-env -p path $ENVTEST_K8S_VERSION)
export KUBEBUILDER_ASSETS

# run integration tests
go test ./integrationtests/agent/...
go test ./integrationtests/...
8 changes: 1 addition & 7 deletions integrationtests/agent/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,7 @@ var _ = BeforeSuite(func() {
Expect(k8sClient).NotTo(BeNil())

specEnvs = make(map[string]*specEnv, 2)
for id, f := range map[string]specResources{
"capabilitybundle": capabilityBundleResources,
"orphanbundle": orphanBundeResources,
"watchertrigger": func() map[string][]v1alpha1.BundleResource {
return nil
},
} {
for id, f := range map[string]specResources{"capabilitybundle": capabilityBundleResources, "orphanbundle": orphanBundeResources} {
namespace, err := utils.NewNamespaceName()
Expect(err).ToNot(HaveOccurred())
fmt.Printf("Creating namespace %s\n", namespace)
Expand Down
192 changes: 0 additions & 192 deletions integrationtests/agent/watcher_trigger_test.go

This file was deleted.

77 changes: 16 additions & 61 deletions internal/cmd/agent/trigger/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,17 @@ package trigger
import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/rancher/fleet/pkg/durations"
"github.com/rancher/wrangler/v2/pkg/objectset"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"

"github.com/rancher/fleet/pkg/durations"
)

type Trigger struct {
Expand All @@ -28,11 +26,6 @@ type Trigger struct {
triggers map[schema.GroupVersionKind]map[objectset.ObjectKey]map[string]func()
restMapper meta.RESTMapper
client dynamic.Interface

// seenGenerations keeps a registry of the object UIDs and the latest observed generation, if any
// Uses sync.Map for a safe concurrent usage.
// Uses atomic.Int64 as values in order to stick to the first use case described at https://pkg.go.dev/sync#Map
seenGenerations sync.Map
}

func New(ctx context.Context, restMapper meta.RESTMapper, client dynamic.Interface) *Trigger {
Expand Down Expand Up @@ -132,39 +125,10 @@ func (t *Trigger) OnChange(key string, defaultNamespace string, trigger func(),
return nil
}

func (t *Trigger) storeObjectGeneration(uid types.UID, generation int64) {
value := new(atomic.Int64)
value.Store(generation)
t.seenGenerations.Store(uid, value)
}

func (t *Trigger) call(gvk schema.GroupVersionKind, obj metav1.Object, deleted bool) {
// If this type populates Generation metadata, use it to filter events that didn't modify that field
if currentGeneration := obj.GetGeneration(); currentGeneration != 0 {
uid := obj.GetUID()
// if the object is being deleted, just forget about it and execute the callback
if deleted {
t.seenGenerations.Delete(uid)
} else {
// keep a map of UID -> generation, using sync.Map and atomic.Int64 for safe concurrent usage
// - sync.Map entries are never modified after created, a pointer is used as value
// - using atomic.Int64 as values allows safely comparing and updating the current Generation value
if value, ok := t.seenGenerations.Load(uid); !ok {
t.storeObjectGeneration(uid, currentGeneration)
} else {
previous := value.(*atomic.Int64)
// Set current generation and retrieve the previous value. if unchanged, do nothing and return early
if previousGeneration := previous.Swap(currentGeneration); previousGeneration == currentGeneration {
return
}
}
}
}

func (t *Trigger) call(gvk schema.GroupVersionKind, key objectset.ObjectKey) {
t.RLock()
defer t.RUnlock()

key := objectset.ObjectKey{Namespace: obj.GetNamespace(), Name: obj.GetName()}
for _, f := range t.triggers[gvk][key] {
f()
}
Expand Down Expand Up @@ -212,27 +176,22 @@ type watcher struct {
}

func (w *watcher) Start(ctx context.Context) {
// resourceVersion is used as a checkpoint if the Watch operation is interrupted.
// the for loop will resume watching with a non-empty resource version to avoid missing or repeating events
resourceVersion := ""
for {
w.Lock()
if w.stopped {
// The Watch operation was intentionally stopped, exit the loop
w.Unlock()
return
break
}
w.Unlock()

// Watch is non-blocking, the response allows consuming the events or stopping
// An error may mean the connection could not be established for some reason
time.Sleep(durations.TriggerSleep)
resp, err := w.client.Resource(w.gvr).Watch(ctx, metav1.ListOptions{
AllowWatchBookmarks: true,
ResourceVersion: resourceVersion,
})
if err != nil {
resourceVersion = ""
time.Sleep(durations.WatchErrorRetrySleep)
continue
}

Expand All @@ -241,25 +200,21 @@ func (w *watcher) Start(ctx context.Context) {
w.Unlock()

for event := range resp.ResultChan() {
// Not all events include a Kubernetes object payload (see the event.Event godoc), filter those out.
obj, err := meta.Accessor(event.Object)
if err != nil {
continue
meta, err := meta.Accessor(event.Object)
var key objectset.ObjectKey
if err == nil {
resourceVersion = meta.GetResourceVersion()
key.Name = meta.GetName()
key.Namespace = meta.GetNamespace()
}

// Store resource version for later resuming if watching is interrupted
resourceVersion = obj.GetResourceVersion()

switch event.Type {
// Just initialize the seen generations.
case watch.Added:
if generation := obj.GetGeneration(); generation != 0 {
w.t.storeObjectGeneration(obj.GetUID(), generation)
}
// Only trigger for Modified or Deleted objects.
case watch.Modified, watch.Deleted:
deleted := event.Type == watch.Deleted
w.t.call(w.gvk, obj, deleted)
fallthrough
case watch.Modified:
fallthrough
case watch.Deleted:
w.t.call(w.gvk, key)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/durations/durations.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
RestConfigTimeout = time.Second * 15
ServiceTokenSleep = time.Second * 2
TokenClusterEnqueueDelay = time.Second * 2
WatchErrorRetrySleep = time.Second * 2
TriggerSleep = time.Second * 2
DefaultCpuPprofPeriod = time.Minute
ReleaseCacheTTL = time.Minute * 5
)

0 comments on commit 2ec31f4

Please sign in to comment.