Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SC plugin manager and Add service-center error codes. #167

Merged
merged 13 commits into from
Nov 20, 2017
4 changes: 0 additions & 4 deletions etc/conf/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ httpport = 30100
# 'etcd' means app running as an etcd agent
# 'embeded_etcd' means app running as an etcd server
registry_plugin = etcd
# Indicates the time in seconds that service center waits since it receive
# the last heartbeat before it remove the instance from registry service
registry_lease_duration = 30s

# registry address
# registry_plugin equals to 'embeded_etcd', example:
Expand Down Expand Up @@ -46,7 +43,6 @@ auth_plugin = ""

#support om, manage
auditlog_plugin = ""
audit_log_dir = ""

#Rate-limit options
#ttl=m, s, ms
Expand Down
2 changes: 1 addition & 1 deletion integration/microservices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ var _ = Describe("MicroService Api Test", func() {
resp, _ := scclient.Do(req)
respbody, _ := ioutil.ReadAll(resp.Body)
Expect(resp.StatusCode).To(Equal(http.StatusOK))
Expect(string(respbody)).To(Equal("{}"))
Expect(strings.TrimSpace(string(respbody))).To(Equal("{}"))
})
})

Expand Down
10 changes: 7 additions & 3 deletions integration/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,17 @@ var _ = Describe("MicroService Api Test", func() {
Expect(resp.StatusCode).To(Equal(http.StatusOK))

//Duplicate Request
bodyBuf = bytes.NewReader(body)
req, _ = http.NewRequest(POST, SCURL+url, bodyBuf)
req.Header.Set("X-Domain-Name", "default")
resp, err = scclient.Do(req)
Expect(err).To(BeNil())
defer resp.Body.Close()

Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
Expect(resp.StatusCode).To(Equal(http.StatusOK))

respbody, _ := ioutil.ReadAll(resp.Body)
Expect(strings.TrimSpace(string(respbody))).To(Equal("{}"))
})
})

Expand Down Expand Up @@ -243,7 +247,7 @@ var _ = Describe("MicroService Api Test", func() {
resp, _ := scclient.Do(req)
respbody, _ := ioutil.ReadAll(resp.Body)
Expect(resp.StatusCode).To(Equal(http.StatusOK))
Expect(string(respbody)).To(Equal("{}"))
Expect(strings.TrimSpace(string(respbody))).To(Equal("{}"))
})

