Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix service discovery related issues and add mesh proxy mode support #2022

Merged
merged 34 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2377ea9
fix service discovery related issues and add mesh proxy mode
chickenlj Aug 18, 2022
39e31d0
fix wrong condition check
chickenlj Aug 18, 2022
1a5918c
delete useless code
chickenlj Aug 18, 2022
113c921
delete empty if branch
chickenlj Aug 18, 2022
645dd64
fix service discovery
chickenlj Aug 18, 2022
e9b728e
fix unexpected change
chickenlj Aug 18, 2022
b17a8d0
remove unused code
chickenlj Aug 18, 2022
f368295
fix shadow variable
chickenlj Aug 18, 2022
2c42119
add comment
chickenlj Aug 18, 2022
e31102a
recover provided_by
chickenlj Aug 18, 2022
a2a62c4
add project setting
chickenlj Aug 18, 2022
096db6a
update
chickenlj Aug 18, 2022
5e98864
Merge branch '3.0' into fix-3.0-mesh-proxy
chickenlj Aug 23, 2022
af62e4d
update project description
chickenlj Aug 23, 2022
f271881
format imports
chickenlj Aug 24, 2022
4d61cf9
format import
chickenlj Aug 24, 2022
82f236a
reformat import using goformatter-imports
chickenlj Aug 24, 2022
a668a23
enable default metadata
chickenlj Aug 24, 2022
48fe0cb
fix compilation
chickenlj Aug 24, 2022
8f26c34
skip if metadata factory not provided
chickenlj Aug 24, 2022
d74775b
add some logs
chickenlj Aug 24, 2022
86a1040
delete previous instance if exists
chickenlj Aug 25, 2022
78bff0c
consume tri protocol by default
chickenlj Aug 25, 2022
8f832be
change IT repo
chickenlj Aug 26, 2022
69d164d
fix
chickenlj Aug 26, 2022
5c1fd14
fix directory subscribe
chickenlj Aug 28, 2022
01f06e2
fix service discovery
chickenlj Aug 28, 2022
d96b220
fix service discovery, ignore err when checking mapping
chickenlj Aug 28, 2022
3c7a964
recover integration test configuration
chickenlj Aug 28, 2022
5ee0a81
support 'N/A' registry address
chickenlj Aug 31, 2022
741f579
support 'N/A' registry address
chickenlj Aug 31, 2022
d6d59ae
fix log format
chickenlj Aug 31, 2022
dd2f59f
Optimze code according to comments
chickenlj Sep 6, 2022
ea88557
update base branch
chickenlj Sep 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 26 additions & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,29 @@ notifications:
issues: notifications@dubbo.apache.org
pullrequests: notifications@dubbo.apache.org
jira_options: link label link label
github:
homepage: https://dubbo.apache.org/
description: "Dubbo-go is the go language implementation of Dubbo."
chickenlj marked this conversation as resolved.
Show resolved Hide resolved
chickenlj marked this conversation as resolved.
Show resolved Hide resolved
labels:
- go
- rpc
- microservices
- http2
- service-mesh
features:
# Enable wiki for documentation
wiki: true
# Enable issue management
issues: true
# Enable projects for project management boards
projects: true
protected_branches:
master:
# only disable force push
foo: bar
justxuewei marked this conversation as resolved.
Show resolved Hide resolved
3.0:
# only disable force push
foo: bar
3.1:
# only disable force push
foo: bar
13 changes: 9 additions & 4 deletions common/constant/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ package constant

// nolint
const (
ConfigFileEnvKey = "DUBBO_GO_CONFIG_PATH" // key of environment variable dubbogo configure file path
AppLogConfFile = "AppLogConfFile"
PodNameEnvKey = "POD_NAME"
PodNamespaceEnvKey = "POD_NAMESPACE"
ConfigFileEnvKey = "DUBBO_GO_CONFIG_PATH" // key of environment variable dubbogo configure file path
AppLogConfFile = "AppLogConfFile"
PodNameEnvKey = "POD_NAME"
PodNamespaceEnvKey = "POD_NAMESPACE"
ClusterDomainKey = "CLUSTER_DOMAIN"
DefaultClusterDomain = "cluster.local"
DefaultNamespace = "default"
SVC = "svc"
DefaultMeshPort = 80
)
2 changes: 1 addition & 1 deletion common/metadata_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (si *ServiceInfo) GetMethods() []string {
s := si.Params[constant.MethodsKey]
return strings.Split(s, ",")
}
methods := make([]string, 8)
methods := make([]string, 0)
chickenlj marked this conversation as resolved.
Show resolved Hide resolved
for k, _ := range si.Params {
ms := strings.Index(k, ".")
if ms > 0 {
Expand Down
40 changes: 36 additions & 4 deletions common/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ func WithMethods(methods []string) Option {
}
}

// WithParams sets params for URL
// WithParams deep copy the params in the argument into params of the target URL
func WithParams(params url.Values) Option {
return func(url *URL) {
url.params = params
url.SetParams(params)
}
}

Expand Down Expand Up @@ -190,6 +190,13 @@ func WithPath(path string) Option {
}
}

