Skip to content

Commit

Permalink
fix: When grpc ends idle mode, it needs to continue to obtain the lat…
Browse files Browse the repository at this point in the history
…est instance of the service

* Add test cases

* Delete serviceSet when serviceSet has no watcher

* The context of resolve is controlled independently

* resolve use set context

* Remove test declare and do not use it

* gofmt
  • Loading branch information
harbourlga committed Feb 21, 2024
1 parent a556a2b commit e6b07ce
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 3 deletions.
8 changes: 5 additions & 3 deletions contrib/registry/consul/registry.go
Expand Up @@ -190,6 +190,7 @@ func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, er
services: &atomic.Value{},
serviceName: name,
}
set.ctx, set.cancel = context.WithCancel(context.Background())
r.registry[name] = set
}

Expand All @@ -208,10 +209,8 @@ func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, er
// otherwise the initial data may be blocked forever during the watch.
w.event <- struct{}{}
}

if !ok {
err := r.resolve(ctx, set)
if err != nil {
if err := r.resolve(set.ctx, set); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -249,6 +248,9 @@ func (r *Registry) resolve(ctx context.Context, ss *serviceSet) error {
}
idx = tmpIdx
case <-ctx.Done():
r.lock.Lock()
delete(r.registry, ss.serviceName)
r.lock.Unlock()
return
}
}
Expand Down
206 changes: 206 additions & 0 deletions contrib/registry/consul/registry_test.go
Expand Up @@ -413,6 +413,212 @@ func TestRegistry_Watch(t *testing.T) {
}
}

func TestRegistry_IdleAndWatch(t *testing.T) {
addr := fmt.Sprintf("%s:9091", getIntranetIP())

time.Sleep(time.Millisecond * 100)
cli, err := api.NewClient(&api.Config{Address: "127.0.0.1:8500", WaitTime: 2 * time.Second})
if err != nil {
t.Fatalf("create consul client failed: %v", err)
}

r := New(cli, []Option{
WithHealthCheck(false),
}...)

instance1 := &registry.ServiceInstance{
ID: "1",
Name: "server-1",
Version: "v0.0.1",
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
}

type args struct {
ctx context.Context
instance *registry.ServiceInstance
}

tests := []struct {
name string
args args
want []*registry.ServiceInstance
wantErr bool
}{
{
name: "many client, one idle",
args: args{
ctx: context.Background(),
instance: instance1,
},
want: []*registry.ServiceInstance{instance1},
wantErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err = r.Register(tt.args.ctx, tt.args.instance)
if err != nil {
t.Error(err)
}
defer func() {
err = r.Deregister(tt.args.ctx, tt.args.instance)
if err != nil {
t.Error(err)
}
}()

for i := 0; i < 10; i++ {
canceledCtx, _ := context.WithCancel(context.Background())
watch, err := r.Watch(canceledCtx, tt.args.instance.Name)
if err != nil {
t.Error(err)
}
go func(i int) {
// first
service, err := watch.Next()
if (err != nil) != tt.wantErr {
t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("GetService() got = %v", service)
return
}
// instance changes
service, err = watch.Next()
if i == 9 {
return
}
if (err != nil) != tt.wantErr {
t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("GetService() got = %v", service)
return
}
if !reflect.DeepEqual(service, tt.want) {
t.Errorf("GetService() got = %v, want %v", service, tt.want)
}
// t.Logf("service:%v, i:%d", service, i)
}(i)
if i == 9 {
time.Sleep(time.Second * 3)
// become idle, close watcher
watch.Stop()
}
}
time.Sleep(2 * time.Second)
change := tt.args.instance
change.Version = "v0.0.2"
err = r.Register(tt.args.ctx, change)
if err != nil {
t.Error(err)
}
time.Sleep(1 * time.Second)
})
}
}

func TestRegistry_IdleAndWatch2(t *testing.T) {
addr := fmt.Sprintf("%s:9091", getIntranetIP())

time.Sleep(time.Millisecond * 100)
cli, err := api.NewClient(&api.Config{Address: "127.0.0.1:8500", WaitTime: 2 * time.Second})
if err != nil {
t.Fatalf("create consul client failed: %v", err)
}

instance1 := &registry.ServiceInstance{
ID: "1",
Name: "server-1",
Version: "v0.0.1",
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
}
type args struct {
ctx context.Context
opts []Option
instance *registry.ServiceInstance
}

tests := []struct {
name string
args args
want []*registry.ServiceInstance
wantErr bool
}{
{
name: "all clients are idle, create a new one",
args: args{
ctx: context.Background(),
instance: instance1,
opts: []Option{
WithHealthCheck(false),
},
},
want: []*registry.ServiceInstance{instance1},
wantErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := New(cli, tt.args.opts...)

err = r.Register(tt.args.ctx, tt.args.instance)
if err != nil {
t.Error(err)
}
defer func() {
err = r.Deregister(tt.args.ctx, tt.args.instance)
if err != nil {
t.Error(err)
}
}()

ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 10; i++ {
watchCtx, _ := context.WithCancel(context.Background())
stopCtx, _ := context.WithCancel(ctx)
watch, err := r.Watch(watchCtx, tt.args.instance.Name)
if err != nil {
t.Error(err)
}
go func(i int) {
// first
service, err := watch.Next()
if (err != nil) != tt.wantErr {
t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("GetService() got = %v", service)
return
}
_, err = watch.Next()
if err == nil {
t.Errorf("watch exit exception:%d ", i)
}
}(i)
go func() {
<-stopCtx.Done()
watch.Stop()
}()
}
time.Sleep(time.Second * 3)
cancel()
time.Sleep(time.Second * 2)
// Everything is idle. Add new watch.
watchCtx, _ := context.WithCancel(context.Background())
watch, err := r.Watch(watchCtx, tt.args.instance.Name)
if err != nil {
t.Error(err)
}
service, err := watch.Next()
if (err != nil) != tt.wantErr {
t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("GetService() got = %v", service)
return
}
if !reflect.DeepEqual(service, tt.want) {
t.Errorf("GetService() got = %v, want %v", service, tt.want)
}
})
}
}

func getIntranetIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions contrib/registry/consul/service.go
@@ -1,6 +1,7 @@
package consul

import (
"context"
"sync"
"sync/atomic"

Expand All @@ -12,6 +13,9 @@ type serviceSet struct {
watcher map[*watcher]struct{}
services *atomic.Value
lock sync.RWMutex

ctx context.Context
cancel context.CancelFunc
}

func (s *serviceSet) broadcast(ss []*registry.ServiceInstance) {
Expand Down
4 changes: 4 additions & 0 deletions contrib/registry/consul/watcher.go
Expand Up @@ -36,5 +36,9 @@ func (w *watcher) Stop() error {
w.set.lock.Lock()
defer w.set.lock.Unlock()
delete(w.set.watcher, w)
// close resolve
if len(w.set.watcher) == 0 {
w.set.cancel()
}
return nil
}

0 comments on commit e6b07ce

Please sign in to comment.