Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
smallinsky committed Mar 2, 2022
1 parent 246e15e commit 554e636
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 15 deletions.
12 changes: 6 additions & 6 deletions lib/backend/dynamo/dynamodbbk.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,25 +383,25 @@ func (b *Backend) GetRange(ctx context.Context, startKey []byte, endKey []byte,

func (b *Backend) getAllRecords(ctx context.Context, startKey []byte, endKey []byte, limit int) (*getResult, error) {
var result getResult
limitRemaining := limit
// this code is being extra careful here not to introduce endless loop
// by some unfortunate series of events
for i := 0; i < backend.DefaultRangeLimit/100; i++ {
re, err := b.getRecords(ctx, prependPrefix(startKey), prependPrefix(endKey), limit, result.lastEvaluatedKey)
if limit > 0 {
limitRemaining = limit - len(result.records)
}
re, err := b.getRecords(ctx, prependPrefix(startKey), prependPrefix(endKey), limitRemaining, result.lastEvaluatedKey)
if err != nil {
return nil, trace.Wrap(err)
}
result.records = append(result.records, re.records...)
if limit != 0 && len(result.records) >= limit {
if (limit != 0 && len(result.records) >= limit) || len(re.lastEvaluatedKey) == 0 {
if len(result.records) == backend.DefaultRangeLimit {
b.Warnf("Range query hit backend limit. (this is a bug!) startKey=%q,limit=%d", startKey, backend.DefaultRangeLimit)
}
return &result, nil
}
if len(re.lastEvaluatedKey) == 0 {
result.lastEvaluatedKey = nil
return &result, nil
}

result.lastEvaluatedKey = re.lastEvaluatedKey
}
return nil, trace.BadParameter("backend entered endless loop")
Expand Down
7 changes: 4 additions & 3 deletions lib/backend/dynamo/dynamodbbk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"testing"
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"

"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/test"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestDynamoDB(t *testing.T) {
if err != nil {
return nil, nil, trace.Wrap(err)
}
clock := clockwork.NewFakeClock()
clock := clockwork.NewFakeClockAt(time.Now())
uut.clock = clock
return uut, clock, nil
}
Expand Down
41 changes: 35 additions & 6 deletions lib/backend/test/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"encoding/hex"
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -156,6 +157,10 @@ func RunBackendComplianceSuite(t *testing.T, newBackend Constructor) {
t.Run("Mirror", func(t *testing.T) {
testMirror(t, newBackend)
})

t.Run("FetchLimit", func(t *testing.T) {
testFetchLimit(t, newBackend)
})
}

// RequireItems asserts that the supplied `actual` items collection matches
Expand Down Expand Up @@ -548,7 +553,7 @@ func testEvents(t *testing.T, newBackend Constructor) {
item = &backend.Item{
Key: prefix("c"),
Value: []byte("val"),
Expires: clock.Now().Add(1 * time.Second),
Expires: clock.Now().Add(3 * time.Second),
}
_, err = uut.Put(ctx, *item)
require.NoError(t, err)
Expand All @@ -561,17 +566,41 @@ func testEvents(t *testing.T, newBackend Constructor) {
e = requireEvent(t, watcher, types.OpPut, item.Key, eventTimeout)
require.Equal(t, item.Value, e.Item.Value)

// Wait a few seconds for the item to expire.
clock.Advance(3 * time.Second)

// Make sure item has been removed.
_, err = uut.Get(ctx, item.Key)
require.Error(t, err)
require.Eventually(t, func() bool {
_, err = uut.Get(ctx, item.Key)
return trace.IsNotFound(err)
}, time.Second*4, time.Millisecond*200, "Failed to ensure that item %q has been deleted", item.Key)

// Make sure a DELETE event is emitted.
requireEvent(t, watcher, types.OpDelete, item.Key, 2*time.Second)
}

// testFetchLimit tests fetch max items size limit.
func testFetchLimit(t *testing.T, newBackend Constructor) {
uut, _, err := newBackend()
require.NoError(t, err)
defer func() { require.NoError(t, uut.Close()) }()

prefix := MakePrefix()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Allocate 4KB buffer.
buff := make([]byte, 1<<16)
itemsCount := 20
// Fill the backend with events that total size is greater than 1MB (4KB * 20 > 1MB).
for i := 0; i < itemsCount; i++ {
item := &backend.Item{Key: prefix(fmt.Sprintf("/db/database%d", i)), Value: buff}
_, err = uut.Put(ctx, *item)
require.NoError(t, err)
}

result, err := uut.GetRange(ctx, prefix("/db"), backend.RangeEnd(prefix("/db")), backend.NoLimit)
require.NoError(t, err)
require.Equal(t, itemsCount, len(result.Items))
}

// requireEvent asserts that a given event type with the given key is emitted
// by a watcher within the supplied timeout, returning that event for further
// inspection if successful.
Expand Down

0 comments on commit 554e636

Please sign in to comment.