Skip to content

Commit

Permalink
register not use metadata configuration.
Browse files Browse the repository at this point in the history
  • Loading branch information
binbin0325 committed Nov 9, 2021
1 parent 27aaaa9 commit 0b33524
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 67 deletions.
11 changes: 7 additions & 4 deletions common/extension/service_discovery.go
Expand Up @@ -22,26 +22,29 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/registry"
)

var discoveryCreatorMap = make(map[string]func() (registry.ServiceDiscovery, error), 4)
var discoveryCreatorMap = make(map[string]func(url *common.URL) (registry.ServiceDiscovery, error), 4)

// SetServiceDiscovery will store the @creator and @name
// protocol indicate the implementation, like nacos
// the name like nacos-1...
func SetServiceDiscovery(protocol string, creator func() (registry.ServiceDiscovery, error)) {
func SetServiceDiscovery(protocol string, creator func(url *common.URL) (registry.ServiceDiscovery, error)) {
discoveryCreatorMap[protocol] = creator
}

// GetServiceDiscovery will return the registry.ServiceDiscovery
// protocol indicate the implementation, like nacos
// the name like nacos-1...
// if not found, or initialize instance failed, it will return error.
func GetServiceDiscovery(protocol string) (registry.ServiceDiscovery, error) {
func GetServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
protocol := url.GetParam(constant.RegistryKey, "")
creator, ok := discoveryCreatorMap[protocol]
if !ok {
return nil, perrors.New("Could not find the service discovery with discovery protocol: " + protocol)
}
return creator()
return creator(url)
}
2 changes: 1 addition & 1 deletion config/config_center_config.go
Expand Up @@ -57,7 +57,7 @@ type CenterConfig struct {
Group string `default:"dubbo" yaml:"group" json:"group,omitempty"`
Username string `yaml:"username" json:"username,omitempty"`
Password string `yaml:"password" json:"password,omitempty"`
Namespace string `default:"dubbo" yaml:"namespace" json:"namespace,omitempty"`
Namespace string `yaml:"namespace" json:"namespace,omitempty"`
AppID string `default:"dubbo" yaml:"app-id" json:"app-id,omitempty"`
Timeout string `default:"10s" yaml:"timeout" json:"timeout,omitempty"`
Params map[string]string `yaml:"params" json:"parameters,omitempty"`
Expand Down
6 changes: 3 additions & 3 deletions config_center/nacos/impl.go
Expand Up @@ -63,10 +63,10 @@ type nacosDynamicConfiguration struct {

func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration, error) {
c := &nacosDynamicConfiguration{
rootPath: "/" + url.GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup) + "/config",
url: url,
done: make(chan struct{}),
url: url,
done: make(chan struct{}),
}
c.GetURL().SetParam(constant.NacosNamespaceID, url.GetParam(constant.ConfigNamespaceKey, ""))
err := ValidateNacosClient(c)
if err != nil {
logger.Errorf("nacos configClient start error ,error message is %v", err)
Expand Down
2 changes: 1 addition & 1 deletion metadata/service/exporter/configurable/exporter.go
Expand Up @@ -69,7 +69,7 @@ func (exporter *MetadataServiceExporter) Export(url *common.URL) error {
SetGroup(config.GetApplicationConfig().Name).
SetVersion(version).
SetProxyFactoryKey(constant.DefaultKey).
SetMetadataType(constant.RemoteMetadataStorageType).
SetMetadataType(config.GetApplicationConfig().MetadataType).
Build()
exporter.ServiceConfig.Implement(exporter.metadataService)
err := exporter.ServiceConfig.Export()
Expand Down
36 changes: 15 additions & 21 deletions registry/etcdv3/service_discovery.go
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"strings"
"sync"
"time"
)

import (
Expand All @@ -33,10 +32,10 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/remoting"
"dubbo.apache.org/dubbo-go/v3/remoting/etcdv3"
Expand Down Expand Up @@ -202,7 +201,7 @@ func (e *etcdV3ServiceDiscovery) GetHealthyInstancesByPage(serviceName string, o
return gxpage.NewPage(offset, pageSize, res, len(all))
}

// Batch get all instances by the specified service names
// GetRequestInstances Batch get all instances by the specified service names
func (e *etcdV3ServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
res := make(map[string]gxpage.Pager, len(serviceNames))
for _, name := range serviceNames {
Expand Down Expand Up @@ -278,7 +277,7 @@ func (e *etcdV3ServiceDiscovery) registerServiceWatcher(serviceName string) erro
return nil
}

// when child data change should DispatchEventByServiceName
// DataChange when child data change should DispatchEventByServiceName
func (e *etcdV3ServiceDiscovery) DataChange(eventType remoting.Event) bool {
if eventType.Action == remoting.EventTypeUpdate {
instance := &registry.DefaultServiceInstance{}
Expand All @@ -304,32 +303,27 @@ func (e *etcdV3ServiceDiscovery) DataChange(eventType remoting.Event) bool {
}

// newEtcdv3ServiceDiscovery
func newEtcdV3ServiceDiscovery() (registry.ServiceDiscovery, error) {
func newEtcdV3ServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
initLock.Lock()
defer initLock.Unlock()

metadataReportConfig := config.GetMetadataReportConfg()
timeout := url.GetParamDuration(constant.RegistryTimeoutKey, constant.DefaultRegTimeout)

to, err := time.ParseDuration(metadataReportConfig.Timeout)
if err != nil {
logger.Errorf("timeout config %v is invalid,err is %v", metadataReportConfig.Timeout, err.Error())
return nil, err
}
logger.Infof("etcd address is: %v,timeout is:%s", metadataReportConfig.Timeout, to.String())
logger.Infof("etcd address is: %v,timeout is:%s", url.Location, timeout.String())

client := etcdv3.NewServiceDiscoveryClient(
gxetcd.WithName(gxetcd.RegistryETCDV3Client),
gxetcd.WithTimeout(to),
gxetcd.WithEndpoints(strings.Split(metadataReportConfig.Address, ",")...),
gxetcd.WithTimeout(timeout),
gxetcd.WithEndpoints(strings.Split(url.Location, ",")...),
)

descriptor := fmt.Sprintf("etcd-service-discovery[%s]", metadataReportConfig.Address)
descriptor := fmt.Sprintf("etcd-service-discovery[%s]", url.Location)

return &etcdV3ServiceDiscovery{
descriptor,
client,
nil,
gxset.NewSet(),
make(map[string]*etcdv3.EventListener),
make(map[string]*gxset.HashSet)}, nil
descriptor: descriptor,
client: client,
serviceInstance: nil,
services: gxset.NewSet(),
childListenerMap: make(map[string]*etcdv3.EventListener),
instanceListenerMap: make(map[string]*gxset.HashSet)}, nil
}
7 changes: 2 additions & 5 deletions registry/file/service_discovery.go
Expand Up @@ -38,7 +38,6 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/config_center/file"
"dubbo.apache.org/dubbo-go/v3/registry"
Expand All @@ -56,19 +55,17 @@ type fileSystemServiceDiscovery struct {
fileMap map[string]string
}

func newFileSystemServiceDiscovery() (registry.ServiceDiscovery, error) {
if config.GetMetadataReportConfg().Protocol != constant.FileKey {
func newFileSystemServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
if url.Protocol != constant.FileKey {
return nil, perrors.New("could not init the instance because the config is invalid")
}

rp, err := file.Home()
if err != nil {
return nil, perrors.WithStack(err)
}

fdcf := extension.GetConfigCenterFactory(constant.FileKey)
p := path.Join(rp, ".dubbo", constant.RegistryKey)
url, _ := common.NewURL("")
url.AddParamAvoidNil(file.ConfigCenterDirParamName, p)
c, err := fdcf.GetDynamicConfiguration(url)
if err != nil {
Expand Down
24 changes: 11 additions & 13 deletions registry/nacos/service_discovery.go
Expand Up @@ -19,7 +19,6 @@ package nacos

import (
"fmt"
"net/url"
"sync"
)

Expand All @@ -39,7 +38,6 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/remoting/nacos"
)
Expand Down Expand Up @@ -330,22 +328,22 @@ func (n *nacosServiceDiscovery) String() string {
}

// newNacosServiceDiscovery will create new service discovery instance
func newNacosServiceDiscovery() (registry.ServiceDiscovery, error) {
metadataReportConfig := config.GetMetadataReportConfg()
url := common.NewURLWithOptions(
common.WithParams(make(url.Values)),
common.WithPassword(metadataReportConfig.Password),
common.WithUsername(metadataReportConfig.Username),
common.WithParamsValue(constant.RegistryTimeoutKey, metadataReportConfig.Timeout))
url.Location = metadataReportConfig.Address
client, err := nacos.NewNacosClientByUrl(url)
func newNacosServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
discoveryUrl := common.NewURLWithOptions(
common.WithParams(url.GetParams()),
common.WithParamsValue(constant.TimeoutKey, url.GetParam(constant.RegistryTimeoutKey, constant.DefaultRegTimeout)),
common.WithParamsValue(constant.NacosUsername, url.Username),
common.WithParamsValue(constant.NacosPassword, url.Password),
common.WithParamsValue(constant.NacosNamespaceID, url.GetParam(constant.NamespaceKey, "")))
discoveryUrl.Location = url.Location
client, err := nacos.NewNacosClientByUrl(discoveryUrl)
if err != nil {
return nil, perrors.WithMessage(err, "create nacos namingClient failed.")
}

descriptor := fmt.Sprintf("nacos-service-discovery[%s]", metadataReportConfig.Address)
descriptor := fmt.Sprintf("nacos-service-discovery[%s]", discoveryUrl.Location)

group := metadataReportConfig.Group
group := discoveryUrl.Group()
if len(group) == 0 {
group = defaultGroup
}
Expand Down
3 changes: 1 addition & 2 deletions registry/servicediscovery/service_discovery_registry.go
Expand Up @@ -118,8 +118,7 @@ func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registr
}

func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
sdcName := url.GetParam(constant.RegistryKey, "")
originServiceDiscovery, err := extension.GetServiceDiscovery(sdcName)
originServiceDiscovery, err := extension.GetServiceDiscovery(url)
if err != nil {
return nil, perrors.WithMessage(err, "Create service discovery fialed")
}
Expand Down
19 changes: 6 additions & 13 deletions registry/zookeeper/service_discovery.go
Expand Up @@ -19,7 +19,6 @@ package zookeeper

import (
"fmt"
"net/url"
"strconv"
"strings"
"sync"
Expand All @@ -36,13 +35,16 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/remoting"
"dubbo.apache.org/dubbo-go/v3/remoting/zookeeper"
"dubbo.apache.org/dubbo-go/v3/remoting/zookeeper/curator_discovery"
)

const (
rootPath = "/services"
)

// init will put the service discovery into extension
func init() {
extension.SetServiceDiscovery(constant.ZookeeperKey, newZookeeperServiceDiscovery)
Expand All @@ -63,22 +65,13 @@ type zookeeperServiceDiscovery struct {
}

// newZookeeperServiceDiscovery the constructor of newZookeeperServiceDiscovery
func newZookeeperServiceDiscovery() (registry.ServiceDiscovery, error) {
metadataReportConfig := config.GetMetadataReportConfg()
rootPath := "/services"
url := common.NewURLWithOptions(
common.WithParams(make(url.Values)),
common.WithPassword(metadataReportConfig.Password),
common.WithUsername(metadataReportConfig.Username),
common.WithParamsValue(constant.RegistryTimeoutKey, metadataReportConfig.Timeout))
url.Location = metadataReportConfig.Address
func newZookeeperServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
zksd := &zookeeperServiceDiscovery{
url: url,
rootPath: rootPath,
instanceListenerMap: make(map[string]*gxset.HashSet),
}
err := zookeeper.ValidateZookeeperClient(zksd, url.Location)
if err != nil {
if err := zookeeper.ValidateZookeeperClient(zksd, url.Location); err != nil {
return nil, err
}
zksd.WaitGroup().Add(1) // zk client start successful, then wg +1
Expand Down
2 changes: 1 addition & 1 deletion remoting/nacos/builder.go
Expand Up @@ -72,7 +72,7 @@ func GetNacosConfig(url *common.URL) ([]nacosConstant.ServerConfig, nacosConstan
serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{IpAddr: ip, Port: uint64(port)})
}

timeout := url.GetParamDuration(constant.ConfigTimeoutKey, constant.DefaultRegTimeout)
timeout := url.GetParamDuration(constant.TimeoutKey, constant.DefaultRegTimeout)

clientConfig := nacosConstant.ClientConfig{
TimeoutMs: uint64(int32(timeout / time.Millisecond)),
Expand Down
6 changes: 3 additions & 3 deletions remoting/nacos/builder_test.go
Expand Up @@ -101,7 +101,7 @@ func TestTimeoutConfig(t *testing.T) {

t.Run("right timeout", func(t *testing.T) {

regurlMap.Set(constant.ConfigTimeoutKey, "5s")
regurlMap.Set(constant.RegistryTimeoutKey, "5s")

newURL, _ := common.NewURL("registry://console.nacos.io:80", common.WithParams(regurlMap))

Expand All @@ -112,7 +112,7 @@ func TestTimeoutConfig(t *testing.T) {
})

t.Run("invalid timeout", func(t *testing.T) {
regurlMap.Set(constant.ConfigTimeoutKey, "5ab")
regurlMap.Set(constant.RegistryTimeoutKey, "5ab")

newURL, _ := common.NewURL("registry://console.nacos.io:80", common.WithParams(regurlMap))
_, cc, err := GetNacosConfig(newURL)
Expand All @@ -130,7 +130,7 @@ func getRegUrl() *common.URL {
// regurlMap.Set(constant.NacosUsername, "nacos")
// regurlMap.Set(constant.NacosPassword, "nacos")
regurlMap.Set(constant.NacosNamespaceID, "nacos")
regurlMap.Set(constant.ConfigTimeoutKey, "5s")
regurlMap.Set(constant.RegistryTimeoutKey, "5s")

regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParams(regurlMap))

Expand Down

0 comments on commit 0b33524

Please sign in to comment.