Skip to content

Commit

Permalink
feat!: migrate all stuff with breaking changes (#500)
Browse files Browse the repository at this point in the history
  • Loading branch information
sysulq committed Oct 24, 2022
1 parent 4658757 commit 6ebee4c
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 55 deletions.
3 changes: 3 additions & 0 deletions pkg/client/etcdv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@ func (w *Watch) IncipientKeyValues() []*mvccpb.KeyValue {
func (client *Client) WatchPrefix(ctx context.Context, prefix string) (*Watch, error) {
resp, err := client.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
client.config.logger.Error("client.Get failed", xlog.FieldErr(err))
return nil, err
}

client.config.logger.Debug("client.Get finished", xlog.FieldKey(prefix), xlog.FieldValueAny(resp))

var w = &Watch{
revision: resp.Header.Revision,
eventChan: make(chan *clientv3.Event, 100),
Expand Down
8 changes: 5 additions & 3 deletions pkg/client/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"google.golang.org/grpc"
)

type ClientConn = grpc.ClientConn

func init() {
// conf.OnLoaded(func(c *conf.Configuration) {
// xgrpclog.SetLogger(xlog.Jupiter().With(xlog.FieldMod("grpc")))
Expand All @@ -36,7 +38,7 @@ func newGRPCClient(config *Config) *grpc.ClientConn {
var dialOptions = config.dialOptions
logger := config.logger.With(
xlog.FieldMod("client.grpc"),
xlog.FieldAddr(config.Address),
xlog.FieldAddr(config.Addr),
)
// 默认配置使用block
if config.Block {
Expand All @@ -60,10 +62,10 @@ func newGRPCClient(config *Config) *grpc.ClientConn {
svcCfg := fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, config.BalancerName)
dialOptions = append(dialOptions, grpc.WithDefaultServiceConfig(svcCfg))

cc, err := grpc.DialContext(ctx, config.Address, dialOptions...)
cc, err := grpc.DialContext(ctx, config.Addr, dialOptions...)

if err != nil {
if config.OnDialError == "panic" {
if config.Level == "panic" {
logger.Panic("dial grpc server", xlog.FieldErrKind(ecode.ErrKindRequestErr), xlog.FieldErr(err))
} else {
logger.Error("dial grpc server", xlog.FieldErrKind(ecode.ErrKindRequestErr), xlog.FieldErr(err))
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ func TestConfigBlockTrue(t *testing.T) {
assert.True(t, flag)
}()
cfg := DefaultConfig()
cfg.OnDialError = "panic"
cfg.Level = "panic"
newGRPCClient(cfg)
})
}

func TestConfigBlockFalse(t *testing.T) {
t.Run("test no address and no block", func(t *testing.T) {
cfg := DefaultConfig()
cfg.OnDialError = "panic"
cfg.Level = "panic"
cfg.Block = false
conn := newGRPCClient(cfg)
assert.Equal(t, conn.GetState().String(), "IDLE")
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/grpc/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestMain(m *testing.M) {
time.Sleep(200 * time.Millisecond)

cfg := DefaultConfig()
cfg.Address = l.Addr().String()
cfg.Addr = l.Addr().String()

conn := newGRPCClient(cfg)
directClient = testproto.NewGreeterClient(conn)
Expand Down
8 changes: 4 additions & 4 deletions pkg/client/grpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ import (
type Config struct {
Name string // config's name
BalancerName string
Address string
Addr string
Block bool
DialTimeout time.Duration
ReadTimeout time.Duration
Direct bool
OnDialError string // panic | error
Level string // panic | error
KeepAlive *keepalive.ClientParameters
RegistryConfig string

Expand Down Expand Up @@ -66,7 +66,7 @@ func DefaultConfig() *Config {
DialTimeout: time.Second * 3,
ReadTimeout: cast.ToDuration("1s"),
SlowThreshold: cast.ToDuration("600ms"),
OnDialError: "panic",
Level: "panic",
AccessInterceptorLevel: "info",
Block: true,
KeepAlive: &keepalive.ClientParameters{
Expand Down Expand Up @@ -111,7 +111,7 @@ func (config *Config) WithDialOption(opts ...grpc.DialOption) *Config {
func (config *Config) Build() *grpc.ClientConn {
if config.Debug {
config.dialOptions = append(config.dialOptions,
grpc.WithChainUnaryInterceptor(debugUnaryClientInterceptor(config.Address)),
grpc.WithChainUnaryInterceptor(debugUnaryClientInterceptor(config.Addr)),
)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/client/grpc/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func TestConfig(t *testing.T) {
config := StdConfig("test")
assert.Equal(t, "swr", config.BalancerName)
assert.Equal(t, time.Second*10, config.DialTimeout)
assert.Equal(t, "127.0.0.1:9091", config.Address)
assert.Equal(t, "127.0.0.1:9091", config.Addr)
assert.Equal(t, false, config.Direct)
assert.Equal(t, "panic", config.OnDialError)
assert.Equal(t, "panic", config.Level)
})
}
11 changes: 10 additions & 1 deletion pkg/client/grpc/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package resolver

import (
"context"
"strings"

"github.com/douyu/jupiter/pkg/core/constant"
"github.com/douyu/jupiter/pkg/registry/etcdv3"
"github.com/douyu/jupiter/pkg/util/xgo"
"github.com/douyu/jupiter/pkg/xlog"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
)
Expand All @@ -42,8 +44,13 @@ type baseBuilder struct {
func (b *baseBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
reg := etcdv3.RawConfig(b.registryConfig).MustSingleton()

endpoints, err := reg.WatchServices(context.Background(), target.Endpoint, "grpc")
if !strings.HasSuffix(target.Endpoint, "/") {
target.Endpoint += "/"
}

endpoints, err := reg.WatchServices(context.Background(), target.Endpoint)
if err != nil {
xlog.Jupiter().Error("watch services failed", xlog.FieldErr(err))
return nil, err
}

Expand All @@ -52,6 +59,8 @@ func (b *baseBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
for {
select {
case endpoint := <-endpoints:
xlog.Jupiter().Debug("watch services finished", xlog.FieldValueAny(endpoint))

var state = resolver.State{
Addresses: make([]resolver.Address, 0),
Attributes: attributes.
Expand Down
76 changes: 47 additions & 29 deletions pkg/registry/etcdv3/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,16 @@ const (
defaultRetryTimes = 3
// defaultKeepAliveTimeout is the default timeout for keepalive requests.
defaultRegisterTimeout = 5 * time.Second
// servicePrefix is the prefix of service key
servicePrefix = "%s:%s:%s:%s/"
// registerService is servicePrefix+host:port
registerService = servicePrefix + "%s"
)

var _ registry.Registry = new(etcdv3Registry)

func newETCDRegistry(config *Config) (*etcdv3Registry, error) {
if config.logger == nil {
config.logger = xlog.Jupiter()
}
config.logger = config.logger.With(xlog.FieldMod(ecode.ModRegistryETCD), xlog.FieldAddrAny(config.Config.Endpoints))
etcdv3Client, err := config.Config.Build()
etcdv3Client, err := config.Config.Singleton()
if err != nil {
config.logger.Error("create etcdv3 client", xlog.FieldErrKind(ecode.ErrKindRequestErr), xlog.FieldErr(err))
return nil, err
Expand Down Expand Up @@ -101,11 +99,11 @@ func (reg *etcdv3Registry) UnregisterService(ctx context.Context, info *server.S
}

// ListServices list service registered in registry with name `name`
func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme string) (services []*server.ServiceInfo, err error) {
target := fmt.Sprintf(servicePrefix, scheme, name, "v1", pkg.AppMode())
getResp, getErr := reg.client.Get(ctx, target, clientv3.WithPrefix())
func (reg *etcdv3Registry) ListServices(ctx context.Context, prefix string) (services []*server.ServiceInfo, err error) {
getResp, getErr := reg.client.Get(ctx, prefix, clientv3.WithPrefix())
if getErr != nil {
reg.logger.Error(ecode.MsgWatchRequestErr, xlog.FieldErrKind(ecode.ErrKindRequestErr), xlog.FieldErr(getErr), xlog.FieldAddr(target))
reg.logger.Error("reg.client.Get failed",
xlog.FieldErrKind(ecode.ErrKindRequestErr), xlog.FieldErr(getErr), xlog.FieldAddr(prefix))
return nil, getErr
}

Expand All @@ -125,10 +123,10 @@ func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme
}

// WatchServices watch service change event, then return address list
func (reg *etcdv3Registry) WatchServices(ctx context.Context, name string, scheme string) (chan registry.Endpoints, error) {
prefix := fmt.Sprintf(servicePrefix, scheme, name, "v1", pkg.AppMode())
func (reg *etcdv3Registry) WatchServices(ctx context.Context, prefix string) (chan registry.Endpoints, error) {
watch, err := reg.client.WatchPrefix(context.Background(), prefix)
if err != nil {
reg.logger.Error("reg.client.WatchPrefix failed", xlog.FieldErrKind(ecode.MsgWatchRequestErr), xlog.FieldErr(err), xlog.FieldAddr(prefix))
return nil, err
}

Expand All @@ -140,6 +138,8 @@ func (reg *etcdv3Registry) WatchServices(ctx context.Context, name string, schem
ProviderConfigs: make(map[string]registry.ProviderConfig),
}

scheme := getScheme(prefix)

for _, kv := range watch.IncipientKeyValues() {
updateAddrList(al, prefix, scheme, kv)
}
Expand Down Expand Up @@ -216,10 +216,10 @@ func (reg *etcdv3Registry) registerMetric(ctx context.Context, info *server.Serv
return nil
}

metric := "/prometheus/job/%s/%s/%s"
metric := "/prometheus/job/%s/%s"

val := info.Address
key := fmt.Sprintf(metric, info.Name, pkg.HostName(), val)
key := fmt.Sprintf(metric, info.Name, pkg.HostName())

return reg.registerKV(ctx, key, val)

Expand All @@ -237,9 +237,9 @@ func (reg *etcdv3Registry) registerKV(ctx context.Context, key, val string) erro
// opOptions = append(opOptions, clientv3.WithSerializable())
if ttl := reg.Config.ServiceTTL.Seconds(); ttl > 0 {
// 这里基于应用名为key做缓存,每个服务实例应该只需要创建一个lease,降低etcd的压力
lease, err := reg.getLeaseID(ctx)
lease, err := reg.getOrGrantLeaseID(ctx)
if err != nil {
reg.logger.Error("getSession", xlog.FieldErrKind(ecode.ErrKindRegisterErr), xlog.FieldErr(err),
reg.logger.Error("getSession failed", xlog.FieldErrKind(ecode.ErrKindRegisterErr), xlog.FieldErr(err),
xlog.FieldKeyAny(key), xlog.FieldValueAny(val))
return err
}
Expand All @@ -262,7 +262,7 @@ func (reg *etcdv3Registry) registerKV(ctx context.Context, key, val string) erro
return nil
}

func (reg *etcdv3Registry) getLeaseID(ctx context.Context) (clientv3.LeaseID, error) {
func (reg *etcdv3Registry) getOrGrantLeaseID(ctx context.Context) (clientv3.LeaseID, error) {
reg.rmu.Lock()
defer reg.rmu.Unlock()

Expand All @@ -281,21 +281,35 @@ func (reg *etcdv3Registry) getLeaseID(ctx context.Context) (clientv3.LeaseID, er
return grant.ID, nil
}

func (reg *etcdv3Registry) getLeaseID() clientv3.LeaseID {
reg.rmu.RLock()
defer reg.rmu.RUnlock()

return reg.leaseID
}

func (reg *etcdv3Registry) setLeaseID(leaseId clientv3.LeaseID) {
reg.rmu.Lock()
defer reg.rmu.Unlock()

reg.leaseID = leaseId
}

// doKeepAlive periodically sends keep alive requests to etcd server.
// when the keep alive request fails or timeout, it will try to re-establish the lease.
func (reg *etcdv3Registry) doKeepalive(ctx context.Context) {

reg.logger.Debug("start keepalive...")

kac, err := reg.client.KeepAlive(ctx, reg.leaseID)
kac, err := reg.client.KeepAlive(ctx, reg.getLeaseID())
if err != nil {
reg.leaseID = 0
reg.setLeaseID(0)
reg.logger.Error("reg.client.KeepAlive failed", xlog.FieldErrKind(ecode.ErrKindRegisterErr), xlog.FieldErr(err))
}

for {
// we should register again, because the leaseID is 0
if reg.leaseID == 0 {
if reg.getLeaseID() == 0 {
cancelCtx, cancel := context.WithCancel(ctx)

done := make(chan struct{}, 1)
Expand All @@ -311,14 +325,14 @@ func (reg *etcdv3Registry) doKeepalive(ctx context.Context) {
done <- struct{}{}
}()

// wait keepalive success
// wait registerAllKvs success
select {
case <-time.After(defaultRegisterTimeout):
// when timeout happens
// we should cancel the context and retry again
cancel()
// mark leaseID as 0 to retry register
reg.leaseID = 0
reg.setLeaseID(0)

continue
case <-done:
Expand All @@ -327,31 +341,31 @@ func (reg *etcdv3Registry) doKeepalive(ctx context.Context) {
}

// try do keepalive again
// when error or timeout happens, just continue
kac, err = reg.client.KeepAlive(ctx, reg.leaseID)
// when error or timeout happens, just continue and try again
kac, err = reg.client.KeepAlive(ctx, reg.getLeaseID())
if err != nil {
reg.logger.Error("reg.client.KeepAlive failed", xlog.FieldErrKind(ecode.ErrKindRegisterErr), xlog.FieldErr(err))

time.Sleep(defaultRegisterTimeout)
continue
}

reg.logger.Debug("reg.client.KeepAlive finished", xlog.String("leaseid", fmt.Sprintf("%x", reg.leaseID)))
reg.logger.Debug("reg.client.KeepAlive finished", xlog.String("leaseid", fmt.Sprintf("%x", reg.getLeaseID())))
}

select {
case data, ok := <-kac:
if !ok {
// when error happens
// mark leaseID as 0 to retry register
reg.leaseID = 0
reg.setLeaseID(0)

reg.logger.Debug("need to retry registration", xlog.String("leaseid", fmt.Sprintf("%x", reg.leaseID)))
reg.logger.Debug("need to retry registration", xlog.String("leaseid", fmt.Sprintf("%x", reg.getLeaseID())))

continue
}

// just record detailed keepalive info
reg.logger.Debug("do keepalive", xlog.Any("data", data), xlog.String("leaseid", fmt.Sprintf("%x", reg.leaseID)))
reg.logger.Debug("do keepalive", xlog.Any("data", data), xlog.String("leaseid", fmt.Sprintf("%x", reg.getLeaseID())))
case <-reg.ctx.Done():
reg.logger.Debug("exit keepalive")

Expand All @@ -361,7 +375,7 @@ func (reg *etcdv3Registry) doKeepalive(ctx context.Context) {
}

func (reg *etcdv3Registry) registerKey(info *server.ServiceInfo) string {
return fmt.Sprintf(registerService, info.Scheme, info.Name, "v1", pkg.AppMode(), info.Address)
return info.RegistryName()
}

func (reg *etcdv3Registry) registerValue(info *server.ServiceInfo) string {
Expand Down Expand Up @@ -437,3 +451,7 @@ func isIPPort(addr string) bool {
_, _, err := net.SplitHostPort(addr)
return err == nil
}

func getScheme(prefix string) string {
return strings.Split(prefix, ":")[0]
}
Loading

0 comments on commit 6ebee4c

Please sign in to comment.