Skip to content

Commit

Permalink
Add: use multiple service config in service configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexStocks committed Aug 19, 2018
1 parent 8de316e commit fc8c917
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 80 deletions.
6 changes: 6 additions & 0 deletions change_log.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
## develop history ##
---

- 2018/08/19
> Feature
* use multiple service config in service configuration
* do not register consumer
* version: v1.0.2

- 2018/08/16
> Feature
* Add gxpool.watcher filter
Expand Down
47 changes: 23 additions & 24 deletions micro/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Client struct {
}

// NewServer initialize a micro service consumer
func NewClient(conf *rpc.ClientConfig, regConf *RegistryConfig, opts ...ClientOption) (*Client, error) {
func NewClient(conf *rpc.ClientConfig, regConf *ConsumerRegistryConfig, opts ...ClientOption) (*Client, error) {
var (
err error
rpcClient *rpc.Client
Expand All @@ -61,17 +61,17 @@ func NewClient(conf *rpc.ClientConfig, regConf *RegistryConfig, opts ...ClientOp
return nil, jerrors.Trace(err)
}

addrList := strings.Split(regConf.Addr, ",")
regAddrList := strings.Split(regConf.RegAddr, ",")
switch regConf.Type {
case "etcd":
registry, err = gxetcd.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithAddrs(regAddrList...),
gxregistry.WithTimeout(time.Duration(1e9*regConf.KeepaliveTimeout)),
gxregistry.WithRoot(regConf.Root),
)
case "zookeeper":
registry, err = gxzookeeper.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithAddrs(regAddrList...),
gxregistry.WithTimeout(time.Duration(1e9*regConf.KeepaliveTimeout)),
gxregistry.WithRoot(regConf.Root),
)
Expand All @@ -83,8 +83,7 @@ func NewClient(conf *rpc.ClientConfig, regConf *RegistryConfig, opts ...ClientOp
}

serviceAttrFilter := gxregistry.ServiceAttr{
Group: regConf.IDC,
Role: gxregistry.SRT_Provider,
Role: gxregistry.SRT_Provider,
}
gxctx := gxcontext.NewValuesContext(nil)
gxctx.Set(gxpool.GxfilterServiceAttrKey, serviceAttrFilter)
Expand All @@ -97,29 +96,29 @@ func NewClient(conf *rpc.ClientConfig, regConf *RegistryConfig, opts ...ClientOp
return nil, jerrors.Trace(err)
}

service := gxregistry.Service{
Attr: &gxregistry.ServiceAttr{
Group: regConf.IDC,
Role: gxregistry.SRT_Consumer,
Protocol: regConf.Codec,
},
Nodes: []*gxregistry.Node{
&gxregistry.Node{
ID: regConf.NodeID,
Address: conf.Host,
// Port: 0,
},
},
}
if err = registry.Register(service); err != nil {
return nil, jerrors.Trace(err)
}
// service := gxregistry.Service{
// Attr: &gxregistry.ServiceAttr{
// Group: regConf.IDC,
// Role: gxregistry.SRT_Consumer,
// Protocol: regConf.Codec,
// },
// Nodes: []*gxregistry.Node{
// &gxregistry.Node{
// ID: regConf.NodeID,
// Address: conf.Host,
// // Port: 0,
// },
// },
// }
// if err = registry.Register(service); err != nil {
// return nil, jerrors.Trace(err)
// }

clt := &Client{
Client: rpcClient,
registry: registry,
attr: gxregistry.ServiceAttr{
Group: regConf.IDC,
Group: regConf.Group,
},
filter: filter,
svcMap: make(map[gxregistry.ServiceAttr]*gxfilter.ServiceArray),
Expand Down
101 changes: 86 additions & 15 deletions micro/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package micro

import (
"github.com/AlexStocks/getty/rpc"
"github.com/AlexStocks/goext/container/set/strset"
jerrors "github.com/juju/errors"
"github.com/scylladb/go-set/strset"
)

const (
Expand All @@ -14,26 +14,64 @@ var (
registryArray = strset.New("zookeeper", "etcd")
)

type ServiceConfig struct {
LocalHost string `default:"127.0.0.1" yaml:"local_host" json:"local_host, omitempty"`
LocalPort int `default:"10001" yaml:"local_port" json:"local_port, omitempty"`
Group string `default:"idc-bj" yaml:"group" json:"group,omitempty"`
NodeID string `default:"node0" yaml:"node_id" json:"node_id,omitempty"`
Protocol string `default:"json" yaml:"protocol" json:"protocol,omitempty"`
Service string `default:"test" yaml:"service" json:"service,omitempty"`
Version string `default:"v1" yaml:"version" json:"version,omitempty"`
}

// CheckValidity check parameter validity
func (c *ServiceConfig) CheckValidity() error {
if len(c.LocalHost) == 0 {
return jerrors.Errorf(ErrIllegalConf+"local host %s", c.LocalHost)
}

if c.LocalPort <= 0 || 65535 < c.LocalPort {
return jerrors.Errorf(ErrIllegalConf+"local port %s", c.LocalPort)
}

if len(c.Group) == 0 {
return jerrors.Errorf(ErrIllegalConf+"group %s", c.Group)
}

if len(c.NodeID) == 0 {
return jerrors.Errorf(ErrIllegalConf+"node id %s", c.NodeID)
}

if codec := rpc.GetCodecType(c.Protocol); codec == rpc.CodecUnknown {
return jerrors.Errorf(ErrIllegalConf+"protocol type %s", c.Protocol)
}

if len(c.Service) == 0 {
return jerrors.Errorf(ErrIllegalConf+"service %s", c.Service)
}

if len(c.Version) == 0 {
return jerrors.Errorf(ErrIllegalConf+"service version %s", c.Version)
}

return nil
}

// RegistryConfig provides configuration for registry
type RegistryConfig struct {
Type string `default:"etcd" yaml:"type" json:"type,omitempty"`
Addr string `default:"" yaml:"addr" json:"addr,omitempty"`
KeepaliveTimeout int `default:"5" yaml:"keepalive_time" json:"keepalive_timeout,omitempty"`
Root string `default:"getty" yaml:"root" json:"root,omitempty"`
IDC string `default:"idc-bj" yaml:"idc" json:"idc,omitempty"`
NodeID string `default:"node0" yaml:"node_id" json:"node_id,omitempty"`
Codec string `default:"json" yaml:"codec" json:"codec,omitempty"`
codec rpc.CodecType
RegAddr string `default:"127.0.0.1:2181" yaml:"reg_addr" json:"reg_addr,omitempty"`
KeepaliveTimeout int `default:"5" yaml:"keepalive_timeout" json:"keepalive_timeout,omitempty"`
Root string `default:"/getty" yaml:"root" json:"root,omitempty"`
}

// CheckValidity check parameter validity
func (c *RegistryConfig) CheckValidity() error {
if !registryArray.Has(c.Type) {
return jerrors.Errorf(ErrIllegalConf+"registry type %s", c.Type)
}

if len(c.Addr) == 0 {
return jerrors.Errorf(ErrIllegalConf+"registry addr %s", c.Addr)
if len(c.RegAddr) == 0 {
return jerrors.Errorf(ErrIllegalConf+"registry addr %s", c.RegAddr)
}

if c.KeepaliveTimeout < 0 {
Expand All @@ -44,12 +82,45 @@ func (c *RegistryConfig) CheckValidity() error {
return jerrors.Errorf(ErrIllegalConf+"root %s", c.Root)
}

if len(c.NodeID) == 0 {
return jerrors.Errorf(ErrIllegalConf+"node id %s", c.NodeID)
return nil
}

// ProviderRegistryConfig provides provider configuration for registry
type ProviderRegistryConfig struct {
RegistryConfig `yaml:"basic" json:"basic,omitempty"`
ServiceArray []ServiceConfig `default:"" yaml:"service_array" json:"service_array,omitempty"`
}

// CheckValidity check parameter validity
func (c *ProviderRegistryConfig) CheckValidity() error {
if err := c.RegistryConfig.CheckValidity(); err != nil {
return jerrors.Trace(err)
}

for idx := 0; idx < len(c.ServiceArray); idx++ {
if err := c.ServiceArray[idx].CheckValidity(); err != nil {
return jerrors.Errorf(ErrIllegalConf+"service reference config, idx:%d, err:%s",
idx, jerrors.ErrorStack(err))
}
}

return nil
}

// ConsumerRegistryConfig provides consumer configuration for registry
type ConsumerRegistryConfig struct {
RegistryConfig `yaml:"basic" json:"basic,omitempty"`
Group string `default:"idc-bj" yaml:"group" json:"group,omitempty"`
}

// CheckValidity check parameter validity
func (c *ConsumerRegistryConfig) CheckValidity() error {
if err := c.RegistryConfig.CheckValidity(); err != nil {
return jerrors.Trace(err)
}

if c.codec = rpc.GetCodecType(c.Codec); c.codec == rpc.CodecUnknown {
return jerrors.Errorf(ErrIllegalConf+"codec type %s", c.Codec)
if len(c.Group) == 0 {
return jerrors.Errorf(ErrIllegalConf+"group %s", c.Group)
}

return nil
Expand Down
79 changes: 51 additions & 28 deletions micro/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package micro

import (
"github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/strings"
"net"
"strconv"
"strings"
"time"
Expand All @@ -19,19 +21,17 @@ import (
type Server struct {
*rpc.Server
// registry
regConf RegistryConfig
regConf ProviderRegistryConfig
registry gxregistry.Registry
attr gxregistry.ServiceAttr
nodes []*gxregistry.Node
}

// NewServer initialize a micro service provider
func NewServer(conf *rpc.ServerConfig, regConf *RegistryConfig) (*Server, error) {
func NewServer(conf *rpc.ServerConfig, regConf *ProviderRegistryConfig) (*Server, error) {
var (
err error
rpcServer *rpc.Server
registry gxregistry.Registry
nodes []*gxregistry.Node
)

if err = regConf.CheckValidity(); err != nil {
Expand All @@ -42,17 +42,17 @@ func NewServer(conf *rpc.ServerConfig, regConf *RegistryConfig) (*Server, error)
return nil, jerrors.Trace(err)
}

addrList := strings.Split(regConf.Addr, ",")
regAddrList := strings.Split(regConf.RegAddr, ",")
switch regConf.Type {
case "etcd":
registry, err = gxetcd.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithAddrs(regAddrList...),
gxregistry.WithTimeout(time.Duration(1e9*regConf.KeepaliveTimeout)),
gxregistry.WithRoot(regConf.Root),
)
case "zookeeper":
registry, err = gxzookeeper.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithAddrs(regAddrList...),
gxregistry.WithTimeout(time.Duration(1e9*regConf.KeepaliveTimeout)),
gxregistry.WithRoot(regConf.Root),
)
Expand All @@ -63,47 +63,70 @@ func NewServer(conf *rpc.ServerConfig, regConf *RegistryConfig) (*Server, error)
return nil, jerrors.Trace(err)
}

var localAddrArr []string
for _, p := range conf.Ports {
port, err := strconv.Atoi(p)
if err != nil {
return nil, jerrors.Trace(err)
}

if port <= 0 || 65535 < port {
return nil, jerrors.Errorf("illegal port %s", p)
}

nodes = append(nodes,
&gxregistry.Node{
// use host port as part of NodeID to defeat the case: on process listens on many ports
ID: regConf.NodeID + "@" + gxnet.HostAddress(conf.Host, port),
Address: conf.Host,
Port: int32(port),
},
)
localAddrArr = append(localAddrArr, net.JoinHostPort(conf.Host, p))
}

for _, svr := range regConf.ServiceArray {
addr := gxnet.HostAddress(svr.LocalHost, svr.LocalPort)
if ok := gxstrings.Contains(localAddrArr, addr); !ok {
return nil, jerrors.Errorf("can not find ServiceConfig addr %s in conf address array %#v",
addr, localAddrArr)
}
}

return &Server{
Server: rpcServer,
regConf: *regConf,
registry: registry,
nodes: nodes,

attr: gxregistry.ServiceAttr{
Group: regConf.IDC,
Role: gxregistry.SRT_Provider,
Protocol: regConf.Codec,
},
}, nil
}

// Register the @rcvr
func (s *Server) Register(rcvr rpc.GettyRPCService) error {
if err := s.Server.Register(rcvr); err != nil {
return jerrors.Trace(err)
}
var (
flag bool
attr gxregistry.ServiceAttr
)

attr := s.attr
attr.Role = gxregistry.SRT_Provider
attr.Service = rcvr.Service()
attr.Version = rcvr.Version()
service := gxregistry.Service{Attr: &attr, Nodes: s.nodes}
if err := s.registry.Register(service); err != nil {
for _, c := range s.regConf.ServiceArray {
if c.Service == rcvr.Service() && c.Version == rcvr.Version() {
flag = true
attr.Group = c.Group
attr.Protocol = c.Protocol

service := gxregistry.Service{Attr: &attr}
service.Nodes = append(service.Nodes,
&gxregistry.Node{
ID: c.NodeID,
Address: c.LocalHost,
Port: int32(c.LocalPort),
},
)
if err := s.registry.Register(service); err != nil {
return jerrors.Trace(err)
}
}
}
if !flag {
return jerrors.Errorf("can not find @rcvr{service:%s, version:%s} in registry config:%#v",
rcvr.Service(), rcvr.Version(), s.regConf)
}

if err := s.Server.Register(rcvr); err != nil {
return jerrors.Trace(err)
}

Expand Down
Loading

0 comments on commit fc8c917

Please sign in to comment.