Skip to content

Commit

Permalink
New features: Add paging option in registry client and metrics key de…
Browse files Browse the repository at this point in the history
…sign. (#158)

* New features: Add paging option in registry client and metrics key design.

* New features: Add paging option in registry client and metrics key design.

* Optimize log print in paging function.

* Change metrics key design and optimize log print.
  • Loading branch information
little-cui authored and asifdxtreme committed Nov 9, 2017
1 parent 71e7458 commit b7710a3
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 29 deletions.
17 changes: 17 additions & 0 deletions server/core/key_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
REGISTRY_LEASE_KEY = "leases"
REGISTRY_DEPENDENCY_KEY = "deps"
REGISTRY_DEPS_RULE_KEY = "dep-rules"
REGISTRY_METRICS_KEY = "metrics"
)

func GetRootKey() string {
Expand Down Expand Up @@ -346,3 +347,19 @@ func GetSystemKey() string {
REGISTRY_SYS_KEY,
}, "/")
}

func GetMetricsRootKey() string {
return util.StringJoin([]string{
GetRootKey(),
REGISTRY_METRICS_KEY,
}, "/")
}

func GenerateMetricsKey(name, utc, tenant string) string {
return util.StringJoin([]string{
GetMetricsRootKey(),
name,
utc,
tenant,
}, "/")
}
47 changes: 27 additions & 20 deletions server/core/registry/etcd/remote_etcd_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
const (
REGISTRY_PLUGIN_ETCD = "etcd"
CONNECT_MANAGER_SERVER_TIMEOUT = 10
DEFAULT_PAGE_COUNT = 4096 // grpc does not allow to transport a large body more then 4MB in a request.
)

var clientTLSConfig *tls.Config
Expand Down Expand Up @@ -219,11 +218,11 @@ func (c *EtcdClient) PutNoOverride(ctx context.Context, opts ...registry.PluginO
util.Logger().Errorf(err, "PutNoOverride %s failed", op.Key)
return false, err
}
util.Logger().Infof("response %s %v %v", op.Key, resp.Succeeded, resp.Revision)
util.Logger().Debugf("response %s %v %v", op.Key, resp.Succeeded, resp.Revision)
return resp.Succeeded, nil
}

