diff --git a/clientv3/kv.go b/clientv3/kv.go index 5a7469bd4c91..b14a3efbdf85 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -15,9 +15,11 @@ package clientv3 import ( + "bytes" "context" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/keyutil" "google.golang.org/grpc" ) @@ -141,12 +143,63 @@ func (kv *kv) Txn(ctx context.Context) Txn { } } +func (kv *kv) doRange(ctx context.Context, op Op) (*pb.RangeResponse, error) { + var resp pb.RangeResponse + resp.Header = &pb.ResponseHeader{} + + totalLimit := op.limit + if op.limit == 0 || op.rangeMaxKeysOnce != 0 && op.rangeMaxKeysOnce < op.limit { + op.limit = op.rangeMaxKeysOnce + } + + rr := op.toRangeRequest() + + startKey := rr.Key + rangeEnd := keyutil.MkGteRange(op.end) + noEnd := bytes.Compare(rangeEnd, []byte{0}) != 0 + + rev := rr.Revision + + for noEnd || bytes.Compare(startKey, rangeEnd) == -1 { + r, err := kv.remote.Range(ctx, rr, kv.callOpts...) + if err != nil { + return nil, err + } + + if resp.Header == nil || rev == 0 { + // revision wasn't specified by the caller, let's use the revision of the first result + rev = r.Header.Revision + rr.Revision = rev + + resp.Header = r.Header // reuse first response's header + } + + if resp.Count == 0 { + break + } + + resp.Kvs = append(resp.Kvs, r.Kvs...) + resp.Count += int64(len(r.Kvs)) + + if totalLimit != 0 && resp.Count == totalLimit { + break + } + + startKey = keyutil.NextKey(r.Kvs[len(r.Kvs)-1].Key) + if bytes.Compare(startKey, []byte{0}) == 0 { + break + } + } + + return &resp, nil +} + func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { var err error switch op.t { case tRange: var resp *pb.RangeResponse - resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...) + resp, err = kv.doRange(ctx, op) if err == nil { return OpResponse{get: (*GetResponse)(resp)}, nil } diff --git a/clientv3/op.go b/clientv3/op.go index c6ec5bf5200c..be9c03f48863 100644 --- a/clientv3/op.go +++ b/clientv3/op.go @@ -48,7 +48,8 @@ type Op struct { maxCreateRev int64 // for range, watch - rev int64 + rev int64 + rangeMaxKeysOnce int64 // for watch, put, delete prevKV bool @@ -311,6 +312,10 @@ func WithLease(leaseID LeaseID) OpOption { // If WithLimit is given a 0 limit, it is treated as no limit. func WithLimit(n int64) OpOption { return func(op *Op) { op.limit = n } } +// WithRangeMaxKeysOnce limits the number of read keys in a single read transaction +// during Range() RPC. If WithRangeMaxKeysOnce is given a 0 limit, it is treated as no limit. +func WithRangeMaxKeysOnce(n int64) OpOption { return func(op *Op) { op.rangeMaxKeysOnce = n } } + // WithRev specifies the store revision for 'Get' request. // Or the start revision of 'Watch' request. func WithRev(rev int64) OpOption { return func(op *Op) { op.rev = rev } } diff --git a/etcdctl/ctlv3/command/get_command.go b/etcdctl/ctlv3/command/get_command.go index 5f9c74f3c552..a6d35c1a2ddc 100644 --- a/etcdctl/ctlv3/command/get_command.go +++ b/etcdctl/ctlv3/command/get_command.go @@ -23,15 +23,16 @@ import ( ) var ( - getConsistency string - getLimit int64 - getSortOrder string - getSortTarget string - getPrefix bool - getFromKey bool - getRev int64 - getKeysOnly bool - printValueOnly bool + getConsistency string + getLimit int64 + getSortOrder string + getSortTarget string + getPrefix bool + getFromKey bool + getRev int64 + getKeysOnly bool + printValueOnly bool + getRangeMaxKeysOnce int64 ) // NewGetCommand returns the cobra command for "get". @@ -51,6 +52,8 @@ func NewGetCommand() *cobra.Command { cmd.Flags().Int64Var(&getRev, "rev", 0, "Specify the kv revision") cmd.Flags().BoolVar(&getKeysOnly, "keys-only", false, "Get only the keys") cmd.Flags().BoolVar(&printValueOnly, "print-value-only", false, `Only write values when using the "simple" output format`) + cmd.Flags().Int64Var(&getRangeMaxKeysOnce, "range-max-keys-once", 0, "Specify the maximum number of read keys in a single read transaction") + return cmd } @@ -159,5 +162,9 @@ func getGetOp(cmd *cobra.Command, args []string) (string, []clientv3.OpOption) { opts = append(opts, clientv3.WithKeysOnly()) } + if getRangeMaxKeysOnce != 0 { + opts = append(opts, clientv3.WithRangeMaxKeysOnce(getRangeMaxKeysOnce)) + } + return key, opts } diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 671732ff03cf..04edbf611945 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -26,6 +26,7 @@ import ( "github.com/coreos/etcd/internal/lease" "github.com/coreos/etcd/internal/mvcc" "github.com/coreos/etcd/internal/mvcc/mvccpb" + "github.com/coreos/etcd/pkg/keyutil" "github.com/coreos/etcd/pkg/types" "github.com/gogo/protobuf/proto" @@ -213,7 +214,7 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { resp := &pb.DeleteRangeResponse{} resp.Header = &pb.ResponseHeader{} - end := mkGteRange(dr.RangeEnd) + end := keyutil.MkGteRange(dr.RangeEnd) if txn == nil { txn = a.s.kv.Write() @@ -237,17 +238,6 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ return resp, nil } -func nextKey(key []byte) []byte { - for i := len(key) - 1; 0 <= i; i-- { - if key[i] < 0xff { - key[i]++ - return key[:i+1] - } - } - - return []byte{0} -} - func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { resp := &pb.RangeResponse{} resp.Header = &pb.ResponseHeader{} @@ -266,7 +256,7 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang var rr *mvcc.RangeResult var err error - rangeEnd := mkGteRange(r.RangeEnd) + rangeEnd := keyutil.MkGteRange(r.RangeEnd) if txn != nil { ro := mvcc.RangeOptions{ @@ -318,7 +308,7 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang rr.KVs = append(rr.KVs, rrpart.KVs...) rr.Count += len(rrpart.KVs) - startKey = nextKey(rrpart.KVs[len(rrpart.KVs)-1].Key) + startKey = keyutil.NextKey(rrpart.KVs[len(rrpart.KVs)-1].Key) if bytes.Compare(startKey, []byte{0}) == 0 { break } @@ -491,7 +481,7 @@ func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool { // * rewrite rules for common patterns: // ex. "[a, b) createrev > 0" => "limit 1 /\ kvs > 0" // * caching - rr, err := rv.Range(c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{}) + rr, err := rv.Range(c.Key, keyutil.MkGteRange(c.RangeEnd), mvcc.RangeOptions{}) if err != nil { return false } @@ -974,17 +964,6 @@ func compareInt64(a, b int64) int { } } -// mkGteRange determines if the range end is a >= range. This works around grpc -// sending empty byte strings as nil; >= is encoded in the range end as '\0'. -// If it is a GTE range, then []byte{} is returned to indicate the empty byte -// string (vs nil being no byte string). -func mkGteRange(rangeEnd []byte) []byte { - if len(rangeEnd) == 1 && rangeEnd[0] == 0 { - return []byte{} - } - return rangeEnd -} - func noSideEffect(r *pb.InternalRaftRequest) bool { return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil } diff --git a/pkg/keyutil/keyutil.go b/pkg/keyutil/keyutil.go new file mode 100644 index 000000000000..3794773a4459 --- /dev/null +++ b/pkg/keyutil/keyutil.go @@ -0,0 +1,40 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package keyutil exports utility functions related to etcd3's key name. + +package keyutil + +// NextKey returns a name of next key of the given key. The key and the next key don't have any keys between them. +func NextKey(key []byte) []byte { + for i := len(key) - 1; 0 <= i; i-- { + if key[i] < 0xff { + key[i]++ + return key[:i+1] + } + } + + return []byte{0} +} + +// MkGteRange determines if the range end is a >= range. This works around grpc +// sending empty byte strings as nil; >= is encoded in the range end as '\0'. +// If it is a GTE range, then []byte{} is returned to indicate the empty byte +// string (vs nil being no byte string). +func MkGteRange(rangeEnd []byte) []byte { + if len(rangeEnd) == 1 && rangeEnd[0] == 0 { + return []byte{} + } + return rangeEnd +}