Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve registry implementation #482

Merged
merged 3 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/client/etcdv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

"github.com/douyu/jupiter/pkg/ecode"
"github.com/douyu/jupiter/pkg/xlog"
grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
mvccpb "go.etcd.io/etcd/api/v3/mvccpb"
Expand Down Expand Up @@ -112,7 +113,7 @@ func newClient(config *Config) (*Client, error) {
client, err := clientv3.New(conf)

if err != nil {
// config.logger.Panic("client etcd start panic", xlog.FieldMod(ecode.ModClientETCD), xlog.FieldErrKind(ecode.ErrKindAny), xlog.FieldErr(err), xlog.FieldValueAny(config))
config.logger.Panic("client etcd start panic", xlog.FieldMod(ecode.ModClientETCD), xlog.FieldErrKind(ecode.ErrKindAny), xlog.FieldErr(err), xlog.FieldValueAny(config))
return nil, fmt.Errorf("client etcd start failed: %v", err)
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/client/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@ import (
"fmt"
"time"

"github.com/douyu/jupiter/pkg/client/grpc/resolver"
"github.com/douyu/jupiter/pkg/conf"
"github.com/douyu/jupiter/pkg/ecode"
"github.com/douyu/jupiter/pkg/xlog"
"go.uber.org/zap/zapgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)

func init() {
conf.OnLoaded(func(c *conf.Configuration) {
grpclog.SetLoggerV2(zapgrpc.NewLogger(xlog.Jupiter().With(xlog.FieldMod("grpc"))))
})
}

func newGRPCClient(config *Config) *grpc.ClientConn {
var ctx = context.Background()
var dialOptions = config.dialOptions
Expand All @@ -46,6 +56,10 @@ func newGRPCClient(config *Config) *grpc.ClientConn {
dialOptions = append(dialOptions, grpc.WithKeepaliveParams(*config.KeepAlive))
}

dialOptions = append(dialOptions, grpc.WithResolvers(
resolver.NewEtcdBuilder("etcd", config.RegistryConfig)),
)

svcCfg := fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, config.BalancerName)
dialOptions = append(dialOptions, grpc.WithDefaultServiceConfig(svcCfg))

Expand Down
27 changes: 15 additions & 12 deletions pkg/client/grpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,19 @@ import (

// Config ...
type Config struct {
Name string // config's name
BalancerName string
Address string
Block bool
DialTimeout time.Duration
ReadTimeout time.Duration
Direct bool
OnDialError string // panic | error
KeepAlive *keepalive.ClientParameters
logger *xlog.Logger
dialOptions []grpc.DialOption
Name string // config's name
BalancerName string
Address string
Block bool
DialTimeout time.Duration
ReadTimeout time.Duration
Direct bool
OnDialError string // panic | error
KeepAlive *keepalive.ClientParameters
RegistryConfig string

logger *xlog.Logger
dialOptions []grpc.DialOption

SlowThreshold time.Duration

Expand All @@ -67,12 +69,13 @@ func DefaultConfig() *Config {
OnDialError: "panic",
AccessInterceptorLevel: "info",
Block: true,
RegistryConfig: "jupiter.registry.default",
}
}

// StdConfig ...
func StdConfig(name string) *Config {
return RawConfig("jupiter.client." + name)
return RawConfig("jupiter.grpc." + name)
}

// RawConfig ...
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/grpc/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

func TestConfig(t *testing.T) {
var configStr = `
[jupiter.client.test]
[jupiter.grpc.test]
balancerName="swr"
address="127.0.0.1:9091"
dialTimeout="10s"
Expand Down
21 changes: 12 additions & 9 deletions pkg/client/grpc/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,31 @@ import (
"context"

"github.com/douyu/jupiter/pkg/constant"
"github.com/douyu/jupiter/pkg/registry"
"github.com/douyu/jupiter/pkg/registry/etcdv3"
"github.com/douyu/jupiter/pkg/util/xgo"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
)

// Register ...
func Register(name string, reg registry.Registry) {
resolver.Register(&baseBuilder{
name: name,
reg: reg,
})
// NewEtcdBuilder returns a new etcdv3 resolver builder.
func NewEtcdBuilder(name string, registryConfig string) resolver.Builder {
return &baseBuilder{
name: name,
registryConfig: registryConfig,
}
}

type baseBuilder struct {
name string
reg registry.Registry

registryConfig string
}

// Build ...
func (b *baseBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
endpoints, err := b.reg.WatchServices(context.Background(), target.Endpoint, "grpc")
reg := etcdv3.RawConfig(b.registryConfig).MustSingleton()

endpoints, err := reg.WatchServices(context.Background(), target.Endpoint, "grpc")
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/registry/etcdv3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func DefaultConfig() *Config {
return &Config{
Config: etcdv3.DefaultConfig(),
ReadTimeout: time.Second * 3,
Prefix: "jupiter",
Prefix: "wsd-reg",
logger: xlog.Jupiter(),
ServiceTTL: 0,
}
Expand Down Expand Up @@ -84,7 +84,7 @@ func (config Config) MustBuild() registry.Registry {
}

func (config *Config) Singleton() (registry.Registry, error) {
if val, ok := singleton.Load(constant.ModuleClientEtcd, "etcdv3"); ok {
if val, ok := singleton.Load(constant.ModuleClientEtcd, config.ConfigKey); ok {
return val.(registry.Registry), nil
}

Expand All @@ -93,7 +93,7 @@ func (config *Config) Singleton() (registry.Registry, error) {
return nil, err
}

singleton.Store(constant.ModuleClientEtcd, "etcdv3", reg)
singleton.Store(constant.ModuleClientEtcd, config.ConfigKey, reg)

return reg, nil
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/registry/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"log"

"github.com/douyu/jupiter/pkg/conf"
"github.com/douyu/jupiter/pkg/xlog"
)

// var _registerers = sync.Map{}
Expand All @@ -35,10 +36,10 @@ var DefaultRegisterer Registry = &Local{}
func init() {
// 初始化注册中心
conf.OnLoaded(func(c *conf.Configuration) {
log.Print("hook config, init registry")
xlog.Jupiter().Sugar().Info("hook config, init registry")
var config Config
if err := c.UnmarshalKey("jupiter.registry", &config); err != nil {
log.Printf("hook config, read registry config failed: %v", err)
xlog.Jupiter().Sugar().Infof("hook config, read registry config failed: %v", err)
return
}

Expand All @@ -47,13 +48,19 @@ func init() {
if itemKind == "" {
itemKind = "etcdv3"
}

if item.ConfigKey == "" {
item.ConfigKey = "jupiter.registry.default"
}

build, ok := registryBuilder[itemKind]
if !ok {
log.Printf("invalid registry kind: %s", itemKind)
xlog.Jupiter().Sugar().Infof("invalid registry kind: %s", itemKind)
continue
}

xlog.Jupiter().Sugar().Infof("build registrerer %s with config: %s", name, item.ConfigKey)
DefaultRegisterer = build(item.ConfigKey)
log.Printf("build registrerer %s with config: %s", name, item.ConfigKey)
}
})
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/xgo/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

var (
_logger = xlog.Jupiter().With(zap.String("mod", "xgo"))
_logger = xlog.Jupiter().With(zap.String("mod", "xgo")).WithOptions(zap.AddStacktrace(zap.ErrorLevel))
)

func try(fn func() error, cleaner func()) (ret error) {
Expand All @@ -35,6 +35,7 @@ func try(fn func() error, cleaner func()) (ret error) {
defer func() {
if err := recover(); err != nil {
_, file, line, _ := runtime.Caller(2)

_logger.Error("recover", zap.Any("err", err), zap.String("line", fmt.Sprintf("%s:%d", file, line)))
if _, ok := err.(error); ok {
ret = err.(error)
Expand Down