From cf8f8774e323ae7cf02655cbc722eb660c7f180f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Wed, 19 Oct 2022 17:37:20 +0800 Subject: [PATCH] feat: improve registry implementation (#482) --- pkg/client/etcdv3/client.go | 3 ++- pkg/client/grpc/client.go | 14 ++++++++++++++ pkg/client/grpc/config.go | 27 +++++++++++++++------------ pkg/client/grpc/config_test.go | 2 +- pkg/client/grpc/resolver/resolver.go | 21 ++++++++++++--------- pkg/registry/etcdv3/config.go | 6 +++--- pkg/registry/init.go | 15 +++++++++++---- pkg/util/xgo/init.go | 3 ++- 8 files changed, 60 insertions(+), 31 deletions(-) diff --git a/pkg/client/etcdv3/client.go b/pkg/client/etcdv3/client.go index e068c01dcd..ac11124301 100644 --- a/pkg/client/etcdv3/client.go +++ b/pkg/client/etcdv3/client.go @@ -23,6 +23,7 @@ import ( "strings" "time" + "github.com/douyu/jupiter/pkg/ecode" "github.com/douyu/jupiter/pkg/xlog" grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus" mvccpb "go.etcd.io/etcd/api/v3/mvccpb" @@ -112,7 +113,7 @@ func newClient(config *Config) (*Client, error) { client, err := clientv3.New(conf) if err != nil { - // config.logger.Panic("client etcd start panic", xlog.FieldMod(ecode.ModClientETCD), xlog.FieldErrKind(ecode.ErrKindAny), xlog.FieldErr(err), xlog.FieldValueAny(config)) + config.logger.Panic("client etcd start panic", xlog.FieldMod(ecode.ModClientETCD), xlog.FieldErrKind(ecode.ErrKindAny), xlog.FieldErr(err), xlog.FieldValueAny(config)) return nil, fmt.Errorf("client etcd start failed: %v", err) } diff --git a/pkg/client/grpc/client.go b/pkg/client/grpc/client.go index ccad4b3420..47519be853 100644 --- a/pkg/client/grpc/client.go +++ b/pkg/client/grpc/client.go @@ -19,11 +19,21 @@ import ( "fmt" "time" + "github.com/douyu/jupiter/pkg/client/grpc/resolver" + "github.com/douyu/jupiter/pkg/conf" "github.com/douyu/jupiter/pkg/ecode" "github.com/douyu/jupiter/pkg/xlog" + "go.uber.org/zap/zapgrpc" "google.golang.org/grpc" + "google.golang.org/grpc/grpclog" ) +func init() { + conf.OnLoaded(func(c *conf.Configuration) { + grpclog.SetLoggerV2(zapgrpc.NewLogger(xlog.Jupiter().With(xlog.FieldMod("grpc")))) + }) +} + func newGRPCClient(config *Config) *grpc.ClientConn { var ctx = context.Background() var dialOptions = config.dialOptions @@ -46,6 +56,10 @@ func newGRPCClient(config *Config) *grpc.ClientConn { dialOptions = append(dialOptions, grpc.WithKeepaliveParams(*config.KeepAlive)) } + dialOptions = append(dialOptions, grpc.WithResolvers( + resolver.NewEtcdBuilder("etcd", config.RegistryConfig)), + ) + svcCfg := fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, config.BalancerName) dialOptions = append(dialOptions, grpc.WithDefaultServiceConfig(svcCfg)) diff --git a/pkg/client/grpc/config.go b/pkg/client/grpc/config.go index 5a5a8cd387..fa70925eb7 100644 --- a/pkg/client/grpc/config.go +++ b/pkg/client/grpc/config.go @@ -30,17 +30,19 @@ import ( // Config ... type Config struct { - Name string // config's name - BalancerName string - Address string - Block bool - DialTimeout time.Duration - ReadTimeout time.Duration - Direct bool - OnDialError string // panic | error - KeepAlive *keepalive.ClientParameters - logger *xlog.Logger - dialOptions []grpc.DialOption + Name string // config's name + BalancerName string + Address string + Block bool + DialTimeout time.Duration + ReadTimeout time.Duration + Direct bool + OnDialError string // panic | error + KeepAlive *keepalive.ClientParameters + RegistryConfig string + + logger *xlog.Logger + dialOptions []grpc.DialOption SlowThreshold time.Duration @@ -67,12 +69,13 @@ func DefaultConfig() *Config { OnDialError: "panic", AccessInterceptorLevel: "info", Block: true, + RegistryConfig: "jupiter.registry.default", } } // StdConfig ... func StdConfig(name string) *Config { - return RawConfig("jupiter.client." + name) + return RawConfig("jupiter.grpc." + name) } // RawConfig ... diff --git a/pkg/client/grpc/config_test.go b/pkg/client/grpc/config_test.go index 096885d6fb..c79ae19040 100644 --- a/pkg/client/grpc/config_test.go +++ b/pkg/client/grpc/config_test.go @@ -26,7 +26,7 @@ import ( func TestConfig(t *testing.T) { var configStr = ` -[jupiter.client.test] +[jupiter.grpc.test] balancerName="swr" address="127.0.0.1:9091" dialTimeout="10s" diff --git a/pkg/client/grpc/resolver/resolver.go b/pkg/client/grpc/resolver/resolver.go index c714c75800..c61d2ba4de 100644 --- a/pkg/client/grpc/resolver/resolver.go +++ b/pkg/client/grpc/resolver/resolver.go @@ -18,28 +18,31 @@ import ( "context" "github.com/douyu/jupiter/pkg/constant" - "github.com/douyu/jupiter/pkg/registry" + "github.com/douyu/jupiter/pkg/registry/etcdv3" "github.com/douyu/jupiter/pkg/util/xgo" "google.golang.org/grpc/attributes" "google.golang.org/grpc/resolver" ) -// Register ... -func Register(name string, reg registry.Registry) { - resolver.Register(&baseBuilder{ - name: name, - reg: reg, - }) +// NewEtcdBuilder returns a new etcdv3 resolver builder. +func NewEtcdBuilder(name string, registryConfig string) resolver.Builder { + return &baseBuilder{ + name: name, + registryConfig: registryConfig, + } } type baseBuilder struct { name string - reg registry.Registry + + registryConfig string } // Build ... func (b *baseBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - endpoints, err := b.reg.WatchServices(context.Background(), target.Endpoint, "grpc") + reg := etcdv3.RawConfig(b.registryConfig).MustSingleton() + + endpoints, err := reg.WatchServices(context.Background(), target.Endpoint, "grpc") if err != nil { return nil, err } diff --git a/pkg/registry/etcdv3/config.go b/pkg/registry/etcdv3/config.go index e6d5732b37..fcecd46a10 100644 --- a/pkg/registry/etcdv3/config.go +++ b/pkg/registry/etcdv3/config.go @@ -51,7 +51,7 @@ func DefaultConfig() *Config { return &Config{ Config: etcdv3.DefaultConfig(), ReadTimeout: time.Second * 3, - Prefix: "jupiter", + Prefix: "wsd-reg", logger: xlog.Jupiter(), ServiceTTL: 0, } @@ -84,7 +84,7 @@ func (config Config) MustBuild() registry.Registry { } func (config *Config) Singleton() (registry.Registry, error) { - if val, ok := singleton.Load(constant.ModuleClientEtcd, "etcdv3"); ok { + if val, ok := singleton.Load(constant.ModuleClientEtcd, config.ConfigKey); ok { return val.(registry.Registry), nil } @@ -93,7 +93,7 @@ func (config *Config) Singleton() (registry.Registry, error) { return nil, err } - singleton.Store(constant.ModuleClientEtcd, "etcdv3", reg) + singleton.Store(constant.ModuleClientEtcd, config.ConfigKey, reg) return reg, nil } diff --git a/pkg/registry/init.go b/pkg/registry/init.go index f892bc88e3..1496526116 100644 --- a/pkg/registry/init.go +++ b/pkg/registry/init.go @@ -18,6 +18,7 @@ import ( "log" "github.com/douyu/jupiter/pkg/conf" + "github.com/douyu/jupiter/pkg/xlog" ) // var _registerers = sync.Map{} @@ -35,10 +36,10 @@ var DefaultRegisterer Registry = &Local{} func init() { // 初始化注册中心 conf.OnLoaded(func(c *conf.Configuration) { - log.Print("hook config, init registry") + xlog.Jupiter().Sugar().Info("hook config, init registry") var config Config if err := c.UnmarshalKey("jupiter.registry", &config); err != nil { - log.Printf("hook config, read registry config failed: %v", err) + xlog.Jupiter().Sugar().Infof("hook config, read registry config failed: %v", err) return } @@ -47,13 +48,19 @@ func init() { if itemKind == "" { itemKind = "etcdv3" } + + if item.ConfigKey == "" { + item.ConfigKey = "jupiter.registry.default" + } + build, ok := registryBuilder[itemKind] if !ok { - log.Printf("invalid registry kind: %s", itemKind) + xlog.Jupiter().Sugar().Infof("invalid registry kind: %s", itemKind) continue } + + xlog.Jupiter().Sugar().Infof("build registrerer %s with config: %s", name, item.ConfigKey) DefaultRegisterer = build(item.ConfigKey) - log.Printf("build registrerer %s with config: %s", name, item.ConfigKey) } }) } diff --git a/pkg/util/xgo/init.go b/pkg/util/xgo/init.go index 0460c89511..ca67bbeab9 100644 --- a/pkg/util/xgo/init.go +++ b/pkg/util/xgo/init.go @@ -25,7 +25,7 @@ import ( ) var ( - _logger = xlog.Jupiter().With(zap.String("mod", "xgo")) + _logger = xlog.Jupiter().With(zap.String("mod", "xgo")).WithOptions(zap.AddStacktrace(zap.ErrorLevel)) ) func try(fn func() error, cleaner func()) (ret error) { @@ -35,6 +35,7 @@ func try(fn func() error, cleaner func()) (ret error) { defer func() { if err := recover(); err != nil { _, file, line, _ := runtime.Caller(2) + _logger.Error("recover", zap.Any("err", err), zap.String("line", fmt.Sprintf("%s:%d", file, line))) if _, ok := err.(error); ok { ret = err.(error)