It("Get Rules for Invalid MicroService", func() {
Expand Down Expand Up @@ -482,7 +486,7 @@ var _ = Describe("MicroService Api Test", func() {
resp, _ = scclient.Do(req)
respbody, _ = ioutil.ReadAll(resp.Body)
Expect(resp.StatusCode).To(Equal(http.StatusOK))
Expect(string(respbody)).To(Equal("{}"))
Expect(strings.TrimSpace(string(respbody))).To(Equal("{}"))
})

It("Delete MicroService rules with non-exsisting ruleID", func() {
Expand Down
11 changes: 6 additions & 5 deletions pkg/etcdsync/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ package etcdsync
import (
"fmt"
"github.com/ServiceComb/service-center/pkg/util"
"github.com/ServiceComb/service-center/server/core/registry"
"github.com/ServiceComb/service-center/server/core/backend"
"github.com/ServiceComb/service-center/server/infra/registry"
"github.com/coreos/etcd/client"
"golang.org/x/net/context"
"io"
Expand Down Expand Up @@ -127,13 +128,13 @@ func (m *Locker) Lock() error {

putOpts := opts
if m.builder.ttl > 0 {
leaseID, err := registry.GetRegisterCenter().LeaseGrant(m.builder.ctx, m.builder.ttl)
leaseID, err := backend.Registry().LeaseGrant(m.builder.ctx, m.builder.ttl)
if err != nil {
return err
}
putOpts = append(opts, registry.WithLease(leaseID))
}
success, err := registry.GetRegisterCenter().PutNoOverride(m.builder.ctx, putOpts...)
success, err := backend.Registry().PutNoOverride(m.builder.ctx, putOpts...)
if err == nil && success {
util.Logger().Infof("Create Lock OK, key=%s, id=%s", m.builder.key, m.id)
return nil
Expand All @@ -142,7 +143,7 @@ func (m *Locker) Lock() error {

ctx, cancel := context.WithTimeout(m.builder.ctx, defaultTTL*time.Second)
go func() {
err := registry.GetRegisterCenter().Watch(ctx,
err := backend.Registry().Watch(ctx,
registry.WithStrKey(m.builder.key),
registry.WithWatchCallback(
func(message string, evt *registry.PluginResponse) error {
Expand Down Expand Up @@ -179,7 +180,7 @@ func (m *Locker) Unlock() (err error) {
registry.WithStrKey(m.builder.key)}

for i := 1; i <= defaultTry; i++ {
_, err = registry.GetRegisterCenter().Do(m.builder.ctx, opts...)
_, err = backend.Registry().Do(m.builder.ctx, opts...)
if err == nil {
if !IsDebug {
m.builder.mutex.Unlock()
Expand Down
5 changes: 2 additions & 3 deletions pkg/rest/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import (
"encoding/pem"
"github.com/ServiceComb/service-center/pkg/tlsutil"
"github.com/ServiceComb/service-center/pkg/util"
"github.com/ServiceComb/service-center/server/infra/security"
"github.com/astaxie/beego"
"github.com/ServiceComb/service-center/server/plugin"
"io/ioutil"
"time"
)
Expand Down Expand Up @@ -67,7 +66,7 @@ func getX509CACertPool() (caCertPool *x509.CertPool, err error) {
func loadTLSCertificate() (tlsCert []tls.Certificate, err error) {
certFile, keyFile := tlsutil.GetServerSSLConfig().CertFile, tlsutil.GetServerSSLConfig().KeyFile
passphase := tlsutil.GetServerSSLConfig().KeyPassphase
plainPassphase, err := security.CipherPlugins[beego.AppConfig.DefaultString("cipher_plugin", "default")]().Decrypt(passphase)
plainPassphase, err := plugin.Plugins().Cipher().Decrypt(passphase)
if err != nil {
util.Logger().Errorf(err, "decrypt ssl passphase(%d) failed.", len(passphase))
plainPassphase = ""
Expand Down
5 changes: 5 additions & 0 deletions pkg/uuid/uuid.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/binary"
"encoding/hex"
"net"
"strings"
"sync"
"time"
"unsafe"
Expand Down Expand Up @@ -165,3 +166,7 @@ func NewV1() UUID {

return u
}

func GenerateUuid() string {
return strings.Replace(NewV1().String(), string(DASH), "", -1)
}
20 changes: 14 additions & 6 deletions server/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,29 @@
package bootstrap

import _ "github.com/ServiceComb/service-center/server/core" // initialize
// cipher
import _ "github.com/ServiceComb/service-center/server/plugin/infra/security/plain"

// registry
import _ "github.com/ServiceComb/service-center/server/core/registry/etcd"
import _ "github.com/ServiceComb/service-center/server/core/registry/embededetcd"

// rest
import _ "github.com/ServiceComb/service-center/server/rest/controller/v3"
import _ "github.com/ServiceComb/service-center/server/rest/controller/v4"

// registry
import _ "github.com/ServiceComb/service-center/server/plugin/infra/registry/etcd"
import _ "github.com/ServiceComb/service-center/server/plugin/infra/registry/embededetcd"

// cipher
import _ "github.com/ServiceComb/service-center/server/plugin/infra/security/buildin"

// quota
import _ "github.com/ServiceComb/service-center/server/plugin/infra/quota/buildin"
import _ "github.com/ServiceComb/service-center/server/plugin/infra/quota/unlimit"

// auth
import _ "github.com/ServiceComb/service-center/server/plugin/infra/auth/buildin"
import _ "github.com/ServiceComb/service-center/server/plugin/infra/auth/dynamic"

// uuid
import _ "github.com/ServiceComb/service-center/server/plugin/infra/uuid/dynamic"

import (
"github.com/ServiceComb/service-center/pkg/util"
"github.com/ServiceComb/service-center/server/handler/auth"
Expand Down
95 changes: 95 additions & 0 deletions server/core/backend/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
//Copyright 2017 Huawei Technologies Co., Ltd
//
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
package backend

import (
"errors"
"github.com/ServiceComb/service-center/pkg/util"
"github.com/ServiceComb/service-center/server/infra/registry"
"github.com/ServiceComb/service-center/server/plugin"
"golang.org/x/net/context"
"sync"
"time"
)

var (
registryInstance registry.Registry
singletonLock sync.Mutex
wait_delay = []int{1, 1, 5, 10, 20, 30, 60}
)

const (
MAX_TXN_NUMBER_ONE_TIME = 128
)

func New() (registry.Registry, error) {
instance := plugin.Plugins().Registry()
if instance == nil {
return nil, errors.New("register center client plugin does not exist")
}
select {
case err := <-instance.Err():
plugin.Plugins().Reload(plugin.REGISTRY)
return nil, err
case <-instance.Ready():
}
return instance, nil
}

func Registry() registry.Registry {
if registryInstance == nil {
singletonLock.Lock()
for i := 0; registryInstance == nil; i++ {
inst, err := New()
if err != nil {
util.Logger().Errorf(err, "get register center client failed")
}
registryInstance = inst

if registryInstance != nil {
singletonLock.Unlock()
return registryInstance
}

if i >= len(wait_delay) {
i = len(wait_delay) - 1
}
t := time.Duration(wait_delay[i]) * time.Second
util.Logger().Errorf(nil, "initialize service center failed, retry after %s", t)
<-time.After(t)
}
singletonLock.Unlock()
}
return registryInstance
}

func BatchCommit(ctx context.Context, opts []registry.PluginOp) error {
lenOpts := len(opts)
tmpLen := lenOpts
tmpOpts := []registry.PluginOp{}
var err error
for i := 0; tmpLen > 0; i++ {
tmpLen = lenOpts - (i+1)*MAX_TXN_NUMBER_ONE_TIME
if tmpLen > 0 {
tmpOpts = opts[i*MAX_TXN_NUMBER_ONE_TIME : (i+1)*MAX_TXN_NUMBER_ONE_TIME]
} else {
tmpOpts = opts[i*MAX_TXN_NUMBER_ONE_TIME : lenOpts]
}
_, err = Registry().Txn(ctx, tmpOpts)
if err != nil {
return err
}
}
return nil
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ package store

import (
"github.com/ServiceComb/service-center/pkg/util"
"github.com/ServiceComb/service-center/server/core/backend"
"github.com/ServiceComb/service-center/server/core/proto"
"github.com/ServiceComb/service-center/server/core/registry"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
"sync"
Expand Down Expand Up @@ -193,14 +193,13 @@ func (c *KvCacher) doList(listOps *ListOptions) error {
c.lastRev = c.lw.Revision()
c.sync(c.filter(c.lastRev, kvs))

util.LogNilOrWarnf(start, "finish to cache key %s, %d items, rev: %d", c.Cfg.Key, len(kvs), c.lastRev)
util.LogDebugOrWarnf(start, "finish to cache key %s, %d items, rev: %d", c.Cfg.Key, len(kvs), c.lastRev)

return nil
}

func (c *KvCacher) doWatch(listOps *ListOptions) error {
watcher := c.lw.Watch(listOps)
util.Logger().Debugf("finish to new watcher, key %s, start rev: %d+1", c.Cfg.Key, c.lastRev)
return c.handleWatcher(watcher)
}

Expand Down Expand Up @@ -482,7 +481,7 @@ func (c *KvCacher) onKvEvents(evts []*KvEvent) {

func (c *KvCacher) run() {
c.goroute.Do(func(stopCh <-chan struct{}) {
util.Logger().Infof("start to list and watch %s", c.Cfg)
util.Logger().Debugf("start to list and watch %s", c.Cfg)
ctx, cancel := context.WithCancel(context.Background())
c.goroute.Do(func(stopCh <-chan struct{}) {
defer cancel()
Expand Down Expand Up @@ -520,8 +519,6 @@ func (c *KvCacher) Stop() {
c.goroute.Close(true)

util.SafeCloseChan(c.ready)

util.Logger().Debugf("cacher is stopped, %s", c.Cfg)
}

func (c *KvCacher) Ready() <-chan struct{} {
Expand All @@ -548,7 +545,7 @@ func NewKvCacher(opts ...KvCacherCfgOption) Cacher {
Cfg: cfg,
ready: make(chan struct{}),
lw: ListWatcher{
Client: registry.GetRegisterCenter(),
Client: backend.Registry(),
Key: cfg.Key,
},
goroute: util.NewGo(make(chan struct{})),
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ package store

import (
"github.com/ServiceComb/service-center/pkg/util"
"github.com/ServiceComb/service-center/server/core/backend"
pb "github.com/ServiceComb/service-center/server/core/proto"
"github.com/ServiceComb/service-center/server/core/registry"
"github.com/ServiceComb/service-center/server/infra/registry"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
"strings"
Expand Down Expand Up @@ -60,7 +61,7 @@ func (i *Indexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (
(op.Offset >= 0 && op.Limit > 0) {
util.Logger().Debugf("search %s match special options, request etcd server, opts: %s",
i.cacheType, op)
return registry.GetRegisterCenter().Do(ctx, opts...)
return backend.Registry().Do(ctx, opts...)
}

if op.Prefix {
Expand All @@ -75,7 +76,7 @@ func (i *Indexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (

util.Logger().Debugf("can not find any key from %s cache with prefix, request etcd server, key: %s",
i.cacheType, key)
return registry.GetRegisterCenter().Do(ctx, opts...)
return backend.Registry().Do(ctx, opts...)
}

resp := &registry.PluginResponse{
Expand All @@ -95,7 +96,7 @@ func (i *Indexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (
}

util.Logger().Debugf("%s cache does not store this key, request etcd server, key: %s", i.cacheType, key)
return registry.GetRegisterCenter().Do(ctx, opts...)
return backend.Registry().Do(ctx, opts...)
}

cacheData := i.Cache().Data(key)
Expand All @@ -106,7 +107,7 @@ func (i *Indexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (

util.Logger().Debugf("do not match any key in %s cache store, request etcd server, key: %s",
i.cacheType, key)
return registry.GetRegisterCenter().Do(ctx, opts...)
return backend.Registry().Do(ctx, opts...)
}

resp.Count = 1
Expand Down Expand Up @@ -302,8 +303,6 @@ func (i *Indexer) Stop() {
close(i.prefixBuildQueue)

util.SafeCloseChan(i.ready)

util.Logger().Debugf("%s indexer is stopped", i.cacheType)
}

func (i *Indexer) Ready() <-chan struct{} {
Expand Down