Skip to content

Commit

Permalink
*: make Range() fine grained
Browse files Browse the repository at this point in the history
Current Range() tries to read entire requested range of keys in a
single read transaction. It can introduce long waiting of writer
transactions which can be observed as high latency spikes. For solving
this problem, this commit lets Range() split its read transaction in a
fine grained manner. In the interval of the read transactions,
concurrent write RPCs (e.g. Put(), DeleteRange()) can have a chance of
starting their transactions.

This commit also adds a new option `--range-max-keys-once` to
etcd. With the option, users can specify the maximum number of keys
read in a single transaction during Range().
  • Loading branch information
mitake committed Jan 31, 2018
1 parent 42ef97d commit 0bd38ad
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 13 deletions.
1 change: 1 addition & 0 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type Config struct {
SnapCount uint64 `json:"snapshot-count"`
AutoCompactionRetention string `json:"auto-compaction-retention"`
AutoCompactionMode string `json:"auto-compaction-mode"`
RangeMaxKeysOnce uint64 `json:"range-max-keys-once"`

// TickMs is the number of milliseconds between heartbeat ticks.
// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).
Expand Down
1 change: 1 addition & 0 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
Debug: cfg.Debug,
RangeMaxKeysOnce: cfg.RangeMaxKeysOnce,
}

if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
Expand Down
1 change: 1 addition & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func newConfig() *config {

fs.StringVar(&cfg.ec.AutoCompactionRetention, "auto-compaction-retention", "0", "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.")
fs.StringVar(&cfg.ec.AutoCompactionMode, "auto-compaction-mode", "periodic", "interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.")
fs.Uint64Var(&cfg.ec.RangeMaxKeysOnce, "range-max-keys-once", 0, "A number of maximum keys which can be read in a single read transaction during range request. 0 means the maximum number is not limited.")

// pprof profiler via HTTP
fs.BoolVar(&cfg.ec.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"")
Expand Down
85 changes: 72 additions & 13 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package etcdserver
import (
"bytes"
"context"
"runtime"
"sort"
"time"

Expand Down Expand Up @@ -236,15 +237,21 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
return resp, nil
}

func nextKey(key []byte) []byte {
for i := len(key) - 1; 0 <= i; i-- {
if key[i] < 0xff {
key[i]++
return key[:i+1]
}
}

return []byte{0}
}

func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}

if txn == nil {
txn = a.s.kv.Read()
defer txn.End()
}

limit := r.Limit
if r.SortOrder != pb.RangeRequest_NONE ||
r.MinModRevision != 0 || r.MaxModRevision != 0 ||
Expand All @@ -257,15 +264,67 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
limit = limit + 1
}

ro := mvcc.RangeOptions{
Limit: limit,
Rev: r.Revision,
Count: r.CountOnly,
}
var rr *mvcc.RangeResult
var err error
rangeEnd := mkGteRange(r.RangeEnd)

if txn != nil {
ro := mvcc.RangeOptions{
Limit: limit,
Rev: r.Revision,
Count: r.CountOnly,
}

rr, err := txn.Range(r.Key, mkGteRange(r.RangeEnd), ro)
if err != nil {
return nil, err
rr, err = txn.Range(r.Key, rangeEnd, ro)
if err != nil {
return nil, err
}
} else {
rev := r.Revision
if rev == 0 {
rev = a.s.kv.Rev()
}

rr = &mvcc.RangeResult{
KVs: make([]mvccpb.KeyValue, 0),
Rev: rev,
}

txnlimit := int64(a.s.Cfg.RangeMaxKeysOnce)
if limit == 0 || txnlimit != 0 && txnlimit < limit {
limit = txnlimit
}
startKey := r.Key
noEnd := bytes.Compare(rangeEnd, []byte{0}) != 0
for noEnd || bytes.Compare(startKey, rangeEnd) == -1 {
txn := a.s.kv.Read()

ro := mvcc.RangeOptions{
Limit: limit,
Rev: rev,
Count: r.CountOnly,
}

rrpart, err := txn.Range(startKey, rangeEnd, ro)
txn.End()
if err != nil {
return nil, err
}

if rrpart.Count == 0 {
break
}

rr.KVs = append(rr.KVs, rrpart.KVs...)
rr.Count += len(rrpart.KVs)

startKey = nextKey(rrpart.KVs[len(rrpart.KVs)-1].Key)
if bytes.Compare(startKey, []byte{0}) == 0 {
break
}

runtime.Gosched()
}
}

if r.MaxModRevision != 0 {
Expand Down
2 changes: 2 additions & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type ServerConfig struct {
CorruptCheckTime time.Duration

Debug bool

RangeMaxKeysOnce uint64
}

// VerifyBootstrap sanity-checks the initial config for bootstrap case
Expand Down

0 comments on commit 0bd38ad

Please sign in to comment.