Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor etcd registry protocol #485

Merged
merged 5 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions pkg/ecode/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
package ecode

import (
"encoding/json"
"net/http"
"sync"

"github.com/douyu/jupiter/pkg/governor"
"github.com/douyu/jupiter/pkg/xlog"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
Expand All @@ -38,20 +35,6 @@ var (
OK = add(int(codes.OK), "OK")
)

func init() {
// status code list
governor.HandleFunc("/status/code/list", func(w http.ResponseWriter, r *http.Request) {
var rets = make(map[int]*spbStatus)
_codes.Range(func(key, val interface{}) bool {
code := key.(int)
status := val.(*spbStatus)
rets[code] = status
return true
})
_ = json.NewEncoder(w).Encode(rets)
})
}

// Add ...
func Add(code int, message string) *spbStatus {
if code > maxCustomizeCode {
Expand Down
11 changes: 11 additions & 0 deletions pkg/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"

"github.com/douyu/jupiter/pkg/conf"
"github.com/douyu/jupiter/pkg/constant"
)

Expand Down Expand Up @@ -42,7 +43,17 @@ func SetAppLogDir(logDir string) {
appLogDir = logDir
}

// AppMode returns the current application mode.
func AppMode() string {
confMode := conf.GetString(constant.ConfigKey("mode"))
if appMode == "" {
if confMode == "" {
return "unkown-mode"
}

return confMode
}

return appMode
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/registry/etcdv3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/douyu/jupiter/pkg/registry"
"github.com/douyu/jupiter/pkg/singleton"
"github.com/douyu/jupiter/pkg/xlog"
"github.com/spf13/cast"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -53,7 +54,7 @@ func DefaultConfig() *Config {
ReadTimeout: time.Second * 3,
Prefix: "wsd-reg",
logger: xlog.Jupiter(),
ServiceTTL: 0,
ServiceTTL: cast.ToDuration("20s"),
}
}

Expand Down
181 changes: 40 additions & 141 deletions pkg/registry/etcdv3/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
"encoding/json"
"fmt"
"net"
"net/url"
"strings"
"sync"
"time"

"github.com/douyu/jupiter/pkg"
"github.com/douyu/jupiter/pkg/client/etcdv3"
"github.com/douyu/jupiter/pkg/conf"
"github.com/douyu/jupiter/pkg/constant"
"github.com/douyu/jupiter/pkg/ecode"
"github.com/douyu/jupiter/pkg/registry"
Expand All @@ -46,6 +46,12 @@ type etcdv3Registry struct {
sessions map[string]*concurrency.Session
}

const (
servicePrefix = "%s:%s:%s:%s/"
// schema:appname:version:mode/host:port
registerService = "%s:%s:%s:%s/%s"
)

func newETCDRegistry(config *Config) (*etcdv3Registry, error) {
if config.logger == nil {
config.logger = xlog.Jupiter()
Expand Down Expand Up @@ -84,7 +90,7 @@ func (reg *etcdv3Registry) UnregisterService(ctx context.Context, info *server.S

// ListServices list service registered in registry with name `name`
func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme string) (services []*server.ServiceInfo, err error) {
target := fmt.Sprintf("/%s/%s/providers/%s://", reg.Prefix, name, scheme)
target := fmt.Sprintf(servicePrefix, scheme, name, "v1", conf.GetString("app.mode"))
getResp, getErr := reg.client.Get(ctx, target, clientv3.WithPrefix())
if getErr != nil {
reg.logger.Error(ecode.MsgWatchRequestErr, xlog.FieldErrKind(ecode.ErrKindRequestErr), xlog.FieldErr(getErr), xlog.FieldAddr(target))
Expand All @@ -105,7 +111,7 @@ func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme

// WatchServices watch service change event, then return address list
func (reg *etcdv3Registry) WatchServices(ctx context.Context, name string, scheme string) (chan registry.Endpoints, error) {
prefix := fmt.Sprintf("/%s/%s/", reg.Prefix, name)
prefix := fmt.Sprintf(servicePrefix, scheme, name, "v1", pkg.AppMode())
watch, err := reg.client.WatchPrefix(context.Background(), prefix)
if err != nil {
return nil, err
Expand Down Expand Up @@ -213,8 +219,8 @@ func (reg *etcdv3Registry) registerMetric(ctx context.Context, info *server.Serv
opOptions := make([]clientv3.OpOption, 0)
// opOptions = append(opOptions, clientv3.WithSerializable())
if ttl := reg.Config.ServiceTTL.Seconds(); ttl > 0 {
//todo ctx without timeout for same as service life?
sess, err := reg.getSession(key, concurrency.WithTTL(int(ttl)))
// 这里基于应用名为key做缓存,每个服务实例应该只需要创建一个lease,降低etcd的压力
sess, err := reg.getSession(info.Name, concurrency.WithTTL(int(ttl)))
if err != nil {
return err
}
Expand Down Expand Up @@ -244,8 +250,8 @@ func (reg *etcdv3Registry) registerBiz(ctx context.Context, info *server.Service
opOptions := make([]clientv3.OpOption, 0)
// opOptions = append(opOptions, clientv3.WithSerializable())
if ttl := reg.Config.ServiceTTL.Seconds(); ttl > 0 {
//todo ctx without timeout for same as service life?
sess, err := reg.getSession(key, concurrency.WithTTL(int(ttl)))
// 这里基于应用名为key做缓存,每个服务实例应该只需要创建一个lease,降低etcd的压力
sess, err := reg.getSession(info.Name, concurrency.WithTTL(int(ttl)))
if err != nil {
return err
}
Expand All @@ -262,19 +268,21 @@ func (reg *etcdv3Registry) registerBiz(ctx context.Context, info *server.Service
}

func (reg *etcdv3Registry) getSession(k string, opts ...concurrency.SessionOption) (*concurrency.Session, error) {
reg.rmu.RLock()
// 需要对整个方法加锁,防止并发创建session
reg.rmu.Lock()
defer reg.rmu.Unlock()
sess, ok := reg.sessions[k]
reg.rmu.RUnlock()
if ok {
return sess, nil
}

sess, err := concurrency.NewSession(reg.client.Client, opts...)
if err != nil {
xlog.Jupiter().Error("create session failed", xlog.FieldKeyAny(k))
return sess, err
}
reg.rmu.Lock()
reg.sessions[k] = sess
reg.rmu.Unlock()
xlog.Jupiter().Info("create session", xlog.FieldKeyAny(k), xlog.FieldValueAny(sess))
return sess, nil
}

Expand All @@ -296,43 +304,23 @@ func (reg *etcdv3Registry) delSession(k string) error {
}

func (reg *etcdv3Registry) registerKey(info *server.ServiceInfo) string {
return registry.GetServiceKey(reg.Prefix, info)
return fmt.Sprintf(registerService, info.Scheme, info.Name, "v1", pkg.AppMode(), info.Address)
}

func (reg *etcdv3Registry) registerValue(info *server.ServiceInfo) string {
return registry.GetServiceValue(info)
update := Update{
Op: Add,
Addr: info.Address,
}

val, _ := json.Marshal(update)

return string(val)
}

func deleteAddrList(al *registry.Endpoints, prefix, scheme string, kvs ...*mvccpb.KeyValue) {
for _, kv := range kvs {
var addr = strings.TrimPrefix(string(kv.Key), prefix)
if strings.HasPrefix(addr, "providers/"+scheme) {
// 解析服务注册键
addr = strings.TrimPrefix(addr, "providers/")
if addr == "" {
continue
}
uri, err := url.Parse(addr)
if err != nil {
xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key)))
continue
}
delete(al.Nodes, uri.String())
}

if strings.HasPrefix(addr, "configurators/"+scheme) {
// 解析服务配置键
addr = strings.TrimPrefix(addr, "configurators/")
if addr == "" {
continue
}
uri, err := url.Parse(addr)
if err != nil {
xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key)))
continue
}
delete(al.RouteConfigs, uri.String())
}

if isIPPort(addr) {
// 直接删除addr 因为Delete操作的value值为空
Expand All @@ -345,117 +333,28 @@ func deleteAddrList(al *registry.Endpoints, prefix, scheme string, kvs ...*mvccp
func updateAddrList(al *registry.Endpoints, prefix, scheme string, kvs ...*mvccpb.KeyValue) {
for _, kv := range kvs {
var addr = strings.TrimPrefix(string(kv.Key), prefix)
switch {
// 解析服务注册键
case strings.HasPrefix(addr, "providers/"+scheme):
addr = strings.TrimPrefix(addr, "providers/")
uri, err := url.Parse(addr)
if err != nil {
xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key)))
continue
}
var serviceInfo server.ServiceInfo
if err := json.Unmarshal(kv.Value, &serviceInfo); err != nil {
xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key)))
continue
}
if serviceInfo.Enable {
al.Nodes[uri.String()] = serviceInfo
} else {
delete(al.Nodes, uri.String())
}

case strings.HasPrefix(addr, "configurators/"+scheme):
addr = strings.TrimPrefix(addr, "configurators/")

uri, err := url.Parse(addr)
if err != nil {
xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key)))
if isIPPort(addr) {
var meta Update
if err := json.Unmarshal(kv.Value, &meta); err != nil {
xlog.Jupiter().Error("unmarshal meta", xlog.FieldErr(err),
xlog.FieldExtMessage("value", string(kv.Value), "key", string(kv.Key)))
continue
}

if strings.HasPrefix(uri.Path, "/routes/") { // 路由配置
var routeConfig registry.RouteConfig
if err := json.Unmarshal(kv.Value, &routeConfig); err != nil {
xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key)))
continue
}
routeConfig.ID = strings.TrimPrefix(uri.Path, "/routes/")
routeConfig.Scheme = uri.Scheme
routeConfig.Host = uri.Host
al.RouteConfigs[uri.String()] = routeConfig
}

if strings.HasPrefix(uri.Path, "/providers/") {
var providerConfig registry.ProviderConfig
if err := json.Unmarshal(kv.Value, &providerConfig); err != nil {
xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key)))
continue
}
providerConfig.ID = strings.TrimPrefix(uri.Path, "/providers/")
providerConfig.Scheme = uri.Scheme
providerConfig.Host = uri.Host
al.ProviderConfigs[uri.String()] = providerConfig
}

