Skip to content

Commit

Permalink
Merge 9c6aceb into b63555b
Browse files Browse the repository at this point in the history
  • Loading branch information
ChinX committed Feb 8, 2020
2 parents b63555b + 9c6aceb commit c051e7a
Show file tree
Hide file tree
Showing 22 changed files with 1,398 additions and 380 deletions.
56 changes: 56 additions & 0 deletions etc/conf/syncer.yaml
@@ -0,0 +1,56 @@
# run mode, supports (single, cluster)
mode: signle
# node name, must be unique on the network
node: syncer-node
# Cluster name, clustering by this name
cluster: syncer-cluster
dataDir: ./syncer-data/
listener:
# Address used to network with other Syncers in LAN
bindAddr: 0.0.0.0:30190
# Address used to network with other Syncers in WAN
advertiseAddr: ""
# Address used to synchronize data with other Syncers
rpcAddr: 0.0.0.0:30191
# Address used to communicate with other cluster peers
peerAddr: 127.0.0.1:30192
tlsMount:
enabled: false
name: syncer
join:
enabled: false
# Address to join the network by specifying at least one existing member
address: 127.0.0.1:30190
# Limit the maximum of RetryJoin, default is 0, means no limit
retryMax: 3
retryInterval: 30s
task:
kind: ticker
params:
# Time interval between timing tasks, default is 30s
- key: interval
value: 30s
registry:
plugin: servicecenter
address: http://127.0.0.1:30100
tlsMount:
enabled: false
name: servicecenter
tlsConfigs:
- name: syncer
verifyPeer: true
minVersion: TLSv1.2
caFile: ./certs/trust.cer
certFile: ./certs/server.cer
keyFile: ./certs/server_key.pem
passphrase: ""
ciphers:
- TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
- TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
- TLS_RSA_WITH_AES_256_GCM_SHA384
- TLS_RSA_WITH_AES_128_GCM_SHA256
- name: servicecenter
verifyPeer: false
caFile: ./certs/trust.cer
certFile: ./certs/server.cer
keyFile: ./certs/server_key.pem
4 changes: 3 additions & 1 deletion go.mod
Expand Up @@ -73,8 +73,9 @@ require (
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492 // indirect
github.com/opentracing/opentracing-go v1.0.2
github.com/openzipkin/zipkin-go-opentracing v0.3.3-0.20180123190626-6bb822a7f15f
github.com/pborman/uuid v1.2.0 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pkg/errors v0.8.1 // indirect
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.8.1-0.20170628125436-ab4214782d02
github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e // indirect
Expand All @@ -86,6 +87,7 @@ require (
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/spf13/cobra v0.0.0-20170624150100-4d647c8944eb
github.com/spf13/pflag v1.0.0
github.com/stretchr/testify v1.3.0
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/ugorji/go v1.1.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
Expand Down
44 changes: 27 additions & 17 deletions syncer/cmd/daemon.go
Expand Up @@ -26,7 +26,7 @@ import (
)

var (
conf = config.DefaultConfig()
conf = &config.Config{}
configFile = ""
)

Expand All @@ -42,28 +42,28 @@ func init() {
syncerCmd.Flags().StringVar(&conf.Mode, "mode", conf.Mode,
"run mode")

syncerCmd.Flags().StringVar(&conf.NodeName, "node", conf.NodeName,
syncerCmd.Flags().StringVar(&conf.Node, "node", conf.Node,
"node name")

syncerCmd.Flags().StringVar(&conf.BindAddr, "bind-addr", conf.BindAddr,
"address to bind listeners to")
syncerCmd.Flags().StringVar(&conf.Cluster, "cluster", conf.Cluster,
"cluster name")

syncerCmd.Flags().StringVar(&conf.RPCAddr, "rpc-addr", conf.RPCAddr,
"port to bind RPC listener to")
syncerCmd.Flags().StringVar(&conf.Listener.BindAddr, "bind-addr", conf.Listener.BindAddr,
"address used to network with other Syncers")

syncerCmd.Flags().StringVar(&conf.JoinAddr, "join-addr", conf.JoinAddr,
"address to join the cluster by specifying at least one existing member")
syncerCmd.Flags().StringVar(&conf.Listener.RPCAddr, "rpc-addr", conf.Listener.RPCAddr,
"port used to synchronize data with other Syncers")

syncerCmd.Flags().StringVar(&conf.SC.Addr, "sc-addr", conf.SC.Addr,
"address to monitor the service-center")
syncerCmd.Flags().StringVar(&conf.Listener.PeerAddr, "peer-addr", conf.Listener.PeerAddr,
"port used to communicate with other cluster members")

syncerCmd.Flags().StringVar(&conf.ClusterName, "cluster-name", conf.ClusterName,
"name to group members into cluster")
syncerCmd.Flags().StringVar(&conf.Join.Address, "join", "",
"address to join the network by specifying at least one existing member")

syncerCmd.Flags().IntVar(&conf.ClusterPort, "cluster-port", conf.ClusterPort,
"port to communicate between cluster members")
syncerCmd.Flags().StringVar(&conf.Registry.Address, "registry", conf.Registry.Address,
"address to monitor the registry")

syncerCmd.Flags().StringVar(&conf.SC.Plugin, "sc-plugin", conf.SC.Plugin,
syncerCmd.Flags().StringVar(&conf.Registry.Plugin, "plugin", conf.Registry.Plugin,
"plugin name of servicecenter")

syncerCmd.Flags().StringVar(&configFile, "config", "",
Expand All @@ -72,16 +72,26 @@ func init() {

// runSyncer Runs the Syncer service.
func runSyncer(cmd *cobra.Command, args []string) {
if conf.Join.Address != "" {
conf.Join.Enabled = true
}

defaultConfig := config.DefaultConfig()

if configFile != "" {
fromFile, err := config.LoadConfig(configFile)
if err != nil {
log.Errorf(err, "load config file failed")
return
}
conf.Merge(fromFile)
if fromFile != nil {
*defaultConfig = config.Merge(*defaultConfig, *fromFile)
}
}

if err := conf.Verify(); err != nil {
*conf = config.Merge(*defaultConfig, *conf)
err := config.Verify(conf)
if err != nil {
log.Errorf(err, "verify syncer config failed")
return
}
Expand Down
155 changes: 36 additions & 119 deletions syncer/config/config.go
Expand Up @@ -17,93 +17,58 @@
package config

import (
"crypto/md5"
"fmt"
"io/ioutil"
"os"
"strings"
"path/filepath"
"strconv"

"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/syncer/etcd"
"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
"github.com/apache/servicecomb-service-center/syncer/plugins"
_ "github.com/apache/servicecomb-service-center/syncer/plugins/eureka"
"github.com/apache/servicecomb-service-center/syncer/plugins/servicecenter"
"github.com/apache/servicecomb-service-center/syncer/serf"
"github.com/pkg/errors"
"gopkg.in/yaml.v2"
"k8s.io/apimachinery/pkg/util/uuid"
)

var (
DefaultDCPort = 30100
DefaultClusterPort = 30192
DefaultTickerInterval = 30
DefaultConfigPath = "./conf/config.yaml"

syncerName = ""
servicecenterName = "servicecenter"
)

// Config is the configuration that can be set for Syncer. Some of these
// configurations are exposed as command-line flags.
type Config struct {
// Wraps the serf config
*serf.Config

// Wraps the etcd config
Etcd *etcd.Config
LogFile string `yaml:"log_file"`

// JoinAddr The management address of one gossip pool member.
JoinAddr string `yaml:"join_addr"`
TickerInterval int `yaml:"ticker_interval"`
Profile string `yaml:"profile"`
EnableCompression bool `yaml:"enable_compression"`
AutoSync bool `yaml:"auto_sync"`
TLSConfig *TLSConfig `yaml:"tls_config"`
SC *ServiceCenter `yaml:"servicecenter"`
}

// ServiceCenter configuration
type ServiceCenter struct {
// Addr servicecenter address, which is the service registry address.
// Cluster mode is supported, and multiple addresses are separated by an English ",".
Addr string `yaml:"addr"`
Plugin string `yaml:"plugin"`
TLSConfig *TLSConfig `yaml:"tls_config"`
Endpoints []string `yaml:"-"`
}

// DefaultConfig returns the default config
func DefaultConfig() *Config {
serfConf := serf.DefaultConfig()
etcdConf := etcd.DefaultConfig()
hostname, err := os.Hostname()
if err != nil {
log.Errorf(err, "Error determining hostname: %s", err)
return nil
hostname = string(uuid.NewUUID())
}
serfConf.NodeName = hostname
etcdConf.Name = hostname

return &Config{
TickerInterval: DefaultTickerInterval,
Config: serfConf,
Etcd: etcdConf,
TLSConfig: DefaultTLSConfig(),
SC: &ServiceCenter{
Addr: fmt.Sprintf("127.0.0.1:%d", DefaultDCPort),
Plugin: servicecenter.PluginName,
TLSConfig: NewTLSConfig(servicecenterName),
Mode: ModeSingle,
Node: hostname,
DataDir: defaultDataDir + hostname,
Listener: Listener{
BindAddr: "0.0.0.0:" + strconv.Itoa(defaultBindPort),
RPCAddr: "0.0.0.0:" + strconv.Itoa(defaultRPCPort),
PeerAddr: "127.0.0.1:" + strconv.Itoa(defaultPeerPort),
},
Task: Task{
Kind: "ticker",
Params: []Label{
{
Key: defaultTaskKey,
Value: defaultTaskValue,
},
},
},
Registry: Registry{
Address: "http://127.0.0.1:30100",
Plugin: defaultDCPluginName,
},
}
}

// LoadConfig loads configuration from file
func LoadConfig(filepath string) (*Config, error) {
if filepath == "" {
filepath = DefaultConfigPath
return nil, nil
}
if !(utils.IsFileExist(filepath)) {
err := fmt.Errorf("file is not exist")
err := errors.New("file is not exist")
log.Errorf(err, "Load config from %s failed", filepath)
return nil, err
}
Expand All @@ -123,63 +88,15 @@ func LoadConfig(filepath string) (*Config, error) {
return conf, nil
}

// Merge other configuration into the current configuration
func (c *Config) Merge(other *Config) {
if c.TLSConfig != nil && other.TLSConfig != nil {
c.TLSConfig.Merge(syncerName, other.TLSConfig)
}

if c.SC != nil && c.SC.TLSConfig != nil && other.SC != nil && other.SC.TLSConfig != nil {
c.SC.TLSConfig.Merge(servicecenterName, other.SC.TLSConfig)
}
}

// Verify Provide config verification
func (c *Config) Verify() error {
ip, port, err := utils.SplitHostPort(c.BindAddr, serf.DefaultBindPort)
if err != nil {
return err
}
if ip == "127.0.0.1" {
c.BindAddr = fmt.Sprintf("0.0.0.0:%d", port)
}

ip, port, err = utils.SplitHostPort(c.RPCAddr, serf.DefaultRPCPort)
if err != nil {
return err
}
c.RPCPort = port
if ip == "127.0.0.1" {
c.RPCAddr = fmt.Sprintf("0.0.0.0:%d", c.RPCPort)
}

if c.JoinAddr != "" {
c.RetryJoin = strings.Split(c.JoinAddr, ",")
}

if c.ClusterName == "" {
c.ClusterName = fmt.Sprintf("%x", md5.Sum([]byte(c.SC.Addr)))
}

c.TLSEnabled = c.TLSConfig.Enabled

c.SC.Endpoints = strings.Split(c.SC.Addr, ",")

c.Etcd.SetName(c.NodeName)
return nil
func (c *Config) GetTLSConfig(name string) *TLSConfig {
return findInTLSConfigs(c.TLSConfigs, name)
}

func (sc *ServiceCenter) SCConfigOps() []plugins.SCConfigOption {
opts := []plugins.SCConfigOption{plugins.WithEndpoints(strings.Split(sc.Addr, ","))}
if sc.TLSConfig.Enabled {
opts = append(opts,
plugins.WithTLSEnabled(sc.TLSConfig.Enabled),
plugins.WithTLSVerifyPeer(sc.TLSConfig.VerifyPeer),
plugins.WithTLSPassphrase(sc.TLSConfig.Passphrase),
plugins.WithTLSCAFile(sc.TLSConfig.CAFile),
plugins.WithTLSCertFile(sc.TLSConfig.CertFile),
plugins.WithTLSKeyFile(sc.TLSConfig.KeyFile),
)
func pathFromSSLEnvOrDefault(server, path string) string {
env := os.Getenv(defaultEnvSSLRoot)
if len(env) == 0 {
wd, _ := os.Getwd()
return filepath.Join(wd, defaultCertsDir, server, path)
}
return opts
return os.ExpandEnv(filepath.Join("$"+defaultEnvSSLRoot, server, path))
}

0 comments on commit c051e7a

Please sign in to comment.