Skip to content

Commit

Permalink
kvstore: fix TestWorkqueueSyncStoreMetrics flake
Browse files Browse the repository at this point in the history
This commit addresses a flake observed in the CI:

Error Trace:	/home/travis/gopath/src/github.com/cilium/cilium/pkg/kvstore/store/syncstore_test.go:392
Error:      	Not equal:
              	expected: 1
             	actual  : 2
Test:       	TestWorkqueueSyncStoreMetrics

This flake was caused by a race condition concerning setting and reading
the metric, and it is addressed asserting that the metric *eventually*
matches the expected value.

Additionally, all timeouts are raised to prevent the occurrence of flakes
when running in a noisy CI environment.

Signed-off-by: Marco Iorio <marco.iorio@isovalent.com>
  • Loading branch information
giorio94 authored and squeed committed May 26, 2023
1 parent 14faf27 commit 85a9ed2
Showing 1 changed file with 59 additions and 50 deletions.
109 changes: 59 additions & 50 deletions pkg/kvstore/store/syncstore_test.go
Expand Up @@ -18,6 +18,12 @@ import (
"github.com/cilium/cilium/pkg/option"
)

// Configure a generous timeout to prevent flakes when running in a noisy CI environment.
var (
tick = 10 * time.Millisecond
timeout = 5 * time.Second
)

type KVPair struct{ Key, Value string }

func NewKVPair(key, value string) *KVPair { return &KVPair{Key: key, Value: value} }
Expand Down Expand Up @@ -99,7 +105,7 @@ func (frl *fakeRateLimiter) Forget(item interface{}) {
}
func (frl *fakeRateLimiter) NumRequeues(item interface{}) int { return 0 }