func (c *EtcdClient) paging(ctx context.Context, op registry.PluginOp, countPerPage int) (*clientv3.GetResponse, error) {
func (c *EtcdClient) paging(ctx context.Context, op registry.PluginOp) (*clientv3.GetResponse, error) {
var etcdResp *clientv3.GetResponse
key := util.BytesToStringWithNoCopy(op.Key)

Expand All @@ -235,8 +234,8 @@ func (c *EtcdClient) paging(ctx context.Context, op registry.PluginOp, countPerP
return nil, err
}

recordCount := int(coutResp.Count)
if recordCount < countPerPage {
recordCount := coutResp.Count
if op.Offset == -1 && recordCount < op.Limit {
return nil, nil // no paging
}

Expand All @@ -245,45 +244,53 @@ func (c *EtcdClient) paging(ctx context.Context, op registry.PluginOp, countPerP
tempOp.Prefix = false
tempOp.SortOrder = registry.SORT_ASCEND
tempOp.EndKey = op.Key
if len(op.EndKey) > 0 {
tempOp.EndKey = op.EndKey
}
tempOp.Revision = coutResp.Header.Revision

etcdResp = coutResp
etcdResp.Kvs = make([]*mvccpb.KeyValue, 0, etcdResp.Count)

pageCount := recordCount / countPerPage
remainCount := recordCount % countPerPage
pageCount := recordCount / op.Limit
remainCount := recordCount % op.Limit
if remainCount > 0 {
pageCount++
}

baseOps := []clientv3.OpOption{}
baseOps = append(baseOps, c.toGetRequest(tempOp)...)

for i := 0; i < pageCount; i++ {
limit := countPerPage
if i == pageCount-1 {
nextKey := key
for i := int64(0); i < pageCount; i++ {
limit := op.Limit
if remainCount > 0 && i == pageCount-1 {
limit = remainCount
}
ops := append(baseOps, clientv3.WithLimit(int64(limit)))
recordResp, err := c.Client.Get(ctx, key, ops...)
recordResp, err := c.Client.Get(ctx, nextKey, ops...)
if err != nil {
return nil, err
}
l := int64(len(recordResp.Kvs))
nextKey := recordResp.Kvs[l-1].Key
key = clientv3.GetPrefixRangeEnd(util.BytesToStringWithNoCopy(nextKey))
nextKey = clientv3.GetPrefixRangeEnd(util.BytesToStringWithNoCopy(recordResp.Kvs[l-1].Key))

if op.Offset >= 0 && (op.Offset < i*op.Limit || op.Offset >= (i+1)*op.Limit) {
continue
}
etcdResp.Kvs = append(etcdResp.Kvs, recordResp.Kvs...)
}

util.LogInfoOrWarnf(start, "get too many KeyValues(%s) from etcdserver, now paging.(%d vs %d)",
key, recordCount, countPerPage)
if op.Offset == -1 {
util.LogInfoOrWarnf(start, "get too many KeyValues(%s) from etcdserver, now paging.(%d vs %d)",
key, recordCount, op.Limit)
}

// too slow
if op.SortOrder == registry.SORT_DESCEND {
t := time.Now()
var last int
for i := 0; i < recordCount; i++ {
last = recordCount - i - 1
for i := int64(0); i < recordCount; i++ {
last := recordCount - i - 1
if last <= i {
break
}
Expand All @@ -307,8 +314,8 @@ func (c *EtcdClient) Do(ctx context.Context, opts ...registry.PluginOpOption) (*
var etcdResp *clientv3.GetResponse
key := util.BytesToStringWithNoCopy(op.Key)

if op.Prefix && !op.CountOnly {
etcdResp, err = c.paging(ctx, op, DEFAULT_PAGE_COUNT)
if (op.Prefix || len(op.EndKey) > 0) && !op.CountOnly {
etcdResp, err = c.paging(ctx, op)
if err != nil {
break
}
Expand Down
17 changes: 13 additions & 4 deletions server/core/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ const (
REQUEST_TIMEOUT = 300

MAX_TXN_NUMBER_ONE_TIME = 128

DEFAULT_PAGE_COUNT = 4096 // grpc does not allow to transport a large body more then 4MB in a request.
)

type Registry interface {
Expand Down Expand Up @@ -195,14 +197,16 @@ type PluginOp struct {
IgnoreLease bool
Mode CacheMode
WatchCallback WatchCallback
Offset int64
Limit int64
}

func (op PluginOp) String() string {
return fmt.Sprintf(
"{action: %s, key: %s, end: %s, val: %d, prefix: %t, prev: %t, lease: %d, keyOnly: %t, countOnly: %t, sort: %s, rev: %d, ignoreLease: %t, mode: %s}",
op.Action, op.Key, op.EndKey, len(util.BytesToStringWithNoCopy(op.Value)),
"{mode: %s, action: %s, key: %s, end: %s, val: %d, prefix: %t, prev: %t, lease: %d, keyOnly: %t, countOnly: %t, sort: %s, rev: %d, ignoreLease: %t, offset: %d, limit: %d}",
op.Mode, op.Action, op.Key, op.EndKey, len(util.BytesToStringWithNoCopy(op.Value)),
op.Prefix, op.PrevKV, op.Lease, op.KeyOnly, op.CountOnly,
op.SortOrder, op.Revision, op.IgnoreLease, op.Mode,
op.SortOrder, op.Revision, op.IgnoreLease, op.Offset, op.Limit,
)
}

Expand Down Expand Up @@ -236,7 +240,8 @@ func WithWatchCallback(f WatchCallback) PluginOpOption {
func WithStrKey(key string) PluginOpOption { return WithKey(util.StringToBytesWithNoCopy(key)) }
func WithStrEndKey(key string) PluginOpOption { return WithEndKey(util.StringToBytesWithNoCopy(key)) }
func WithStrValue(value string) PluginOpOption { return WithValue(util.StringToBytesWithNoCopy(value)) }

func WithOffset(i int64) PluginOpOption { return func(op *PluginOp) { op.Offset = i } }
func WithLimit(i int64) PluginOpOption { return func(op *PluginOp) { op.Limit = i } }
func WatchPrefixOpOptions(key string) []PluginOpOption {
return []PluginOpOption{GET, WithStrKey(key), WithPrefix(), WithPrevKv()}
}
Expand All @@ -260,6 +265,10 @@ func OptionsToOp(opts ...PluginOpOption) (op PluginOp) {
for _, opt := range opts {
opt(&op)
}
if op.Limit == 0 {
op.Offset = -1
op.Limit = DEFAULT_PAGE_COUNT
}
return
}

Expand Down
12 changes: 7 additions & 5 deletions server/core/registry/store/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ func (i *Indexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (

key := util.BytesToStringWithNoCopy(op.Key)

if op.Mode == registry.MODE_NO_CACHE || op.Revision > 0 {
util.Logger().Debugf("search %s match WitchNoCache or WitchRev, request etcd server, key: %s",
i.cacheType, key)
if op.Mode == registry.MODE_NO_CACHE ||
op.Revision > 0 ||
(op.Offset >= 0 && op.Limit > 0) {
util.Logger().Debugf("search %s match special options, request etcd server, opts: %s",
i.cacheType, op)
return registry.GetRegisterCenter().Do(ctx, opts...)
}

if op.Prefix {
resp, err := i.searchPrefixKeyWithCache(ctx, &op)
resp, err := i.searchPrefixKeyWithCache(ctx, op)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -116,7 +118,7 @@ func (i *Indexer) Cache() Cache {
return i.cacher.Cache()
}

func (i *Indexer) searchPrefixKeyWithCache(ctx context.Context, op *registry.PluginOp) (*registry.PluginResponse, error) {
func (i *Indexer) searchPrefixKeyWithCache(ctx context.Context, op registry.PluginOp) (*registry.PluginResponse, error) {
resp := &registry.PluginResponse{
Action: op.Action,
Kvs: []*mvccpb.KeyValue{},
Expand Down

0 comments on commit b7710a3

Please sign in to comment.