Skip to content

Commit

Permalink
Merge 9078cae into 3618245
Browse files Browse the repository at this point in the history
  • Loading branch information
little-cui committed May 17, 2018
2 parents 3618245 + 9078cae commit da8ade3
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 59 deletions.
51 changes: 51 additions & 0 deletions pkg/util/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package util

import (
"math"
"time"
)

var DefaultBackoff Backoff = &PowerBackoff{
MaxDelay: 30 * time.Second,
InitDelay: 1 * time.Second,
Factor: 1.6,
}

type Backoff interface {
Delay(retries int) time.Duration
}

// delay = min(MaxDelay, InitDelay * power(Factor, retries))
type PowerBackoff struct {
MaxDelay time.Duration
InitDelay time.Duration
Factor float64
}

func (pb *PowerBackoff) Delay(retries int) time.Duration {
if retries <= 0 {
return pb.InitDelay
}

return time.Duration(math.Min(float64(pb.MaxDelay), float64(pb.InitDelay)*math.Pow(pb.Factor, float64(retries))))
}

func GetBackoff() Backoff {
return DefaultBackoff
}
57 changes: 57 additions & 0 deletions pkg/util/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package util

import (
"testing"
"time"
)

func TestPowerBackoff_Delay(t *testing.T) {
i, m := time.Second, 30*time.Second
b := &PowerBackoff{
MaxDelay: 30 * time.Second,
InitDelay: 1 * time.Second,
Factor: 1.6,
}
r := b.Delay(-1)
if r != i {
t.Fatalf("TestPowerBackoff_Delay -1 failed, result is %s", r)
}
r = b.Delay(0)
if r != i {
t.Fatalf("TestPowerBackoff_Delay 0 failed, result is %s", r)
}
r = b.Delay(1)
if r != 1600*time.Millisecond {
t.Fatalf("TestPowerBackoff_Delay 1 failed, result is %s", r)
}
r = b.Delay(4)
if r != 6553600*time.Microsecond {
t.Fatalf("TestPowerBackoff_Delay 4 failed, result is %s", r)
}
r = b.Delay(8)
if r != m {
t.Fatalf("TestPowerBackoff_Delay 8 failed, result is %s", r)
}
}

func TestGetBackoff(t *testing.T) {
if GetBackoff() == nil {
t.Fatalf("TestGetBackoff failed")
}
}
21 changes: 11 additions & 10 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,25 +142,26 @@ func LogPanic(args ...interface{}) {
}

if strings.Index(file, "service-center") > 0 || strings.Index(file, "servicecenter") > 0 {
idx := strings.LastIndex(file, "/")
if idx >= 0 {
file = file[idx+1:]
}
Logger().Errorf(nil, "recover from %s %s():%d! %s", file, method, line, fmt.Sprint(args...))
Logger().Errorf(nil, "recover from %s %s():%d! %s", FileLastName(file), method, line, fmt.Sprint(args...))
return
}
}

file, method, line, _ := GetCaller(0)
idx := strings.LastIndex(file, "/")
if idx >= 0 {
file = file[idx+1:]
}
fmt.Fprintln(os.Stderr, time.Now().Format("2006-01-02T15:04:05.000Z07:00"), "FATAL", "system", os.Getpid(),
fmt.Sprintf("%s %s():%d", file, method, line), fmt.Sprint(args...))
fmt.Sprintf("%s %s():%d", FileLastName(file), method, line), fmt.Sprint(args...))
fmt.Fprintln(os.Stderr, BytesToStringWithNoCopy(debug.Stack()))
}

func FileLastName(file string) string {
if sp1 := strings.LastIndex(file, "/"); sp1 >= 0 {
if sp2 := strings.LastIndex(file[:sp1], "/"); sp2 >= 0 {
file = file[sp2+1:]
}
}
return file
}

func GetCaller(skip int) (string, string, int, bool) {
pc, file, line, ok := runtime.Caller(skip + 1)
method := FormatFuncName(runtime.FuncForPC(pc).Name())
Expand Down
27 changes: 27 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,30 @@ func TestBytesToInt32(t *testing.T) {
t.FailNow()
}
}