func eventually(in <-chan *KVPair, timeout time.Duration) *KVPair {
func eventually(in <-chan *KVPair) *KVPair {
select {
case kv := <-in:
return kv
Expand Down Expand Up @@ -128,37 +134,37 @@ func TestWorkqueueSyncStore(t *testing.T) {
// Upserts should trigger the corresponding backend operation.
store.UpsertKey(ctx, NewKVPair("key1", "value1"))
store.UpsertKey(ctx, NewKVPair("key2", "value2"))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("/foo/bar/key2", "value2"), eventually(backend.updated))

// Unless the pair is already part of the known state.
store.UpsertKey(ctx, NewKVPair("key1", "value1"))
store.UpsertKey(ctx, NewKVPair("key3", "value3"))
require.Equal(t, NewKVPair("/foo/bar/key3", "value3"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key3", "value3"), eventually(backend.updated))

// Upserts for the same key should be coalescenced. In this case, it is guaranteed
// to happen since the first upsert blocks until we read from the channel.
store.UpsertKey(ctx, NewKVPair("key4", "value4"))
store.UpsertKey(ctx, NewKVPair("key1", "valueA"))
store.UpsertKey(ctx, NewKVPair("key1", "valueB"))
require.Equal(t, NewKVPair("/foo/bar/key4", "value4"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", "valueB"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key4", "value4"), eventually(backend.updated))
require.Equal(t, NewKVPair("/foo/bar/key1", "valueB"), eventually(backend.updated))

// Deletions should trigger the corresponding backend operation, only if known to exist.
store.DeleteKey(ctx, NewKVPair("key5", ""))
store.DeleteKey(ctx, NewKVPair("key4", ""))
require.Equal(t, NewKVPair("/foo/bar/key4", ""), eventually(backend.deleted, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key4", ""), eventually(backend.deleted))

// Both upserts and deletes should be retried in case an error is returned by the client
backend.errorsOnUpdate["/foo/bar/key1"] = 1
store.UpsertKey(ctx, NewKVPair("key1", "valueC"))
require.Equal(t, NewKVPair("/foo/bar/key1", "valueC"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", "valueC"), eventually(backend.updated, 250*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", "valueC"), eventually(backend.updated))
require.Equal(t, NewKVPair("/foo/bar/key1", "valueC"), eventually(backend.updated))

backend.errorsOnDelete["/foo/bar/key2"] = 1
store.DeleteKey(ctx, NewKVPair("key2", ""))
require.Equal(t, NewKVPair("/foo/bar/key2", ""), eventually(backend.deleted, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key2", ""), eventually(backend.deleted, 250*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key2", ""), eventually(backend.deleted))
require.Equal(t, NewKVPair("/foo/bar/key2", ""), eventually(backend.deleted))
}

func TestWorkqueueSyncStoreWithoutLease(t *testing.T) {
Expand All @@ -181,7 +187,7 @@ func TestWorkqueueSyncStoreWithoutLease(t *testing.T) {
// The fake backend checks whether the lease setting corresponds to the expected
// value, and emits a KVPair with the error message in case it does not match
store.UpsertKey(ctx, NewKVPair("key1", "value1"))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated))
}

func TestWorkqueueSyncStoreWithRateLimiter(t *testing.T) {
Expand All @@ -206,10 +212,10 @@ func TestWorkqueueSyncStoreWithRateLimiter(t *testing.T) {

// Assert that the custom rate limiter has been correctly called
store.UpsertKey(ctx, NewKVPair("key1", "value1"))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("key1", ""), eventually(limiter.whenCalled, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("key1", ""), eventually(limiter.forgetCalled, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("key1", ""), eventually(limiter.whenCalled))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("key1", ""), eventually(limiter.forgetCalled))
}

func TestWorkqueueSyncStoreWithWorkers(t *testing.T) {
Expand All @@ -230,15 +236,15 @@ func TestWorkqueueSyncStoreWithWorkers(t *testing.T) {
}()

store.UpsertKey(ctx, NewKVPair("key1", "value1"))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated))

// Since the Update() and Delete() functions implemented by the fake backend
// block until we read from the correposponding channel, reading in reversed
// order the elements from the two channels requires at least two workers
store.DeleteKey(ctx, NewKVPair("key1", "value1"))
store.UpsertKey(ctx, NewKVPair("key2", "value2"))
require.Equal(t, NewKVPair("/foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", ""), eventually(backend.deleted, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key2", "value2"), eventually(backend.updated))
require.Equal(t, NewKVPair("/foo/bar/key1", ""), eventually(backend.deleted))
}

func TestWorkqueueSyncStoreSynced(t *testing.T) {
Expand Down Expand Up @@ -274,12 +280,12 @@ func TestWorkqueueSyncStoreSynced(t *testing.T) {
store.Synced(ctx, callback, callback)
store.UpsertKey(ctx, NewKVPair("key3", "value3"))

require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated, 100*time.Millisecond).Key)
require.Equal(t, NewKVPair("callback/executed", ""), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("callback/executed", ""), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated))
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated).Key)
require.Equal(t, NewKVPair("callback/executed", ""), eventually(backend.updated))
require.Equal(t, NewKVPair("callback/executed", ""), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated))
}, WSSWithSourceClusterName("qux")))

t.Run("key-override", runnable(func(t *testing.T, ctx context.Context, backend *fakeBackend, store SyncStore) {
Expand All @@ -288,10 +294,10 @@ func TestWorkqueueSyncStoreSynced(t *testing.T) {
store.Synced(ctx)
store.UpsertKey(ctx, NewKVPair("key3", "value3"))

require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, "cilium/synced/qux/override", eventually(backend.updated, 100*time.Millisecond).Key)
require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated))
require.Equal(t, "cilium/synced/qux/override", eventually(backend.updated).Key)
require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated))
}, WSSWithSourceClusterName("qux"), WSSWithSyncedKeyOverride("override")))

t.Run("key-upsertion-failure", runnable(func(t *testing.T, ctx context.Context, backend *fakeBackend, store SyncStore) {
Expand All @@ -302,12 +308,12 @@ func TestWorkqueueSyncStoreSynced(t *testing.T) {
store.Synced(ctx)
store.UpsertKey(ctx, NewKVPair("key3", "value3"))

require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 250*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated))
// The synced key shall be created only once key1 has been successfully upserted.
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated, 100*time.Millisecond).Key)
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated).Key)
}, WSSWithSourceClusterName("qux")))

