Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvstore: fix TestWorkqueueSyncStoreMetrics flake #25706

Merged
merged 1 commit into from May 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
109 changes: 59 additions & 50 deletions pkg/kvstore/store/syncstore_test.go
Expand Up @@ -18,6 +18,12 @@ import (
"github.com/cilium/cilium/pkg/option"
)

// Configure a generous timeout to prevent flakes when running in a noisy CI environment.
var (
tick = 10 * time.Millisecond
timeout = 5 * time.Second
)

type KVPair struct{ Key, Value string }

func NewKVPair(key, value string) *KVPair { return &KVPair{Key: key, Value: value} }
Expand Down Expand Up @@ -99,7 +105,7 @@ func (frl *fakeRateLimiter) Forget(item interface{}) {
}
func (frl *fakeRateLimiter) NumRequeues(item interface{}) int { return 0 }

func eventually(in <-chan *KVPair, timeout time.Duration) *KVPair {
func eventually(in <-chan *KVPair) *KVPair {
select {
case kv := <-in:
return kv
Expand Down Expand Up @@ -128,37 +134,37 @@ func TestWorkqueueSyncStore(t *testing.T) {
// Upserts should trigger the corresponding backend operation.
store.UpsertKey(ctx, NewKVPair("key1", "value1"))
store.UpsertKey(ctx, NewKVPair("key2", "value2"))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("/foo/bar/key2", "value2"), eventually(backend.updated))

// Unless the pair is already part of the known state.
store.UpsertKey(ctx, NewKVPair("key1", "value1"))
store.UpsertKey(ctx, NewKVPair("key3", "value3"))
require.Equal(t, NewKVPair("/foo/bar/key3", "value3"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key3", "value3"), eventually(backend.updated))

// Upserts for the same key should be coalescenced. In this case, it is guaranteed
// to happen since the first upsert blocks until we read from the channel.
store.UpsertKey(ctx, NewKVPair("key4", "value4"))
store.UpsertKey(ctx, NewKVPair("key1", "valueA"))
store.UpsertKey(ctx, NewKVPair("key1", "valueB"))
require.Equal(t, NewKVPair("/foo/bar/key4", "value4"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", "valueB"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key4", "value4"), eventually(backend.updated))
require.Equal(t, NewKVPair("/foo/bar/key1", "valueB"), eventually(backend.updated))

// Deletions should trigger the corresponding backend operation, only if known to exist.
store.DeleteKey(ctx, NewKVPair("key5", ""))
store.DeleteKey(ctx, NewKVPair("key4", ""))
require.Equal(t, NewKVPair("/foo/bar/key4", ""), eventually(backend.deleted, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key4", ""), eventually(backend.deleted))

// Both upserts and deletes should be retried in case an error is returned by the client
backend.errorsOnUpdate["/foo/bar/key1"] = 1
store.UpsertKey(ctx, NewKVPair("key1", "valueC"))
require.Equal(t, NewKVPair("/foo/bar/key1", "valueC"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", "valueC"), eventually(backend.updated, 250*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", "valueC"), eventually(backend.updated))
require.Equal(t, NewKVPair("/foo/bar/key1", "valueC"), eventually(backend.updated))

backend.errorsOnDelete["/foo/bar/key2"] = 1
store.DeleteKey(ctx, NewKVPair("key2", ""))
require.Equal(t, NewKVPair("/foo/bar/key2", ""), eventually(backend.deleted, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key2", ""), eventually(backend.deleted, 250*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key2", ""), eventually(backend.deleted))
require.Equal(t, NewKVPair("/foo/bar/key2", ""), eventually(backend.deleted))
}

func TestWorkqueueSyncStoreWithoutLease(t *testing.T) {
Expand All @@ -181,7 +187,7 @@ func TestWorkqueueSyncStoreWithoutLease(t *testing.T) {
// The fake backend checks whether the lease setting corresponds to the expected
// value, and emits a KVPair with the error message in case it does not match
store.UpsertKey(ctx, NewKVPair("key1", "value1"))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated))
}

func TestWorkqueueSyncStoreWithRateLimiter(t *testing.T) {
Expand All @@ -206,10 +212,10 @@ func TestWorkqueueSyncStoreWithRateLimiter(t *testing.T) {

// Assert that the custom rate limiter has been correctly called
store.UpsertKey(ctx, NewKVPair("key1", "value1"))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("key1", ""), eventually(limiter.whenCalled, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("key1", ""), eventually(limiter.forgetCalled, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("key1", ""), eventually(limiter.whenCalled))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("key1", ""), eventually(limiter.forgetCalled))
}

func TestWorkqueueSyncStoreWithWorkers(t *testing.T) {
Expand All @@ -230,15 +236,15 @@ func TestWorkqueueSyncStoreWithWorkers(t *testing.T) {
}()

store.UpsertKey(ctx, NewKVPair("key1", "value1"))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated))

// Since the Update() and Delete() functions implemented by the fake backend
// block until we read from the correposponding channel, reading in reversed
// order the elements from the two channels requires at least two workers
store.DeleteKey(ctx, NewKVPair("key1", "value1"))
store.UpsertKey(ctx, NewKVPair("key2", "value2"))
require.Equal(t, NewKVPair("/foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key1", ""), eventually(backend.deleted, 100*time.Millisecond))
require.Equal(t, NewKVPair("/foo/bar/key2", "value2"), eventually(backend.updated))
require.Equal(t, NewKVPair("/foo/bar/key1", ""), eventually(backend.deleted))
}

func TestWorkqueueSyncStoreSynced(t *testing.T) {
Expand Down Expand Up @@ -274,12 +280,12 @@ func TestWorkqueueSyncStoreSynced(t *testing.T) {
store.Synced(ctx, callback, callback)
store.UpsertKey(ctx, NewKVPair("key3", "value3"))

require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated, 100*time.Millisecond).Key)
require.Equal(t, NewKVPair("callback/executed", ""), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("callback/executed", ""), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated))
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated).Key)
require.Equal(t, NewKVPair("callback/executed", ""), eventually(backend.updated))
require.Equal(t, NewKVPair("callback/executed", ""), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated))
}, WSSWithSourceClusterName("qux")))

