Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support specifying MaxBytes in range request #14810

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2557,6 +2557,11 @@
"type": "string",
"format": "int64"
},
"max_bytes": {
"description": "max_bytes is a limit on the total size of key-value pairs returned for the\nrequest. When max_bytes is set to 0, it is treated as no limit.",
"type": "string",
"format": "int64"
},
"max_create_revision": {
"description": "max_create_revision is the upper bound for returned key create revisions; all keys with\ngreater create revisions will be filtered away.",
"type": "string",
Expand Down
594 changes: 316 additions & 278 deletions api/etcdserverpb/rpc.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions api/etcdserverpb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,10 @@ message RangeRequest {
// max_create_revision is the upper bound for returned key create revisions; all keys with
// greater create revisions will be filtered away.
int64 max_create_revision = 13 [(versionpb.etcd_version_field)="3.1"];

// max_bytes is a limit on the total size of key-value pairs returned for the
// request. When max_bytes is set to 0, it is treated as no limit.
int64 max_bytes = 14 [(versionpb.etcd_version_field)="3.6"];
}

message RangeResponse {
Expand Down
12 changes: 12 additions & 0 deletions client/v3/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Op struct {

// for range
limit int64
maxBytes int64
sort *SortOption
serializable bool
keysOnly bool
Expand Down Expand Up @@ -153,6 +154,7 @@ func (op Op) toRangeRequest() *pb.RangeRequest {
Key: op.key,
RangeEnd: op.end,
Limit: op.limit,
MaxBytes: op.maxBytes,
Revision: op.rev,
Serializable: op.serializable,
KeysOnly: op.keysOnly,
Expand Down Expand Up @@ -263,6 +265,8 @@ func OpDelete(key string, opts ...OpOption) Op {
panic("unexpected filter in delete")
case ret.createdNotify:
panic("unexpected createdNotify in delete")
case ret.maxBytes != 0:
panic("unexpected maxBytes in delete")
}
return ret
}
Expand Down Expand Up @@ -292,6 +296,8 @@ func OpPut(key, val string, opts ...OpOption) Op {
panic("unexpected filter in put")
case ret.createdNotify:
panic("unexpected createdNotify in put")
case ret.maxBytes != 0:
panic("unexpected maxBytes in delete")
}
return ret
}
Expand Down Expand Up @@ -319,6 +325,8 @@ func opWatch(key string, opts ...OpOption) Op {
panic("unexpected mod revision filter in watch")
case ret.minCreateRev != 0, ret.maxCreateRev != 0:
panic("unexpected create revision filter in watch")
case ret.maxBytes != 0:
panic("unexpected maxBytes in delete")
}
return ret
}
Expand All @@ -341,6 +349,10 @@ func WithLease(leaseID LeaseID) OpOption {
// If WithLimit is given a 0 limit, it is treated as no limit.
func WithLimit(n int64) OpOption { return func(op *Op) { op.limit = n } }

// WithMaxBytes limits the size in bytes of results to return from 'Get' request.
// If WithMaxBytes is given a 0 limit, it is treated as no limit.
func WithMaxBytes(n int64) OpOption { return func(op *Op) { op.maxBytes = n } }

// WithRev specifies the store revision for 'Get' request.
// Or the start revision of 'Watch' request.
func WithRev(rev int64) OpOption { return func(op *Op) { op.rev = rev } }
Expand Down
3 changes: 3 additions & 0 deletions etcdctl/ctlv3/command/get_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
var (
getConsistency string
getLimit int64
getMaxBytes int64
getSortOrder string
getSortTarget string
getPrefix bool
Expand All @@ -49,6 +50,7 @@ func NewGetCommand() *cobra.Command {
cmd.Flags().StringVar(&getSortOrder, "order", "", "Order of results; ASCEND or DESCEND (ASCEND by default)")
cmd.Flags().StringVar(&getSortTarget, "sort-by", "", "Sort target; CREATE, KEY, MODIFY, VALUE, or VERSION")
cmd.Flags().Int64Var(&getLimit, "limit", 0, "Maximum number of results")
cmd.Flags().Int64Var(&getMaxBytes, "max-bytes", 0, "Maximum bytes of results")
cmd.Flags().BoolVar(&getPrefix, "prefix", false, "Get keys with matching prefix")
cmd.Flags().BoolVar(&getFromKey, "from-key", false, "Get keys that are greater than or equal to the given key using byte compare")
cmd.Flags().Int64Var(&getRev, "rev", 0, "Specify the kv revision")
Expand Down Expand Up @@ -126,6 +128,7 @@ func getGetOp(args []string) (string, []clientv3.OpOption) {
}

opts = append(opts, clientv3.WithLimit(getLimit))
opts = append(opts, clientv3.WithMaxBytes(getMaxBytes))
if getRev > 0 {
opts = append(opts, clientv3.WithRev(getRev))
}
Expand Down
1 change: 1 addition & 0 deletions scripts/etcd_version_annotations.txt
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ etcdserverpb.RangeRequest.count_only: ""
etcdserverpb.RangeRequest.key: ""
etcdserverpb.RangeRequest.keys_only: ""
etcdserverpb.RangeRequest.limit: ""
etcdserverpb.RangeRequest.max_bytes: "3.6"
etcdserverpb.RangeRequest.max_create_revision: "3.1"
etcdserverpb.RangeRequest.max_mod_revision: "3.1"
etcdserverpb.RangeRequest.min_create_revision: "3.1"
Expand Down
27 changes: 19 additions & 8 deletions server/etcdserver/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,20 @@ func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, txnRead mvcc.TxnRead
defer txnRead.End()
}

limit := r.Limit
limit, maxBytes := r.Limit, r.MaxBytes
if r.SortOrder != pb.RangeRequest_NONE ||
r.MinModRevision != 0 || r.MaxModRevision != 0 ||
r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 {
// fetch everything; sort and truncate afterwards
limit = 0
}
if limit > 0 {
// fetch one extra for 'more' flag
limit = limit + 1
maxBytes = 0
}

ro := mvcc.RangeOptions{
Limit: limit,
Rev: r.Revision,
Count: r.CountOnly,
Limit: limit,
MaxBytes: maxBytes,
Rev: r.Revision,
Count: r.CountOnly,
}

rr, err := txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
Expand Down Expand Up @@ -201,10 +199,23 @@ func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, txnRead mvcc.TxnRead
}
}

resp.More = rr.More
if r.Limit > 0 && len(rr.KVs) > int(r.Limit) {
rr.KVs = rr.KVs[:r.Limit]
resp.More = true
}
linxiulei marked this conversation as resolved.
Show resolved Hide resolved
if r.MaxBytes > 0 && maxBytes == 0 {
var totalBytes int64
for i, kv := range rr.KVs {
totalBytes += int64(kv.Size())
if totalBytes > r.MaxBytes {
resp.More = true
rr.KVs = rr.KVs[:i]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the code correctly, it means we might be sending '0' items, it is, we might stuck completely,
giving empty responses and retries.

I think we should prioritize giving at least 1 item in such situation. And be very explicit that in this situation the maxBytes contract can be exceeded.
But this is the exact semantic discussion we should have with k8s api machinery team @mborsz @lavalamp

We also need to have tests for this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you understood correctly. The initial version was to return the result with size just exceeding the maxBytes so at least one item is returned. But this made the maxBytes a bit ambiguous.

Given the max object size is 1.5MB, we can warm users the maxBytes should be bigger than 1.5MB to avoid returning 0 items. WDYT?

break
}
}
}

trace.Step("filter and sort the key-value pairs")
resp.Header.Revision = rr.Rev
resp.Count = int64(rr.Count)
Expand Down
8 changes: 5 additions & 3 deletions server/storage/mvcc/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ import (
)

type RangeOptions struct {
Limit int64
Rev int64
Count bool
Limit int64
MaxBytes int64
Rev int64
Count bool
}

type RangeResult struct {
KVs []mvccpb.KeyValue
Rev int64
Count int
More bool
}

type ReadView interface {
Expand Down
87 changes: 66 additions & 21 deletions server/storage/mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
)

// Functional tests for features implemented in v3 store. It treats v3 store
Expand Down Expand Up @@ -225,34 +226,78 @@ func testKVRangeLimit(t *testing.T, f rangeFunc) {
limit int64
wcounts int64
wkvs []mvccpb.KeyValue
wmore bool
}{
// no limit
{-1, 3, kvs},
{-1, 3, kvs, false},
// no limit
{0, 3, kvs},
{1, 3, kvs[:1]},
{2, 3, kvs[:2]},
{3, 3, kvs},
{100, 3, kvs},
{0, 3, kvs, false},
{1, 3, kvs[:1], true},
{2, 3, kvs[:2], true},
{3, 3, kvs, false},
{100, 3, kvs, false},
}
for i, tt := range tests {
r, err := f(s, []byte("foo"), []byte("foo3"), RangeOptions{Limit: tt.limit})
if err != nil {
t.Fatalf("#%d: range error (%v)", i, err)
}
if !reflect.DeepEqual(r.KVs, tt.wkvs) {
t.Errorf("#%d: kvs = %+v, want %+v", i, r.KVs, tt.wkvs)
}
if r.Rev != wrev {
t.Errorf("#%d: rev = %d, want %d", i, r.Rev, wrev)
}
if tt.limit <= 0 || int(tt.limit) > len(kvs) {
if r.Count != len(kvs) {
t.Errorf("#%d: count = %d, want %d", i, r.Count, len(kvs))
tt := tt
t.Run(fmt.Sprint(i), func(t *testing.T) {
r, err := f(s, []byte("foo"), []byte("foo3"), RangeOptions{Limit: tt.limit})
require.Nil(t, err)
require.Equal(t, tt.wkvs, r.KVs)
require.Equal(t, wrev, r.Rev)
if tt.limit <= 0 || int(tt.limit) > len(kvs) {
require.Equal(t, len(kvs), r.Count)
} else {
require.Equal(t, int(tt.wcounts), r.Count)
}
} else if r.Count != int(tt.wcounts) {
t.Errorf("#%d: count = %d, want %d", i, r.Count, tt.limit)
require.Equal(t, tt.wmore, r.More)
})
}
}

func TestKVRangeMaxBytes(t *testing.T) { testKVRangeMaxBytes(t, normalRangeFunc) }
func TestKVTxnRangeMaxBytes(t *testing.T) { testKVRangeMaxBytes(t, txnRangeFunc) }

func testKVRangeMaxBytes(t *testing.T, f rangeFunc) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

kvs := put3TestKVs(s)

kvsSize := func(kvs []mvccpb.KeyValue) int64 {
var s int
for _, kv := range kvs {
s += kv.Size()
}
return int64(s)
}

wrev := int64(4)
tests := []struct {
maxBytes int64
wcounts int64
wkvs []mvccpb.KeyValue
wmore bool
}{
// no limit
{-1, 3, kvs, false},
// no limit
{0, 3, kvs, false},
{kvsSize(kvs[:1]), 3, kvs[:1], true},
{kvsSize(kvs[:2]), 3, kvs[:2], true},
{kvsSize(kvs), 3, kvs, false},
{1000, 3, kvs, false},
}
for i, tt := range tests {
tt := tt
t.Run(fmt.Sprint(i), func(t *testing.T) {
r, err := f(s, []byte("foo"), []byte("foo3"), RangeOptions{MaxBytes: tt.maxBytes})
require.Nil(t, err)
require.Equal(t, tt.wkvs, r.KVs)
require.Equal(t, wrev, r.Rev)
require.Equal(t, len(kvs), r.Count)
require.Equal(t, tt.wmore, r.More)
})
}
}

Expand Down
10 changes: 8 additions & 2 deletions server/storage/mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,13 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i
}

