From ae3607762446e0a3f2b48d7337c9c806d40403f1 Mon Sep 17 00:00:00 2001 From: guoquanwei <31917049+guoquanwei@users.noreply.github.com> Date: Fri, 10 May 2024 15:43:47 +0800 Subject: [PATCH 1/2] fix(contrib/registry/nacos):Abnormal blocking of the Subscribe and Next methods. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This submission can solve the channel blocking problem during service discovery, as well as the channel message missing problem when the GRPC idle state changes.  The first scenario is when both the configuration center and registration center are initialized at the same time during service startup, and the SubscribeCallback method will be triggered twice, throwing the error "over time discovering the creation of observers", resulting in a discovery failure. The second scenario occurs after NotLoadCacheAtStart: true. When grpc exits the idle state, calling newWatcher again will not trigger the SubscribeCallback method, causing the Next function to be blocked and the service to be unable to communicate. --- contrib/registry/nacos/watcher.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/contrib/registry/nacos/watcher.go b/contrib/registry/nacos/watcher.go index 6fd59b378a2..c60376697ce 100644 --- a/contrib/registry/nacos/watcher.go +++ b/contrib/registry/nacos/watcher.go @@ -41,10 +41,15 @@ func newWatcher(ctx context.Context, cli naming_client.INamingClient, serviceNam Clusters: clusters, GroupName: groupName, SubscribeCallback: func(services []model.SubscribeService, err error) { - w.watchChan <- struct{}{} + if len(w.watchChan) == 0 { + w.watchChan <- struct{}{} + } }, } e := w.cli.Subscribe(w.subscribeParam) + if len(w.watchChan) == 0 { + w.watchChan <- struct{}{} + } return w, e } From b1aa0dacb26e65e6b6b5c9e349f54acbad532f13 Mon Sep 17 00:00:00 2001 From: guoquanwei <31917049+guoquanwei@users.noreply.github.com> Date: Fri, 10 May 2024 17:05:23 +0800 Subject: [PATCH 2/2] Another implementation, the effect remains unchanged --- contrib/registry/nacos/watcher.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/contrib/registry/nacos/watcher.go b/contrib/registry/nacos/watcher.go index c60376697ce..7f1dc84663a 100644 --- a/contrib/registry/nacos/watcher.go +++ b/contrib/registry/nacos/watcher.go @@ -41,14 +41,16 @@ func newWatcher(ctx context.Context, cli naming_client.INamingClient, serviceNam Clusters: clusters, GroupName: groupName, SubscribeCallback: func(services []model.SubscribeService, err error) { - if len(w.watchChan) == 0 { - w.watchChan <- struct{}{} + select { + case w.watchChan <- struct{}{}: + default: } }, } e := w.cli.Subscribe(w.subscribeParam) - if len(w.watchChan) == 0 { - w.watchChan <- struct{}{} + select { + case w.watchChan <- struct{}{}: + default: } return w, e }