From 554e636d8a8d5d47fa1d4cb1dfd1003f94fe7ec7 Mon Sep 17 00:00:00 2001 From: Marek Smolinski Date: Wed, 2 Mar 2022 14:58:39 +0100 Subject: [PATCH] update --- lib/backend/dynamo/dynamodbbk.go | 12 ++++---- lib/backend/dynamo/dynamodbbk_test.go | 7 +++-- lib/backend/test/suite.go | 41 +++++++++++++++++++++++---- 3 files changed, 45 insertions(+), 15 deletions(-) diff --git a/lib/backend/dynamo/dynamodbbk.go b/lib/backend/dynamo/dynamodbbk.go index e7e15a078dd..ca34013a998 100644 --- a/lib/backend/dynamo/dynamodbbk.go +++ b/lib/backend/dynamo/dynamodbbk.go @@ -383,25 +383,25 @@ func (b *Backend) GetRange(ctx context.Context, startKey []byte, endKey []byte, func (b *Backend) getAllRecords(ctx context.Context, startKey []byte, endKey []byte, limit int) (*getResult, error) { var result getResult + limitRemaining := limit // this code is being extra careful here not to introduce endless loop // by some unfortunate series of events for i := 0; i < backend.DefaultRangeLimit/100; i++ { - re, err := b.getRecords(ctx, prependPrefix(startKey), prependPrefix(endKey), limit, result.lastEvaluatedKey) + if limit > 0 { + limitRemaining = limit - len(result.records) + } + re, err := b.getRecords(ctx, prependPrefix(startKey), prependPrefix(endKey), limitRemaining, result.lastEvaluatedKey) if err != nil { return nil, trace.Wrap(err) } result.records = append(result.records, re.records...) - if limit != 0 && len(result.records) >= limit { + if (limit != 0 && len(result.records) >= limit) || len(re.lastEvaluatedKey) == 0 { if len(result.records) == backend.DefaultRangeLimit { b.Warnf("Range query hit backend limit. (this is a bug!) startKey=%q,limit=%d", startKey, backend.DefaultRangeLimit) } - return &result, nil - } - if len(re.lastEvaluatedKey) == 0 { result.lastEvaluatedKey = nil return &result, nil } - result.lastEvaluatedKey = re.lastEvaluatedKey } return nil, trace.BadParameter("backend entered endless loop") diff --git a/lib/backend/dynamo/dynamodbbk_test.go b/lib/backend/dynamo/dynamodbbk_test.go index d2b336cd244..aba11048f0c 100644 --- a/lib/backend/dynamo/dynamodbbk_test.go +++ b/lib/backend/dynamo/dynamodbbk_test.go @@ -22,11 +22,12 @@ import ( "testing" "time" + "github.com/gravitational/trace" + "github.com/jonboulle/clockwork" + "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/backend/test" "github.com/gravitational/teleport/lib/utils" - "github.com/gravitational/trace" - "github.com/jonboulle/clockwork" ) func TestMain(m *testing.M) { @@ -70,7 +71,7 @@ func TestDynamoDB(t *testing.T) { if err != nil { return nil, nil, trace.Wrap(err) } - clock := clockwork.NewFakeClock() + clock := clockwork.NewFakeClockAt(time.Now()) uut.clock = clock return uut, clock, nil } diff --git a/lib/backend/test/suite.go b/lib/backend/test/suite.go index 195236eba1b..dcc9655e22b 100644 --- a/lib/backend/test/suite.go +++ b/lib/backend/test/suite.go @@ -22,6 +22,7 @@ import ( "context" "encoding/hex" "errors" + "fmt" "math/rand" "sync" "sync/atomic" @@ -156,6 +157,10 @@ func RunBackendComplianceSuite(t *testing.T, newBackend Constructor) { t.Run("Mirror", func(t *testing.T) { testMirror(t, newBackend) }) + + t.Run("FetchLimit", func(t *testing.T) { + testFetchLimit(t, newBackend) + }) } // RequireItems asserts that the supplied `actual` items collection matches @@ -548,7 +553,7 @@ func testEvents(t *testing.T, newBackend Constructor) { item = &backend.Item{ Key: prefix("c"), Value: []byte("val"), - Expires: clock.Now().Add(1 * time.Second), + Expires: clock.Now().Add(3 * time.Second), } _, err = uut.Put(ctx, *item) require.NoError(t, err) @@ -561,17 +566,41 @@ func testEvents(t *testing.T, newBackend Constructor) { e = requireEvent(t, watcher, types.OpPut, item.Key, eventTimeout) require.Equal(t, item.Value, e.Item.Value) - // Wait a few seconds for the item to expire. - clock.Advance(3 * time.Second) - // Make sure item has been removed. - _, err = uut.Get(ctx, item.Key) - require.Error(t, err) + require.Eventually(t, func() bool { + _, err = uut.Get(ctx, item.Key) + return trace.IsNotFound(err) + }, time.Second*4, time.Millisecond*200, "Failed to ensure that item %q has been deleted", item.Key) // Make sure a DELETE event is emitted. requireEvent(t, watcher, types.OpDelete, item.Key, 2*time.Second) } +// testFetchLimit tests fetch max items size limit. +func testFetchLimit(t *testing.T, newBackend Constructor) { + uut, _, err := newBackend() + require.NoError(t, err) + defer func() { require.NoError(t, uut.Close()) }() + + prefix := MakePrefix() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Allocate 4KB buffer. + buff := make([]byte, 1<<16) + itemsCount := 20 + // Fill the backend with events that total size is greater than 1MB (4KB * 20 > 1MB). + for i := 0; i < itemsCount; i++ { + item := &backend.Item{Key: prefix(fmt.Sprintf("/db/database%d", i)), Value: buff} + _, err = uut.Put(ctx, *item) + require.NoError(t, err) + } + + result, err := uut.GetRange(ctx, prefix("/db"), backend.RangeEnd(prefix("/db")), backend.NoLimit) + require.NoError(t, err) + require.Equal(t, itemsCount, len(result.Items)) +} + // requireEvent asserts that a given event type with the given key is emitted // by a watcher within the supplied timeout, returning that event for further // inspection if successful.