Skip to content

Commit

Permalink
feat (registry/polaris): fix conversations
Browse files Browse the repository at this point in the history
1.add heartbeat report
2.fix conversations
  • Loading branch information
huyuanxin committed Feb 21, 2022
1 parent 88a31a9 commit 369cccb
Showing 1 changed file with 43 additions and 14 deletions.
57 changes: 43 additions & 14 deletions contrib/registry/polaris/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package polaris

import (
"context"
"fmt"
"net"
"net/url"
"strconv"
"strings"
"time"

"github.com/go-kratos/kratos/v2/log"

"github.com/go-kratos/kratos/v2/registry"
"github.com/polarismesh/polaris-go/api"
"github.com/polarismesh/polaris-go/pkg/model"
Expand Down Expand Up @@ -62,47 +63,47 @@ type Registry struct {
provider api.ProviderAPI
}

// WithNamespace with default Namespace option.
// WithNamespace with Namespace option.
func WithNamespace(namespace string) Option {
return func(o *options) { o.Namespace = namespace }
}

// WithServiceToken with default ServiceToken option.
// WithServiceToken with ServiceToken option.
func WithServiceToken(serviceToken string) Option {
return func(o *options) { o.ServiceToken = serviceToken }
}

// WithProtocol with default Protocol option.
// WithProtocol with Protocol option.
func WithProtocol(protocol string) Option {
return func(o *options) { o.Protocol = &protocol }
}

// WithDefaultWeight with default Weight option.
func WithDefaultWeight(weight int) Option {
// WithWeight with Weight option.
func WithWeight(weight int) Option {
return func(o *options) { o.Weight = weight }
}

// WithHealthy with default Healthy option.
// WithHealthy with Healthy option.
func WithHealthy(healthy bool) Option {
return func(o *options) { o.Healthy = healthy }
}

// WithIsolate with default Isolate option.
// WithIsolate with Isolate option.
func WithIsolate(isolate bool) Option {
return func(o *options) { o.Isolate = isolate }
}

// WithDefaultTTL with default TTL option.
func WithDefaultTTL(TTL int) Option {
// WithTTL with TTL option.
func WithTTL(TTL int) Option {
return func(o *options) { o.TTL = TTL }
}

// WithTimeout with default Timeout option.
// WithTimeout with Timeout option.
func WithTimeout(timeout time.Duration) Option {
return func(o *options) { o.Timeout = timeout }
}

// WithRetryCount with default RetryCount option.
// WithRetryCount with RetryCount option.
func WithRetryCount(retryCount int) Option {
return func(o *options) { o.RetryCount = retryCount }
}
Expand Down Expand Up @@ -167,7 +168,6 @@ func (r *Registry) Register(_ context.Context, serviceInstance *registry.Service
rmd["version"] = serviceInstance.Version
}
// Register
fmt.Println(serviceInstance.Name + u.Scheme)
service, err := r.provider.Register(
&api.InstanceRegisterRequest{
InstanceRegisterRequest: model.InstanceRegisterRequest{
Expand All @@ -191,7 +191,36 @@ func (r *Registry) Register(_ context.Context, serviceInstance *registry.Service
if err != nil {
return err
}
ids = append(ids, service.InstanceID)
instanceID := service.InstanceID

// start heartbeat report
go func() {
ticker := time.NewTicker(time.Second * time.Duration(r.opt.TTL))
defer ticker.Stop()

for {
<-ticker.C

err = r.provider.Heartbeat(&api.InstanceHeartbeatRequest{
InstanceHeartbeatRequest: model.InstanceHeartbeatRequest{
Service: serviceInstance.Name + u.Scheme,
Namespace: r.opt.Namespace,
Host: host,
Port: portNum,
ServiceToken: r.opt.ServiceToken,
InstanceID: instanceID,
Timeout: &r.opt.Timeout,
RetryCount: &r.opt.RetryCount,
},
})
if err != nil {
log.Error(err.Error())
continue
}
}
}()

ids = append(ids, instanceID)
}
// need to set InstanceID for Deregister
serviceInstance.ID = strings.Join(ids, _instanceIDSeparator)
Expand Down

0 comments on commit 369cccb

Please sign in to comment.