From 072ae4da9aeb50e65f469a578da2df08acff6cdf Mon Sep 17 00:00:00 2001 From: bryanhe-bupt Date: Tue, 25 Feb 2020 17:45:36 +0800 Subject: [PATCH] feature: bcs-k8s-watch supports to sync dato to storage in external network. issue #377 --- bcs-k8s/bcs-k8s-watch/app/app.go | 4 ++-- bcs-k8s/bcs-k8s-watch/app/bcs/cluster.go | 8 ++++---- bcs-k8s/bcs-k8s-watch/app/bcs/service.go | 12 ++++++++++-- bcs-k8s/bcs-k8s-watch/app/options/options.go | 3 +++ 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/bcs-k8s/bcs-k8s-watch/app/app.go b/bcs-k8s/bcs-k8s-watch/app/app.go index 49276ba53c..81b6c2d93b 100644 --- a/bcs-k8s/bcs-k8s-watch/app/app.go +++ b/bcs-k8s/bcs-k8s-watch/app/app.go @@ -220,14 +220,14 @@ func RunAsLeader(stopChan <-chan struct{}, config *options.WatchConfig, clusterI bcsTLSConfig := config.BCS.TLS glog.Info("getting storage service now...") - storageService, storageServiceZKRD, err := bcs.GetStorageService(zkHosts, bcsTLSConfig, config.BCS.CustomStorageEndpoints) + storageService, storageServiceZKRD, err := bcs.GetStorageService(zkHosts, bcsTLSConfig, config.BCS.CustomStorageEndpoints, config.BCS.IsExternal) if err != nil { panic(err) } glog.Info("get storage service done") glog.Info("getting netservice now...") - netservice, netserviceZKRD, err := bcs.GetNetService(netServiceZKHosts, bcsTLSConfig, config.BCS.CustomNetServiceEndpoints) + netservice, netserviceZKRD, err := bcs.GetNetService(netServiceZKHosts, bcsTLSConfig, config.BCS.CustomNetServiceEndpoints, false) if err != nil { panic(err) } diff --git a/bcs-k8s/bcs-k8s-watch/app/bcs/cluster.go b/bcs-k8s/bcs-k8s-watch/app/bcs/cluster.go index 01ca7bb2bb..cad73a1869 100644 --- a/bcs-k8s/bcs-k8s-watch/app/bcs/cluster.go +++ b/bcs-k8s/bcs-k8s-watch/app/bcs/cluster.go @@ -136,7 +136,7 @@ func GetClusterID(zkHosts string, hostIP string, bcsTLSConfig options.TLS) (stri } // GetStorageService returns storage InnerService object for discovery. -func GetStorageService(zkHosts string, bcsTLSConfig options.TLS, customEndpoints []string) (*InnerService, *RegisterDiscover.RegDiscover, error) { +func GetStorageService(zkHosts string, bcsTLSConfig options.TLS, customEndpoints []string, isExternal bool) (*InnerService, *RegisterDiscover.RegDiscover, error) { discovery := RegisterDiscover.NewRegDiscoverEx(zkHosts, 5*time.Second) if err := discovery.Start(); err != nil { return nil, nil, fmt.Errorf("get storage service from ZK failed, %+v", err) @@ -151,14 +151,14 @@ func GetStorageService(zkHosts string, bcsTLSConfig options.TLS, customEndpoints return nil, nil, fmt.Errorf("discover storage service failed, %+v", err) } - storageService := NewInnerService(types.BCS_MODULE_STORAGE, eventChan, customEndpoints) + storageService := NewInnerService(types.BCS_MODULE_STORAGE, eventChan, customEndpoints, isExternal) go storageService.Watch(bcsTLSConfig) return storageService, discovery, nil } // GetNetService returns netservice InnerService object for discovery. -func GetNetService(zkHosts string, bcsTLSConfig options.TLS, customEndpoints []string) (*InnerService, *RegisterDiscover.RegDiscover, error) { +func GetNetService(zkHosts string, bcsTLSConfig options.TLS, customEndpoints []string, isExternal bool) (*InnerService, *RegisterDiscover.RegDiscover, error) { discovery := RegisterDiscover.NewRegDiscoverEx(zkHosts, 5*time.Second) if err := discovery.Start(); err != nil { return nil, nil, fmt.Errorf("get netservice from ZK failed, %+v", err) @@ -174,7 +174,7 @@ func GetNetService(zkHosts string, bcsTLSConfig options.TLS, customEndpoints []s return nil, nil, fmt.Errorf("discover netservice failed, %+v", err) } - netService := NewInnerService(types.BCS_MODULE_NETSERVICE, eventChan, customEndpoints) + netService := NewInnerService(types.BCS_MODULE_NETSERVICE, eventChan, customEndpoints, isExternal) go netService.Watch(bcsTLSConfig) return netService, discovery, nil diff --git a/bcs-k8s/bcs-k8s-watch/app/bcs/service.go b/bcs-k8s/bcs-k8s-watch/app/bcs/service.go index 928b06f4b3..0aa04387ce 100644 --- a/bcs-k8s/bcs-k8s-watch/app/bcs/service.go +++ b/bcs-k8s/bcs-k8s-watch/app/bcs/service.go @@ -54,17 +54,19 @@ type InnerService struct { eventChan <-chan *RegisterDiscover.DiscoverEvent servers map[string]*HTTPClientConfig customEndpoints []string + isExternal bool } // NewInnerService creates a new serviceName InnerService instance for discovery. func NewInnerService(serviceName string, eventChan <-chan *RegisterDiscover.DiscoverEvent, - customEndpoints []string) *InnerService { + customEndpoints []string, isExternal bool) *InnerService { svc := &InnerService{ name: serviceName, eventChan: eventChan, servers: make(map[string]*HTTPClientConfig), customEndpoints: customEndpoints, + isExternal: isExternal, } return svc @@ -148,7 +150,13 @@ func (s *InnerService) update(servers []string, bcsTLSConfig options.TLS) { continue } - address := fmt.Sprintf("%s://%s:%d", serverInfo.Scheme, serverInfo.IP, serverInfo.Port) + var address string + if s.isExternal { + address = fmt.Sprintf("%s://%s:%d", serverInfo.Scheme, serverInfo.ExternalIp, serverInfo.ExternalPort) + } else { + address = fmt.Sprintf("%s://%s:%d", serverInfo.Scheme, serverInfo.IP, serverInfo.Port) + } + currentServers[address] = "" if _, exists := s.servers[address]; !exists { diff --git a/bcs-k8s/bcs-k8s-watch/app/options/options.go b/bcs-k8s/bcs-k8s-watch/app/options/options.go index 4d4d3e59d9..38e1614d3f 100644 --- a/bcs-k8s/bcs-k8s-watch/app/options/options.go +++ b/bcs-k8s/bcs-k8s-watch/app/options/options.go @@ -69,6 +69,9 @@ type BCSConfig struct { // CustomNetServiceEndpoints is custom target netservice endpoints. CustomNetServiceEndpoints []string `json:"custom-netservice-endpoints"` + + // whether the k8s cluster and bcs-k8s-watch is in external network + IsExternal bool `json:"is-external"` } type K8sConfig struct {