if strings.HasPrefix(uri.Path, "/consumers/") {
var consumerConfig registry.ConsumerConfig
if err := json.Unmarshal(kv.Value, &consumerConfig); err != nil {
xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key)))
continue
switch meta.Op {
case Add:
al.Nodes[addr] = server.ServiceInfo{
Address: addr,
}
consumerConfig.ID = strings.TrimPrefix(uri.Path, "/consumers/")
consumerConfig.Scheme = uri.Scheme
consumerConfig.Host = uri.Host
al.ConsumerConfigs[uri.String()] = consumerConfig
case Delete:
delete(al.Nodes, addr)
}
}
}
}


func isIPPort(addr string) bool {
_, _, err := net.SplitHostPort(addr)
return err == nil
}

/*
key: /jupiter/main/configurator/grpc:///routes/1
val:
{
"upstream": { // 客户端配置
"nodes": { // 按照node负载均衡
"127.0.0.1:1980": 1,
"127.0.0.1:1981": 4
},
"group": { // 按照group负载均衡
"red": 2,
"green": 1
}
},
"uri": "/hello",
"deployment": "open_api"
}

key: /jupiter/main/configurator/grpc://127.0.0.1/routes/2
val:
{
"upstream": { // 客户端配置
"nodes": { // 按照node负载均衡
"127.0.0.1:1980": 1,
"127.0.0.1:1981": 1
},
"group": { // 按照group负载均衡
"red": 1,
"green": 2
}
},
"uri": "/hello",
"deployment": "core_api" // 部署组
}

key: /jupiter/main/configurator/grpc:///consumers/client-demo
val:
{

}
*/
36 changes: 36 additions & 0 deletions pkg/registry/etcdv3/update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2022 Douyu
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdv3

type Operation uint8

const (
// Add indicates an Endpoint is added.
Add Operation = iota
// Delete indicates an existing address is deleted.
Delete
)

// Update defines a name resolution update. Notice that it is not valid having both
// empty string Addr and nil Metadata in an Update.
type Update struct {
// Op indicates the operation of the update.
Op Operation
// Addr is the updated address. It is empty string if there is no address update.
Addr string
// Metadata is the updated metadata. It is nil if there is no metadata update.
// Metadata is not required for a custom naming implementation.
Metadata interface{}
}
Loading