Skip to content

Commit

Permalink
Merge pull request #1144 from ztelur/bugfix-config-change-handle
Browse files Browse the repository at this point in the history
3.0 fix reExporter bug when config-center {applicationName}.configurator data change
  • Loading branch information
AlexStocks committed Apr 11, 2021
2 parents 95c99f0 + d982a87 commit 2ec80b8
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 10 deletions.
4 changes: 2 additions & 2 deletions config/config_loader.go
Expand Up @@ -182,7 +182,7 @@ func loadConsumerConfig() {
if data, err := yaml.MarshalYML(consumerConfig); err != nil {
logger.Errorf("Marshal consumer config err: %s", err.Error())
} else {
if err := ioutil.WriteFile(consumerConfig.CacheFile, data, 0o666); err != nil {
if err := ioutil.WriteFile(consumerConfig.CacheFile, data, 0666); err != nil {
logger.Errorf("Write consumer config cache file err: %s", err.Error())
}
}
Expand Down Expand Up @@ -249,7 +249,7 @@ func loadProviderConfig() {
if data, err := yaml.MarshalYML(providerConfig); err != nil {
logger.Errorf("Marshal provider config err: %s", err.Error())
} else {
if err := ioutil.WriteFile(providerConfig.CacheFile, data, 0o666); err != nil {
if err := ioutil.WriteFile(providerConfig.CacheFile, data, 0666); err != nil {
logger.Errorf("Write provider config cache file err: %s", err.Error())
}
}
Expand Down
42 changes: 37 additions & 5 deletions registry/protocol/protocol.go
Expand Up @@ -25,6 +25,7 @@ import (

import (
gxset "github.com/dubbogo/gost/container/set"
perrors "github.com/pkg/errors"
)

import (
Expand Down Expand Up @@ -70,7 +71,8 @@ func init() {
extension.SetProtocol("registry", GetProtocol)
}

func getCacheKey(url *common.URL) string {
func getCacheKey(invoker protocol.Invoker) string {
url := getProviderUrl(invoker)
delKeys := gxset.NewSet("dynamic", "enabled")
return url.CloneExceptParams(delKeys).String()
}
Expand Down Expand Up @@ -200,7 +202,7 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
return nil
}

key := getCacheKey(providerUrl)
key := getCacheKey(invoker)
logger.Infof("The cached exporter keys is %v!", key)
cachedExporter, loaded := proto.bounds.Load(key)
if loaded {
Expand All @@ -221,17 +223,47 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
}

func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common.URL) {
url := getProviderUrl(invoker)
key := getCacheKey(url)
key := getCacheKey(invoker)
if oldExporter, loaded := proto.bounds.Load(key); loaded {
wrappedNewInvoker := newWrappedInvoker(invoker, newUrl)
oldExporter.(protocol.Exporter).Unexport()
proto.bounds.Delete(key)
// oldExporter Unexport function unRegister rpcService from the serviceMap, so need register it again as far as possible
if err := registerServiceMap(invoker); err != nil {
logger.Error(err.Error())
}
proto.Export(wrappedNewInvoker)
// TODO: unregister & unsubscribe
}
}

func registerServiceMap(invoker protocol.Invoker) error {
providerUrl := getProviderUrl(invoker)
// the bean.name param of providerUrl is the ServiceConfig id property
// such as dubbo://:20000/org.apache.dubbo.UserProvider?bean.name=UserProvider&cluster=failfast...
id := providerUrl.GetParam(constant.BEAN_NAME_KEY, "")

serviceConfig := config.GetProviderConfig().Services[id]
if serviceConfig == nil {
s := "reExport can not get serviceConfig"
return perrors.New(s)
}
rpcService := config.GetProviderService(id)
if rpcService == nil {
s := "reExport can not get RPCService"
return perrors.New(s)
}

_, err := common.ServiceMap.Register(serviceConfig.InterfaceName,
serviceConfig.Protocol, serviceConfig.Group,
serviceConfig.Version, rpcService)
if err != nil {
s := "reExport can not re register ServiceMap. Error message is " + err.Error()
return perrors.New(s)
}
return nil
}

type overrideSubscribeListener struct {
url *common.URL
originInvoker protocol.Invoker
Expand Down Expand Up @@ -263,7 +295,7 @@ func (nl *overrideSubscribeListener) NotifyAll(events []*registry.ServiceEvent,

func (nl *overrideSubscribeListener) doOverrideIfNecessary() {
providerUrl := getProviderUrl(nl.originInvoker)
key := getCacheKey(providerUrl)
key := getCacheKey(nl.originInvoker)
if exporter, ok := nl.protocol.bounds.Load(key); ok {
currentUrl := exporter.(protocol.Exporter).GetInvoker().GetURL()
// Compatible with the 2.6.x
Expand Down
14 changes: 11 additions & 3 deletions registry/protocol/protocol_test.go
Expand Up @@ -23,6 +23,7 @@ import (
)

import (
gxset "github.com/dubbogo/gost/container/set"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -245,7 +246,9 @@ func TestExportWithOverrideListener(t *testing.T) {
time.Sleep(1e9)
newUrl := url.SubURL.Clone()
newUrl.SetParam(constant.CLUSTER_KEY, "mock1")
v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl))
delKeys := gxset.NewSet("dynamic", "enabled")
key := newUrl.CloneExceptParams(delKeys).String()
v2, _ := regProtocol.bounds.Load(key)
assert.NotNil(t, v2)
}

Expand All @@ -265,7 +268,10 @@ func TestExportWithServiceConfig(t *testing.T) {
newUrl := url.SubURL.Clone()
newUrl.SetParam(constant.CLUSTER_KEY, "mock1")

v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl))
delKeys := gxset.NewSet("dynamic", "enabled")
key := newUrl.CloneExceptParams(delKeys).String()
v2, _ := regProtocol.bounds.Load(key)

assert.NotNil(t, v2)
}

Expand All @@ -284,7 +290,9 @@ func TestExportWithApplicationConfig(t *testing.T) {

newUrl := url.SubURL.Clone()
newUrl.SetParam(constant.CLUSTER_KEY, "mock1")
v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl))
delKeys := gxset.NewSet("dynamic", "enabled")
key := newUrl.CloneExceptParams(delKeys).String()
v2, _ := regProtocol.bounds.Load(key)
assert.NotNil(t, v2)
}

Expand Down

0 comments on commit 2ec80b8

Please sign in to comment.