limit := int(ro.Limit)
if limit <= 0 || limit > len(revpairs) {
if limit <= 0 || limit >= len(revpairs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no-op

limit = len(revpairs)
}

kvs := make([]mvccpb.KeyValue, limit)
revBytes := newRevBytes()
var totalBytes int64
for i, revpair := range revpairs[:len(kvs)] {
select {
case <-ctx.Done():
Expand All @@ -115,6 +116,11 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i
zap.Int("len-values", len(vs)),
)
}
totalBytes += int64(len(vs[0]))
if ro.MaxBytes > 0 && totalBytes > ro.MaxBytes {
kvs = kvs[:i]
break
}
if err := kvs[i].Unmarshal(vs[0]); err != nil {
tr.s.lg.Fatal(
"failed to unmarshal mvccpb.KeyValue",
Expand All @@ -123,7 +129,7 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i
}
}
tr.trace.Step("range keys from bolt db")
return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil
return &RangeResult{KVs: kvs, Count: total, Rev: curRev, More: total > len(kvs)}, nil
linxiulei marked this conversation as resolved.
Show resolved Hide resolved
}

func (tr *storeTxnRead) End() {
Expand Down
1 change: 1 addition & 0 deletions tests/common/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func TestKVGet(t *testing.T) {
{begin: "foo", options: config.GetOptions{Prefix: true}, wkv: []string{"foo", "foo/abc"}},
{begin: "foo", options: config.GetOptions{FromKey: true}, wkv: []string{"foo", "foo/abc", "fop"}},
{begin: "", options: config.GetOptions{Prefix: true, Limit: 2}, wkv: wantKvs[:2]},
{begin: "", options: config.GetOptions{Prefix: true, MaxBytes: 42}, wkv: wantKvs[:3]},
{begin: "", options: config.GetOptions{Prefix: true, Order: clientv3.SortAscend, SortBy: clientv3.SortByModRevision}, wkv: wantKvs},
{begin: "", options: config.GetOptions{Prefix: true, Order: clientv3.SortAscend, SortBy: clientv3.SortByVersion}, wkv: kvsByVersion},
{begin: "", options: config.GetOptions{Prefix: true, Order: clientv3.SortNone, SortBy: clientv3.SortByCreateRevision}, wkv: wantKvs},
Expand Down
1 change: 1 addition & 0 deletions tests/e2e/ctl_v3_kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func getTest(cx ctlCtx) {
{[]string{"", "--from-key"}, kvs},
{[]string{"key", "--prefix"}, kvs},
{[]string{"key", "--prefix", "--limit=2"}, kvs[:2]},
{[]string{"key", "--prefix", "--max-bytes=36"}, kvs[:2]},
{[]string{"key", "--prefix", "--order=ASCEND", "--sort-by=MODIFY"}, kvs},
{[]string{"key", "--prefix", "--order=ASCEND", "--sort-by=VERSION"}, kvs},
{[]string{"key", "--prefix", "--sort-by=CREATE"}, kvs}, // ASCEND by default
Expand Down
1 change: 1 addition & 0 deletions tests/framework/config/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type GetOptions struct {
Prefix bool
FromKey bool
Limit int
MaxBytes int
Order clientv3.SortOrder
SortBy clientv3.SortTarget
Timeout time.Duration
Expand Down
3 changes: 3 additions & 0 deletions tests/framework/e2e/etcdctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ func (ctl *EtcdctlV3) Get(ctx context.Context, key string, o config.GetOptions)
if o.Limit != 0 {
args = append(args, fmt.Sprintf("--limit=%d", o.Limit))
}
if o.MaxBytes != 0 {
args = append(args, fmt.Sprintf("--max-bytes=%d", o.MaxBytes))
}
if o.FromKey {
args = append(args, "--from-key")
}
Expand Down
3 changes: 3 additions & 0 deletions tests/framework/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func (c integrationClient) Get(ctx context.Context, key string, o config.GetOpti
if o.Limit != 0 {
clientOpts = append(clientOpts, clientv3.WithLimit(int64(o.Limit)))
}
if o.MaxBytes != 0 {
clientOpts = append(clientOpts, clientv3.WithMaxBytes(int64(o.MaxBytes)))
}
if o.FromKey {
clientOpts = append(clientOpts, clientv3.WithFromKey())
}
Expand Down