// WithInterface sets interface param for URL
func WithInterface(v string) Option {
return func(url *URL) {
url.SetParam(constant.InterfaceKey, v)
}
}

// WithLocation sets location for URL
func WithLocation(location string) Option {
return func(url *URL) {
Expand Down Expand Up @@ -294,6 +301,14 @@ func (c *URL) Version() string {
return c.GetParam(constant.VersionKey, "")
}

// Address with format "ip:port"
func (c *URL) Address() string {
if c.Port == "" {
return c.Ip
}
return c.Ip + ":" + c.Port
}

// URLEqual judge @URL and @c is equal or not.
func (c *URL) URLEqual(url *URL) bool {
tmpC := c.Clone()
Expand Down Expand Up @@ -515,6 +530,23 @@ func (c *URL) GetParam(s string, d string) string {
return r
}

// GetParamNoDefault gets value by key, return nil,false if no value found mapping to the key
func (c *URL) GetParamNoDefault(s string) (string, bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个函数名气很奇怪。在Go里面,我们知道map之类的会有一些default值,所以我大概也能明白你的用意。但 (string, bool) 中的 bool 就是 go 的风格,能够说明 key 是否存在,所以改名叫 GetParam 就可以了。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

改名叫 GetParam 就可以

GetParam 这个名字已经被占用了,并且没有 bool 返回值,没啥好办法了。

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might consider using GetParamWithoutDefault to replace that.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetNonDefaultParam怎么样

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetNonDefaultParam怎么样

Agree with u. Really a better func name.

c.paramsLock.RLock()
defer c.paramsLock.RUnlock()

var r string
ok := true
if len(c.params) > 0 {
r = c.params.Get(s)
}
if len(r) == 0 {
ok = false
}

return r, ok
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是不是多余了?直接判断是否为空字符串不就好了吗?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return r, r == ""

}

// GetParams gets values
func (c *URL) GetParams() url.Values {
return c.params
Expand Down Expand Up @@ -693,7 +725,7 @@ func MergeURL(serviceURL *URL, referenceURL *URL) *URL {
// iterator the referenceURL if serviceURL not have the key ,merge in
// referenceURL usually will not changed. so change RangeParams to GetParams to avoid the string value copy.// Group get group
for key, value := range referenceURL.GetParams() {
if v := mergedURL.GetParam(key, ""); len(v) == 0 {
if _, ok := mergedURL.GetParamNoDefault(key); !ok {
if len(value) > 0 {
params[key] = value
}
Expand All @@ -704,7 +736,7 @@ func MergeURL(serviceURL *URL, referenceURL *URL) *URL {
methodConfigMergeFcn := mergeNormalParam(params, referenceURL, []string{constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey})

// remote timestamp
if v := serviceURL.GetParam(constant.TimestampKey, ""); len(v) > 0 {
if v, ok := serviceURL.GetParamNoDefault(constant.TimestampKey); !ok {
params[constant.RemoteTimestampKey] = []string{v}
params[constant.TimestampKey] = []string{referenceURL.GetParam(constant.TimestampKey, "")}
}
Expand Down
2 changes: 1 addition & 1 deletion common/url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestNewURLWithOptions(t *testing.T) {
assert.Equal(t, loopbackAddress, u.Ip)
assert.Equal(t, "8080", u.Port)
assert.Equal(t, methods, u.Methods)
assert.Equal(t, params, u.params)
assert.Equal(t, 2, len(u.params))
}

func TestURL(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions config/consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type ConsumerConfig struct {
TracingKey string `yaml:"tracing-key" json:"tracing-key" property:"tracing-key"`
FilterConf interface{} `yaml:"filter-conf" json:"filter-conf,omitempty" property:"filter-conf"`
MaxWaitTimeForServiceDiscovery string `default:"3s" yaml:"max-wait-time-for-service-discovery" json:"max-wait-time-for-service-discovery,omitempty" property:"max-wait-time-for-service-discovery"`
MeshEnabled bool `yaml:"mesh-enabled" json:"mesh-enabled,omitempty" property:"mesh-enabled"`
rootConfig *RootConfig
}

Expand Down Expand Up @@ -239,6 +240,11 @@ func (ccb *ConsumerConfigBuilder) SetFilterConf(filterConf interface{}) *Consume
return ccb
}

func (ccb *ConsumerConfigBuilder) SetMeshEnabled(meshEnabled bool) *ConsumerConfigBuilder {
ccb.consumerConfig.MeshEnabled = meshEnabled
return ccb
}

func (ccb *ConsumerConfigBuilder) SetRootConfig(rootConfig *RootConfig) *ConsumerConfigBuilder {
ccb.consumerConfig.rootConfig = rootConfig
return ccb
Expand Down
98 changes: 70 additions & 28 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ package config

import (
"fmt"
constant2 "github.com/dubbogo/triple/pkg/common/constant"
"net/url"
"os"
"strconv"
"time"
)

import (
"github.com/creasty/defaults"

"github.com/dubbogo/gost/log/logger"
gxstrings "github.com/dubbogo/gost/strings"
)

Expand All @@ -43,34 +46,34 @@ import (

// ReferenceConfig is the configuration of service consumer
type ReferenceConfig struct {
pxy *proxy.Proxy
id string
InterfaceName string `yaml:"interface" json:"interface,omitempty" property:"interface"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
URL string `yaml:"url" json:"url,omitempty" property:"url"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
RegistryIDs []string `yaml:"registry-ids" json:"registry-ids,omitempty" property:"registry-ids"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Serialization string `yaml:"serialization" json:"serialization" property:"serialization"`
ProvidedBy string `yaml:"provided_by" json:"provided_by,omitempty" property:"provided_by"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Async bool `yaml:"async" json:"async,omitempty" property:"async"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
invoker protocol.Invoker
urls []*common.URL
Generic string `yaml:"generic" json:"generic,omitempty" property:"generic"`
Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
ForceTag bool `yaml:"force.tag" json:"force.tag,omitempty" property:"force.tag"`
TracingKey string `yaml:"tracing-key" json:"tracing-key,omitempty" propertiy:"tracing-key"`

rootConfig *RootConfig
metaDataType string
pxy *proxy.Proxy
id string
InterfaceName string `yaml:"interface" json:"interface,omitempty" property:"interface"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
URL string `yaml:"url" json:"url,omitempty" property:"url"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
RegistryIDs []string `yaml:"registry-ids" json:"registry-ids,omitempty" property:"registry-ids"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Serialization string `yaml:"serialization" json:"serialization" property:"serialization"`
ProvidedBy string `yaml:"provided_by" json:"provided_by,omitempty" property:"provided_by"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Async bool `yaml:"async" json:"async,omitempty" property:"async"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
invoker protocol.Invoker
urls []*common.URL
Generic string `yaml:"generic" json:"generic,omitempty" property:"generic"`
Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
ForceTag bool `yaml:"force.tag" json:"force.tag,omitempty" property:"force.tag"`
TracingKey string `yaml:"tracing-key" json:"tracing-key,omitempty" propertiy:"tracing-key"`
rootConfig *RootConfig
metaDataType string
MeshProviderPort int `yaml:"mesh-provider-port" json:"mesh-provider-port,omitempty" propertiy:"mesh-provider-port"`
}

func (rc *ReferenceConfig) Prefix() string {
Expand Down Expand Up @@ -120,6 +123,41 @@ func (rc *ReferenceConfig) Init(root *RootConfig) error {
return verify(rc)
}

func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}

func updateOrCreateMeshURL(rc *ReferenceConfig) {
if rc.URL != "" {
logger.Infof("URL specified explicitly %v", rc.URL)
}

if !rc.rootConfig.Consumer.MeshEnabled {
return
}
if rc.Protocol != constant2.TRIPLE {
panic(fmt.Sprintf("Mesh mode enabled, Triple protocol expected but %v protocol found!", rc.Protocol))
}
if rc.ProvidedBy == "" {
panic(fmt.Sprintf("Mesh mode enabled, provided-by should not be empty!"))
}

podNamespace := getEnv(constant.PodNamespaceEnvKey, constant.DefaultNamespace)
clusterDomain := getEnv(constant.ClusterDomainKey, constant.DefaultClusterDomain)

var meshPort int
if rc.MeshProviderPort > 0 {
meshPort = rc.MeshProviderPort
} else {
meshPort = constant.DefaultMeshPort
}

rc.URL = "tri://" + rc.ProvidedBy + "." + podNamespace + constant.SVC + clusterDomain + ":" + strconv.Itoa(meshPort)
}

// Refer retrieves invokers from urls.
func (rc *ReferenceConfig) Refer(srv interface{}) {
// If adaptive service is enabled,
Expand All @@ -144,6 +182,9 @@ func (rc *ReferenceConfig) Refer(srv interface{}) {
}
rc.postProcessConfig(cfgURL)

// if mesh-enabled is set
updateOrCreateMeshURL(rc)

// retrieving urls from config, and appending the urls to rc.urls
if rc.URL != "" { // use user-specific urls
/*
Expand Down Expand Up @@ -173,6 +214,7 @@ func (rc *ReferenceConfig) Refer(srv interface{}) {
// replace params of serviceURL with params of cfgUrl
// other stuff, e.g. IP, port, etc., are same as serviceURL
newURL := common.MergeURL(serviceURL, cfgURL)
newURL.AddParam("peer", "true")
rc.urls = append(rc.urls, newURL)
}
}
Expand Down