Skip to content
Permalink
Browse files
SpringCloud subscribe control by user
  • Loading branch information
PhilYue committed May 15, 2022
1 parent 0357b4a commit 8bccdc27655414234951329c6350ae773b96979b
Showing 6 changed files with 55 additions and 18 deletions.
@@ -44,7 +44,6 @@ const (

Nacos = "nacos"
Zookeeper = "zookeeper"
Consul = "consul"
)

func init() {
@@ -73,15 +72,28 @@ type (
Registry *model.RemoteConfig `yaml:"registry" json:"registry" default:"registry"`
FreshInterval time.Duration `yaml:"freshInterval" json:"freshInterval" default:"freshInterval"`
Services []string `yaml:"services" json:"services" default:"services"`
// todo configuration the discovery config, like `zookeeper.discovery.root = "/services"`
//Discovery *model.DiscoveryConfig `yaml:"discovery" json:"discovery" default:"discovery"`
// SubscribePolicy subscribe config,
// - adapting : if there is no any Services (App) names, fetch All services from registry center
// - definitely : fetch services by the config Services (App) names
SubscribePolicy string `yaml:"subscribe-policy" json:"subscribe-policy" default:"adapting"`
}

Service struct {
Name string
}

SubscribePolicy int
)

const (
Adapting SubscribePolicy = iota
Definitely
)

func (sp SubscribePolicy) String() string {
return [...]string{"adapting", "definitely"}[sp]
}

// Kind return plugin kind
func (p *CloudPlugin) Kind() string {
return Kind
@@ -94,26 +106,24 @@ func (p *CloudPlugin) CreateAdapter(ad *model.Adapter) (adapter.Adapter, error)

// Start start the adapter
func (a *CloudAdapter) Start() {

// do not block the main goroutine
// init get all service instance
err := a.firstFetch()
if err != nil {
logger.Errorf("init fetch service fail", err.Error())
//return
}

// background sync service instance from remote
err = a.backgroundSyncPeriod()
if err != nil {
logger.Errorf("init periodicity fetch service task fail", err.Error())
//return
}

// watch then fetch is more safety for consistent but there is background fresh mechanism
err = a.watch()
if err != nil {
logger.Errorf("init watch the register fail", err.Error())
//return
}
}

@@ -136,7 +146,6 @@ func (a *CloudAdapter) Apply() error {
switch strings.ToLower(a.cfg.Registry.Protocol) {
case Nacos:
sd, err = nacos.NewNacosServiceDiscovery(a.cfg.Services, a.cfg.Registry, a)
case Consul:
case Zookeeper:
sd, err = zookeeper.NewZKServiceDiscovery(a.cfg.Services, a.cfg.Registry, a)
default:
@@ -208,10 +217,18 @@ func (a *CloudAdapter) fetchServiceByConfig() ([]servicediscovery.ServiceInstanc
var instances []servicediscovery.ServiceInstance
var err error
// if configure specific services, then fetch those service instance only
if len(a.cfg.Services) > 0 {
instances, err = a.sd.QueryServicesByName(a.cfg.Services)
if a.subscribeServiceDefinitely() {
if len(a.cfg.Services) > 0 {
instances, err = a.sd.QueryServicesByName(a.cfg.Services)
} else {
logger.Warnf("No any Service(App) need Subscribe, config the Service(App) Names or make the `subscribe-policy: adapting` pls.")
}
} else {
instances, err = a.sd.QueryAllServices()
if len(a.cfg.Services) > 0 {
instances, err = a.sd.QueryServicesByName(a.cfg.Services)
} else {
instances, err = a.sd.QueryAllServices()
}
}

if err != nil {
@@ -347,8 +364,15 @@ func (a *CloudAdapter) stop() error {
err := a.sd.Unsubscribe()
if err != nil {
logger.Errorf("unsubscribe registry fail ", err.Error())
//return err
}
close(a.stopChan)
return nil
}

func (a *CloudAdapter) subscribeServiceDefinitely() bool {
return strings.EqualFold(a.cfg.SubscribePolicy, Definitely.String())
}

func (a *CloudAdapter) subscribeServiceAdapting() bool {

Check failure on line 376 in pkg/adapter/springcloud/cloud.go

GitHub Actions / review

pkg/adapter/springcloud/cloud.go#L376

[golangci] reported by reviewdog 🐶
func `(*CloudAdapter).subscribeServiceAdapting` is unused (unused)

Raw Output:
pkg/adapter/springcloud/cloud.go:376:24: func `(*CloudAdapter).subscribeServiceAdapting` is unused (unused)
func (a *CloudAdapter) subscribeServiceAdapting() bool {
                       ^
return strings.EqualFold(a.cfg.SubscribePolicy, Adapting.String())
}
@@ -90,6 +90,7 @@ func (f *Filter) Decode(hc *http.HttpContext) filter.FilterStatus {
clusterManager := server.GetClusterManager()
endpoint := clusterManager.PickEndpoint(clusterName)
if endpoint == nil {
logger.Debugf("[dubbo-go-pixiu] cluster not found endpoint")
bt, _ := json.Marshal(http.ErrResponse{Message: "cluster not found endpoint"})
hc.SendLocalReply(http3.StatusServiceUnavailable, bt)
return filter.Stop
@@ -98,7 +98,7 @@ func (f *Filter) Encode(c *http.HttpContext) filter.FilterStatus {
atomic.AddInt64(&totalElapsed, latency.Nanoseconds())
atomic.AddInt64(&totalCount, 1)

logger.Infof("[Metric] [UPSTREAM] receive request | %d | %s | %s | %s | ", c.GetStatusCode(), latency, c.GetMethod(), c.GetUrl())
logger.Debugf("[Metric] [UPSTREAM] receive request | %d | %s | %s | %s | ", c.GetStatusCode(), latency, c.GetMethod(), c.GetUrl())
return filter.Continue
}

@@ -19,7 +19,8 @@ package model

// Adapter the adapter plugin for manage cluster or router
type Adapter struct {
ID string `yaml:"id" json:"id"`
Name string `yaml:"name" json:"name"` // Name the adapter unique name
Config map[string]interface{} `yaml:"config" json:"config" mapstructure:"config"` // Config adapter config
ID string `yaml:"id" json:"id"`
Name string `yaml:"name" json:"name"` // Name the adapter unique name
Enabled string `yaml:"enabled" json:"enabled" default:"true"`
Config map[string]interface{} `yaml:"config" json:"config" mapstructure:"config"` // Config adapter config
}
@@ -17,6 +17,10 @@

package server

import (
"strings"
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/common/extension/adapter"
"github.com/apache/dubbo-go-pixiu/pkg/common/yaml"
@@ -57,6 +61,11 @@ func (am *AdapterManager) initAdapters() {
logger.Error("initAdapters get plugin error %s", err)
}

if !strings.EqualFold(f.Enabled, "true") {
logger.Warnf("the Adapter %s will stop starting, by config of Enabled : %s", f.Name, f.Enabled)
return
}

hf, err := hp.CreateAdapter(f)
if err != nil {
logger.Error("initFilterIfNeed create adapter error %s", err)
@@ -55,11 +55,13 @@ static_resources:
adapters:
- id: "springcloud"
name: "dgp.adapter.springcloud"
# enabled: false
config:
freshInterval: 60s # 刷新配置时间
# services:
# - user-service
# - auth-service
subscribe-policy: definitely # adapting-订阅根据系统自适应策略,definitely-服务订阅根据用户配置的services
services:
- user-service
- auth-service
registry:
name: naocs
protocol: nacos

0 comments on commit 8bccdc2

Please sign in to comment.