Skip to content

Commit

Permalink
resource: Fix flaky test due to missing Done call
Browse files Browse the repository at this point in the history
The workaround in 5fa2ac4 was faulty as it did not call ev.Done
on the received event leading to further events not being received
and thus eventually causing the context to time out. The problem was
reproduced reliably locally by adding short time.Sleep calls to the
pushUpdate and the initial listing to trigger the race causing the
double updates.

To remedy this, add the missing ev.Done call, handle timeout
gracefully and make assertions FailNow immediately to not mask
problems due to e.g. nil deref.

FIxes: #24696
Fixes: 5fa2ac4 ("resource: Work around a rare race in initial sync")

Signed-off-by: Jussi Maki <jussi@isovalent.com>
  • Loading branch information
joamaki authored and julianwiedmann committed Jun 2, 2023
1 parent 96685b3 commit e76d40f
Showing 1 changed file with 46 additions and 33 deletions.
79 changes: 46 additions & 33 deletions pkg/k8s/resource/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -125,26 +126,31 @@ func TestResource_WithFakeClient(t *testing.T) {
}

// First event should be the node (initial set)
ev := <-events
assert.Equal(t, resource.Upsert, ev.Kind)
assert.Equal(t, ev.Key.Name, nodeName)
assert.Equal(t, ev.Object.GetName(), node.Name)
assert.Equal(t, ev.Object.Status.Phase, node.Status.Phase)
ev, ok := <-events
require.True(t, ok)
require.Equal(t, resource.Upsert, ev.Kind)
require.Equal(t, ev.Key.Name, nodeName)
require.Equal(t, ev.Object.GetName(), node.Name)
require.Equal(t, ev.Object.Status.Phase, node.Status.Phase)
ev.Done(nil)

// Second should be a sync.
//
// We work around the rare race condition in which we see the same
// upsert event twice due to it being inserted into store but our
// upsert event twice due to it being inserted into store while Resource's
// Add handler finishes after the initial listing has been processed (#23079).
// Proper fix is to make sure updates to store happen synchronously with queueing
// and subscribing.
ev = <-events
// and subscribing which requires locking around updates to the store (e.g. fork
// of informer in style of pkg/k8s/informer).
ev, ok = <-events
require.True(t, ok, "events channel closed unexpectedly")
if ev.Kind == resource.Upsert {
t.Logf("Ignored duplicate upsert event")
ev.Done(nil)
ev = <-events
}
assert.Equal(t, resource.Sync, ev.Kind)
assert.Nil(t, ev.Object)
require.Equal(t, resource.Sync, ev.Kind)
require.Nil(t, ev.Object)
ev.Done(nil)

// After sync event we can also use Store() without it blocking.
Expand All @@ -161,10 +167,11 @@ func TestResource_WithFakeClient(t *testing.T) {
corev1.SchemeGroupVersion.WithResource("nodes"),
node.DeepCopy(), "")

ev = <-events
assert.Equal(t, resource.Upsert, ev.Kind)
assert.Equal(t, ev.Key.Name, nodeName)
assert.Equal(t, ev.Object.Status.Phase, corev1.NodePhase("update1"))
ev, ok = <-events
require.True(t, ok, "events channel closed unexpectedly")
require.Equal(t, resource.Upsert, ev.Kind)
require.Equal(t, ev.Key.Name, nodeName)
require.Equal(t, ev.Object.Status.Phase, corev1.NodePhase("update1"))
ev.Done(nil)

// Test that multiple events for the same key are coalesced.
Expand All @@ -176,12 +183,14 @@ func TestResource_WithFakeClient(t *testing.T) {
ctx2, cancel2 := context.WithCancel(ctx)
events2 := nodes.Events(ctx2)

ev2 := <-events2
assert.Equal(t, resource.Upsert, ev2.Kind)
ev2, ok := <-events2
require.True(t, ok, "events channel closed unexpectedly")
require.Equal(t, resource.Upsert, ev2.Kind)
ev2.Done(nil)

ev2 = <-events2
assert.Equal(t, resource.Sync, ev2.Kind)
ev2, ok = <-events2
require.True(t, ok, "events channel closed unexpectedly")
require.Equal(t, resource.Sync, ev2.Kind)
ev2.Done(nil)

for i := 2; i <= 10; i++ {
Expand All @@ -191,9 +200,10 @@ func TestResource_WithFakeClient(t *testing.T) {
fakeClient.KubernetesFakeClientset.Tracker().Update(
corev1.SchemeGroupVersion.WithResource("nodes"),
node.DeepCopy(), "")
ev2 := <-events2
assert.Equal(t, resource.Upsert, ev2.Kind)
assert.Equal(t, version, ev2.Object.ResourceVersion)
ev2, ok := <-events2
require.True(t, ok, "events channel closed unexpectedly")
require.Equal(t, resource.Upsert, ev2.Kind)
require.Equal(t, version, ev2.Object.ResourceVersion)
ev2.Done(nil)
}
cancel2()
Expand All @@ -203,15 +213,17 @@ func TestResource_WithFakeClient(t *testing.T) {

// We should now see either just the last change, or one intermediate change
// and the last change.
ev = <-events
assert.Equal(t, resource.Upsert, ev.Kind)
assert.Equal(t, nodeName, ev.Key.Name)
ev, ok = <-events
require.True(t, ok, "events channel closed unexpectedly")
require.Equal(t, resource.Upsert, ev.Kind)
require.Equal(t, nodeName, ev.Key.Name)
ev.Done(nil)
if ev.Object.ResourceVersion != node.ObjectMeta.ResourceVersion {
ev = <-events
assert.Equal(t, resource.Upsert, ev.Kind)
assert.Equal(t, nodeName, ev.Key.Name)
assert.Equal(t, node.ObjectMeta.ResourceVersion, ev.Object.ResourceVersion)
ev, ok = <-events
require.True(t, ok, "events channel closed unexpectedly")
require.Equal(t, resource.Upsert, ev.Kind)
require.Equal(t, nodeName, ev.Key.Name)
require.Equal(t, node.ObjectMeta.ResourceVersion, ev.Object.ResourceVersion)
ev.Done(nil)
}

Expand All @@ -220,17 +232,18 @@ func TestResource_WithFakeClient(t *testing.T) {
corev1.SchemeGroupVersion.WithResource("nodes"),
"", "some-node")

ev = <-events
assert.Equal(t, resource.Delete, ev.Kind)
assert.Equal(t, nodeName, ev.Key.Name)
assert.Equal(t, node.ObjectMeta.ResourceVersion, ev.Object.ResourceVersion)
ev, ok = <-events
require.True(t, ok, "events channel closed unexpectedly")
require.Equal(t, resource.Delete, ev.Kind)
require.Equal(t, nodeName, ev.Key.Name)
require.Equal(t, node.ObjectMeta.ResourceVersion, ev.Object.ResourceVersion)
ev.Done(nil)

// Cancel the subscriber context and verify that the stream gets completed.
cancel()

// No more events should be observed.
ev, ok := <-events
ev, ok = <-events
if ok {
t.Fatalf("unexpected event still in stream: %v", ev)
}
Expand Down

0 comments on commit e76d40f

Please sign in to comment.