Skip to content

Commit

Permalink
*: add a client side mechanism for limiting max number of keys in a s…
Browse files Browse the repository at this point in the history
…ingle range txn

This commit adds a similar mechanism of making a read txn smaller in
the client side. This is good for reducing peak memory usage of the
server.

This commit adds a new flag --max-range-keys-once to `etcdctl
get`. With specifying a value with the option, users can use this
feature during the execution of the command.
  • Loading branch information
mitake committed Jan 31, 2018
1 parent 0bd38ad commit 203ffe8
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 37 deletions.
55 changes: 54 additions & 1 deletion clientv3/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package clientv3

import (
"bytes"
"context"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/keyutil"

"google.golang.org/grpc"
)
Expand Down Expand Up @@ -141,12 +143,63 @@ func (kv *kv) Txn(ctx context.Context) Txn {
}
}

func (kv *kv) doRange(ctx context.Context, op Op) (*pb.RangeResponse, error) {
var resp pb.RangeResponse
resp.Header = &pb.ResponseHeader{}

totalLimit := op.limit
if op.limit == 0 || op.rangeMaxKeysOnce != 0 && op.rangeMaxKeysOnce < op.limit {
op.limit = op.rangeMaxKeysOnce
}

rr := op.toRangeRequest()

startKey := rr.Key
rangeEnd := keyutil.MkGteRange(op.end)
noEnd := bytes.Compare(rangeEnd, []byte{0}) != 0

rev := rr.Revision

for noEnd || bytes.Compare(startKey, rangeEnd) == -1 {
r, err := kv.remote.Range(ctx, rr, kv.callOpts...)
if err != nil {
return nil, err
}

if resp.Header == nil || rev == 0 {
// revision wasn't specified by the caller, let's use the revision of the first result
rev = r.Header.Revision
rr.Revision = rev

resp.Header = r.Header // reuse first response's header
}

if resp.Count == 0 {
break
}

resp.Kvs = append(resp.Kvs, r.Kvs...)
resp.Count += int64(len(r.Kvs))

if totalLimit != 0 && resp.Count == totalLimit {
break
}

startKey = keyutil.NextKey(r.Kvs[len(r.Kvs)-1].Key)
if bytes.Compare(startKey, []byte{0}) == 0 {
break
}
}

return &resp, nil
}

func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
var err error
switch op.t {
case tRange:
var resp *pb.RangeResponse
resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...)
resp, err = kv.doRange(ctx, op)
if err == nil {
return OpResponse{get: (*GetResponse)(resp)}, nil
}
Expand Down
7 changes: 6 additions & 1 deletion clientv3/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ type Op struct {
maxCreateRev int64

// for range, watch
rev int64
rev int64
rangeMaxKeysOnce int64

// for watch, put, delete
prevKV bool
Expand Down Expand Up @@ -311,6 +312,10 @@ func WithLease(leaseID LeaseID) OpOption {
// If WithLimit is given a 0 limit, it is treated as no limit.
func WithLimit(n int64) OpOption { return func(op *Op) { op.limit = n } }

// WithRangeMaxKeysOnce limits the number of read keys in a single read transaction
// during Range() RPC. If WithRangeMaxKeysOnce is given a 0 limit, it is treated as no limit.
func WithRangeMaxKeysOnce(n int64) OpOption { return func(op *Op) { op.rangeMaxKeysOnce = n } }

// WithRev specifies the store revision for 'Get' request.
// Or the start revision of 'Watch' request.
func WithRev(rev int64) OpOption { return func(op *Op) { op.rev = rev } }
Expand Down
25 changes: 16 additions & 9 deletions etcdctl/ctlv3/command/get_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ import (
)

var (
getConsistency string
getLimit int64
getSortOrder string
getSortTarget string
getPrefix bool
getFromKey bool
getRev int64
getKeysOnly bool
printValueOnly bool
getConsistency string
getLimit int64
getSortOrder string
getSortTarget string
getPrefix bool
getFromKey bool
getRev int64
getKeysOnly bool
printValueOnly bool
getRangeMaxKeysOnce int64
)

// NewGetCommand returns the cobra command for "get".
Expand All @@ -51,6 +52,8 @@ func NewGetCommand() *cobra.Command {
cmd.Flags().Int64Var(&getRev, "rev", 0, "Specify the kv revision")
cmd.Flags().BoolVar(&getKeysOnly, "keys-only", false, "Get only the keys")
cmd.Flags().BoolVar(&printValueOnly, "print-value-only", false, `Only write values when using the "simple" output format`)
cmd.Flags().Int64Var(&getRangeMaxKeysOnce, "range-max-keys-once", 0, "Specify the maximum number of read keys in a single read transaction")

return cmd
}

Expand Down Expand Up @@ -159,5 +162,9 @@ func getGetOp(cmd *cobra.Command, args []string) (string, []clientv3.OpOption) {
opts = append(opts, clientv3.WithKeysOnly())
}

if getRangeMaxKeysOnce != 0 {
opts = append(opts, clientv3.WithRangeMaxKeysOnce(getRangeMaxKeysOnce))
}

return key, opts
}
31 changes: 5 additions & 26 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/coreos/etcd/internal/lease"
"github.com/coreos/etcd/internal/mvcc"
"github.com/coreos/etcd/internal/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/keyutil"
"github.com/coreos/etcd/pkg/types"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -213,7 +214,7 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu
func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
resp := &pb.DeleteRangeResponse{}
resp.Header = &pb.ResponseHeader{}
end := mkGteRange(dr.RangeEnd)
end := keyutil.MkGteRange(dr.RangeEnd)

if txn == nil {
txn = a.s.kv.Write()
Expand All @@ -237,17 +238,6 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
return resp, nil
}

func nextKey(key []byte) []byte {
for i := len(key) - 1; 0 <= i; i-- {
if key[i] < 0xff {
key[i]++
return key[:i+1]
}
}

return []byte{0}
}

func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}
Expand All @@ -266,7 +256,7 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang

