Skip to content

Commit

Permalink
fix: change register event chan to unbundent chan (apache#1330)
Browse files Browse the repository at this point in the history
  • Loading branch information
LaurenceLiZhixin committed Jul 22, 2021
1 parent 103aa72 commit 3ab0ab6
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 25 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/apache/dubbo-go-hessian2 v1.9.2
github.com/creasty/defaults v1.5.1
github.com/dubbogo/go-zookeeper v1.0.3
github.com/dubbogo/gost v1.11.13
github.com/dubbogo/gost v1.11.14
github.com/dubbogo/triple v1.0.1
github.com/emicklei/go-restful/v3 v3.4.0
github.com/frankban/quicktest v1.4.1 // indirect
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDH
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/dubbo-getty v1.4.3 h1:PCKpryDasKOxwT5MBC6MIMO+0NLOaHF6Xco9YXQw7HI=
github.com/apache/dubbo-getty v1.4.3/go.mod h1:ansXgKxxyhCOiQL29nO5ce1MDcEKmCyZuNR9oMs3hek=
github.com/apache/dubbo-getty v1.4.4 h1:pthYQaCXyjHJ6/SjVwKkX5NhdAqSpUrRL1Z9GowrLdE=
github.com/apache/dubbo-getty v1.4.4/go.mod h1:mcDyiu7M/TVrYDyL8TxDemQkOdvEqqHSQ4jOuYejY1w=
github.com/apache/dubbo-go-hessian2 v1.9.1/go.mod h1:xQUjE7F8PX49nm80kChFvepA/AvqAZ0oh/UaB6+6pBE=
github.com/apache/dubbo-go-hessian2 v1.9.2 h1:XuI8KvENSfKiAhiCBS4RNihmQDoPNmGWKT3gTui0p9A=
github.com/apache/dubbo-go-hessian2 v1.9.2/go.mod h1:xQUjE7F8PX49nm80kChFvepA/AvqAZ0oh/UaB6+6pBE=
Expand Down Expand Up @@ -130,9 +128,8 @@ github.com/dubbogo/go-zookeeper v1.0.3 h1:UkuY+rBsxdT7Bs63QAzp9z7XqQ53W1j8E5rwl8
github.com/dubbogo/go-zookeeper v1.0.3/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
github.com/dubbogo/gost v1.11.12/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.13 h1:sWvK1QbHpPBMmRQJV9qIH3syLegQBQa4xAPof3/Kv5c=
github.com/dubbogo/gost v1.11.13/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.14 h1:9lfcdILOmqTOVAW1fPHa5uf1NrD6jlIOBe4vf8576yQ=
github.com/dubbogo/gost v1.11.14/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/dubbogo/net v0.0.3 h1:2k53mh+1U8h1gFjJ8ykzyP4wNdAdgjc5moD+xVHI/AE=
github.com/dubbogo/net v0.0.3/go.mod h1:B6/ka3g8VzcyrmdCH4VkHP1K0aHeI37FmclS+TCwIBU=
Expand Down Expand Up @@ -430,7 +427,6 @@ github.com/modern-go/reflect2 v0.0.0-20180320133207-05fbef0ca5da/go.mod h1:bx2lN
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
Expand Down
10 changes: 6 additions & 4 deletions registry/etcdv3/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

import (
gxchan "github.com/dubbogo/gost/container/chan"
perrors "github.com/pkg/errors"
)

Expand Down Expand Up @@ -80,20 +81,20 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {

type configurationListener struct {
registry *etcdV3Registry
events chan *config_center.ConfigChangeEvent
events *gxchan.UnboundedChan
closeOnce sync.Once
}

// NewConfigurationListener for listening the event of etcdv3.
func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
// add a new waiter
reg.WaitGroup().Add(1)
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
return &configurationListener{registry: reg, events: gxchan.NewUnboundedChan(32)}
}

// Process data change event from config center of etcd
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
l.events.In() <- configType
}

// Next returns next service event once received
Expand All @@ -104,7 +105,8 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.")
return nil, perrors.New("listener stopped")

