From 0bd38ad6aa0cc4ba6a432fff04e3eaf5908c79f7 Mon Sep 17 00:00:00 2001 From: Hitoshi Mitake Date: Tue, 23 Jan 2018 15:27:02 +0900 Subject: [PATCH] *: make Range() fine grained Current Range() tries to read entire requested range of keys in a single read transaction. It can introduce long waiting of writer transactions which can be observed as high latency spikes. For solving this problem, this commit lets Range() split its read transaction in a fine grained manner. In the interval of the read transactions, concurrent write RPCs (e.g. Put(), DeleteRange()) can have a chance of starting their transactions. This commit also adds a new option `--range-max-keys-once` to etcd. With the option, users can specify the maximum number of keys read in a single transaction during Range(). --- embed/config.go | 1 + embed/etcd.go | 1 + etcdmain/config.go | 1 + etcdserver/apply.go | 85 +++++++++++++++++++++++++++++++++++++------- etcdserver/config.go | 2 ++ 5 files changed, 77 insertions(+), 13 deletions(-) diff --git a/embed/config.go b/embed/config.go index 105165a5fc1a..d6a4f6d5ec53 100644 --- a/embed/config.go +++ b/embed/config.go @@ -118,6 +118,7 @@ type Config struct { SnapCount uint64 `json:"snapshot-count"` AutoCompactionRetention string `json:"auto-compaction-retention"` AutoCompactionMode string `json:"auto-compaction-mode"` + RangeMaxKeysOnce uint64 `json:"range-max-keys-once"` // TickMs is the number of milliseconds between heartbeat ticks. // TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1). diff --git a/embed/etcd.go b/embed/etcd.go index c1a94bc457c6..119c1c5b0f03 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -181,6 +181,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, Debug: cfg.Debug, + RangeMaxKeysOnce: cfg.RangeMaxKeysOnce, } if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { diff --git a/etcdmain/config.go b/etcdmain/config.go index 471c73480fb2..2e616dd683bf 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -203,6 +203,7 @@ func newConfig() *config { fs.StringVar(&cfg.ec.AutoCompactionRetention, "auto-compaction-retention", "0", "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.") fs.StringVar(&cfg.ec.AutoCompactionMode, "auto-compaction-mode", "periodic", "interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.") + fs.Uint64Var(&cfg.ec.RangeMaxKeysOnce, "range-max-keys-once", 0, "A number of maximum keys which can be read in a single read transaction during range request. 0 means the maximum number is not limited.") // pprof profiler via HTTP fs.BoolVar(&cfg.ec.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"") diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 93f5ed04cf5d..671732ff03cf 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -17,6 +17,7 @@ package etcdserver import ( "bytes" "context" + "runtime" "sort" "time" @@ -236,15 +237,21 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ return resp, nil } +func nextKey(key []byte) []byte { + for i := len(key) - 1; 0 <= i; i-- { + if key[i] < 0xff { + key[i]++ + return key[:i+1] + } + } + + return []byte{0} +} + func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { resp := &pb.RangeResponse{} resp.Header = &pb.ResponseHeader{} - if txn == nil { - txn = a.s.kv.Read() - defer txn.End() - } - limit := r.Limit if r.SortOrder != pb.RangeRequest_NONE || r.MinModRevision != 0 || r.MaxModRevision != 0 || @@ -257,15 +264,67 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang limit = limit + 1 } - ro := mvcc.RangeOptions{ - Limit: limit, - Rev: r.Revision, - Count: r.CountOnly, - } + var rr *mvcc.RangeResult + var err error + rangeEnd := mkGteRange(r.RangeEnd) + + if txn != nil { + ro := mvcc.RangeOptions{ + Limit: limit, + Rev: r.Revision, + Count: r.CountOnly, + } - rr, err := txn.Range(r.Key, mkGteRange(r.RangeEnd), ro) - if err != nil { - return nil, err + rr, err = txn.Range(r.Key, rangeEnd, ro) + if err != nil { + return nil, err + } + } else { + rev := r.Revision + if rev == 0 { + rev = a.s.kv.Rev() + } + + rr = &mvcc.RangeResult{ + KVs: make([]mvccpb.KeyValue, 0), + Rev: rev, + } + + txnlimit := int64(a.s.Cfg.RangeMaxKeysOnce) + if limit == 0 || txnlimit != 0 && txnlimit < limit { + limit = txnlimit + } + startKey := r.Key + noEnd := bytes.Compare(rangeEnd, []byte{0}) != 0 + for noEnd || bytes.Compare(startKey, rangeEnd) == -1 { + txn := a.s.kv.Read() + + ro := mvcc.RangeOptions{ + Limit: limit, + Rev: rev, + Count: r.CountOnly, + } + + rrpart, err := txn.Range(startKey, rangeEnd, ro) + txn.End() + if err != nil { + return nil, err + } + + if rrpart.Count == 0 { + break + } + + rr.KVs = append(rr.KVs, rrpart.KVs...) + rr.Count += len(rrpart.KVs) + + startKey = nextKey(rrpart.KVs[len(rrpart.KVs)-1].Key) + if bytes.Compare(startKey, []byte{0}) == 0 { + break + } + + runtime.Gosched() + } } if r.MaxModRevision != 0 { diff --git a/etcdserver/config.go b/etcdserver/config.go index 056af745dad0..fa55a84ac394 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -72,6 +72,8 @@ type ServerConfig struct { CorruptCheckTime time.Duration Debug bool + + RangeMaxKeysOnce uint64 } // VerifyBootstrap sanity-checks the initial config for bootstrap case