Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add polaris subscribe #2100

Merged
merged 19 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b8c8f5c
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 22, 2022
72c68f5
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Oct 31, 2022
30a8e04
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Oct 31, 2022
47f0b69
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Nov 8, 2022
e78a6f3
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Nov 8, 2022
1587b99
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Nov 8, 2022
18fbee3
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Nov 8, 2022
da71acf
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Nov 9, 2022
8f3c0a5
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Nov 9, 2022
5343be2
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Nov 9, 2022
6a6b659
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Nov 9, 2022
1ad48fd
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Nov 9, 2022
f461883
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Nov 9, 2022
0833961
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Nov 9, 2022
b968e3c
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Nov 9, 2022
acb2d86
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Nov 9, 2022
e4156a2
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Nov 9, 2022
fcfbef8
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Nov 9, 2022
73eff57
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Nov 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 17 additions & 10 deletions registry/polaris/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package polaris

import (
"dubbo.apache.org/dubbo-go/v3/remoting"
"net/url"
"strconv"
)
Expand All @@ -38,29 +39,35 @@ import (
)

type polarisListener struct {
watcher *PolarisServiceWatcher
listenUrl *common.URL
events *gxchan.UnboundedChan
closeCh chan struct{}
watcher *PolarisServiceWatcher
events *gxchan.UnboundedChan
closeCh chan struct{}
}

// NewPolarisListener new polaris listener
func NewPolarisListener(url *common.URL) (*polarisListener, error) {
func NewPolarisListener(watcher *PolarisServiceWatcher) (*polarisListener, error) {
listener := &polarisListener{
listenUrl: url,
events: gxchan.NewUnboundedChan(32),
closeCh: make(chan struct{}),
watcher: watcher,
events: gxchan.NewUnboundedChan(32),
closeCh: make(chan struct{}),
}

listener.startListen()
return listener, nil
}
func (pl *polarisListener) startListen() {
pl.watcher.AddSubscriber(func(et remoting.EventType, ins []model.Instance) {
for i := range ins {
pl.events.In() <- &config_center.ConfigChangeEvent{Value: generateUrl(ins[i]), ConfigType: et}
}
})
}

// Next returns next service event once received
func (pl *polarisListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-pl.closeCh:
logger.Warnf("polaris listener is close!listenUrl:%+v", pl.listenUrl)
logger.Warnf("polaris listener is close")
return nil, perrors.New("listener stopped")
case val := <-pl.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
Expand Down
68 changes: 57 additions & 11 deletions registry/polaris/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ func init() {

// newPolarisRegistry will create new instance
func newPolarisRegistry(url *common.URL) (registry.Registry, error) {
sdkCtx, _, err := polaris.GetPolarisConfig(url)
sdkCtx, ns, err := polaris.GetPolarisConfig(url)
if err != nil {
return &polarisRegistry{}, err
}
pRegistry := &polarisRegistry{
namespace: ns,
provider: api.NewProviderAPIByContext(sdkCtx),
consumer: api.NewConsumerAPIByContext(sdkCtx),
lock: &sync.RWMutex{},
registryUrls: make(map[string]*PolarisHeartbeat),
listenerLock: &sync.RWMutex{},
Expand All @@ -67,11 +69,13 @@ func newPolarisRegistry(url *common.URL) (registry.Registry, error) {
}

type polarisRegistry struct {
consumer api.ConsumerAPI
namespace string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个 ns 信息在哪里初始化呢?

url *common.URL
provider api.ProviderAPI
lock *sync.RWMutex
registryUrls map[string]*PolarisHeartbeat

watchers map[string]*PolarisServiceWatcher
listenerLock *sync.RWMutex
}

Expand Down Expand Up @@ -147,33 +151,49 @@ func (pr *polarisRegistry) UnRegister(conf *common.URL) error {

// Subscribe returns nil if subscribing registry successfully. If not returns an error.
func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
var (
newParam api.WatchServiceRequest
newConsumer api.ConsumerAPI
)

role, _ := strconv.Atoi(url.GetParam(constant.RegistryRoleKey, ""))
if role != common.CONSUMER {
return nil
}
timer := time.NewTimer(time.Duration(RegistryConnDelay) * time.Second)
defer timer.Stop()

req := api.WatchServiceRequest{
WatchServiceRequest: model.WatchServiceRequest{
Key: model.ServiceKey{
Service: common.GetSubscribeName(url),
Namespace: pr.namespace,
},
},
}

for {
listener, err := NewPolarisListener(url)
watcher, err := newPolarisWatcher(&req, pr.consumer)

if err != nil {
logger.Warnf("getwatcher() = err:%v", perrors.WithStack(err))
<-timer.C
timer.Reset(time.Duration(RegistryConnDelay) * time.Second)
continue
}
listener, err := NewPolarisListener(watcher)

if err != nil {
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
<-time.After(time.Duration(RegistryConnDelay) * time.Second)
<-timer.C
timer.Reset(time.Duration(RegistryConnDelay) * time.Second)
continue
}

watcher, err := newPolarisWatcher(&newParam, newConsumer)
if err != nil {
logger.Warnf("getwatcher() = err:%v", perrors.WithStack(err))
timer := time.NewTimer(time.Duration(RegistryConnDelay) * time.Second)
timer.Reset(time.Duration(RegistryConnDelay) * time.Second)
continue
}
for {

for {
serviceEvent, err := listener.Next()

if err != nil {
Expand All @@ -199,6 +219,31 @@ func (pr *polarisRegistry) GetURL() *common.URL {
return pr.url
}

func (pr *polarisRegistry) createPolarisWatcher(serviceName string) (*PolarisServiceWatcher, error) {

pr.listenerLock.Lock()
defer pr.listenerLock.Unlock()

if _, exist := pr.watchers[serviceName]; !exist {
subscribeParam := &api.WatchServiceRequest{
WatchServiceRequest: model.WatchServiceRequest{
Key: model.ServiceKey{
Namespace: pr.namespace,
Service: serviceName,
},
},
}

watcher, err := newPolarisWatcher(subscribeParam, pr.consumer)
if err != nil {
return nil, err
}
pr.watchers[serviceName] = watcher
}

return pr.watchers[serviceName], nil
}

// Destroy stop polaris registry.
func (pr *polarisRegistry) Destroy() {
for _, val := range pr.registryUrls {
Expand All @@ -218,7 +263,8 @@ func (pr *polarisRegistry) IsAvailable() bool {
}

// doHeartbeat Since polaris does not support automatic reporting of instance heartbeats, separate logic is
// needed to implement it
//
// needed to implement it
func (pr *polarisRegistry) doHeartbeat(ctx context.Context, ins *api.InstanceRegisterRequest) {
ticker := time.NewTicker(time.Duration(4) * time.Second)

Expand Down
13 changes: 2 additions & 11 deletions remoting/polaris/polaris.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,7 @@ global:
grpc:
maxCallRecvMsgSize: 52428800
statReporter:
enable: true
chain:
- stat2Monitor
- serviceCache
plugin:
stat2Monitor:
metricsReportWindow: 1m
metricsNumBuckets: 12
serviceCache:
reportInterval: 3m
enable: false
consumer:
localCache:
type: inmemory
Expand Down Expand Up @@ -92,4 +83,4 @@ consumer:
type: subscribeLocalChannel
plugin:
subscribeLocalChannel:
channelBufferSize: 50
channelBufferSize: 50