t.Run("key-override", runnable(func(t *testing.T, ctx context.Context, backend *fakeBackend, store SyncStore) {
Expand All @@ -288,10 +294,10 @@ func TestWorkqueueSyncStoreSynced(t *testing.T) {
store.Synced(ctx)
store.UpsertKey(ctx, NewKVPair("key3", "value3"))

require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, "cilium/synced/qux/override", eventually(backend.updated, 100*time.Millisecond).Key)
require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated))
require.Equal(t, "cilium/synced/qux/override", eventually(backend.updated).Key)
require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated))
}, WSSWithSourceClusterName("qux"), WSSWithSyncedKeyOverride("override")))

t.Run("key-upsertion-failure", runnable(func(t *testing.T, ctx context.Context, backend *fakeBackend, store SyncStore) {
Expand All @@ -302,12 +308,12 @@ func TestWorkqueueSyncStoreSynced(t *testing.T) {
store.Synced(ctx)
store.UpsertKey(ctx, NewKVPair("key3", "value3"))

require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 250*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated))
// The synced key shall be created only once key1 has been successfully upserted.
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated, 100*time.Millisecond).Key)
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated).Key)
}, WSSWithSourceClusterName("qux")))

t.Run("synced-upsertion-failure", runnable(func(t *testing.T, ctx context.Context, backend *fakeBackend, store SyncStore) {
Expand All @@ -317,10 +323,10 @@ func TestWorkqueueSyncStoreSynced(t *testing.T) {
store.UpsertKey(ctx, NewKVPair("key2", "value2"))
store.Synced(ctx)

require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated, 100*time.Millisecond).Key)
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated, 250*time.Millisecond).Key)
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated))
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated).Key)
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated).Key)
}, WSSWithSourceClusterName("qux")))

// Assert that the synced key is created only after key1 has been successfully upserted also in case there are multiple workers
Expand All @@ -329,10 +335,10 @@ func TestWorkqueueSyncStoreSynced(t *testing.T) {
store.UpsertKey(ctx, NewKVPair("key1", "value1"))
store.Synced(ctx)

require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 1000*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 1000*time.Millisecond))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 1000*time.Millisecond))
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated, 500*time.Millisecond).Key)
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated))
require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated).Key)
}, WSSWithSourceClusterName("qux"), WSSWithWorkers(10)))
}

Expand Down Expand Up @@ -387,19 +393,22 @@ func TestWorkqueueSyncStoreMetrics(t *testing.T) {
}()

// The metric should reflect the updated queue size (one in this case, since one element has been processed, and
// another is being processed --- stuck performing Update()).
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key1", "valueA"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, float64(1), testutil.ToFloat64(metrics.KVStoreSyncQueueSize.WithLabelValues("nodes/v1", "foo")))
// another is being processed --- stuck performing Update()). We need to assert this "eventually", because we have
// no guarantee that the processing of the second element has already started.
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key1", "valueA"), eventually(backend.updated))
require.Eventually(t, func() bool {
return testutil.ToFloat64(metrics.KVStoreSyncQueueSize.WithLabelValues("nodes/v1", "foo")) == 1
}, timeout, tick, "Incorrect metric value (expected: 1)")

// Deleting one element, the queue size should grow by one (the worker is still stuck in the Update() call).
store.DeleteKey(ctx, NewKVPair("key1", ""))
require.Equal(t, float64(2), testutil.ToFloat64(metrics.KVStoreSyncQueueSize.WithLabelValues("nodes/v1", "foo")))

backend.errorsOnUpdate["cilium/state/nodes/v1/key3"] = 1
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key2", "value2"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key3", "value3"), eventually(backend.updated, 100*time.Millisecond))
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key1", ""), eventually(backend.deleted, 100*time.Millisecond))
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key3", "value3"), eventually(backend.updated, 250*time.Millisecond))
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key2", "value2"), eventually(backend.updated))
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key3", "value3"), eventually(backend.updated))
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key1", ""), eventually(backend.deleted))
require.Equal(t, NewKVPair("cilium/state/nodes/v1/key3", "value3"), eventually(backend.updated))

store.Synced(ctx, func(ctx context.Context) {
// When the callback is executed, the store should be synced
Expand All @@ -410,7 +419,7 @@ func TestWorkqueueSyncStoreMetrics(t *testing.T) {
// The store should not yet be synced, as the synced entry has not yet been written to the kvstore.
require.Equal(t, metrics.BoolToFloat64(false),
testutil.ToFloat64(metrics.KVStoreInitialSyncCompleted.WithLabelValues("nodes/v1", "foo", "write")))
require.Equal(t, "cilium/synced/foo/cilium/state/nodes/v1", eventually(backend.updated, 100*time.Millisecond).Key)
require.Equal(t, "cilium/synced/foo/cilium/state/nodes/v1", eventually(backend.updated).Key)

// Once all elements have been processed, the metric should be zero.
require.Equal(t, float64(0), testutil.ToFloat64(metrics.KVStoreSyncQueueSize.WithLabelValues("nodes/v1", "foo")))
Expand Down