Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.0 fix reExporter bug when config-center {applicationName}.configurator data change #1144

Merged
merged 2 commits into from Apr 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/config_loader.go
Expand Up @@ -182,7 +182,7 @@ func loadConsumerConfig() {
if data, err := yaml.MarshalYML(consumerConfig); err != nil {
logger.Errorf("Marshal consumer config err: %s", err.Error())
} else {
if err := ioutil.WriteFile(consumerConfig.CacheFile, data, 0o666); err != nil {
if err := ioutil.WriteFile(consumerConfig.CacheFile, data, 0666); err != nil {
logger.Errorf("Write consumer config cache file err: %s", err.Error())
}
}
Expand Down Expand Up @@ -249,7 +249,7 @@ func loadProviderConfig() {
if data, err := yaml.MarshalYML(providerConfig); err != nil {
logger.Errorf("Marshal provider config err: %s", err.Error())
} else {
if err := ioutil.WriteFile(providerConfig.CacheFile, data, 0o666); err != nil {
if err := ioutil.WriteFile(providerConfig.CacheFile, data, 0666); err != nil {
logger.Errorf("Write provider config cache file err: %s", err.Error())
}
}
Expand Down
42 changes: 37 additions & 5 deletions registry/protocol/protocol.go
Expand Up @@ -25,6 +25,7 @@ import (

import (
gxset "github.com/dubbogo/gost/container/set"
perrors "github.com/pkg/errors"
)

import (
Expand Down Expand Up @@ -70,7 +71,8 @@ func init() {
extension.SetProtocol("registry", GetProtocol)
}

func getCacheKey(url *common.URL) string {
func getCacheKey(invoker protocol.Invoker) string {
url := getProviderUrl(invoker)
delKeys := gxset.NewSet("dynamic", "enabled")
return url.CloneExceptParams(delKeys).String()
}
Expand Down Expand Up @@ -200,7 +202,7 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
return nil
}

key := getCacheKey(providerUrl)
key := getCacheKey(invoker)
logger.Infof("The cached exporter keys is %v!", key)
cachedExporter, loaded := proto.bounds.Load(key)
if loaded {
Expand All @@ -221,17 +223,47 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
}

func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common.URL) {
url := getProviderUrl(invoker)
key := getCacheKey(url)
key := getCacheKey(invoker)
if oldExporter, loaded := proto.bounds.Load(key); loaded {
wrappedNewInvoker := newWrappedInvoker(invoker, newUrl)
oldExporter.(protocol.Exporter).Unexport()
proto.bounds.Delete(key)
// oldExporter Unexport function unRegister rpcService from the serviceMap, so need register it again as far as possible
if err := registerServiceMap(invoker); err != nil {
logger.Error(err.Error())
}
proto.Export(wrappedNewInvoker)
// TODO: unregister & unsubscribe
}
}

func registerServiceMap(invoker protocol.Invoker) error {
providerUrl := getProviderUrl(invoker)
// the bean.name param of providerUrl is the ServiceConfig id property
// such as dubbo://:20000/org.apache.dubbo.UserProvider?bean.name=UserProvider&cluster=failfast...
id := providerUrl.GetParam(constant.BEAN_NAME_KEY, "")

serviceConfig := config.GetProviderConfig().Services[id]
if serviceConfig == nil {
s := "reExport can not get serviceConfig"
return perrors.New(s)
}
rpcService := config.GetProviderService(id)
if rpcService == nil {
s := "reExport can not get RPCService"
return perrors.New(s)
}

_, err := common.ServiceMap.Register(serviceConfig.InterfaceName,
serviceConfig.Protocol, serviceConfig.Group,
serviceConfig.Version, rpcService)
if err != nil {
s := "reExport can not re register ServiceMap. Error message is " + err.Error()
return perrors.New(s)
}
return nil
}

type overrideSubscribeListener struct {
url *common.URL
originInvoker protocol.Invoker
Expand Down Expand Up @@ -263,7 +295,7 @@ func (nl *overrideSubscribeListener) NotifyAll(events []*registry.ServiceEvent,

func (nl *overrideSubscribeListener) doOverrideIfNecessary() {
providerUrl := getProviderUrl(nl.originInvoker)
key := getCacheKey(providerUrl)
key := getCacheKey(nl.originInvoker)
if exporter, ok := nl.protocol.bounds.Load(key); ok {
currentUrl := exporter.(protocol.Exporter).GetInvoker().GetURL()
// Compatible with the 2.6.x
Expand Down
14 changes: 11 additions & 3 deletions registry/protocol/protocol_test.go
Expand Up @@ -23,6 +23,7 @@ import (
)

import (
gxset "github.com/dubbogo/gost/container/set"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -245,7 +246,9 @@ func TestExportWithOverrideListener(t *testing.T) {
time.Sleep(1e9)
newUrl := url.SubURL.Clone()
newUrl.SetParam(constant.CLUSTER_KEY, "mock1")
v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl))
delKeys := gxset.NewSet("dynamic", "enabled")
key := newUrl.CloneExceptParams(delKeys).String()
v2, _ := regProtocol.bounds.Load(key)
assert.NotNil(t, v2)
}

Expand All @@ -265,7 +268,10 @@ func TestExportWithServiceConfig(t *testing.T) {
newUrl := url.SubURL.Clone()
newUrl.SetParam(constant.CLUSTER_KEY, "mock1")

v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl))
delKeys := gxset.NewSet("dynamic", "enabled")
key := newUrl.CloneExceptParams(delKeys).String()
v2, _ := regProtocol.bounds.Load(key)

assert.NotNil(t, v2)
}

Expand All @@ -284,7 +290,9 @@ func TestExportWithApplicationConfig(t *testing.T) {

newUrl := url.SubURL.Clone()
newUrl.SetParam(constant.CLUSTER_KEY, "mock1")
v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl))
delKeys := gxset.NewSet("dynamic", "enabled")
key := newUrl.CloneExceptParams(delKeys).String()
v2, _ := regProtocol.bounds.Load(key)
assert.NotNil(t, v2)
}

Expand Down