var rr *mvcc.RangeResult
var err error
rangeEnd := mkGteRange(r.RangeEnd)
rangeEnd := keyutil.MkGteRange(r.RangeEnd)

if txn != nil {
ro := mvcc.RangeOptions{
Expand Down Expand Up @@ -318,7 +308,7 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
rr.KVs = append(rr.KVs, rrpart.KVs...)
rr.Count += len(rrpart.KVs)

startKey = nextKey(rrpart.KVs[len(rrpart.KVs)-1].Key)
startKey = keyutil.NextKey(rrpart.KVs[len(rrpart.KVs)-1].Key)
if bytes.Compare(startKey, []byte{0}) == 0 {
break
}
Expand Down Expand Up @@ -491,7 +481,7 @@ func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
// * rewrite rules for common patterns:
// ex. "[a, b) createrev > 0" => "limit 1 /\ kvs > 0"
// * caching
rr, err := rv.Range(c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{})
rr, err := rv.Range(c.Key, keyutil.MkGteRange(c.RangeEnd), mvcc.RangeOptions{})
if err != nil {
return false
}
Expand Down Expand Up @@ -974,17 +964,6 @@ func compareInt64(a, b int64) int {
}
}

// mkGteRange determines if the range end is a >= range. This works around grpc
// sending empty byte strings as nil; >= is encoded in the range end as '\0'.
// If it is a GTE range, then []byte{} is returned to indicate the empty byte
// string (vs nil being no byte string).
func mkGteRange(rangeEnd []byte) []byte {
if len(rangeEnd) == 1 && rangeEnd[0] == 0 {
return []byte{}
}
return rangeEnd
}

func noSideEffect(r *pb.InternalRaftRequest) bool {
return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/keyutil/keyutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package keyutil exports utility functions related to etcd3's key name.

package keyutil

// NextKey returns a name of next key of the given key. The key and the next key don't have any keys between them.
func NextKey(key []byte) []byte {
for i := len(key) - 1; 0 <= i; i-- {
if key[i] < 0xff {
key[i]++
return key[:i+1]
}
}

return []byte{0}
}

// MkGteRange determines if the range end is a >= range. This works around grpc
// sending empty byte strings as nil; >= is encoded in the range end as '\0'.
// If it is a GTE range, then []byte{} is returned to indicate the empty byte
// string (vs nil being no byte string).
func MkGteRange(rangeEnd []byte) []byte {
if len(rangeEnd) == 1 && rangeEnd[0] == 0 {
return []byte{}
}
return rangeEnd
}

0 comments on commit 203ffe8

Please sign in to comment.