Skip to content

Commit

Permalink
feat: improve registry implementation (#482)
Browse files Browse the repository at this point in the history
  • Loading branch information
sysulq committed Oct 19, 2022
1 parent 3f036d0 commit cf8f877
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 31 deletions.
3 changes: 2 additions & 1 deletion pkg/client/etcdv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

"github.com/douyu/jupiter/pkg/ecode"
"github.com/douyu/jupiter/pkg/xlog"
grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
mvccpb "go.etcd.io/etcd/api/v3/mvccpb"
Expand Down Expand Up @@ -112,7 +113,7 @@ func newClient(config *Config) (*Client, error) {
client, err := clientv3.New(conf)

if err != nil {
// config.logger.Panic("client etcd start panic", xlog.FieldMod(ecode.ModClientETCD), xlog.FieldErrKind(ecode.ErrKindAny), xlog.FieldErr(err), xlog.FieldValueAny(config))
config.logger.Panic("client etcd start panic", xlog.FieldMod(ecode.ModClientETCD), xlog.FieldErrKind(ecode.ErrKindAny), xlog.FieldErr(err), xlog.FieldValueAny(config))
return nil, fmt.Errorf("client etcd start failed: %v", err)
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/client/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@ import (
"fmt"
"time"

"github.com/douyu/jupiter/pkg/client/grpc/resolver"
"github.com/douyu/jupiter/pkg/conf"
"github.com/douyu/jupiter/pkg/ecode"
"github.com/douyu/jupiter/pkg/xlog"
"go.uber.org/zap/zapgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)

func init() {
conf.OnLoaded(func(c *conf.Configuration) {
grpclog.SetLoggerV2(zapgrpc.NewLogger(xlog.Jupiter().With(xlog.FieldMod("grpc"))))
})
}

func newGRPCClient(config *Config) *grpc.ClientConn {
var ctx = context.Background()
var dialOptions = config.dialOptions
Expand All @@ -46,6 +56,10 @@ func newGRPCClient(config *Config) *grpc.ClientConn {
dialOptions = append(dialOptions, grpc.WithKeepaliveParams(*config.KeepAlive))
}

dialOptions = append(dialOptions, grpc.WithResolvers(
resolver.NewEtcdBuilder("etcd", config.RegistryConfig)),
)

svcCfg := fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, config.BalancerName)
dialOptions = append(dialOptions, grpc.WithDefaultServiceConfig(svcCfg))

Expand Down
27 changes: 15 additions & 12 deletions pkg/client/grpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,19 @@ import (

// Config ...
type Config struct {
Name string // config's name
BalancerName string
Address string
Block bool
DialTimeout time.Duration
ReadTimeout time.Duration
Direct bool
OnDialError string // panic | error
KeepAlive *keepalive.ClientParameters
logger *xlog.Logger
dialOptions []grpc.DialOption
Name string // config's name
BalancerName string
Address string
Block bool
DialTimeout time.Duration
ReadTimeout time.Duration
Direct bool
OnDialError string // panic | error
KeepAlive *keepalive.ClientParameters
RegistryConfig string

logger *xlog.Logger
dialOptions []grpc.DialOption

SlowThreshold time.Duration

Expand All @@ -67,12 +69,13 @@ func DefaultConfig() *Config {
OnDialError: "panic",
AccessInterceptorLevel: "info",
Block: true,
RegistryConfig: "jupiter.registry.default",
}
}

// StdConfig ...
func StdConfig(name string) *Config {
return RawConfig("jupiter.client." + name)
return RawConfig("jupiter.grpc." + name)
}

// RawConfig ...
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/grpc/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

func TestConfig(t *testing.T) {
var configStr = `
[jupiter.client.test]
[jupiter.grpc.test]
balancerName="swr"
address="127.0.0.1:9091"
dialTimeout="10s"
Expand Down
21 changes: 12 additions & 9 deletions pkg/client/grpc/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,31 @@ import (
"context"

"github.com/douyu/jupiter/pkg/constant"
"github.com/douyu/jupiter/pkg/registry"
"github.com/douyu/jupiter/pkg/registry/etcdv3"
"github.com/douyu/jupiter/pkg/util/xgo"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
)

// Register ...
func Register(name string, reg registry.Registry) {
resolver.Register(&baseBuilder{
name: name,
reg: reg,
})
// NewEtcdBuilder returns a new etcdv3 resolver builder.
func NewEtcdBuilder(name string, registryConfig string) resolver.Builder {
return &baseBuilder{
name: name,
registryConfig: registryConfig,
}
}

type baseBuilder struct {
name string
reg registry.Registry

registryConfig string
}

// Build ...
func (b *baseBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
endpoints, err := b.reg.WatchServices(context.Background(), target.Endpoint, "grpc")
reg := etcdv3.RawConfig(b.registryConfig).MustSingleton()

endpoints, err := reg.WatchServices(context.Background(), target.Endpoint, "grpc")
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/registry/etcdv3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func DefaultConfig() *Config {
return &Config{
Config: etcdv3.DefaultConfig(),
ReadTimeout: time.Second * 3,
Prefix: "jupiter",
Prefix: "wsd-reg",
logger: xlog.Jupiter(),
ServiceTTL: 0,
}
Expand Down Expand Up @@ -84,7 +84,7 @@ func (config Config) MustBuild() registry.Registry {
}

func (config *Config) Singleton() (registry.Registry, error) {
if val, ok := singleton.Load(constant.ModuleClientEtcd, "etcdv3"); ok {
if val, ok := singleton.Load(constant.ModuleClientEtcd, config.ConfigKey); ok {
return val.(registry.Registry), nil
}

Expand All @@ -93,7 +93,7 @@ func (config *Config) Singleton() (registry.Registry, error) {
return nil, err
}

singleton.Store(constant.ModuleClientEtcd, "etcdv3", reg)
singleton.Store(constant.ModuleClientEtcd, config.ConfigKey, reg)

return reg, nil
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/registry/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"log"

"github.com/douyu/jupiter/pkg/conf"
"github.com/douyu/jupiter/pkg/xlog"
)

// var _registerers = sync.Map{}
Expand All @@ -35,10 +36,10 @@ var DefaultRegisterer Registry = &Local{}
func init() {
// 初始化注册中心
conf.OnLoaded(func(c *conf.Configuration) {
log.Print("hook config, init registry")
xlog.Jupiter().Sugar().Info("hook config, init registry")
var config Config
if err := c.UnmarshalKey("jupiter.registry", &config); err != nil {
log.Printf("hook config, read registry config failed: %v", err)
xlog.Jupiter().Sugar().Infof("hook config, read registry config failed: %v", err)
return
}

Expand All @@ -47,13 +48,19 @@ func init() {
if itemKind == "" {
itemKind = "etcdv3"
}

if item.ConfigKey == "" {
item.ConfigKey = "jupiter.registry.default"
}

build, ok := registryBuilder[itemKind]
if !ok {
log.Printf("invalid registry kind: %s", itemKind)
xlog.Jupiter().Sugar().Infof("invalid registry kind: %s", itemKind)
continue
}

xlog.Jupiter().Sugar().Infof("build registrerer %s with config: %s", name, item.ConfigKey)
DefaultRegisterer = build(item.ConfigKey)
log.Printf("build registrerer %s with config: %s", name, item.ConfigKey)
}
})
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/xgo/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

var (
_logger = xlog.Jupiter().With(zap.String("mod", "xgo"))
_logger = xlog.Jupiter().With(zap.String("mod", "xgo")).WithOptions(zap.AddStacktrace(zap.ErrorLevel))
)

func try(fn func() error, cleaner func()) (ret error) {
Expand All @@ -35,6 +35,7 @@ func try(fn func() error, cleaner func()) (ret error) {
defer func() {
if err := recover(); err != nil {
_, file, line, _ := runtime.Caller(2)

_logger.Error("recover", zap.Any("err", err), zap.String("line", fmt.Sprintf("%s:%d", file, line)))
if _, ok := err.(error); ok {
ret = err.(error)
Expand Down

0 comments on commit cf8f877

Please sign in to comment.