case e := <-l.events:
case val := <-l.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
logger.Infof("got etcd event %#v", e)
if e.ConfigType == remoting.EventTypeDel && l.registry.client.Valid() {
select {
Expand Down
10 changes: 6 additions & 4 deletions registry/kubernetes/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
)

import (
gxchan "github.com/dubbogo/gost/container/chan"
perrors "github.com/pkg/errors"
)

Expand Down Expand Up @@ -80,19 +81,19 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {

type configurationListener struct {
registry *kubernetesRegistry
events chan *config_center.ConfigChangeEvent
events *gxchan.UnboundedChan
}

// NewConfigurationListener for listening the event of kubernetes.
func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener {
// add a new waiter
reg.WaitGroup().Add(1)
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
return &configurationListener{registry: reg, events: gxchan.NewUnboundedChan(32)}
}

// Process processes the data change event from config center of kubernetes
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
l.events.In() <- configType
}

// Next returns next service event once received
Expand All @@ -103,7 +104,8 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
logger.Warnf("listener's kubernetes client connection is broken, so kubernetes event listener exits now.")
return nil, perrors.New("listener stopped")

case e := <-l.events:
case val := <-l.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
logger.Debugf("got kubernetes event %#v", e)
if e.ConfigType == remoting.EventTypeDel && !l.registry.client.Valid() {
select {
Expand Down
15 changes: 9 additions & 6 deletions registry/nacos/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
)

import (
gxchan "github.com/dubbogo/gost/container/chan"
nacosClient "github.com/dubbogo/gost/database/kv/nacos"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
Expand All @@ -44,7 +45,7 @@ import (
type nacosListener struct {
namingClient *nacosClient.NacosNamingClient
listenUrl *common.URL
events chan *config_center.ConfigChangeEvent
events *gxchan.UnboundedChan
instanceMap map[string]model.Instance
cacheLock sync.Mutex
done chan struct{}
Expand All @@ -55,9 +56,10 @@ type nacosListener struct {
func NewNacosListener(url *common.URL, namingClient *nacosClient.NacosNamingClient) (*nacosListener, error) {
listener := &nacosListener{
namingClient: namingClient,
listenUrl: url, events: make(chan *config_center.ConfigChangeEvent, 32),
instanceMap: map[string]model.Instance{},
done: make(chan struct{}),
listenUrl: url,
events: gxchan.NewUnboundedChan(32),
instanceMap: map[string]model.Instance{},
done: make(chan struct{}),
}
err := listener.startListen()
return listener, err
Expand Down Expand Up @@ -198,7 +200,7 @@ func (nl *nacosListener) stopListen() error {
}

func (nl *nacosListener) process(configType *config_center.ConfigChangeEvent) {
nl.events <- configType
nl.events.In() <- configType
}

// Next returns the service event from nacos.
Expand All @@ -209,7 +211,8 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) {
logger.Warnf("nacos listener is close!listenUrl:%+v", nl.listenUrl)
return nil, perrors.New("listener stopped")

case e := <-nl.events:
case val := <-nl.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
logger.Debugf("got nacos event %s", e)
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(*common.URL)}, nil
}
Expand Down
10 changes: 6 additions & 4 deletions registry/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

import (
gxchan "github.com/dubbogo/gost/container/chan"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
perrors "github.com/pkg/errors"
)
Expand Down Expand Up @@ -116,7 +117,7 @@ func (l *RegistryDataListener) Close() {
type RegistryConfigurationListener struct {
client *gxzookeeper.ZookeeperClient
registry *zkRegistry
events chan *config_center.ConfigChangeEvent
events *gxchan.UnboundedChan
isClosed bool
close chan struct{}
closeOnce sync.Once
Expand All @@ -129,7 +130,7 @@ func NewRegistryConfigurationListener(client *gxzookeeper.ZookeeperClient, reg *
return &RegistryConfigurationListener{
client: client,
registry: reg,
events: make(chan *config_center.ConfigChangeEvent, 32),
events: gxchan.NewUnboundedChan(32),
isClosed: false,
close: make(chan struct{}, 1),
subscribeURL: conf,
Expand All @@ -138,7 +139,7 @@ func NewRegistryConfigurationListener(client *gxzookeeper.ZookeeperClient, reg *

// Process submit the ConfigChangeEvent to the event chan to notify all observer
func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
l.events.In() <- configType
}

// Next will observe the registry state and events chan
Expand All @@ -150,7 +151,8 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
case <-l.registry.Done():
logger.Warnf("zk consumer register has quit, so zk event listener exit now. (registry url {%v}", l.registry.BaseRegistry.URL)
return nil, perrors.New("zookeeper registry, (registry url{%v}) stopped")
case e := <-l.events:
case val := <-l.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
logger.Debugf("got zk event %s", e)
if e.ConfigType == remoting.EventTypeDel && !l.valid() {
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
Expand Down

0 comments on commit 3ab0ab6

Please sign in to comment.