Skip to content

Commit

Permalink
local mode, metadata mapping is reported using registration information
Browse files Browse the repository at this point in the history
  • Loading branch information
binbin0325 committed Nov 24, 2021
1 parent 03cf5fd commit dd0efbf
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 9 deletions.
2 changes: 1 addition & 1 deletion config/config_loader.go
Expand Up @@ -96,7 +96,7 @@ func registerServiceInstance() {
}
// publish metadata to remote
if GetApplicationConfig().MetadataType == constant.RemoteMetadataStorageType {
if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil {
if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil {
remoteMetadataService.PublishMetadata(GetApplicationConfig().Name)
}
}
Expand Down
54 changes: 54 additions & 0 deletions config/instance/registry_metadata_report.go
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package instance

import (
"sync"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
)

var (
regInstances = make(map[string]report.MetadataReport, 8)
mux sync.RWMutex
)

// GetMetadataReportInstanceByReg obtain metadata report instance through registry config
func GetMetadataReportInstanceByReg(protocol string) report.MetadataReport {
mux.RLock()
defer mux.RUnlock()
regInstance, ok := regInstances[protocol]
if !ok {
return nil
}
return regInstance
}

// SetMetadataReportInstanceByReg set metadata reporting instances by registering urls
func SetMetadataReportInstanceByReg(url *common.URL) {
mux.Lock()
defer mux.Unlock()
if _, ok := regInstances[url.Protocol]; ok {
return
}
regInstances[url.Protocol] = extension.GetMetadataReportFactory(url.Protocol).CreateMetadataReport(url)
}
64 changes: 64 additions & 0 deletions config/instance/registry_metadata_report_test.go
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package instance

import (
"testing"
)

import (
"github.com/stretchr/testify/assert"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
"dubbo.apache.org/dubbo-go/v3/metadata/report/factory"
)

func TestGetMetadataReportInstanceByReg(t *testing.T) {
t.Run("nil", func(t *testing.T) {
reportInstance := GetMetadataReportInstanceByReg("mock")
assert.Nil(t, reportInstance)
})

t.Run("normal", func(t *testing.T) {
u, _ := common.NewURL("mock://127.0.0.1")
extension.SetMetadataReportFactory("mock", func() factory.MetadataReportFactory {
return &mockMetadataReportFactory{}
})
SetMetadataReportInstanceByReg(u)
reportInstance := GetMetadataReportInstanceByReg("mock")
assert.NotNil(t, reportInstance)
})
}

func TestSetMetadataReportInstanceByReg(t *testing.T) {

t.Run("exist", func(t *testing.T) {
regInstances = make(map[string]report.MetadataReport, 8)
u, _ := common.NewURL("mock://127.0.0.1")
extension.SetMetadataReportFactory("mock", func() factory.MetadataReportFactory {
return &mockMetadataReportFactory{}
})
SetMetadataReportInstanceByReg(u)
SetMetadataReportInstanceByReg(u)
assert.True(t, len(regInstances) == 1)
})
}
1 change: 1 addition & 0 deletions config/metadata_report_config.go
Expand Up @@ -61,6 +61,7 @@ func (mc *MetadataReportConfig) ToUrl() (*common.URL, error) {
common.WithPassword(mc.Password),
common.WithLocation(mc.Address),
common.WithProtocol(mc.Protocol),
common.WithParamsValue(constant.TimeoutKey, mc.Timeout),
common.WithParamsValue(constant.MetadataReportGroupKey, mc.Group),
common.WithParamsValue(constant.MetadataReportNamespaceKey, mc.Namespace),
common.WithParamsValue(constant.MetadataTypeKey, mc.metadataType),
Expand Down
35 changes: 33 additions & 2 deletions config/registry_config.go
Expand Up @@ -25,13 +25,16 @@ import (

import (
"github.com/creasty/defaults"

perrors "github.com/pkg/errors"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config/instance"
"dubbo.apache.org/dubbo-go/v3/registry"
)

Expand Down Expand Up @@ -67,8 +70,7 @@ func (c *RegistryConfig) Init() error {
if err := defaults.Set(c); err != nil {
return err
}
c.translateRegistryAddress()
return verify(c)
return c.startRegistryConfig()
}

func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values {
Expand All @@ -89,6 +91,35 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values {
return urlMap
}

func (c *RegistryConfig) startRegistryConfig() error {
c.translateRegistryAddress()
if GetApplicationConfig().MetadataType == constant.DefaultMetadataStorageType && c.RegistryType == constant.ServiceKey {
if tmpUrl, err := c.toMetadataReportUrl(); err == nil {
instance.SetMetadataReportInstanceByReg(tmpUrl)
} else {
return perrors.Wrap(err, "Start RegistryConfig failed.")
}
}
return verify(c)
}

// toMetadataReportUrl translate the registry configuration to the metadata reporting url
func (c *RegistryConfig) toMetadataReportUrl() (*common.URL, error) {
res, err := common.NewURL(c.Address,
common.WithLocation(c.Address),
common.WithProtocol(c.Protocol),
common.WithUsername(c.Username),
common.WithPassword(c.Password),
common.WithParamsValue(constant.TimeoutKey, c.Timeout),
common.WithParamsValue(constant.MetadataReportGroupKey, c.Group),
common.WithParamsValue(constant.MetadataReportNamespaceKey, c.Namespace),
)
if err != nil || len(res.Protocol) == 0 {
return nil, perrors.New("Invalid Registry Config.")
}
return res, nil
}

//translateRegistryAddress translate registry address
// eg:address=nacos://127.0.0.1:8848 will return 127.0.0.1:8848 and protocol will set nacos
func (c *RegistryConfig) translateRegistryAddress() string {
Expand Down
8 changes: 7 additions & 1 deletion config/service_config.go
Expand Up @@ -227,7 +227,7 @@ func (svc *ServiceConfig) Export() error {
svc.cacheMutex.Unlock()

for _, regUrl := range regUrls {
regUrl.SubURL = ivkURL
setRegistrySubURL(ivkURL, regUrl)
invoker := proxyFactory.GetInvoker(regUrl)
exporter := svc.cacheProtocol.Export(invoker)
if exporter == nil {
Expand Down Expand Up @@ -259,6 +259,12 @@ func (svc *ServiceConfig) Export() error {
return nil
}

//setRegistrySubURL set registry sub url is ivkURl
func setRegistrySubURL(ivkURL *common.URL, regUrl *common.URL) {
ivkURL.AddParam(constant.RegistryKey, regUrl.GetParam(constant.RegistryKey, ""))
regUrl.SubURL = ivkURL
}

//loadProtocol filter protocols by ids
func loadProtocol(protocolIds []string, protocols map[string]*ProtocolConfig) []*ProtocolConfig {
returnProtocols := make([]*ProtocolConfig, 0, len(protocols))
Expand Down
18 changes: 17 additions & 1 deletion metadata/mapping/metadata/service_name_mapping.go
Expand Up @@ -35,6 +35,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config/instance"
"dubbo.apache.org/dubbo-go/v3/metadata/mapping"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
)

const (
Expand All @@ -61,7 +62,11 @@ func (d *MetadataServiceNameMapping) Map(url *common.URL) error {
}

appName := config.GetApplicationConfig().Name
metadataReport := instance.GetMetadataReportInstance()

metadataReport := getMetaDataReport(url.GetParam(constant.RegistryKey, ""))
if metadataReport == nil {
return perrors.New("get metadata report instance is nil")
}
err := metadataReport.RegisterServiceAppMapping(serviceInterface, defaultGroup, appName)
if err != nil {
return perrors.WithStack(err)
Expand Down Expand Up @@ -95,3 +100,14 @@ func GetNameMappingInstance() mapping.ServiceNameMapping {
})
return serviceNameMappingInstance
}

// getMetaDataReport obtain metadata reporting instances through registration protocol
// if the metadata type is remote, obtain the instance from the metadata report configuration
func getMetaDataReport(protocol string) report.MetadataReport {
var metadataReport report.MetadataReport
if config.GetApplicationConfig().MetadataType == constant.RemoteMetadataStorageType {
metadataReport = instance.GetMetadataReportInstance()
return metadataReport
}
return instance.GetMetadataReportInstanceByReg(protocol)
}
2 changes: 1 addition & 1 deletion metadata/report/etcd/report.go
Expand Up @@ -179,7 +179,7 @@ type etcdMetadataReportFactory struct{}

// CreateMetadataReport get the MetadataReport instance of etcd
func (e *etcdMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {
timeout := url.GetParamDuration(constant.ConfigTimeoutKey, constant.DefaultRegTimeout)
timeout := url.GetParamDuration(constant.TimeoutKey, constant.DefaultRegTimeout)
addresses := strings.Split(url.Location, ",")
client, err := gxetcd.NewClient(gxetcd.MetadataETCDV3Client, addresses, timeout, 1)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions metadata/report/nacos/report.go
Expand Up @@ -255,6 +255,7 @@ type nacosMetadataReportFactory struct{}

// nolint
func (n *nacosMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {
url.SetParam(constant.NacosNamespaceID, url.GetParam(constant.MetadataReportNamespaceKey, ""))
client, err := nacos.NewNacosConfigClientByUrl(url)
if err != nil {
logger.Errorf("Could not create nacos metadata report. URL: %s", url.String())
Expand Down
5 changes: 2 additions & 3 deletions metadata/report/zookeeper/report.go
Expand Up @@ -20,7 +20,6 @@ package zookeeper
import (
"encoding/json"
"strings"
"time"
)

import (
Expand Down Expand Up @@ -184,13 +183,13 @@ func (mf *zookeeperMetadataReportFactory) CreateMetadataReport(url *common.URL)
"zookeeperMetadataReport",
strings.Split(url.Location, ","),
false,
gxzookeeper.WithZkTimeOut(15*time.Second),
gxzookeeper.WithZkTimeOut(url.GetParamDuration(constant.TimeoutKey, "15s")),
)
if err != nil {
panic(err)
}

rootDir := url.GetParam(constant.GroupKey, "dubbo")
rootDir := url.GetParam(constant.MetadataReportGroupKey, "dubbo")
if !strings.HasPrefix(rootDir, constant.PathSeparator) {
rootDir = constant.PathSeparator + rootDir
}
Expand Down

0 comments on commit dd0efbf

Please sign in to comment.