func TestFileLastName(t *testing.T) {
n := FileLastName("")
if n != "" {
t.Fatal("TestFileLastName '' failed", n)
}
n = FileLastName("a")
if n != "a" {
t.Fatal("TestFileLastName 'a' failed", n)
}
n = FileLastName("a/b")
if n != "a/b" {
t.Fatal("TestFileLastName 'a/b' failed", n)
}
n = FileLastName("a/b/c")
if n != "b/c" {
t.Fatal("TestFileLastName 'b/c' failed", n)
}
n = FileLastName("b/")
if n != "b/" {
t.Fatal("TestFileLastName 'b' failed", n)
}
n = FileLastName("/")
if n != "/" {
t.Fatal("TestFileLastName 'b' failed", n)
}
}
6 changes: 1 addition & 5 deletions server/core/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
var (
registryInstance registry.Registry
singletonLock sync.Mutex
wait_delay = []int{1, 1, 5, 10, 20, 30, 60}
)

const (
Expand Down Expand Up @@ -65,10 +64,7 @@ func Registry() registry.Registry {
return registryInstance
}

if i >= len(wait_delay) {
i = len(wait_delay) - 1
}
t := time.Duration(wait_delay[i]) * time.Second
t := util.GetBackoff().Delay(i)
util.Logger().Errorf(nil, "initialize service center failed, retry after %s", t)
<-time.After(t)
}
Expand Down
16 changes: 9 additions & 7 deletions server/core/backend/store/cache_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,21 +212,23 @@ func (c *KvCacher) needDeferHandle(evts []KvEvent) bool {

func (c *KvCacher) refresh(ctx context.Context) {
util.Logger().Debugf("start to list and watch %s", c.Cfg)
retries := 0
for {
start := time.Now()
c.ListAndWatch(ctx)
watchDuration := time.Since(start)
nextPeriod := minWaitInterval
if watchDuration > 0 && c.Cfg.Period > watchDuration {
nextPeriod = c.Cfg.Period - watchDuration
if err := c.ListAndWatch(ctx); err != nil {
nextPeriod = util.GetBackoff().Delay(retries)
retries++
} else {
retries = 0

ReportCacheMetrics(c.Name(), "raw", c.cache.RLock())
c.cache.RUnlock()
}
select {
case <-ctx.Done():
util.Logger().Debugf("stop to list and watch %s", c.Cfg)
return
case <-time.After(nextPeriod):
ReportCacheMetrics(c.Name(), "raw", c.cache.RLock())
c.cache.RUnlock()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/core/backend/store/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ const (
const (
DEFAULT_COMPACT_TIMES = 3
DEFAULT_COMPACT_TIMEOUT = 5 * time.Minute
minWaitInterval = 100 * time.Millisecond
minWaitInterval = 1 * time.Second
eventBlockSize = 1000
)

Expand Down
38 changes: 23 additions & 15 deletions server/core/backend/store/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package store

import (
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
"github.com/apache/incubator-servicecomb-service-center/server/core"
"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
Expand All @@ -35,6 +36,7 @@ type Indexer struct {
cacher Cacher
goroutine *util.GoRoutine
ready chan struct{}
lastMaxSize int
prefixIndex map[string]map[string]struct{}
prefixBuildQueue chan KvEvent
prefixLock sync.RWMutex
Expand All @@ -46,7 +48,8 @@ func (i *Indexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (

key := util.BytesToStringWithNoCopy(op.Key)

if op.Mode == registry.MODE_NO_CACHE ||
if !core.ServerInfo.Config.EnableCache ||
op.Mode == registry.MODE_NO_CACHE ||
op.Revision > 0 ||
(op.Offset >= 0 && op.Limit > 0) {
util.Logger().Debugf("search %s match special options, request etcd server, opts: %s",
Expand Down Expand Up @@ -195,18 +198,27 @@ func (i *Indexer) buildIndex() {
default:
i.addPrefixKey(prefix, key)
}
i.prefixLock.Unlock()

util.LogNilOrWarnf(t, "too long to rebuild(action: %s) index[%d], key is %s",
evt.Type, key, len(i.prefixIndex))
case <-time.After(10 * time.Second):
i.prefixLock.Lock()
if time.Now().Sub(lastCompactTime) >= DEFAULT_COMPACT_TIMEOUT {
// compact
initSize, l := DEFAULT_CACHE_INIT_SIZE, len(i.prefixIndex)
if i.lastMaxSize < l {
i.lastMaxSize = l
}
if initSize >= l &&
i.lastMaxSize >= initSize*DEFAULT_COMPACT_TIMES &&
time.Now().Sub(lastCompactTime) >= DEFAULT_COMPACT_TIMEOUT {
i.compact()
i.lastMaxSize = l
lastCompactTime = time.Now()
}

// report metrics
ReportCacheMetrics(i.cacher.Name(), "index", i.prefixIndex)

i.prefixLock.Unlock()

util.LogNilOrWarnf(t, "too long to rebuild(action: %s) index[%d], key is %s",
evt.Type, key, len(i.prefixIndex))
}
}
util.Logger().Debugf("the goroutine building index %s is stopped", i.cacher.Name())
Expand All @@ -215,11 +227,7 @@ func (i *Indexer) buildIndex() {
}

func (i *Indexer) compact() {
l := len(i.prefixIndex)
if l < DEFAULT_CACHE_INIT_SIZE {
l = DEFAULT_CACHE_INIT_SIZE
}
n := make(map[string]map[string]struct{}, l)
n := make(map[string]map[string]struct{}, DEFAULT_CACHE_INIT_SIZE)
for k, v := range i.prefixIndex {
c, ok := n[k]
if !ok {
Expand All @@ -232,8 +240,8 @@ func (i *Indexer) compact() {
}
i.prefixIndex = n

util.Logger().Infof("index %s(%s): compact root capacity to size %d",
i.cacher.Name(), DEFAULT_COMPACT_TIMEOUT, l)
util.Logger().Infof("index %s: compact root capacity to size %d",
i.cacher.Name(), DEFAULT_CACHE_INIT_SIZE)
}

func (i *Indexer) getPrefixKey(arr *[]string, prefix string) (count int) {
Expand Down Expand Up @@ -301,7 +309,7 @@ func (i *Indexer) Run() {
i.isClose = false
i.prefixLock.Unlock()

if _, ok := i.cacher.(*nullCacher); ok {
if !core.ServerInfo.Config.EnableCache {
util.SafeCloseChan(i.ready)
return
}
Expand Down
2 changes: 1 addition & 1 deletion server/core/backend/store/listwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []KvEvent)) error
return fmt.Errorf("unknown event %s", resp)
}

util.Logger().Infof("watch prefix %s, start rev %d+1, event: %s", lw.Prefix, rev, resp)
util.Logger().Infof("caught event %s, watch prefix %s, start rev %d+1,", resp, lw.Prefix, rev)

lw.setRevision(resp.Revision)

Expand Down
4 changes: 2 additions & 2 deletions server/plugin/infra/registry/embededetcd/embededetcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (

var embedTLSConfig *tls.Config

const START_MANAGER_SERVER_TIMEOUT = 10
const START_MANAGER_SERVER_TIMEOUT = 10 * time.Second

func init() {
mgr.RegisterPlugin(mgr.Plugin{mgr.REGISTRY, "embeded_etcd", getEmbedInstance})
Expand Down Expand Up @@ -458,7 +458,7 @@ func (s *EtcdEmbed) Watch(ctx context.Context, opts ...registry.PluginOpOption)
}

func (s *EtcdEmbed) ReadyNotify() {
timeout := START_MANAGER_SERVER_TIMEOUT * time.Second
timeout := START_MANAGER_SERVER_TIMEOUT
select {
case <-s.Embed.Server.ReadyNotify():
close(s.ready)
Expand Down
9 changes: 1 addition & 8 deletions server/plugin/infra/registry/etcd/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
"github.com/coreos/pkg/capnslog"
"runtime"
"strings"
)

// clientLogger implement from grcplog.LoggerV2s and capnslog.Formatter
Expand All @@ -45,13 +44,7 @@ func (l *clientLogger) getCaller(depth int) string {
if !ok {
return "???"
}

if sp1 := strings.LastIndex(file, "/"); sp1 >= 0 {
if sp2 := strings.LastIndex(file[:sp1], "/"); sp2 >= 0 {
file = file[sp2+1:]
}
}
return fmt.Sprintf("%s:%d", file, line)
return fmt.Sprintf("%s:%d", util.FileLastName(file), line)
}

func (l *clientLogger) Flush() {
Expand Down

0 comments on commit da8ade3

Please sign in to comment.