t.Run("synced-upsertion-failure", runnable(func(t *testing.T, ctx context.Context, backend *fakeBackend, store SyncStore) {
Expand All @@ -317,10 +323,10 @@ func TestWorkqueueSyncStoreSynced(t *testing.T) {
store.UpsertKey(ctx, NewKVPair("key2", "value2"))
store.Synced(ctx)

require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated, 100*time.Millisecond).Key)
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated, 250*time.Millisecond).Key)
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated))
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated).Key)
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated).Key)
}, WSSWithSourceClusterName("qux")))

// Assert that the synced key is created only after key1 has been successfully upserted also in case there are multiple workers
Expand All @@ -329,10 +335,10 @@ func TestWorkqueueSyncStoreSynced(t *testing.T) {
store.UpsertKey(ctx, NewKVPair("key1", "value1"))
store.Synced(ctx)

require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 1000*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 1000*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 1000*time.Millisecond))
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated, 500*time.Millisecond).Key)
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated).Key)
}, WSSWithSourceClusterName("qux"), WSSWithWorkers(10)))
}

Expand Down Expand Up @@ -387,19 +393,22 @@ func TestWorkqueueSyncStoreMetrics(t *testing.T) {
}()

// The metric should reflect the updated queue size (one in this case, since one element has been processed, and
// another is being processed --- stuck performing Update()).
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key1", "valueA"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, float64(1), testutil.ToFloat64(metrics.KVStoreSyncQueueSize.WithLabelValues("nodes/v1", "foo")))
// another is being processed --- stuck performing Update()). We need to assert this "eventually", because we have
// no guarantee that the processing of the second element has already started.
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key1", "valueA"), eventually(backend.updated))
require.Eventually(t, func() bool {
return testutil.ToFloat64(metrics.KVStoreSyncQueueSize.WithLabelValues("nodes/v1", "foo")) == 1
}, timeout, tick, "Incorrect metric value (expected: 1)")

// Deleting one element, the queue size should grow by one (the worker is still stuck in the Update() call).
store.DeleteKey(ctx, NewKVPair("key1", ""))
require.Equal(t, float64(2), testutil.ToFloat64(metrics.KVStoreSyncQueueSize.WithLabelValues("nodes/v1", "foo")))

backend.errorsOnUpdate["cilium/state/nodes/v1/key3"] = 1
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key2", "value2"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key3", "value3"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key1", ""), eventually(backend.deleted, 100*time.Millisecond))
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key3", "value3"), eventually(backend.updated, 250*time.Millisecond))
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key2", "value2"), eventually(backend.updated))
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key3", "value3"), eventually(backend.updated))
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key1", ""), eventually(backend.deleted))
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key3", "value3"), eventually(backend.updated))

store.Synced(ctx, func(ctx context.Context) {
// When the callback is executed, the store should be synced
Expand All @@ -410,7 +419,7 @@ func TestWorkqueueSyncStoreMetrics(t *testing.T) {
// The store should not yet be synced, as the synced entry has not yet been written to the kvstore.
require.Equal(t, metrics.BoolToFloat64(false),
testutil.ToFloat64(metrics.KVStoreInitialSyncCompleted.WithLabelValues("nodes/v1", "foo", "write")))
require.Equal(t, "cilium/synced/foo/cilium/state/nodes/v1", eventually(backend.updated, 100*time.Millisecond).Key)
require.Equal(t, "cilium/synced/foo/cilium/state/nodes/v1", eventually(backend.updated).Key)

// Once all elements have been processed, the metric should be zero.
require.Equal(t, float64(0), testutil.ToFloat64(metrics.KVStoreSyncQueueSize.WithLabelValues("nodes/v1", "foo")))
Expand Down

0 comments on commit 85a9ed2

Please sign in to comment.