Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Make TestCachedDiscovery more robust #2110

Merged
merged 1 commit into from
May 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 13 additions & 31 deletions cluster/kubernetes/cached_disco.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,33 +60,6 @@ func (d *cachedDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*
// the only avenue of a change to the API resources in a running
// system is CRDs being added, updated or deleted.
func MakeCachedDiscovery(d discovery.DiscoveryInterface, c crd.Interface, shutdown <-chan struct{}) discovery.CachedDiscoveryInterface {
result, _, _ := makeCachedDiscovery(d, c, shutdown, makeInvalidatingHandler)
return result
}

// ---

func makeInvalidatingHandler(cached discovery.CachedDiscoveryInterface) toolscache.ResourceEventHandler {
var handler toolscache.ResourceEventHandler = toolscache.ResourceEventHandlerFuncs{
AddFunc: func(_ interface{}) {
cached.Invalidate()
},
UpdateFunc: func(_, _ interface{}) {
cached.Invalidate()
},
DeleteFunc: func(_ interface{}) {
cached.Invalidate()
},
}
return handler
}

type makeHandle func(discovery.CachedDiscoveryInterface) toolscache.ResourceEventHandler

// makeCachedDiscovery constructs a cached discovery client, with more
// flexibility than MakeCachedDiscovery; e.g., with extra handlers for
// testing.
func makeCachedDiscovery(d discovery.DiscoveryInterface, c crd.Interface, shutdown <-chan struct{}, handlerFn makeHandle) (*cachedDiscovery, toolscache.Store, toolscache.Controller) {
cachedDisco := &cachedDiscovery{CachedDiscoveryInterface: memory.NewMemCacheClient(d)}
// We have an empty cache, so it's _a priori_ invalid. (Yes, that's the zero value, but better safe than sorry)
cachedDisco.Invalidate()
Expand All @@ -100,12 +73,21 @@ func makeCachedDiscovery(d discovery.DiscoveryInterface, c crd.Interface, shutdo
return crdClient.Watch(options)
},
}

handler := handlerFn(cachedDisco)
store, controller := toolscache.NewInformer(lw, &crdv1beta1.CustomResourceDefinition{}, 0, handler)
handle := toolscache.ResourceEventHandlerFuncs{
AddFunc: func(_ interface{}) {
cachedDisco.Invalidate()
},
UpdateFunc: func(_, _ interface{}) {
cachedDisco.Invalidate()
},
DeleteFunc: func(_ interface{}) {
cachedDisco.Invalidate()
},
}
_, controller := toolscache.NewInformer(lw, &crdv1beta1.CustomResourceDefinition{}, 0, handle)
go cachedDisco.invalidatePeriodically(shutdown)
go controller.Run(shutdown)
return cachedDisco, store, controller
return cachedDisco
}

func (d *cachedDiscovery) invalidatePeriodically(shutdown <-chan struct{}) {
Expand Down
73 changes: 16 additions & 57 deletions cluster/kubernetes/cached_disco_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,12 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
crdv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
crdfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
toolscache "k8s.io/client-go/tools/cache"
)

type chainHandler struct {
first toolscache.ResourceEventHandler
next toolscache.ResourceEventHandler
}

func (h chainHandler) OnAdd(obj interface{}) {
h.first.OnAdd(obj)
h.next.OnAdd(obj)
}

func (h chainHandler) OnUpdate(old, new interface{}) {
h.first.OnUpdate(old, new)
h.next.OnUpdate(old, new)
}

func (h chainHandler) OnDelete(old interface{}) {
h.first.OnDelete(old)
h.next.OnDelete(old)
}

func TestCachedDiscovery(t *testing.T) {
coreClient := makeFakeClient()

Expand All @@ -55,20 +34,7 @@ func TestCachedDiscovery(t *testing.T) {
shutdown := make(chan struct{})
defer close(shutdown)

// this extra handler means we can synchronise on the add later
// being processed
allowAdd := make(chan interface{})

addHandler := toolscache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
allowAdd <- obj
},
}
makeHandler := func(d discovery.CachedDiscoveryInterface) toolscache.ResourceEventHandler {
return chainHandler{first: makeInvalidatingHandler(d), next: addHandler}
}

cachedDisco, store, _ := makeCachedDiscovery(coreClient.Discovery(), crdClient, shutdown, makeHandler)
cachedDisco := MakeCachedDiscovery(coreClient.Discovery(), crdClient, shutdown)

saved := getDefaultNamespace
getDefaultNamespace = func() (string, error) { return "bar-ns", nil }
Expand Down Expand Up @@ -109,26 +75,19 @@ func TestCachedDiscovery(t *testing.T) {
}

// Wait for the update to "go through"
select {
case <-allowAdd:
break
case <-time.After(time.Second):
t.Fatal("timed out waiting for Add to happen")
}

_, exists, err := store.Get(myCRD)
if err != nil {
t.Error(err)
}
if !exists {
t.Error("does not exist")
}

namespaced, err = namespacer.lookupNamespaced("foo/v1", "Custom", nil)
if err != nil {
t.Fatal(err)
}
if namespaced {
t.Error("got true from lookupNamespaced, expecting false (after changing it)")
c := time.After(time.Second)
loop:
for {
select {
default:
namespaced, err = namespacer.lookupNamespaced("foo/v1", "Custom", nil)
assert.NoError(t, err)
if !namespaced {
break loop
}
time.Sleep(10 * time.Millisecond)
case <-c:
t.Fatal("timed out waiting for Update to happen")
}
}
}