diff --git a/pkg/kvstore/store/syncstore_test.go b/pkg/kvstore/store/syncstore_test.go index 9b0fdd79e25b..3da329749ad8 100644 --- a/pkg/kvstore/store/syncstore_test.go +++ b/pkg/kvstore/store/syncstore_test.go @@ -18,6 +18,12 @@ import ( "github.com/cilium/cilium/pkg/option" ) +// Configure a generous timeout to prevent flakes when running in a noisy CI environment. +var ( + tick = 10 * time.Millisecond + timeout = 5 * time.Second +) + type KVPair struct{ Key, Value string } func NewKVPair(key, value string) *KVPair { return &KVPair{Key: key, Value: value} } @@ -99,7 +105,7 @@ func (frl *fakeRateLimiter) Forget(item interface{}) { } func (frl *fakeRateLimiter) NumRequeues(item interface{}) int { return 0 } -func eventually(in <-chan *KVPair, timeout time.Duration) *KVPair { +func eventually(in <-chan *KVPair) *KVPair { select { case kv := <-in: return kv @@ -128,37 +134,37 @@ func TestWorkqueueSyncStore(t *testing.T) { // Upserts should trigger the corresponding backend operation. store.UpsertKey(ctx, NewKVPair("key1", "value1")) store.UpsertKey(ctx, NewKVPair("key2", "value2")) - require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, NewKVPair("/foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond)) + require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated)) + require.Equal(t, NewKVPair("/foo/bar/key2", "value2"), eventually(backend.updated)) // Unless the pair is already part of the known state. store.UpsertKey(ctx, NewKVPair("key1", "value1")) store.UpsertKey(ctx, NewKVPair("key3", "value3")) - require.Equal(t, NewKVPair("/foo/bar/key3", "value3"), eventually(backend.updated, 100*time.Millisecond)) + require.Equal(t, NewKVPair("/foo/bar/key3", "value3"), eventually(backend.updated)) // Upserts for the same key should be coalescenced. In this case, it is guaranteed // to happen since the first upsert blocks until we read from the channel. store.UpsertKey(ctx, NewKVPair("key4", "value4")) store.UpsertKey(ctx, NewKVPair("key1", "valueA")) store.UpsertKey(ctx, NewKVPair("key1", "valueB")) - require.Equal(t, NewKVPair("/foo/bar/key4", "value4"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, NewKVPair("/foo/bar/key1", "valueB"), eventually(backend.updated, 100*time.Millisecond)) + require.Equal(t, NewKVPair("/foo/bar/key4", "value4"), eventually(backend.updated)) + require.Equal(t, NewKVPair("/foo/bar/key1", "valueB"), eventually(backend.updated)) // Deletions should trigger the corresponding backend operation, only if known to exist. store.DeleteKey(ctx, NewKVPair("key5", "")) store.DeleteKey(ctx, NewKVPair("key4", "")) - require.Equal(t, NewKVPair("/foo/bar/key4", ""), eventually(backend.deleted, 100*time.Millisecond)) + require.Equal(t, NewKVPair("/foo/bar/key4", ""), eventually(backend.deleted)) // Both upserts and deletes should be retried in case an error is returned by the client backend.errorsOnUpdate["/foo/bar/key1"] = 1 store.UpsertKey(ctx, NewKVPair("key1", "valueC")) - require.Equal(t, NewKVPair("/foo/bar/key1", "valueC"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, NewKVPair("/foo/bar/key1", "valueC"), eventually(backend.updated, 250*time.Millisecond)) + require.Equal(t, NewKVPair("/foo/bar/key1", "valueC"), eventually(backend.updated)) + require.Equal(t, NewKVPair("/foo/bar/key1", "valueC"), eventually(backend.updated)) backend.errorsOnDelete["/foo/bar/key2"] = 1 store.DeleteKey(ctx, NewKVPair("key2", "")) - require.Equal(t, NewKVPair("/foo/bar/key2", ""), eventually(backend.deleted, 100*time.Millisecond)) - require.Equal(t, NewKVPair("/foo/bar/key2", ""), eventually(backend.deleted, 250*time.Millisecond)) + require.Equal(t, NewKVPair("/foo/bar/key2", ""), eventually(backend.deleted)) + require.Equal(t, NewKVPair("/foo/bar/key2", ""), eventually(backend.deleted)) } func TestWorkqueueSyncStoreWithoutLease(t *testing.T) { @@ -181,7 +187,7 @@ func TestWorkqueueSyncStoreWithoutLease(t *testing.T) { // The fake backend checks whether the lease setting corresponds to the expected // value, and emits a KVPair with the error message in case it does not match store.UpsertKey(ctx, NewKVPair("key1", "value1")) - require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond)) + require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated)) } func TestWorkqueueSyncStoreWithRateLimiter(t *testing.T) { @@ -206,10 +212,10 @@ func TestWorkqueueSyncStoreWithRateLimiter(t *testing.T) { // Assert that the custom rate limiter has been correctly called store.UpsertKey(ctx, NewKVPair("key1", "value1")) - require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, NewKVPair("key1", ""), eventually(limiter.whenCalled, 100*time.Millisecond)) - require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, NewKVPair("key1", ""), eventually(limiter.forgetCalled, 100*time.Millisecond)) + require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated)) + require.Equal(t, NewKVPair("key1", ""), eventually(limiter.whenCalled)) + require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated)) + require.Equal(t, NewKVPair("key1", ""), eventually(limiter.forgetCalled)) } func TestWorkqueueSyncStoreWithWorkers(t *testing.T) { @@ -230,15 +236,15 @@ func TestWorkqueueSyncStoreWithWorkers(t *testing.T) { }() store.UpsertKey(ctx, NewKVPair("key1", "value1")) - require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond)) + require.Equal(t, NewKVPair("/foo/bar/key1", "value1"), eventually(backend.updated)) // Since the Update() and Delete() functions implemented by the fake backend // block until we read from the correposponding channel, reading in reversed // order the elements from the two channels requires at least two workers store.DeleteKey(ctx, NewKVPair("key1", "value1")) store.UpsertKey(ctx, NewKVPair("key2", "value2")) - require.Equal(t, NewKVPair("/foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, NewKVPair("/foo/bar/key1", ""), eventually(backend.deleted, 100*time.Millisecond)) + require.Equal(t, NewKVPair("/foo/bar/key2", "value2"), eventually(backend.updated)) + require.Equal(t, NewKVPair("/foo/bar/key1", ""), eventually(backend.deleted)) } func TestWorkqueueSyncStoreSynced(t *testing.T) { @@ -274,12 +280,12 @@ func TestWorkqueueSyncStoreSynced(t *testing.T) { store.Synced(ctx, callback, callback) store.UpsertKey(ctx, NewKVPair("key3", "value3")) - require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated, 100*time.Millisecond).Key) - require.Equal(t, NewKVPair("callback/executed", ""), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, NewKVPair("callback/executed", ""), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated, 100*time.Millisecond)) + require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated)) + require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated)) + require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated).Key) + require.Equal(t, NewKVPair("callback/executed", ""), eventually(backend.updated)) + require.Equal(t, NewKVPair("callback/executed", ""), eventually(backend.updated)) + require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated)) }, WSSWithSourceClusterName("qux"))) t.Run("key-override", runnable(func(t *testing.T, ctx context.Context, backend *fakeBackend, store SyncStore) { @@ -288,10 +294,10 @@ func TestWorkqueueSyncStoreSynced(t *testing.T) { store.Synced(ctx) store.UpsertKey(ctx, NewKVPair("key3", "value3")) - require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, "cilium/synced/qux/override", eventually(backend.updated, 100*time.Millisecond).Key) - require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated, 100*time.Millisecond)) + require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated)) + require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated)) + require.Equal(t, "cilium/synced/qux/override", eventually(backend.updated).Key) + require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated)) }, WSSWithSourceClusterName("qux"), WSSWithSyncedKeyOverride("override"))) t.Run("key-upsertion-failure", runnable(func(t *testing.T, ctx context.Context, backend *fakeBackend, store SyncStore) { @@ -302,12 +308,12 @@ func TestWorkqueueSyncStoreSynced(t *testing.T) { store.Synced(ctx) store.UpsertKey(ctx, NewKVPair("key3", "value3")) - require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 250*time.Millisecond)) + require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated)) + require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated)) + require.Equal(t, NewKVPair("foo/bar/key3", "value3"), eventually(backend.updated)) + require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated)) // The synced key shall be created only once key1 has been successfully upserted. - require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated, 100*time.Millisecond).Key) + require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated).Key) }, WSSWithSourceClusterName("qux"))) t.Run("synced-upsertion-failure", runnable(func(t *testing.T, ctx context.Context, backend *fakeBackend, store SyncStore) { @@ -317,10 +323,10 @@ func TestWorkqueueSyncStoreSynced(t *testing.T) { store.UpsertKey(ctx, NewKVPair("key2", "value2")) store.Synced(ctx) - require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated, 100*time.Millisecond).Key) - require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated, 250*time.Millisecond).Key) + require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated)) + require.Equal(t, NewKVPair("foo/bar/key2", "value2"), eventually(backend.updated)) + require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated).Key) + require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated).Key) }, WSSWithSourceClusterName("qux"))) // Assert that the synced key is created only after key1 has been successfully upserted also in case there are multiple workers @@ -329,10 +335,10 @@ func TestWorkqueueSyncStoreSynced(t *testing.T) { store.UpsertKey(ctx, NewKVPair("key1", "value1")) store.Synced(ctx) - require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 1000*time.Millisecond)) - require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 1000*time.Millisecond)) - require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated, 1000*time.Millisecond)) - require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated, 500*time.Millisecond).Key) + require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated)) + require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated)) + require.Equal(t, NewKVPair("foo/bar/key1", "value1"), eventually(backend.updated)) + require.Equal(t, "cilium/synced/qux/foo/bar", eventually(backend.updated).Key) }, WSSWithSourceClusterName("qux"), WSSWithWorkers(10))) } @@ -387,19 +393,22 @@ func TestWorkqueueSyncStoreMetrics(t *testing.T) { }() // The metric should reflect the updated queue size (one in this case, since one element has been processed, and - // another is being processed --- stuck performing Update()). - require.Equal(t, NewKVPair("cilium/state/nodes/v1/key1", "valueA"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, float64(1), testutil.ToFloat64(metrics.KVStoreSyncQueueSize.WithLabelValues("nodes/v1", "foo"))) + // another is being processed --- stuck performing Update()). We need to assert this "eventually", because we have + // no guarantee that the processing of the second element has already started. + require.Equal(t, NewKVPair("cilium/state/nodes/v1/key1", "valueA"), eventually(backend.updated)) + require.Eventually(t, func() bool { + return testutil.ToFloat64(metrics.KVStoreSyncQueueSize.WithLabelValues("nodes/v1", "foo")) == 1 + }, timeout, tick, "Incorrect metric value (expected: 1)") // Deleting one element, the queue size should grow by one (the worker is still stuck in the Update() call). store.DeleteKey(ctx, NewKVPair("key1", "")) require.Equal(t, float64(2), testutil.ToFloat64(metrics.KVStoreSyncQueueSize.WithLabelValues("nodes/v1", "foo"))) backend.errorsOnUpdate["cilium/state/nodes/v1/key3"] = 1 - require.Equal(t, NewKVPair("cilium/state/nodes/v1/key2", "value2"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, NewKVPair("cilium/state/nodes/v1/key3", "value3"), eventually(backend.updated, 100*time.Millisecond)) - require.Equal(t, NewKVPair("cilium/state/nodes/v1/key1", ""), eventually(backend.deleted, 100*time.Millisecond)) - require.Equal(t, NewKVPair("cilium/state/nodes/v1/key3", "value3"), eventually(backend.updated, 250*time.Millisecond)) + require.Equal(t, NewKVPair("cilium/state/nodes/v1/key2", "value2"), eventually(backend.updated)) + require.Equal(t, NewKVPair("cilium/state/nodes/v1/key3", "value3"), eventually(backend.updated)) + require.Equal(t, NewKVPair("cilium/state/nodes/v1/key1", ""), eventually(backend.deleted)) + require.Equal(t, NewKVPair("cilium/state/nodes/v1/key3", "value3"), eventually(backend.updated)) store.Synced(ctx, func(ctx context.Context) { // When the callback is executed, the store should be synced @@ -410,7 +419,7 @@ func TestWorkqueueSyncStoreMetrics(t *testing.T) { // The store should not yet be synced, as the synced entry has not yet been written to the kvstore. require.Equal(t, metrics.BoolToFloat64(false), testutil.ToFloat64(metrics.KVStoreInitialSyncCompleted.WithLabelValues("nodes/v1", "foo", "write"))) - require.Equal(t, "cilium/synced/foo/cilium/state/nodes/v1", eventually(backend.updated, 100*time.Millisecond).Key) + require.Equal(t, "cilium/synced/foo/cilium/state/nodes/v1", eventually(backend.updated).Key) // Once all elements have been processed, the metric should be zero. require.Equal(t, float64(0), testutil.ToFloat64(metrics.KVStoreSyncQueueSize.WithLabelValues("nodes/v1", "foo")))