Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DynamoDB getAllRecords logic when 1MB query limit is reached #10726

Merged
merged 11 commits into from
Mar 4, 2022
15 changes: 11 additions & 4 deletions lib/backend/dynamo/dynamodbbk.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ func (b *Backend) GetRange(ctx context.Context, startKey []byte, endKey []byte,
if len(endKey) == 0 {
return nil, trace.BadParameter("missing parameter endKey")
}
if limit <= 0 {
limit = backend.DefaultRangeLimit
}

result, err := b.getAllRecords(ctx, startKey, endKey, limit)
if err != nil {
return nil, trace.Wrap(err)
Expand All @@ -383,6 +387,7 @@ func (b *Backend) GetRange(ctx context.Context, startKey []byte, endKey []byte,

func (b *Backend) getAllRecords(ctx context.Context, startKey []byte, endKey []byte, limit int) (*getResult, error) {
var result getResult

// this code is being extra careful here not to introduce endless loop
// by some unfortunate series of events
for i := 0; i < backend.DefaultRangeLimit/100; i++ {
Expand All @@ -391,7 +396,9 @@ func (b *Backend) getAllRecords(ctx context.Context, startKey []byte, endKey []b
return nil, trace.Wrap(err)
}
result.records = append(result.records, re.records...)
if len(result.records) >= limit || len(re.lastEvaluatedKey) == 0 {
// If the limit was exceeded or there are no more records to fetch return the current result
// otherwise updated lastEvaluatedKey and proceed with obtaining new records.
if (limit != 0 && len(result.records) >= limit) || len(re.lastEvaluatedKey) == 0 {
smallinsky marked this conversation as resolved.
Show resolved Hide resolved
if len(result.records) == backend.DefaultRangeLimit {
b.Warnf("Range query hit backend limit. (this is a bug!) startKey=%q,limit=%d", startKey, backend.DefaultRangeLimit)
}
Expand Down Expand Up @@ -744,12 +751,12 @@ func (b *Backend) getRecords(ctx context.Context, startKey, endKey string, limit

// isExpired returns 'true' if the given object (record) has a TTL and
// it's due.
func (r *record) isExpired() bool {
func (r *record) isExpired(now time.Time) bool {
if r.Expires == nil {
return false
}
expiryDateUTC := time.Unix(*r.Expires, 0).UTC()
return time.Now().UTC().After(expiryDateUTC)
return now.UTC().After(expiryDateUTC)
}

func removeDuplicates(elements []record) []record {
Expand Down Expand Up @@ -868,7 +875,7 @@ func (b *Backend) getKey(ctx context.Context, key []byte) (*record, error) {
return nil, trace.WrapWithMessage(err, "failed to unmarshal dynamo item %q", string(key))
}
// Check if key expired, if expired delete it
if r.isExpired() {
if r.isExpired(b.clock.Now()) {
if err := b.deleteKey(ctx, key); err != nil {
b.Warnf("Failed deleting expired key %q: %v", key, err)
}
Expand Down
7 changes: 4 additions & 3 deletions lib/backend/dynamo/dynamodbbk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"testing"
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"

"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/test"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestDynamoDB(t *testing.T) {
if err != nil {
return nil, nil, trace.Wrap(err)
}
clock := clockwork.NewFakeClock()
clock := clockwork.NewFakeClockAt(time.Now())
uut.clock = clock
return uut, clock, nil
}
Expand Down
76 changes: 75 additions & 1 deletion lib/backend/test/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"encoding/hex"
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -140,7 +141,6 @@ func RunBackendComplianceSuite(t *testing.T, newBackend Constructor) {
t.Run("Events", func(t *testing.T) {
testEvents(t, newBackend)
})

t.Run("WatchersClose", func(t *testing.T) {
testWatchersClose(t, newBackend)
})
Expand All @@ -156,6 +156,14 @@ func RunBackendComplianceSuite(t *testing.T, newBackend Constructor) {
t.Run("Mirror", func(t *testing.T) {
testMirror(t, newBackend)
})

t.Run("FetchLimit", func(t *testing.T) {
testFetchLimit(t, newBackend)
})

t.Run("Limit", func(t *testing.T) {
testLimit(t, newBackend)
})
}

// RequireItems asserts that the supplied `actual` items collection matches
Expand Down Expand Up @@ -572,6 +580,72 @@ func testEvents(t *testing.T, newBackend Constructor) {
requireEvent(t, watcher, types.OpDelete, item.Key, 2*time.Second)
}

// testFetchLimit tests fetch max items size limit.
func testFetchLimit(t *testing.T, newBackend Constructor) {
uut, _, err := newBackend()
require.NoError(t, err)
defer func() { require.NoError(t, uut.Close()) }()

prefix := MakePrefix()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Allocate 65KB buffer.
buff := make([]byte, 1<<16)
itemsCount := 20
// Fill the backend with events that total size is greater than 1MB (65KB * 20 > 1MB).
for i := 0; i < itemsCount; i++ {
item := &backend.Item{Key: prefix(fmt.Sprintf("/db/database%d", i)), Value: buff}
_, err = uut.Put(ctx, *item)
require.NoError(t, err)
}

result, err := uut.GetRange(ctx, prefix("/db"), backend.RangeEnd(prefix("/db")), backend.NoLimit)
require.NoError(t, err)
require.Equal(t, itemsCount, len(result.Items))
}

// testLimit tests limit.
func testLimit(t *testing.T, newBackend Constructor) {
uut, clock, err := newBackend()
require.NoError(t, err)
defer func() { require.NoError(t, uut.Close()) }()

prefix := MakePrefix()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

item := &backend.Item{
Key: prefix("/db/database_tail_item"),
Value: []byte("data"),
Expires: clock.Now().Add(time.Minute),
}
_, err = uut.Put(ctx, *item)
require.NoError(t, err)
for i := 0; i < 10; i++ {
item := &backend.Item{
Key: prefix(fmt.Sprintf("/db/database%d", i)),
Value: []byte("data"),
Expires: clock.Now().Add(time.Second * 10),
}
_, err = uut.Put(ctx, *item)
require.NoError(t, err)
}
clock.Advance(time.Second * 20)

item = &backend.Item{
Key: prefix("/db/database_head_item"),
Value: []byte("data"),
Expires: clock.Now().Add(time.Minute),
}
_, err = uut.Put(ctx, *item)
require.NoError(t, err)

result, err := uut.GetRange(ctx, prefix("/db"), backend.RangeEnd(prefix("/db")), 2)
require.NoError(t, err)
require.Equal(t, 2, len(result.Items))
}

// requireEvent asserts that a given event type with the given key is emitted
// by a watcher within the supplied timeout, returning that event for further
// inspection if successful.
Expand Down