From 4ba4aba5841cb0dbce1605bc627881b77e384690 Mon Sep 17 00:00:00 2001 From: bergzhao <13611129507@163.com> Date: Fri, 29 May 2020 16:36:53 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20time.Ticker=E6=B2=A1=E6=9C=89=E5=85=B3?= =?UTF-8?q?=E9=97=AD=EF=BC=8C=E5=AF=BC=E8=87=B4cpu=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E7=8E=87=E8=BF=87=E9=AB=98=20#478?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bcs-common/common/blog/glog/glog.go | 4 +- bcs-common/pkg/master/zookeeper.go | 3 + bcs-common/pkg/reflector/reflector.go | 2 + bcs-common/pkg/storage/zookeeper/util.go | 2 + .../driver/cbs/controller.go | 1 + bcs-mesos/bcs-check/bcscheck/bcscheck.go | 411 ++++++++++++++++++ .../console-proxy/manager/console.go | 1 + .../app/bcs_executor.go | 2 + .../connection/fakeconn.go | 1 + .../container/cni/cni_pod.go | 1 + .../container/cnm/cnm_pod.go | 1 + .../executor/executor.go | 2 + .../bcs-container-executor/healthcheck/cmd.go | 1 + .../healthcheck/http.go | 1 + .../bcs-container-executor/healthcheck/tcp.go | 1 + .../hpacontroller/controller/autoscaler.go | 2 + .../metrics/resources/resourcesCollector.go | 1 + .../hpacontroller/scaler/bcs-mesos.go | 1 + .../mesosdriver/mesosdriver.go | 2 + bcs-mesos/bcs-mesos-watch/app/watch.go | 2 + .../bcs-mesos-watch/cluster/etcd/configmap.go | 1 + .../cluster/etcd/deployment.go | 1 + .../bcs-mesos-watch/cluster/etcd/endpoint.go | 1 + .../cluster/etcd/exportservice.go | 1 + .../bcs-mesos-watch/cluster/etcd/secret.go | 1 + .../bcs-mesos-watch/cluster/etcd/service.go | 1 + .../bcs-mesos-watch/cluster/mesos/app.go | 2 + .../cluster/mesos/configmap.go | 1 + .../cluster/mesos/deployment.go | 1 + .../bcs-mesos-watch/cluster/mesos/endpoint.go | 1 + .../cluster/mesos/exportservice.go | 1 + .../bcs-mesos-watch/cluster/mesos/mesos.go | 1 + .../bcs-mesos-watch/cluster/mesos/secret.go | 1 + .../bcs-mesos-watch/cluster/mesos/service.go | 1 + .../cluster/mesos/taskgroup.go | 2 + bcs-mesos/bcs-mesos-watch/storage/cc.go | 1 + .../bcs-mesos-watch/storage/channelproxy.go | 2 +- .../src/manager/sched/offer/offerpool.go | 1 + .../src/manager/sched/scheduler/bcs_event.go | 2 + .../src/manager/sched/scheduler/scheduler.go | 3 + .../manager/sched/scheduler/service_mgr.go | 1 + .../src/plugin/bin/ip-resources/ipResource.go | 1 + .../plugin/dynamicPlugin/dynamicPlugin.go | 1 + .../controller/controller.go | 1 + .../controller/networkpolicy.go | 1 + bcs-services/bcs-api/auth/bkiam/cache.go | 1 + .../bcs-api/pkg/server/proxier/proxier.go | 1 + .../pkg/processor/processor.go | 1 + .../bcsscheduler/controller/zkController.go | 1 + .../pkg/processor/processor.go | 1 + .../bcs-loadbalance/app/eventprocessor.go | 2 + bcs-services/bcs-loadbalance/app/reflector.go | 1 + bcs-services/bcs-loadbalance/clear/clear.go | 1 + .../bcs-logbeat-sidecar/sidecar/controller.go | 2 + .../pkg/watch/cluster-dynamic.go | 1 + .../pkg/watch/cluster-endpoint.go | 1 + .../pkg/watch/cluster-metric.go | 1 + .../bcs-metricservice/pkg/watch/watch.go | 2 + .../storage/zookeeper/zookeeper.go | 1 + .../network-detection/network-detection.go | 1 + .../discovery/bcs-mesos-discovery.go | 183 ++++++++ .../discovery/bcs-service-discovery.go | 151 +++++++ .../storage/actions/v1http/alarms/utils.go | 2 + .../storage/actions/v1http/events/utils.go | 2 + .../app/user-manager/v1http/permission.go | 1 + 65 files changed, 828 insertions(+), 2 deletions(-) create mode 100644 bcs-mesos/bcs-check/bcscheck/bcscheck.go create mode 100644 bcs-services/bcs-sd-prometheus/discovery/bcs-mesos-discovery.go create mode 100644 bcs-services/bcs-sd-prometheus/discovery/bcs-service-discovery.go diff --git a/bcs-common/common/blog/glog/glog.go b/bcs-common/common/blog/glog/glog.go index c1c13286c7..4668176b66 100644 --- a/bcs-common/common/blog/glog/glog.go +++ b/bcs-common/common/blog/glog/glog.go @@ -886,7 +886,9 @@ const flushInterval = 30 * time.Second // flushDaemon periodically flushes the log file buffers. func (l *loggingT) flushDaemon() { - for _ = range time.NewTicker(flushInterval).C { + ticker := time.NewTicker(flushInterval) + defer ticker.Stop() + for _ = range ticker.C { l.lockAndFlushAll() } } diff --git a/bcs-common/pkg/master/zookeeper.go b/bcs-common/pkg/master/zookeeper.go index 8bc52bfce4..279d6b65e7 100644 --- a/bcs-common/pkg/master/zookeeper.go +++ b/bcs-common/pkg/master/zookeeper.go @@ -263,6 +263,9 @@ func (zk *ZookeeperMaster) masterLoop() { func (zk *ZookeeperMaster) healthLoop() { masterTick := time.NewTicker(time.Second * 2) selfTick := time.NewTicker(time.Second * 30) + defer masterTick.Stop() + defer selfTick.Stop() + for { select { case <-zk.exitCxt.Done(): diff --git a/bcs-common/pkg/reflector/reflector.go b/bcs-common/pkg/reflector/reflector.go index 84901e799c..a4191a41a8 100644 --- a/bcs-common/pkg/reflector/reflector.go +++ b/bcs-common/pkg/reflector/reflector.go @@ -73,8 +73,10 @@ func (r *Reflector) Run() { blog.V(3).Infof("%s first resynchronization & watch success, register all ticker", r.name) //create ticker for data object resync syncTick := time.NewTicker(r.syncPeriod) + defer syncTick.Stop() //create ticker check stable watcher watchTick := time.NewTicker(time.Second * 2) + defer watchTick.Stop() for { select { case <-r.cxt.Done(): diff --git a/bcs-common/pkg/storage/zookeeper/util.go b/bcs-common/pkg/storage/zookeeper/util.go index 30281a91f6..cc497d44cc 100644 --- a/bcs-common/pkg/storage/zookeeper/util.go +++ b/bcs-common/pkg/storage/zookeeper/util.go @@ -115,6 +115,7 @@ func (n *Node) Run() { go n.childrenLoop() } tick := time.NewTicker(time.Second * 3) + defer tick.Stop() for { if n.isStopped { return @@ -208,6 +209,7 @@ func (n *Node) selfLoop() { } //wait for next event forceTick := time.NewTicker(time.Second * 300) + defer forceTick.Stop() for { select { case <-n.watchCxt.Done(): diff --git a/bcs-k8s/bcs-k8s-csi-tencentcloud/driver/cbs/controller.go b/bcs-k8s/bcs-k8s-csi-tencentcloud/driver/cbs/controller.go index b9fb7f8577..59a5f09179 100644 --- a/bcs-k8s/bcs-k8s-csi-tencentcloud/driver/cbs/controller.go +++ b/bcs-k8s/bcs-k8s-csi-tencentcloud/driver/cbs/controller.go @@ -297,6 +297,7 @@ func (ctrl *cbsController) CreateVolume(ctx context.Context, req *csi.CreateVolu disk := new(cbs.Disk) ticker := time.NewTicker(time.Second * 5) + defer ticker.Stop() ctx, cancel := context.WithTimeout(context.Background(), time.Second*120) defer cancel() diff --git a/bcs-mesos/bcs-check/bcscheck/bcscheck.go b/bcs-mesos/bcs-check/bcscheck/bcscheck.go new file mode 100644 index 0000000000..ba48ad53f7 --- /dev/null +++ b/bcs-mesos/bcs-check/bcscheck/bcscheck.go @@ -0,0 +1,411 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package bcscheck + +import ( + rd "bk-bcs/bcs-common/common/RegisterDiscover" + "bk-bcs/bcs-common/common/blog" + "bk-bcs/bcs-common/common/metric" + commtype "bk-bcs/bcs-common/common/types" + "bk-bcs/bcs-common/common/version" + "bk-bcs/bcs-mesos/bcs-check/bcscheck/config" + "bk-bcs/bcs-mesos/bcs-check/bcscheck/manager" + "bk-bcs/bcs-mesos/bcs-check/bcscheck/task" + "encoding/json" + "fmt" + "golang.org/x/net/context" + "os" + "runtime" + "time" +) + +type HealthCheckServer struct { + conf config.HealthCheckConfig + + manager manager.Manager + + task task.TaskManager + + isMaster bool + Role metric.RoleType + + cxt context.Context + cancel context.CancelFunc +} + +func NewHealthCheckServer(conf config.HealthCheckConfig) *HealthCheckServer { + cxt, cancel := context.WithCancel(context.Background()) + + by, _ := json.Marshal(conf) + blog.V(3).Infof("NewHealthCheckServer conf %s", string(by)) + + s := &HealthCheckServer{ + conf: conf, + cxt: cxt, + cancel: cancel, + } + + return s +} + +func (s *HealthCheckServer) initManager() { + s.manager = manager.NewManager(s.cxt, s.conf) +} + +func (s *HealthCheckServer) initTaskManeger() { + s.task = task.NewTaskManager(s.cxt, s.conf, s.manager) +} + +func (s *HealthCheckServer) initClusterId() { + + if s.conf.Cluster == "" { + blog.Error("healthcheck cluster unknown") + os.Exit(1) + } + + blog.Infof("healthcheck run for cluster %s", s.conf.Cluster) + +} + +func (s *HealthCheckServer) Run() { + s.initClusterId() + go s.regDiscover() + go s.regBcsDiscover() + go s.runMetric() +} + +func (s *HealthCheckServer) Stop() { + blog.Info("HealthCheckServer stopped") + s.cancel() +} + +func (s *HealthCheckServer) masterStart() { + blog.Infof("health check role change: slave --> master") + + s.isMaster = true + + blog.Info("Manager run ...") + s.initManager() + go s.manager.Run() + + blog.Info("TaskManager run ...") + s.initTaskManeger() + go s.task.Run() +} + +func (s *HealthCheckServer) slaveStop() { + blog.Infof("health check role change: master --> slave") + + s.isMaster = false + s.Stop() + + cxt, cancel := context.WithCancel(context.Background()) + s.cxt = cxt + s.cancel = cancel +} + +func (s *HealthCheckServer) regDiscover() { + + blog.Info("HealthCheckServer to do register ...") + + // register service + regDiscv := rd.NewRegDiscoverEx(s.conf.RegDiscvSvr, time.Second*10) + if regDiscv == nil { + blog.Error("NewRegDiscover(%s) return nil, redo after 3 second ...", s.conf.RegDiscvSvr) + time.Sleep(3 * time.Second) + + go s.regDiscover() + return + } + blog.Info("NewRegDiscover(%s) succ", s.conf.RegDiscvSvr) + + err := regDiscv.Start() + if err != nil { + blog.Error("regDiscv start error(%s), redo after 3 second ...", err.Error()) + time.Sleep(3 * time.Second) + + go s.regDiscover() + return + } + blog.Info("RegDiscover start succ") + + defer func() { + err = regDiscv.Stop() + if err != nil { + blog.Errorf("regDiscv stop error %s", err.Error()) + } + }() + + host, err := os.Hostname() + if err != nil { + blog.Error("health check get hostname err: %s", err.Error()) + host = "UNKOWN" + } + var regInfo commtype.BcsCheckInfo + regInfo.ServerInfo.Cluster = s.conf.Cluster + regInfo.ServerInfo.IP = s.conf.Address + regInfo.ServerInfo.HostName = host + regInfo.ServerInfo.Pid = os.Getpid() + regInfo.ServerInfo.Version = version.GetVersion() + regInfo.ServerInfo.MetricPort = s.conf.MetricPort + + key := fmt.Sprintf("%s/%s/%s.%d", commtype.BCS_SERV_BASEPATH, commtype.BCS_MODULE_Check, + s.conf.Address, os.Getpid()) + + //key = commtype.BCS_SERV_BASEPATH + "/" + commtype.BCS_MODULE_Check + "/" + regInfo.Cluster + "/" + s.conf.Address + + data, err := json.Marshal(regInfo) + if err != nil { + blog.Error("json Marshal error(%s)", err.Error()) + return + } + + err = regDiscv.RegisterService(key, []byte(data)) + if err != nil { + blog.Error("RegisterService(%s) error(%s), redo after 3 second ...", key, err.Error()) + time.Sleep(3 * time.Second) + go s.regDiscover() + return + } + + blog.Info("RegisterService(%s:%s) succ", key, data) + + discvPath := commtype.BCS_SERV_BASEPATH + "/" + commtype.BCS_MODULE_Check + + discvEvent, err := regDiscv.DiscoverService(discvPath) + if err != nil { + blog.Error("DiscoverService(%s) error(%s), redo after 3 second ...", discvPath, err.Error()) + time.Sleep(3 * time.Second) + go s.regDiscover() + return + } + + blog.Info("DiscoverService(%s) succ", discvPath) + + tick := time.NewTicker(180 * time.Second) + defer tick.Stop() + for { + select { + case <-tick.C: + blog.Info("tick: health check(cluster:%s %s) running, current goroutine num (%d)", + s.conf.Cluster, s.conf.Address, runtime.NumGoroutine()) + + case event := <-discvEvent: + blog.Info("get discover event") + + if event.Err != nil { + blog.Error("get discover event err:%s, redo after 3 second ...", event.Err.Error()) + time.Sleep(3 * time.Second) + go s.regDiscover() + return + } + + isRegstered := false + isMaster := false + + for i, server := range event.Server { + blog.Info("discovered : server[%d]: %s %s", i, event.Key, server) + if server == string(data) { + blog.Info("discovered : server[%d] is myself", i) + isRegstered = true + } + + if i == 0 && server == string(data) { + isMaster = true + blog.Info("discoved : I am master") + } + } + + if isRegstered == false { + blog.Warn("drive is not regestered in zk, do register after 3 second ...") + time.Sleep(3 * time.Second) + s.Role = metric.UnknownRole + go s.regDiscover() + return + } + + //from slave change to master + if isMaster && !s.isMaster { + s.Role = metric.MasterRole + s.masterStart() + } + + if !isMaster && s.isMaster { + s.Role = metric.SlaveRole + s.slaveStop() + } + + } // end select + } // end for +} + +func (s *HealthCheckServer) regBcsDiscover() { + + blog.Info("HealthCheckServer to do bcs register ...") + + clusterId := s.conf.Cluster + + // register service + regDiscv := rd.NewRegDiscoverEx(s.conf.BcsDiscvSvr, time.Second*10) + if regDiscv == nil { + blog.Error("NewRegDiscover(%s) return nil, redo after 3 second ...", s.conf.BcsDiscvSvr) + time.Sleep(3 * time.Second) + + go s.regDiscover() + return + } + blog.Info("NewRegDiscover(%s) succ", s.conf.BcsDiscvSvr) + + err := regDiscv.Start() + if err != nil { + blog.Error("regDiscv start error(%s), redo after 3 second ...", err.Error()) + time.Sleep(3 * time.Second) + + go s.regDiscover() + return + } + blog.Info("RegDiscover start succ") + + defer func() { + err = regDiscv.Stop() + if err != nil { + blog.Errorf("regDiscv stop error %s", err.Error()) + } + }() + + host, err := os.Hostname() + if err != nil { + blog.Error("health check get hostname err: %s", err.Error()) + host = "UNKOWN" + } + var regInfo commtype.BcsCheckInfo + regInfo.ServerInfo.Cluster = s.conf.Cluster + regInfo.ServerInfo.IP = s.conf.Address + regInfo.ServerInfo.HostName = host + regInfo.ServerInfo.Pid = os.Getpid() + regInfo.ServerInfo.Version = version.GetVersion() + regInfo.ServerInfo.MetricPort = s.conf.MetricPort + + key := fmt.Sprintf("%s/%s/%s/%s.%d", commtype.BCS_SERV_BASEPATH, commtype.BCS_MODULE_Check, clusterId, + s.conf.Address, os.Getpid()) + + //key = commtype.BCS_SERV_BASEPATH + "/" + commtype.BCS_MODULE_Check + "/" + regInfo.Cluster + "/" + s.conf.Address + + data, err := json.Marshal(regInfo) + if err != nil { + blog.Error("json Marshal error(%s)", err.Error()) + return + } + + err = regDiscv.RegisterService(key, []byte(data)) + if err != nil { + blog.Error("RegisterService(%s) error(%s), redo after 3 second ...", key, err.Error()) + time.Sleep(3 * time.Second) + go s.regDiscover() + return + } + + blog.Info("RegisterService(%s:%s) succ", key, data) + + discvPath := commtype.BCS_SERV_BASEPATH + "/" + commtype.BCS_MODULE_Check + "/" + clusterId + + discvEvent, err := regDiscv.DiscoverService(discvPath) + if err != nil { + blog.Error("DiscoverService(%s) error(%s), redo after 3 second ...", discvPath, err.Error()) + time.Sleep(3 * time.Second) + go s.regDiscover() + return + } + + blog.Info("DiscoverService(%s) succ", discvPath) + + tick := time.NewTicker(180 * time.Second) + defer tick.Stop() + for { + select { + case <-tick.C: + blog.Info("tick: health check(cluster:%s %s) running, current goroutine num (%d)", + s.conf.Cluster, s.conf.Address, runtime.NumGoroutine()) + + case event := <-discvEvent: + blog.Info("get discover event") + + if event.Err != nil { + blog.Error("get discover event err:%s, redo after 3 second ...", event.Err.Error()) + time.Sleep(3 * time.Second) + go s.regDiscover() + return + } + + isRegstered := false + + for i, server := range event.Server { + blog.Info("discovered : server[%d]: %s %s", i, event.Key, server) + if server == string(data) { + blog.Info("discovered : server[%d] is myself", i) + isRegstered = true + } + } + + if isRegstered == false { + blog.Warn("drive is not regestered in zk, do register after 3 second ...") + time.Sleep(3 * time.Second) + + go s.regDiscover() + return + } + + } // end select + } // end for + +} + +func (s *HealthCheckServer) runMetric() { + + metricConf := metric.Config{ + RunMode: metric.Master_Slave_Mode, + ModuleName: commtype.BCS_MODULE_Check, + MetricPort: s.conf.MetricPort, + IP: s.conf.Address, + SvrCaFile: s.conf.ServCert.CAFile, + SvrCertFile: s.conf.ServCert.CertFile, + SvrKeyFile: s.conf.ServCert.KeyFile, + SvrKeyPwd: s.conf.ServCert.CertPasswd, + ClusterID: s.conf.Cluster, + } + + healthFunc := func() metric.HealthMeta { + var isHealthy bool + var msg string + if s.Role != metric.UnknownRole { + isHealthy = true + } else { + msg = fmt.Sprintf("bcs-check %s register zk %s failed", s.conf.Address, s.conf.RegDiscvSvr) + } + + return metric.HealthMeta{ + CurrentRole: s.Role, + IsHealthy: isHealthy, + Message: msg, + } + } + + if err := metric.NewMetricController( + metricConf, + healthFunc); nil != err { + blog.Errorf("run metric fail: %s", err.Error()) + } + + blog.Infof("run metric ok") +} diff --git a/bcs-mesos/bcs-consoleproxy/console-proxy/manager/console.go b/bcs-mesos/bcs-consoleproxy/console-proxy/manager/console.go index 47c31e7ccb..1f6b325d03 100644 --- a/bcs-mesos/bcs-consoleproxy/console-proxy/manager/console.go +++ b/bcs-mesos/bcs-consoleproxy/console-proxy/manager/console.go @@ -146,6 +146,7 @@ func (m *manager) StartExec(w http.ResponseWriter, r *http.Request, conf *types. //}) ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() go func() { for { select { diff --git a/bcs-mesos/bcs-container-executor/app/bcs_executor.go b/bcs-mesos/bcs-container-executor/app/bcs_executor.go index c035f2a76b..3f63a460f3 100644 --- a/bcs-mesos/bcs-container-executor/app/bcs_executor.go +++ b/bcs-mesos/bcs-container-executor/app/bcs_executor.go @@ -359,6 +359,7 @@ func (executor *BcsExecutor) LaunchTaskGroup(driver exec.ExecutorDriver, taskGro stopCh := make(chan struct{}) go func() { ticker := time.NewTicker(time.Minute) + defer ticker.Stop() for { select { @@ -580,6 +581,7 @@ func (executor *BcsExecutor) monitorPod() { }() tick := time.NewTicker(1 * time.Second) + defer tick.Stop() reporting := 0 for { select { diff --git a/bcs-mesos/bcs-container-executor/connection/fakeconn.go b/bcs-mesos/bcs-container-executor/connection/fakeconn.go index 3df60aa6e9..aca4caa0b1 100644 --- a/bcs-mesos/bcs-container-executor/connection/fakeconn.go +++ b/bcs-mesos/bcs-container-executor/connection/fakeconn.go @@ -83,6 +83,7 @@ func (httpConn *FakeConnection) slaveSimulation() { //send Shutdown in 10 second //send KillTask in 10 second ? tick := time.NewTicker(1 * time.Second) + defer tick.Stop() fmt.Fprintln(os.Stdout, "enter slave message sending loop") i := 0 for { diff --git a/bcs-mesos/bcs-container-executor/container/cni/cni_pod.go b/bcs-mesos/bcs-container-executor/container/cni/cni_pod.go index a42493f545..0d32a948e2 100644 --- a/bcs-mesos/bcs-container-executor/container/cni/cni_pod.go +++ b/bcs-mesos/bcs-container-executor/container/cni/cni_pod.go @@ -619,6 +619,7 @@ func (p *CNIPod) containersWatch(cxt context.Context) { } tick := time.NewTicker(defaultPodWatchInterval * time.Second) + defer tick.Stop() for { select { case <-cxt.Done(): diff --git a/bcs-mesos/bcs-container-executor/container/cnm/cnm_pod.go b/bcs-mesos/bcs-container-executor/container/cnm/cnm_pod.go index bc6fb7b263..8b6ba3555c 100644 --- a/bcs-mesos/bcs-container-executor/container/cnm/cnm_pod.go +++ b/bcs-mesos/bcs-container-executor/container/cnm/cnm_pod.go @@ -569,6 +569,7 @@ func (p *DockerPod) containersWatch(cxt context.Context) { } tick := time.NewTicker(defaultPodWatchInterval * time.Second) + defer tick.Stop() //total := defaultErrTolerate * len(p.runningContainer) for { select { diff --git a/bcs-mesos/bcs-container-executor/executor/executor.go b/bcs-mesos/bcs-container-executor/executor/executor.go index d39b7236f6..03f7fbddb0 100644 --- a/bcs-mesos/bcs-container-executor/executor/executor.go +++ b/bcs-mesos/bcs-container-executor/executor/executor.go @@ -343,7 +343,9 @@ func (driver *BcsExecutorDriver) Stop() (mesos.Status, error) { logs.Infoln("ExecutorDriver is under connection, wait slave reply acknowledged") //check all update info acknowledged checkTick := time.NewTicker(500 * time.Microsecond) + defer checkTick.Stop() timeoutTick := time.NewTicker(5 * time.Second) + defer timeoutTick.Stop() for driver.updates != nil && driver.connected { //if connection lost, no need to wait acknowledgement select { diff --git a/bcs-mesos/bcs-container-executor/healthcheck/cmd.go b/bcs-mesos/bcs-container-executor/healthcheck/cmd.go index 6e048c83e3..85878ff1c3 100644 --- a/bcs-mesos/bcs-container-executor/healthcheck/cmd.go +++ b/bcs-mesos/bcs-container-executor/healthcheck/cmd.go @@ -86,6 +86,7 @@ func (check *CommandChecker) Start() { check.check() tick := time.NewTicker(time.Duration(int64(check.mechanism.IntervalSeconds)) * time.Second) + defer tick.Stop() for { select { case <-check.cxt.Done(): diff --git a/bcs-mesos/bcs-container-executor/healthcheck/http.go b/bcs-mesos/bcs-container-executor/healthcheck/http.go index 2b0d813181..36ec83d735 100644 --- a/bcs-mesos/bcs-container-executor/healthcheck/http.go +++ b/bcs-mesos/bcs-container-executor/healthcheck/http.go @@ -88,6 +88,7 @@ func (check *HTTPChecker) Start() { check.check() tick := time.NewTicker(time.Duration(int64(check.mechanism.IntervalSeconds)) * time.Second) + defer tick.Stop() for { select { case <-check.cxt.Done(): diff --git a/bcs-mesos/bcs-container-executor/healthcheck/tcp.go b/bcs-mesos/bcs-container-executor/healthcheck/tcp.go index 4a9a6c7e81..b8edec2650 100644 --- a/bcs-mesos/bcs-container-executor/healthcheck/tcp.go +++ b/bcs-mesos/bcs-container-executor/healthcheck/tcp.go @@ -79,6 +79,7 @@ func (check *TCPChecker) Start() { check.check() tick := time.NewTicker(time.Duration(int64(check.mechanism.IntervalSeconds)) * time.Second) + defer tick.Stop() for { select { case <-check.cxt.Done(): diff --git a/bcs-mesos/bcs-hpacontroller/hpacontroller/controller/autoscaler.go b/bcs-mesos/bcs-hpacontroller/hpacontroller/controller/autoscaler.go index 3212b172c0..9b91402f1c 100644 --- a/bcs-mesos/bcs-hpacontroller/hpacontroller/controller/autoscaler.go +++ b/bcs-mesos/bcs-hpacontroller/hpacontroller/controller/autoscaler.go @@ -84,6 +84,7 @@ func (auto *Autoscaler) Start() error { //ticker list zk autoscalers and sync these autoscalers to workqueue func (auto *Autoscaler) tickerSyncAutoscalerQueue() { ticker := time.NewTicker(time.Second * time.Duration(auto.config.MetricsSyncPeriod)) + defer ticker.Stop() for { @@ -162,6 +163,7 @@ func (auto *Autoscaler) tickerSyncAutoscalerQueue() { func (auto *Autoscaler) tickerHandlerAutoscaler() { ticker := time.NewTicker(time.Second * time.Duration(auto.config.MetricsSyncPeriod)) + defer ticker.Stop() for { diff --git a/bcs-mesos/bcs-hpacontroller/hpacontroller/metrics/resources/resourcesCollector.go b/bcs-mesos/bcs-hpacontroller/hpacontroller/metrics/resources/resourcesCollector.go index 260bd08ae4..4a8ff3326a 100644 --- a/bcs-mesos/bcs-hpacontroller/hpacontroller/metrics/resources/resourcesCollector.go +++ b/bcs-mesos/bcs-hpacontroller/hpacontroller/metrics/resources/resourcesCollector.go @@ -116,6 +116,7 @@ func (collector *resourcesCollector) getMemoryMetricsInfo() metrics.TaskgroupMet func (collector *resourcesCollector) tickerCollectorMetrics() { ticker := time.NewTicker(time.Second * time.Duration(collector.collectMetricsWindow)) + defer ticker.Stop() for { select { diff --git a/bcs-mesos/bcs-hpacontroller/hpacontroller/scaler/bcs-mesos.go b/bcs-mesos/bcs-hpacontroller/hpacontroller/scaler/bcs-mesos.go index d26100a51b..8a77039df7 100644 --- a/bcs-mesos/bcs-hpacontroller/hpacontroller/scaler/bcs-mesos.go +++ b/bcs-mesos/bcs-hpacontroller/hpacontroller/scaler/bcs-mesos.go @@ -134,6 +134,7 @@ func (r *bcsMesosScaler) discvMesosdriver() { blog.Infof("watch mesosdriver under (%s: %s)", MesosDiscv, discvPath) tick := time.NewTicker(180 * time.Second) + defer tick.Stop() for { select { case <-tick.C: diff --git a/bcs-mesos/bcs-mesos-driver/mesosdriver/mesosdriver.go b/bcs-mesos/bcs-mesos-driver/mesosdriver/mesosdriver.go index 36093cfad1..801998af01 100644 --- a/bcs-mesos/bcs-mesos-driver/mesosdriver/mesosdriver.go +++ b/bcs-mesos/bcs-mesos-driver/mesosdriver/mesosdriver.go @@ -268,6 +268,7 @@ func (m *MesosDriver) RegDiscover() { blog.Info("DiscoverService(%s) succ", discvPath) tick := time.NewTicker(180 * time.Second) + defer tick.Stop() for { select { case <-tick.C: @@ -336,6 +337,7 @@ func (m *MesosDriver) DiscvScheduler() { blog.Infof("watch scheduler under (%s: %s), current goroutine num(%d)", MesosDiscv, discvPath, runtime.NumGoroutine()) tick := time.NewTicker(180 * time.Second) + defer tick.Stop() for { select { case <-tick.C: diff --git a/bcs-mesos/bcs-mesos-watch/app/watch.go b/bcs-mesos/bcs-mesos-watch/app/watch.go index 199d7d13c1..44c89b4a4d 100644 --- a/bcs-mesos/bcs-mesos-watch/app/watch.go +++ b/bcs-mesos/bcs-mesos-watch/app/watch.go @@ -266,6 +266,7 @@ func runServer(rdCxt context.Context, cfg *types.CmdConfig, storage storage.Stor appRole := "slave" tick := time.NewTicker(60 * time.Second) + defer tick.Stop() for { select { @@ -439,6 +440,7 @@ func RefreshDCHost(rfCxt context.Context, cfg *types.CmdConfig, storage storage. blog.Info("DiscoverService(%s) succ", discvPath) tick := time.NewTicker(120 * time.Second) + defer tick.Stop() for { select { case <-tick.C: diff --git a/bcs-mesos/bcs-mesos-watch/cluster/etcd/configmap.go b/bcs-mesos/bcs-mesos-watch/cluster/etcd/configmap.go index f3fb6fb5cc..bbafc865d8 100644 --- a/bcs-mesos/bcs-mesos-watch/cluster/etcd/configmap.go +++ b/bcs-mesos/bcs-mesos-watch/cluster/etcd/configmap.go @@ -72,6 +72,7 @@ func (watch *ConfigMapWatch) Work() { watch.ProcessAllConfigmaps() tick := time.NewTicker(12 * time.Second) + defer tick.Stop() for { select { case <-watch.cancelCxt.Done(): diff --git a/bcs-mesos/bcs-mesos-watch/cluster/etcd/deployment.go b/bcs-mesos/bcs-mesos-watch/cluster/etcd/deployment.go index d6ab97ea0e..5171593c5c 100644 --- a/bcs-mesos/bcs-mesos-watch/cluster/etcd/deployment.go +++ b/bcs-mesos/bcs-mesos-watch/cluster/etcd/deployment.go @@ -76,6 +76,7 @@ func (watch *DeploymentWatch) Work() { watch.ProcessAllDeployments() tick := time.NewTicker(10 * time.Second) + defer tick.Stop() for { select { case <-watch.cancelCxt.Done(): diff --git a/bcs-mesos/bcs-mesos-watch/cluster/etcd/endpoint.go b/bcs-mesos/bcs-mesos-watch/cluster/etcd/endpoint.go index d2f2a65616..04e3efe080 100644 --- a/bcs-mesos/bcs-mesos-watch/cluster/etcd/endpoint.go +++ b/bcs-mesos/bcs-mesos-watch/cluster/etcd/endpoint.go @@ -71,6 +71,7 @@ type EndpointWatch struct { func (watch *EndpointWatch) Work() { watch.ProcessAllEndpoints() tick := time.NewTicker(10 * time.Second) + defer tick.Stop() for { select { case <-watch.cancelCxt.Done(): diff --git a/bcs-mesos/bcs-mesos-watch/cluster/etcd/exportservice.go b/bcs-mesos/bcs-mesos-watch/cluster/etcd/exportservice.go index 103d86a6bd..6f5cc19835 100644 --- a/bcs-mesos/bcs-mesos-watch/cluster/etcd/exportservice.go +++ b/bcs-mesos/bcs-mesos-watch/cluster/etcd/exportservice.go @@ -103,6 +103,7 @@ func (watch *ExportServiceWatch) worker(cxt context.Context) { blog.Infof("ExportServiceWatch start work") tick := time.NewTicker(120 * time.Second) + defer tick.Stop() for { select { case <-cxt.Done(): diff --git a/bcs-mesos/bcs-mesos-watch/cluster/etcd/secret.go b/bcs-mesos/bcs-mesos-watch/cluster/etcd/secret.go index 405d39f392..6da7f82db2 100644 --- a/bcs-mesos/bcs-mesos-watch/cluster/etcd/secret.go +++ b/bcs-mesos/bcs-mesos-watch/cluster/etcd/secret.go @@ -72,6 +72,7 @@ func (watch *SecretWatch) Work() { watch.ProcessAllSecrets() tick := time.NewTicker(10 * time.Second) + defer tick.Stop() for { select { case <-watch.cancelCxt.Done(): diff --git a/bcs-mesos/bcs-mesos-watch/cluster/etcd/service.go b/bcs-mesos/bcs-mesos-watch/cluster/etcd/service.go index b0a140bdab..c8479956ef 100644 --- a/bcs-mesos/bcs-mesos-watch/cluster/etcd/service.go +++ b/bcs-mesos/bcs-mesos-watch/cluster/etcd/service.go @@ -71,6 +71,7 @@ func (watch *ServiceWatch) Work() { watch.ProcessAllServices() tick := time.NewTicker(8 * time.Second) + defer tick.Stop() for { select { case <-watch.cancelCxt.Done(): diff --git a/bcs-mesos/bcs-mesos-watch/cluster/mesos/app.go b/bcs-mesos/bcs-mesos-watch/cluster/mesos/app.go index bdafdbaf6f..3598249d15 100644 --- a/bcs-mesos/bcs-mesos-watch/cluster/mesos/app.go +++ b/bcs-mesos/bcs-mesos-watch/cluster/mesos/app.go @@ -140,6 +140,7 @@ func (app *AppWatch) pathWatch(cxt context.Context, path string) { app.handleAppList(cxt, path, children) tick := time.NewTicker(240 * time.Second) + defer tick.Stop() for { select { case <-tick.C: @@ -222,6 +223,7 @@ func (app *AppWatch) appNodeWatch(cxt context.Context, apppath string, ns string blog.V(3).Infof("appwatch wath app ID(%s)", ID) tick := time.NewTicker(240 * time.Second) + defer tick.Stop() for { select { case <-tick.C: diff --git a/bcs-mesos/bcs-mesos-watch/cluster/mesos/configmap.go b/bcs-mesos/bcs-mesos-watch/cluster/mesos/configmap.go index 970c5902ba..6b95d55311 100644 --- a/bcs-mesos/bcs-mesos-watch/cluster/mesos/configmap.go +++ b/bcs-mesos/bcs-mesos-watch/cluster/mesos/configmap.go @@ -89,6 +89,7 @@ type ConfigMapWatch struct { func (watch *ConfigMapWatch) Work() { watch.ProcessAllConfigmaps() tick := time.NewTicker(12 * time.Second) + defer tick.Stop() for { select { case <-watch.cancelCxt.Done(): diff --git a/bcs-mesos/bcs-mesos-watch/cluster/mesos/deployment.go b/bcs-mesos/bcs-mesos-watch/cluster/mesos/deployment.go index b75507a38e..4f734bb258 100644 --- a/bcs-mesos/bcs-mesos-watch/cluster/mesos/deployment.go +++ b/bcs-mesos/bcs-mesos-watch/cluster/mesos/deployment.go @@ -91,6 +91,7 @@ type DeploymentWatch struct { func (watch *DeploymentWatch) Work() { watch.ProcessAllDeployments() tick := time.NewTicker(10 * time.Second) + defer tick.Stop() for { select { case <-watch.cancelCxt.Done(): diff --git a/bcs-mesos/bcs-mesos-watch/cluster/mesos/endpoint.go b/bcs-mesos/bcs-mesos-watch/cluster/mesos/endpoint.go index a42a9411cc..c84be31e1d 100644 --- a/bcs-mesos/bcs-mesos-watch/cluster/mesos/endpoint.go +++ b/bcs-mesos/bcs-mesos-watch/cluster/mesos/endpoint.go @@ -69,6 +69,7 @@ type EndpointWatch struct { func (watch *EndpointWatch) Work() { watch.ProcessAllEndpoints() tick := time.NewTicker(10 * time.Second) + defer tick.Stop() for { select { case <-watch.cancelCxt.Done(): diff --git a/bcs-mesos/bcs-mesos-watch/cluster/mesos/exportservice.go b/bcs-mesos/bcs-mesos-watch/cluster/mesos/exportservice.go index e9bfc3c75b..713be253ab 100644 --- a/bcs-mesos/bcs-mesos-watch/cluster/mesos/exportservice.go +++ b/bcs-mesos/bcs-mesos-watch/cluster/mesos/exportservice.go @@ -98,6 +98,7 @@ func (watch *ExportServiceWatch) postData(data *types.BcsSyncData) { func (watch *ExportServiceWatch) worker(cxt context.Context) { tick := time.NewTicker(120 * time.Second) + defer tick.Stop() for { select { case <-cxt.Done(): diff --git a/bcs-mesos/bcs-mesos-watch/cluster/mesos/mesos.go b/bcs-mesos/bcs-mesos-watch/cluster/mesos/mesos.go index 87087f503c..fe781438ff 100644 --- a/bcs-mesos/bcs-mesos-watch/cluster/mesos/mesos.go +++ b/bcs-mesos/bcs-mesos-watch/cluster/mesos/mesos.go @@ -467,6 +467,7 @@ func (ms *MesosCluster) Run(cxt context.Context) { //ready to start zk connection monitor tick := time.NewTicker(5 * time.Second) + defer tick.Stop() for { select { case <-cxt.Done(): diff --git a/bcs-mesos/bcs-mesos-watch/cluster/mesos/secret.go b/bcs-mesos/bcs-mesos-watch/cluster/mesos/secret.go index 171b9fadb5..cfb397f5d0 100644 --- a/bcs-mesos/bcs-mesos-watch/cluster/mesos/secret.go +++ b/bcs-mesos/bcs-mesos-watch/cluster/mesos/secret.go @@ -69,6 +69,7 @@ type SecretWatch struct { func (watch *SecretWatch) Work() { watch.ProcessAllSecrets() tick := time.NewTicker(10 * time.Second) + defer tick.Stop() for { select { case <-watch.cancelCxt.Done(): diff --git a/bcs-mesos/bcs-mesos-watch/cluster/mesos/service.go b/bcs-mesos/bcs-mesos-watch/cluster/mesos/service.go index f8ef21489b..96a84b9166 100644 --- a/bcs-mesos/bcs-mesos-watch/cluster/mesos/service.go +++ b/bcs-mesos/bcs-mesos-watch/cluster/mesos/service.go @@ -68,6 +68,7 @@ type ServiceWatch struct { func (watch *ServiceWatch) Work() { watch.ProcessAllServices() tick := time.NewTicker(8 * time.Second) + defer tick.Stop() for { select { case <-watch.cancelCxt.Done(): diff --git a/bcs-mesos/bcs-mesos-watch/cluster/mesos/taskgroup.go b/bcs-mesos/bcs-mesos-watch/cluster/mesos/taskgroup.go index 332c748f36..b011e395ac 100644 --- a/bcs-mesos/bcs-mesos-watch/cluster/mesos/taskgroup.go +++ b/bcs-mesos/bcs-mesos-watch/cluster/mesos/taskgroup.go @@ -130,6 +130,7 @@ func (task *TaskGroupWatch) pathWatch(cxt context.Context, path string) { //watch children node event tick := time.NewTicker(240 * time.Second) + defer tick.Stop() for { select { case <-tick.C: @@ -208,6 +209,7 @@ func (task *TaskGroupWatch) taskGroupNodeWatch(cxt context.Context, taskpath str ID := path.Base(taskpath) tick := time.NewTicker(240 * time.Second) + defer tick.Stop() for { select { case <-tick.C: diff --git a/bcs-mesos/bcs-mesos-watch/storage/cc.go b/bcs-mesos/bcs-mesos-watch/storage/cc.go index 209bfd7712..6f287bdb06 100644 --- a/bcs-mesos/bcs-mesos-watch/storage/cc.go +++ b/bcs-mesos/bcs-mesos-watch/storage/cc.go @@ -299,6 +299,7 @@ func (cc *CCStorage) Run(cxt context.Context) error { func (cc *CCStorage) Worker() { blog.Info("CCStorage ready to go into worker!") tick := time.NewTicker(120 * time.Second) + defer tick.Stop() for { select { case <-tick.C: diff --git a/bcs-mesos/bcs-mesos-watch/storage/channelproxy.go b/bcs-mesos/bcs-mesos-watch/storage/channelproxy.go index 58ae5c79f0..3ab69f7b7d 100644 --- a/bcs-mesos/bcs-mesos-watch/storage/channelproxy.go +++ b/bcs-mesos/bcs-mesos-watch/storage/channelproxy.go @@ -32,7 +32,7 @@ type ChannelProxy struct { func (proxy *ChannelProxy) Run(cxt context.Context) { tick := time.NewTicker(300 * time.Second) - + defer tick.Stop() for { select { case <-tick.C: diff --git a/bcs-mesos/bcs-scheduler/src/manager/sched/offer/offerpool.go b/bcs-mesos/bcs-scheduler/src/manager/sched/offer/offerpool.go index 215be6f1c3..8554fe0592 100644 --- a/bcs-mesos/bcs-scheduler/src/manager/sched/offer/offerpool.go +++ b/bcs-mesos/bcs-scheduler/src/manager/sched/offer/offerpool.go @@ -609,6 +609,7 @@ func (p *offerPool) setInnerOffersAttributes(offers []*mesos.Offer) { func (p *offerPool) checkOffers() { tick := time.NewTicker(1 * time.Second) + defer tick.Stop() for { select { diff --git a/bcs-mesos/bcs-scheduler/src/manager/sched/scheduler/bcs_event.go b/bcs-mesos/bcs-scheduler/src/manager/sched/scheduler/bcs_event.go index dbe31eb420..4522f739e6 100644 --- a/bcs-mesos/bcs-scheduler/src/manager/sched/scheduler/bcs_event.go +++ b/bcs-mesos/bcs-scheduler/src/manager/sched/scheduler/bcs_event.go @@ -137,6 +137,7 @@ func (e *bcsEventManager) discvstorage() { blog.Infof("watch storage under (%s: %s), current goroutine num(%d)", e.bcsZk, discvPath, runtime.NumGoroutine()) tick := time.NewTicker(180 * time.Second) + defer tick.Stop() for { select { case <-tick.C: @@ -187,6 +188,7 @@ func (e *bcsEventManager) discvstorage() { func (e *bcsEventManager) handleEventQueue() { tick := time.NewTicker(time.Second * 10) + defer tick.Stop() var err error diff --git a/bcs-mesos/bcs-scheduler/src/manager/sched/scheduler/scheduler.go b/bcs-mesos/bcs-scheduler/src/manager/sched/scheduler/scheduler.go index 74502af51b..76e278ae21 100644 --- a/bcs-mesos/bcs-scheduler/src/manager/sched/scheduler/scheduler.go +++ b/bcs-mesos/bcs-scheduler/src/manager/sched/scheduler/scheduler.go @@ -358,6 +358,7 @@ func (s *Scheduler) discvMesos() { blog.Info("watch mesos master under (%s: %s)", MesosDiscv, discvPath) tick := time.NewTicker(120 * time.Second) + defer tick.Stop() for { select { //case <-rdCxt.Done(): @@ -585,6 +586,7 @@ func (s *Scheduler) regDiscove() { blog.Info("scheduler DiscoverService(%s:%s) succ", s.config.RegDiscvSvr, discvPath) tick := time.NewTicker(180 * time.Second) + defer tick.Stop() for { select { case <-tick.C: @@ -715,6 +717,7 @@ func (s *Scheduler) registerBCS() { blog.Info("BCS register discove path(%s) succ", discvPath) tick := time.NewTicker(180 * time.Second) + defer tick.Stop() for { select { case <-tick.C: diff --git a/bcs-mesos/bcs-scheduler/src/manager/sched/scheduler/service_mgr.go b/bcs-mesos/bcs-scheduler/src/manager/sched/scheduler/service_mgr.go index 11b577b876..efae8bcfdd 100644 --- a/bcs-mesos/bcs-scheduler/src/manager/sched/scheduler/service_mgr.go +++ b/bcs-mesos/bcs-scheduler/src/manager/sched/scheduler/service_mgr.go @@ -173,6 +173,7 @@ func (mgr *ServiceMgr) postData(data *ServiceSyncData) { // This function will process events of service add, delete and update func (mgr *ServiceMgr) Worker() { tick := time.NewTicker(300 * time.Second) + defer tick.Stop() for { select { case req := <-mgr.msgQueue: diff --git a/bcs-mesos/bcs-scheduler/src/plugin/bin/ip-resources/ipResource.go b/bcs-mesos/bcs-scheduler/src/plugin/bin/ip-resources/ipResource.go index f155eb1285..9b0730c8ac 100644 --- a/bcs-mesos/bcs-scheduler/src/plugin/bin/ip-resources/ipResource.go +++ b/bcs-mesos/bcs-scheduler/src/plugin/bin/ip-resources/ipResource.go @@ -271,6 +271,7 @@ func discvNetservice() { blog.Infof("plugin ipResources watch netservice under (%s: %s)", conf.BcsZk, discvPath) tick := time.NewTicker(180 * time.Second) + defer tick.Stop() for { select { case <-cxt.Done(): diff --git a/bcs-mesos/bcs-scheduler/src/pluginManager/plugin/dynamicPlugin/dynamicPlugin.go b/bcs-mesos/bcs-scheduler/src/pluginManager/plugin/dynamicPlugin/dynamicPlugin.go index f31c7383a2..1f4420fd97 100644 --- a/bcs-mesos/bcs-scheduler/src/pluginManager/plugin/dynamicPlugin/dynamicPlugin.go +++ b/bcs-mesos/bcs-scheduler/src/pluginManager/plugin/dynamicPlugin/dynamicPlugin.go @@ -118,6 +118,7 @@ func (p *dynamicPlugin) GetHostAttributes(para *typesplugin.HostPluginParameter) }() ticker := time.NewTicker(time.Second * time.Duration(p.timeout)) + defer ticker.Stop() select { case <-ticker.C: diff --git a/bcs-network/bcs-cloudnetwork/cloud-network-agent/controller/controller.go b/bcs-network/bcs-cloudnetwork/cloud-network-agent/controller/controller.go index 0a0cf438da..94c78eef53 100644 --- a/bcs-network/bcs-cloudnetwork/cloud-network-agent/controller/controller.go +++ b/bcs-network/bcs-cloudnetwork/cloud-network-agent/controller/controller.go @@ -493,6 +493,7 @@ func (nc *NetworkController) Run(ctx context.Context, wg *sync.WaitGroup) { } tick := time.NewTicker(time.Duration(nc.options.CheckInterval) * time.Second) + defer tick.Stop() for { select { case <-tick.C: diff --git a/bcs-network/bcs-networkpolicy/controller/networkpolicy.go b/bcs-network/bcs-networkpolicy/controller/networkpolicy.go index 4dbb8c5a04..ed08d7a159 100644 --- a/bcs-network/bcs-networkpolicy/controller/networkpolicy.go +++ b/bcs-network/bcs-networkpolicy/controller/networkpolicy.go @@ -150,6 +150,7 @@ type namedPort2eps map[string]protocol2eps func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { t := time.NewTicker(npc.syncPeriod) defer t.Stop() + defer t.Stop() defer wg.Done() blog.Info("Starting network policy controller") diff --git a/bcs-services/bcs-api/auth/bkiam/cache.go b/bcs-services/bcs-api/auth/bkiam/cache.go index 54c33d3f1e..79a28bd9a7 100644 --- a/bcs-services/bcs-api/auth/bkiam/cache.go +++ b/bcs-services/bcs-api/auth/bkiam/cache.go @@ -109,6 +109,7 @@ func (tc *TokenCache) Get(tokenKey string) (*auth.Token, error) { func (tc *TokenCache) start() { blog.Infof("TokenCache sync start") ticker := time.NewTicker(time.Duration(tc.conf.BKIamAuth.AuthTokenSyncTime) * time.Second) + defer ticker.Stop() for { select { diff --git a/bcs-services/bcs-api/pkg/server/proxier/proxier.go b/bcs-services/bcs-api/pkg/server/proxier/proxier.go index 23a89988d0..0b9e483756 100644 --- a/bcs-services/bcs-api/pkg/server/proxier/proxier.go +++ b/bcs-services/bcs-api/pkg/server/proxier/proxier.go @@ -262,6 +262,7 @@ func (f *ReverseProxyDispatcher) InitializeHandlerForCluster(clusterId string, r func (f *ReverseProxyDispatcher) StartClusterAddressesPoller(clusterId string) { refreshTicker := time.NewTicker(60 * time.Second) + defer refreshTicker.Stop() upstreamServer := f.availableSrvStore[clusterId] for { select { diff --git a/bcs-services/bcs-clb-controller/pkg/processor/processor.go b/bcs-services/bcs-clb-controller/pkg/processor/processor.go index b29f6ffc9a..33bfd36cdc 100644 --- a/bcs-services/bcs-clb-controller/pkg/processor/processor.go +++ b/bcs-services/bcs-clb-controller/pkg/processor/processor.go @@ -160,6 +160,7 @@ func (p *Processor) Init() error { func (p *Processor) Run() { updateTick := time.NewTicker(time.Second * time.Duration(p.opt.UpdatePeriod)) + defer updateTick.Stop() for { select { case <-p.stopCh: diff --git a/bcs-services/bcs-dns/plugin/bcsscheduler/controller/zkController.go b/bcs-services/bcs-dns/plugin/bcsscheduler/controller/zkController.go index e617fa1024..90d6fac7eb 100644 --- a/bcs-services/bcs-dns/plugin/bcsscheduler/controller/zkController.go +++ b/bcs-services/bcs-dns/plugin/bcsscheduler/controller/zkController.go @@ -135,6 +135,7 @@ func (ctrl *ZkController) RunController(stopCh <-chan struct{}) error { //create resync event for all data tick := time.NewTicker(time.Second * time.Duration(ctrl.resyncperiod)) + defer tick.Stop() for { select { case <-ctrl.conCxt.Done(): diff --git a/bcs-services/bcs-gw-controller/pkg/processor/processor.go b/bcs-services/bcs-gw-controller/pkg/processor/processor.go index 3ff3de649a..2b5d39af3a 100644 --- a/bcs-services/bcs-gw-controller/pkg/processor/processor.go +++ b/bcs-services/bcs-gw-controller/pkg/processor/processor.go @@ -131,6 +131,7 @@ func (p *Processor) SetUpdated() {} // Run run processor loop func (p *Processor) Run() { updateTick := time.NewTicker(time.Second * time.Duration(p.opt.UpdatePeriod)) + defer updateTick.Stop() for { select { case <-p.rootCtx.Done(): diff --git a/bcs-services/bcs-loadbalance/app/eventprocessor.go b/bcs-services/bcs-loadbalance/app/eventprocessor.go index 02d8aed5dd..316287d664 100644 --- a/bcs-services/bcs-loadbalance/app/eventprocessor.go +++ b/bcs-services/bcs-loadbalance/app/eventprocessor.go @@ -185,7 +185,9 @@ func (lp *LBEventProcessor) Start() error { // run main loop func (lp *LBEventProcessor) run() { updateTick := time.NewTicker(time.Second * time.Duration(int64(lp.config.CfgCheckPeriod))) + defer updateTick.Stop() syncTick := time.NewTicker(time.Second * time.Duration(int64(lp.config.SyncPeriod))) + defer syncTick.Stop() for { select { case <-lp.exit: diff --git a/bcs-services/bcs-loadbalance/app/reflector.go b/bcs-services/bcs-loadbalance/app/reflector.go index 376df796aa..32007a43b0 100644 --- a/bcs-services/bcs-loadbalance/app/reflector.go +++ b/bcs-services/bcs-loadbalance/app/reflector.go @@ -610,6 +610,7 @@ func (reflector *ServiceReflector) dataNodeWatch(node string) { //run run ticker for sync all data in zookeeper to local cache func (reflector *ServiceReflector) run() { tick := time.NewTicker(time.Second * time.Duration(reflector.syncPeriod)) + defer tick.Stop() blog.Infof("Entry Ticker to sync total data") for { select { diff --git a/bcs-services/bcs-loadbalance/clear/clear.go b/bcs-services/bcs-loadbalance/clear/clear.go index f79eef38cf..12774bcbd1 100644 --- a/bcs-services/bcs-loadbalance/clear/clear.go +++ b/bcs-services/bcs-loadbalance/clear/clear.go @@ -60,6 +60,7 @@ func (cm *Manager) Start() { //and cacth the signal to exit func (cm *Manager) run() { tick := time.NewTicker(time.Second * time.Duration(int64(120))) + defer tick.Stop() for { select { case <-cm.exit: diff --git a/bcs-services/bcs-logbeat-sidecar/sidecar/controller.go b/bcs-services/bcs-logbeat-sidecar/sidecar/controller.go index c04bf24f45..fd446a4284 100644 --- a/bcs-services/bcs-logbeat-sidecar/sidecar/controller.go +++ b/bcs-services/bcs-logbeat-sidecar/sidecar/controller.go @@ -273,6 +273,8 @@ func (s *SidecarController) produceContainerLogConf(c *docker.Container) { } } return + } else { + blog.Infof("container %s log config %s changed, from(%s)->to(%s)", c.ID, logConf.confPath, string(logConf.data), string(by)) } blog.Infof("container %s log config %s changed, from(%s)->to(%s)", c.ID, logConf.confPath, string(logConf.data), string(by)) } else { diff --git a/bcs-services/bcs-metricservice/pkg/watch/cluster-dynamic.go b/bcs-services/bcs-metricservice/pkg/watch/cluster-dynamic.go index 17ea22e93b..ba2dbf6ae5 100644 --- a/bcs-services/bcs-metricservice/pkg/watch/cluster-dynamic.go +++ b/bcs-services/bcs-metricservice/pkg/watch/cluster-dynamic.go @@ -28,6 +28,7 @@ import ( func (cw *ClusterWatcher) dynamicManager() { syncTick := time.NewTicker(30 * time.Second) + defer syncTick.Stop() ctx, cancel := context.WithCancel(cw.ctx) cw.syncDynamic() go cw.watchDynamic(ctx) diff --git a/bcs-services/bcs-metricservice/pkg/watch/cluster-endpoint.go b/bcs-services/bcs-metricservice/pkg/watch/cluster-endpoint.go index c08a3286fc..2974958368 100644 --- a/bcs-services/bcs-metricservice/pkg/watch/cluster-endpoint.go +++ b/bcs-services/bcs-metricservice/pkg/watch/cluster-endpoint.go @@ -24,6 +24,7 @@ import ( func (cw *ClusterWatcher) endpointManger() { syncTick := time.NewTicker(30 * time.Second) + defer syncTick.Stop() endpoints := cw.config.EndpointWatchPath event := cw.zk.Watch(endpoints) diff --git a/bcs-services/bcs-metricservice/pkg/watch/cluster-metric.go b/bcs-services/bcs-metricservice/pkg/watch/cluster-metric.go index 81dfb1711a..92d33d7bad 100644 --- a/bcs-services/bcs-metricservice/pkg/watch/cluster-metric.go +++ b/bcs-services/bcs-metricservice/pkg/watch/cluster-metric.go @@ -26,6 +26,7 @@ import ( func (cw *ClusterWatcher) metricManager() { syncTick := time.NewTicker(30 * time.Second) + defer syncTick.Stop() ctx, cancel := context.WithCancel(cw.ctx) cw.syncMetric() go cw.watchMetric(ctx) diff --git a/bcs-services/bcs-metricservice/pkg/watch/watch.go b/bcs-services/bcs-metricservice/pkg/watch/watch.go index 98289bdc11..f2a454ec4e 100644 --- a/bcs-services/bcs-metricservice/pkg/watch/watch.go +++ b/bcs-services/bcs-metricservice/pkg/watch/watch.go @@ -61,7 +61,9 @@ func (w *Watcher) manager() { blog.Infof("launch watcher manager") defer blog.Infof("shut down watcher manager") logTick := time.NewTicker(180 * time.Second) + defer logTick.Stop() syncTick := time.NewTicker(3 * time.Second) + defer syncTick.Stop() for { select { diff --git a/bcs-services/bcs-netservice/storage/zookeeper/zookeeper.go b/bcs-services/bcs-netservice/storage/zookeeper/zookeeper.go index 19051aa0e1..134f709acf 100644 --- a/bcs-services/bcs-netservice/storage/zookeeper/zookeeper.go +++ b/bcs-services/bcs-netservice/storage/zookeeper/zookeeper.go @@ -228,6 +228,7 @@ func (zks *zkStorage) RegisterAndWatch(path string, data []byte) error { go func() { tick := time.NewTicker(20 * time.Second) + defer tick.Stop() for { select { case <-tick.C: diff --git a/bcs-services/bcs-network-detection/network-detection/network-detection.go b/bcs-services/bcs-network-detection/network-detection/network-detection.go index 3aaae46e52..d280f8dde0 100644 --- a/bcs-services/bcs-network-detection/network-detection/network-detection.go +++ b/bcs-services/bcs-network-detection/network-detection/network-detection.go @@ -337,6 +337,7 @@ func (n *NetworkDetection) regDiscover() { blog.Info("DiscoverService(%s) succ", discvPath) tick := time.NewTicker(180 * time.Second) + defer tick.Stop() for { select { case <-tick.C: diff --git a/bcs-services/bcs-sd-prometheus/discovery/bcs-mesos-discovery.go b/bcs-services/bcs-sd-prometheus/discovery/bcs-mesos-discovery.go new file mode 100644 index 0000000000..cba7ccebcc --- /dev/null +++ b/bcs-services/bcs-sd-prometheus/discovery/bcs-mesos-discovery.go @@ -0,0 +1,183 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package discovery + +import ( + "fmt" + "path" + "time" + + "bk-bcs/bcs-common/common/blog" + commtypes "bk-bcs/bcs-common/common/types" + moduleDiscovery "bk-bcs/bcs-common/pkg/module-discovery" + "bk-bcs/bcs-services/bcs-sd-prometheus/types" +) + +type bcsMesosDiscovery struct { + zkAddr string + sdFilePath string + + eventHandler EventHandleFunc + moduleDiscovery moduleDiscovery.ModuleDiscovery + module string +} + +// new bcs mesos module service discovery +func NewBcsMesosDiscovery(zkAddr string, promFilePrefix string, module string) (Discovery, error) { + disc := &bcsMesosDiscovery{ + zkAddr: zkAddr, + sdFilePath: path.Join(promFilePrefix, fmt.Sprintf("%s%s", module, DiscoveryFileName)), + module: module, + } + + return disc, nil +} + +// start discovery +func (disc *bcsMesosDiscovery) Start() error { + var err error + disc.moduleDiscovery, err = moduleDiscovery.NewMesosDiscovery(disc.zkAddr) + if err != nil { + return err + } + disc.moduleDiscovery.RegisterEventFunc(disc.handleEventFunc) + go disc.syncTickerPromSdConfig() + return nil +} + +// get the discovery key +func (disc *bcsMesosDiscovery) GetDiscoveryKey() string { + return disc.module +} + +// get prometheus service discovery config +func (disc *bcsMesosDiscovery) GetPrometheusSdConfig() ([]*types.PrometheusSdConfig, error) { + promConfigs := make([]*types.PrometheusSdConfig, 0) + servs, err := disc.moduleDiscovery.GetModuleServers(disc.module) + if err != nil { + blog.Errorf("discovery %s get disc.module %s error %s", disc.module, disc.module, err.Error()) + return nil, err + } + + for _, serv := range servs { + var conf *types.PrometheusSdConfig + switch disc.module { + case commtypes.BCS_MODULE_SCHEDULER: + ser, ok := serv.(*commtypes.SchedulerServInfo) + if !ok { + blog.Errorf("discovery %s disc.module %s failed convert to SchedulerServInfo", disc.module, disc.module) + break + } + + conf = &types.PrometheusSdConfig{ + Targets: []string{fmt.Sprintf("%s:%d", ser.IP, ser.MetricPort)}, + Labels: map[string]string{ + DefaultBcsModuleLabelKey: disc.module, + }, + } + + case commtypes.BCS_MODULE_MESOSAPISERVER: + ser, ok := serv.(*commtypes.BcsMesosApiserverInfo) + if !ok { + blog.Errorf("discovery %s disc.module %s failed convert to MesosDriverServInfo", disc.module, disc.module) + break + } + + conf = &types.PrometheusSdConfig{ + Targets: []string{fmt.Sprintf("%s:%d", ser.IP, ser.MetricPort)}, + Labels: map[string]string{ + DefaultBcsModuleLabelKey: disc.module, + }, + } + + case commtypes.BCS_MODULE_MESOSDATAWATCH: + ser, ok := serv.(*commtypes.MesosDataWatchServInfo) + if !ok { + blog.Errorf("discovery %s disc.module %s failed convert to MesosDataWatchServInfo", disc.module, disc.module) + break + } + + conf = &types.PrometheusSdConfig{ + Targets: []string{fmt.Sprintf("%s:%d", ser.IP, ser.MetricPort)}, + Labels: map[string]string{ + DefaultBcsModuleLabelKey: disc.module, + }, + } + + case commtypes.BCS_MODULE_DNS: + ser, ok := serv.(*commtypes.DNSInfo) + if !ok { + blog.Errorf("discovery %s disc.module %s failed convert to DNSInfo", disc.module, disc.module) + break + } + + conf = &types.PrometheusSdConfig{ + Targets: []string{fmt.Sprintf("%s:%d", ser.IP, ser.MetricPort)}, + Labels: map[string]string{ + DefaultBcsModuleLabelKey: disc.module, + }, + } + + case commtypes.BCS_MODULE_LOADBALANCE: + ser, ok := serv.(*commtypes.LoadBalanceInfo) + if !ok { + blog.Errorf("discovery %s disc.module %s failed convert to DNSInfo", disc.module, disc.module) + break + } + + conf = &types.PrometheusSdConfig{ + Targets: []string{fmt.Sprintf("%s:%d", ser.IP, ser.MetricPort)}, + Labels: map[string]string{ + DefaultBcsModuleLabelKey: disc.module, + }, + } + + default: + blog.Errorf("discovery %s disc.module %s not found", disc.module, disc.module) + } + + if conf != nil { + promConfigs = append(promConfigs, conf) + } + + } + + return promConfigs, nil +} + +// get prometheus sd config file path +func (disc *bcsMesosDiscovery) GetPromSdConfigFile() string { + return disc.sdFilePath +} + +//register event handle function +func (disc *bcsMesosDiscovery) RegisterEventFunc(handleFunc EventHandleFunc) { + disc.eventHandler = handleFunc +} + +func (disc *bcsMesosDiscovery) handleEventFunc(module string) { + blog.Infof("discovery %s handle module %s event", disc.module, module) + disc.eventHandler(disc.GetDiscoveryKey()) +} + +func (disc *bcsMesosDiscovery) syncTickerPromSdConfig() { + ticker := time.NewTicker(time.Minute * 5) + defer ticker.Stop() + + select { + case <-ticker.C: + blog.V(3).Infof("ticker sync prometheus service discovery config") + disc.eventHandler(disc.GetDiscoveryKey()) + } +} diff --git a/bcs-services/bcs-sd-prometheus/discovery/bcs-service-discovery.go b/bcs-services/bcs-sd-prometheus/discovery/bcs-service-discovery.go new file mode 100644 index 0000000000..9d3622fee7 --- /dev/null +++ b/bcs-services/bcs-sd-prometheus/discovery/bcs-service-discovery.go @@ -0,0 +1,151 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package discovery + +import ( + "fmt" + "path" + "time" + + "bk-bcs/bcs-common/common/blog" + commtypes "bk-bcs/bcs-common/common/types" + moduleDiscovery "bk-bcs/bcs-common/pkg/module-discovery" + "bk-bcs/bcs-services/bcs-sd-prometheus/types" +) + +type bcsServiceDiscovery struct { + zkAddr string + sdFilePath string + + eventHandler EventHandleFunc + moduleDiscovery moduleDiscovery.ModuleDiscovery + module string +} + +// new bcs service module service discovery +func NewBcsServiceDiscovery(zkAddr string, promFilePrefix string, module string) (Discovery, error) { + disc := &bcsServiceDiscovery{ + zkAddr: zkAddr, + sdFilePath: path.Join(promFilePrefix, fmt.Sprintf("%s%s", module, DiscoveryFileName)), + module: module, + } + + return disc, nil +} + +// start +func (disc *bcsServiceDiscovery) Start() error { + var err error + disc.moduleDiscovery, err = moduleDiscovery.NewServiceDiscovery(disc.zkAddr) + if err != nil { + return err + } + disc.moduleDiscovery.RegisterEventFunc(disc.handleEventFunc) + go disc.syncTickerPromSdConfig() + return nil +} + +func (disc *bcsServiceDiscovery) GetDiscoveryKey() string { + return disc.module +} + +func (disc *bcsServiceDiscovery) GetPrometheusSdConfig() ([]*types.PrometheusSdConfig, error) { + promConfigs := make([]*types.PrometheusSdConfig, 0) + servs, err := disc.moduleDiscovery.GetModuleServers(disc.module) + if err != nil { + blog.Errorf("discovery %s get module %s error %s", disc.module, disc.module, err.Error()) + return nil, err + } + + for _, serv := range servs { + var conf *types.PrometheusSdConfig + switch disc.module { + case commtypes.BCS_MODULE_APISERVER: + ser, ok := serv.(*commtypes.APIServInfo) + if !ok { + blog.Errorf("discovery %s module %s failed convert to APIServInfo", disc.module, disc.module) + break + } + + conf = &types.PrometheusSdConfig{ + Targets: []string{fmt.Sprintf("%s:%d", ser.IP, ser.MetricPort)}, + Labels: map[string]string{ + DefaultBcsModuleLabelKey: disc.module, + }, + } + + case commtypes.BCS_MODULE_STORAGE: + ser, ok := serv.(*commtypes.BcsStorageInfo) + if !ok { + blog.Errorf("discovery %s module %s failed convert to BcsStorageInfo", disc.module, disc.module) + break + } + + conf = &types.PrometheusSdConfig{ + Targets: []string{fmt.Sprintf("%s:%d", ser.IP, ser.MetricPort)}, + Labels: map[string]string{ + DefaultBcsModuleLabelKey: disc.module, + }, + } + + case commtypes.BCS_MODULE_NETSERVICE: + ser, ok := serv.(*commtypes.NetServiceInfo) + if !ok { + blog.Errorf("discovery %s module %s failed convert to NetServiceInfo", disc.module, disc.module) + break + } + + conf = &types.PrometheusSdConfig{ + Targets: []string{fmt.Sprintf("%s:%d", ser.IP, ser.MetricPort)}, + Labels: map[string]string{ + DefaultBcsModuleLabelKey: disc.module, + }, + } + + default: + blog.Errorf("discovery %s module %s not found", disc.module, disc.module) + } + + if conf != nil { + promConfigs = append(promConfigs, conf) + } + + } + + return promConfigs, nil +} + +func (disc *bcsServiceDiscovery) GetPromSdConfigFile() string { + return disc.sdFilePath +} + +func (disc *bcsServiceDiscovery) RegisterEventFunc(handleFunc EventHandleFunc) { + disc.eventHandler = handleFunc +} + +func (disc *bcsServiceDiscovery) handleEventFunc(module string) { + blog.Infof("discovery %s handle module %s event", disc.module, module) + disc.eventHandler(disc.GetDiscoveryKey()) +} + +func (disc *bcsServiceDiscovery) syncTickerPromSdConfig() { + ticker := time.NewTicker(time.Minute * 5) + defer ticker.Stop() + + select { + case <-ticker.C: + blog.V(3).Infof("ticker sync prometheus service discovery config") + disc.eventHandler(disc.GetDiscoveryKey()) + } +} diff --git a/bcs-services/bcs-storage/storage/actions/v1http/alarms/utils.go b/bcs-services/bcs-storage/storage/actions/v1http/alarms/utils.go index 36d1a03a4e..4c170b0fcc 100644 --- a/bcs-services/bcs-storage/storage/actions/v1http/alarms/utils.go +++ b/bcs-services/bcs-storage/storage/actions/v1http/alarms/utils.go @@ -65,7 +65,9 @@ func cleanAlarmOutCap(maxCaps int) { clusterPool := make(map[string]int) reportTick := time.NewTicker(30 * time.Minute) + defer reportTick.Stop() refreshTick := time.NewTicker(24 * time.Hour) + defer refreshTick.Stop() for { select { case <-reportTick.C: diff --git a/bcs-services/bcs-storage/storage/actions/v1http/events/utils.go b/bcs-services/bcs-storage/storage/actions/v1http/events/utils.go index 55dac43916..9d41edcdc9 100644 --- a/bcs-services/bcs-storage/storage/actions/v1http/events/utils.go +++ b/bcs-services/bcs-storage/storage/actions/v1http/events/utils.go @@ -67,7 +67,9 @@ func cleanEventOutCap(maxCaps int) { clusterPool := make(map[string]int) reportTick := time.NewTicker(30 * time.Minute) + defer reportTick.Stop() refreshTick := time.NewTicker(24 * time.Hour) + defer refreshTick.Stop() for { select { case <-reportTick.C: diff --git a/bcs-services/bcs-user-manager/app/user-manager/v1http/permission.go b/bcs-services/bcs-user-manager/app/user-manager/v1http/permission.go index 52843f9a70..338f356ed6 100644 --- a/bcs-services/bcs-user-manager/app/user-manager/v1http/permission.go +++ b/bcs-services/bcs-user-manager/app/user-manager/v1http/permission.go @@ -104,6 +104,7 @@ func initCache() { mutex = new(sync.RWMutex) var ura []UserResourceAction ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() for { sqlstore.GCoreDB.Table("bcs_user_resource_roles").Select("bcs_user_resource_roles.user_id, bcs_user_resource_roles.resource_type, bcs_user_resource_roles.resource, bcs_roles.actions"). Joins("left join bcs_roles on bcs_user_resource_roles.role_id = bcs_roles.id").Scan(&ura)