From 3812f96cd2eb66e2ad3ce63f7878f20b6f8c3add Mon Sep 17 00:00:00 2001 From: Randy Date: Wed, 13 Apr 2022 17:22:39 +0800 Subject: [PATCH 1/9] health check --- pkg/cluster/healthcheck/healthcheck.go | 195 +++++++++++++++++++++++++ 1 file changed, 195 insertions(+) create mode 100644 pkg/cluster/healthcheck/healthcheck.go diff --git a/pkg/cluster/healthcheck/healthcheck.go b/pkg/cluster/healthcheck/healthcheck.go new file mode 100644 index 000000000..5d24e1a36 --- /dev/null +++ b/pkg/cluster/healthcheck/healthcheck.go @@ -0,0 +1,195 @@ +package healthcheck + +import ( + "github.com/apache/dubbo-go-pixiu/pkg/logger" + "github.com/apache/dubbo-go-pixiu/pkg/model" + gxtime "github.com/dubbogo/gost/time" + "net" + "runtime/debug" + "sync/atomic" + "time" +) + +// healthChecker is a basic implementation of a health checker. +// we use different implementations of types.Session to implement different health checker +type healthChecker struct { + // + checkers map[string]*sessionChecker + sessionConfig map[string]interface{} + // check config + timeout time.Duration + intervalBase time.Duration + intervalJitter time.Duration + healthyThreshold uint32 + initialDelay time.Duration + cluster *model.Cluster + unhealthyThreshold uint32 + localProcessHealthy int64 +} + +// EndpointChecker is a wrapper of types.HealthCheckSession for health check +type EndpointChecker struct { + endpoint *model.Endpoint + HealthChecker *healthChecker + // + tcpChecker *TCPChecker + resp chan checkResponse + timeout chan bool + checkID uint64 + stop chan struct{} + checkTimer *gxtime.Timer + checkTimeout *gxtime.Timer + unHealthCount uint32 + healthCount uint32 +} + +type checkResponse struct { + ID uint64 + Healthy bool +} + +func (hc *healthChecker) start() { + // each endpoint + for _, h := range hc.cluster.Endpoints { + hc.startCheck(h) + } +} + +func (hc *healthChecker) startCheck(endpoint *model.Endpoint) { + addr := endpoint.Address.GetAddress() + if _, ok := hc.checkers[addr]; !ok { + c := newChecker(endpoint, hc) + hc.checkers[addr] = c + c.Start() + atomic.AddInt64(&hc.localProcessHealthy, 1) // default host is healthy + logger.Infof("[health check] create a health check session for %s", addr) + } +} + +func newChecker(endpoint *model.Endpoint, hc *healthChecker) *EndpointChecker { + c := &EndpointChecker{ + tcpChecker: newTcpChecker(endpoint), + endpoint: endpoint, + HealthChecker: hc, + resp: make(chan checkResponse), + timeout: make(chan bool), + stop: make(chan struct{}), + } + return c +} + +func newTcpChecker(endpoint *model.Endpoint) *TCPChecker { + return &TCPChecker{ + addr: endpoint.Address.GetAddress(), + } +} + +func (c *EndpointChecker) Start() { + defer func() { + if r := recover(); r != nil { + logger.Warnf("[health check] node checker panic %v\n%s", r, string(debug.Stack())) + } + // stop all the timer when start is finished + c.checkTimer.Stop() + c.checkTimeout.Stop() + }() + c.checkTimer = gxtime.AfterFunc(c.HealthChecker.initialDelay, c.OnCheck) + for { + select { + case <-c.stop: + return + default: + // prepare a check + currentID := atomic.AddUint64(&c.checkID, 1) + select { + case <-c.stop: + return + case resp := <-c.resp: + // if the ID is not equal, means we receive a timeout for this ID, ignore the response + if resp.ID == currentID { + c.checkTimeout.Stop() + if resp.Healthy { + c.HandleSuccess() + } else { + c.HandleFailure(types.FailureActive) + } + // next health checker + c.checkTimer = gxtime.AfterFunc(c.HealthChecker.getCheckInterval(), c.OnCheck) + } + case <-c.timeout: + c.checkTimer.Stop() + c.HandleFailure(types.FailureNetwork) + // next health checker + c.checkTimer = gxtime.AfterFunc(c.HealthChecker.getCheckInterval(), c.OnCheck) + logger.Infof("[health check] receive a timeout response at id: %d", currentID) + + } + } + } +} + +func (c *EndpointChecker) Stop() { + close(c.stop) +} + +func (c *EndpointChecker) HandleSuccess() { + c.unHealthCount = 0 + changed := false + if c.endpoint.ContainHealthFlag(api.FAILED_ACTIVE_HC) { + c.healthCount++ + // check the threshold + if c.healthCount == c.HealthChecker.healthyThreshold { + changed = true + c.Host.ClearHealthFlag(api.FAILED_ACTIVE_HC) + } + } + c.HealthChecker.incHealthy(c.Host, changed) +} + +func (c *EndpointChecker) HandleFailure(reason types.FailureType) { + c.healthCount = 0 + changed := false + if !c.Host.ContainHealthFlag(api.FAILED_ACTIVE_HC) { + c.unHealthCount++ + // check the threshold + if c.unHealthCount == c.HealthChecker.unhealthyThreshold { + changed = true + c.Host.SetHealthFlag(api.FAILED_ACTIVE_HC) + } + } + c.HealthChecker.decHealthy(c.Host, reason, changed) +} + +func (c *EndpointChecker) OnCheck() { + // record current id + id := atomic.LoadUint64(&c.checkID) + c.HealthChecker.stats.attempt.Inc(1) + // start a timeout before check health + c.checkTimeout.Stop() + c.checkTimeout = utils.NewTimer(c.HealthChecker.timeout, c.OnTimeout) + c.resp <- checkResponse{ + ID: id, + Healthy: c.Session.CheckHealth(), + } +} + +func (c *EndpointChecker) OnTimeout() { + c.timeout <- true +} + +type TCPChecker struct { + addr string +} + +func (s *TCPChecker) CheckHealth() bool { + // default dial timeout, maybe already timeout by checker + conn, err := net.DialTimeout("tcp", s.addr, 30*time.Second) + if err != nil { + logger.Error("[health check] tcp checker for host %s error: %v", s.addr, err) + return false + } + conn.Close() + return true +} + +func (s *TCPChecker) OnTimeout() {} From 58561a89dd59f11c5b3ea9d59daefaa28d646279 Mon Sep 17 00:00:00 2001 From: randy Date: Thu, 14 Apr 2022 22:18:33 +0800 Subject: [PATCH 2/9] fix comments --- pkg/model/health.go | 10 ++-- pkg/upstream/healthchecker/healthchecker.go | 60 +++++++++++++++++++++ 2 files changed, 66 insertions(+), 4 deletions(-) create mode 100644 pkg/upstream/healthchecker/healthchecker.go diff --git a/pkg/model/health.go b/pkg/model/health.go index d2c18b379..a3332fc3e 100644 --- a/pkg/model/health.go +++ b/pkg/model/health.go @@ -18,11 +18,13 @@ package model // HealthCheck -type HealthCheck struct{} +type HealthCheckConfig struct { + Protocol string `json:"protocol,omitempty"` +} // HttpHealthCheck type HttpHealthCheck struct { - HealthCheck + HealthCheckConfig Host string Path string UseHttp2 bool @@ -31,14 +33,14 @@ type HttpHealthCheck struct { // GrpcHealthCheck type GrpcHealthCheck struct { - HealthCheck + HealthCheckConfig ServiceName string Authority string } // CustomHealthCheck type CustomHealthCheck struct { - HealthCheck + HealthCheckConfig Name string Config interface{} } diff --git a/pkg/upstream/healthchecker/healthchecker.go b/pkg/upstream/healthchecker/healthchecker.go new file mode 100644 index 000000000..facf9830d --- /dev/null +++ b/pkg/upstream/healthchecker/healthchecker.go @@ -0,0 +1,60 @@ +package healthchecker + +import ( + "fmt" + "github.com/apache/dubbo-go-pixiu/pkg/context/http" + "github.com/apache/dubbo-go-pixiu/pkg/model" + "github.com/pkg/errors" +) + +type ( + // HealthCheckerFactory describe health checker factory + HealthCheckerFactory interface { + CreateHealthChecker(cfg map[string]interface{}, endpoint model.Endpoint) HealthChecker + } + + // HealthChecker upstream cluster health checker + HealthChecker interface { + // CheckHealth check health + CheckHealth() bool + } +) + +var ( + healthCheckerFactoryRegistry = map[string]HealthCheckerFactory{} +) + +// Register registers health checker factory. +func RegisterHealthCheckerFactory(name string, f HealthCheckerFactory) { + if name == "" { + panic(fmt.Errorf("%T: empty name", f)) + } + + existedFactory, existed := healthCheckerFactoryRegistry[name] + if existed { + panic(fmt.Errorf("%T and %T got same factory: %s", f, existedFactory, name)) + } + + healthCheckerFactoryRegistry[name] = f +} + +// GetHttpHealthCheckerFactory get factory by kind +func GetHttpHealthCheckerFactory(name string) (HealthCheckerFactory, error) { + existedFilter, existed := healthCheckerFactoryRegistry[name] + if existed { + return existedFilter, nil + } + return nil, errors.Errorf("factory not found %s", name) +} + +// CreateHealthCheck is a extendable function that can create different health checker +// by different health check session. +// The Default session is TCPDial session +func CreateHealthCheck(cfg model.HealthCheckConfig) HealthChecker { + f, ok := GetHttpHealthCheckerFactory(cfg.Protocol) + if !ok { + // not registered, use default session factory + f = &TCPDialSessionFactory{} + } + return newHealthChecker(cfg, f) +} From 060e5ef5cfe526ed0c2dcd6470ae47d1547d351b Mon Sep 17 00:00:00 2001 From: randy Date: Sun, 24 Apr 2022 16:19:51 +0800 Subject: [PATCH 3/9] health check --- pkg/adapter/xds/cds.go | 6 +- pkg/adapter/xds/cds_test.go | 8 +- pkg/adapter/xds/xds.go | 4 +- pkg/adapter/xds/xds_test.go | 12 +- pkg/cluster/cluster.go | 41 +++++ pkg/cluster/healthcheck/healthcheck.go | 169 ++++++++++++------ pkg/cluster/healthcheck/tcp.go | 39 ++++ pkg/cluster/loadbalancer/load_balancer.go | 2 +- .../loadbalancer/rand/load_balancer_rand.go | 2 +- .../loadbalancer/roundrobin/round_robin.go | 2 +- pkg/config/config_load_test.go | 2 +- pkg/model/bootstrap.go | 10 +- pkg/model/cluster.go | 19 +- pkg/model/health.go | 13 +- pkg/server/cluster_manager.go | 32 ++-- pkg/server/cluster_manager_test.go | 4 +- samples/http/simple/pixiu/conf.yaml | 6 + 17 files changed, 270 insertions(+), 101 deletions(-) create mode 100644 pkg/cluster/cluster.go create mode 100644 pkg/cluster/healthcheck/tcp.go diff --git a/pkg/adapter/xds/cds.go b/pkg/adapter/xds/cds.go index dade74ef9..1d30cb340 100644 --- a/pkg/adapter/xds/cds.go +++ b/pkg/adapter/xds/cds.go @@ -135,8 +135,8 @@ func (c *CdsManager) removeClusters(toRemoveList map[string]struct{}) { c.removeCluster(removeClusters) } -func (c *CdsManager) makeCluster(cluster *xdspb.Cluster) *model.Cluster { - return &model.Cluster{ +func (c *CdsManager) makeCluster(cluster *xdspb.Cluster) *model.ClusterConfig { + return &model.ClusterConfig{ Name: cluster.Name, TypeStr: cluster.TypeStr, Type: c.makeClusterType(cluster), @@ -179,7 +179,7 @@ func (c *CdsManager) makeAddress(endpoint *xdspb.Endpoint) model.SocketAddress { } } -func (c *CdsManager) makeHealthChecks(checks []*xdspb.HealthCheck) (result []model.HealthCheck) { +func (c *CdsManager) makeHealthChecks(checks []*xdspb.HealthCheck) (result []model.HealthCheckConfig) { //todo implement me after fix model.HealthCheck type define //result = make([]model.HealthCheck, 0, len(checks)) //for _, check := range checks { diff --git a/pkg/adapter/xds/cds_test.go b/pkg/adapter/xds/cds_test.go index 930e5ce27..49ba4072d 100644 --- a/pkg/adapter/xds/cds_test.go +++ b/pkg/adapter/xds/cds_test.go @@ -88,8 +88,8 @@ func TestCdsManager_Fetch(t *testing.T) { var fetchResult []*apiclient.ProtoAny var fetchError error var cluster = map[string]struct{}{} - var updateCluster *model.Cluster - var addCluster *model.Cluster + var updateCluster *model.ClusterConfig + var addCluster *model.ClusterConfig xdsConfig := getCdsConfig() //var deltaResult chan *apiclient.DeltaResources //var deltaErr error @@ -103,10 +103,10 @@ func TestCdsManager_Fetch(t *testing.T) { _, ok := cluster[clusterName] return ok }) - supermonkey.Patch((*server.ClusterManager).UpdateCluster, func(_ *server.ClusterManager, new *model.Cluster) { + supermonkey.Patch((*server.ClusterManager).UpdateCluster, func(_ *server.ClusterManager, new *model.ClusterConfig) { updateCluster = new }) - supermonkey.Patch((*server.ClusterManager).AddCluster, func(_ *server.ClusterManager, c *model.Cluster) { + supermonkey.Patch((*server.ClusterManager).AddCluster, func(_ *server.ClusterManager, c *model.ClusterConfig) { addCluster = c }) supermonkey.Patch((*server.ClusterManager).RemoveCluster, func(_ *server.ClusterManager, names []string) { diff --git a/pkg/adapter/xds/xds.go b/pkg/adapter/xds/xds.go index 6094c5c8e..e750f437e 100644 --- a/pkg/adapter/xds/xds.go +++ b/pkg/adapter/xds/xds.go @@ -79,7 +79,7 @@ type ( GrpcCluster struct { name string //cluster name - config *model.Cluster + config *model.ClusterConfig once sync.Once conn *grpc.ClientConn } @@ -95,7 +95,7 @@ func (g *GrpcClusterManager) GetGrpcCluster(name string) (apiclient.GrpcCluster, grpcCluster := load.(*GrpcCluster) // grpcClusterManager only return grpcCluster, nil } - var clusterCfg *model.Cluster + var clusterCfg *model.ClusterConfig for _, cfg := range g.store.Config { if cfg.Name == name { clusterCfg = cfg diff --git a/pkg/adapter/xds/xds_test.go b/pkg/adapter/xds/xds_test.go index c2e581b6f..321af27bd 100644 --- a/pkg/adapter/xds/xds_test.go +++ b/pkg/adapter/xds/xds_test.go @@ -43,7 +43,7 @@ import ( ) func TestGrpcClusterManager_GetGrpcCluster(t *testing.T) { - cluster := &model.Cluster{ + cluster := &model.ClusterConfig{ Name: "cluster-1", TypeStr: "GRPC", Endpoints: []*model.Endpoint{ @@ -71,14 +71,14 @@ func TestGrpcClusterManager_GetGrpcCluster(t *testing.T) { {"test-simple", fields{ clusters: &sync.Map{}, store: &server.ClusterStore{ - Config: []*model.Cluster{cluster}, + Config: []*model.ClusterConfig{cluster}, Version: 1, }, }, args{name: "cluster-1"}, nil}, {"test-not-exist", fields{ clusters: &sync.Map{}, store: &server.ClusterStore{ - Config: []*model.Cluster{cluster}, + Config: []*model.ClusterConfig{cluster}, Version: 1, }, }, args{name: "cluster-2"}, ErrClusterNotFound}, @@ -106,7 +106,7 @@ func TestGrpcClusterManager_GetGrpcCluster(t *testing.T) { } func TestGrpcCluster_GetConnect(t *testing.T) { - cluster := &model.Cluster{ + cluster := &model.ClusterConfig{ Name: "cluster-1", TypeStr: "GRPC", Endpoints: []*model.Endpoint{ @@ -150,7 +150,7 @@ func TestGrpcCluster_GetConnect(t *testing.T) { } func TestAdapter_createApiManager(t *testing.T) { - cluster := &model.Cluster{ + cluster := &model.ClusterConfig{ Name: "cluster-1", TypeStr: "GRPC", Endpoints: []*model.Endpoint{ @@ -200,7 +200,7 @@ func TestAdapter_createApiManager(t *testing.T) { }) monkey.Patch((*server.ClusterManager).CloneStore, func(_ *server.ClusterManager) (*server.ClusterStore, error) { return &server.ClusterStore{ - Config: []*model.Cluster{cluster}, + Config: []*model.ClusterConfig{cluster}, Version: 1, }, nil }) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go new file mode 100644 index 000000000..98057db94 --- /dev/null +++ b/pkg/cluster/cluster.go @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster + +import ( + "github.com/apache/dubbo-go-pixiu/pkg/cluster/healthcheck" + "github.com/apache/dubbo-go-pixiu/pkg/model" +) + +type Cluster struct { + HealthCheck *healthcheck.HealthChecker + Config *model.ClusterConfig +} + +func NewCluster(clusterConfig *model.ClusterConfig) *Cluster { + c := &Cluster{ + Config: clusterConfig, + } + + // only handle one health checker + if len(c.Config.HealthChecks) != 0 { + c.HealthCheck = healthcheck.CreateHealthCheck(clusterConfig, c.Config.HealthChecks[0]) + c.HealthCheck.Start() + } + return c +} diff --git a/pkg/cluster/healthcheck/healthcheck.go b/pkg/cluster/healthcheck/healthcheck.go index 5d24e1a36..d4b1d9462 100644 --- a/pkg/cluster/healthcheck/healthcheck.go +++ b/pkg/cluster/healthcheck/healthcheck.go @@ -1,20 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package healthcheck import ( "github.com/apache/dubbo-go-pixiu/pkg/logger" "github.com/apache/dubbo-go-pixiu/pkg/model" gxtime "github.com/dubbogo/gost/time" - "net" "runtime/debug" "sync/atomic" "time" ) -// healthChecker is a basic implementation of a health checker. -// we use different implementations of types.Session to implement different health checker -type healthChecker struct { - // - checkers map[string]*sessionChecker +const ( + DefaultTimeout = time.Second + DefaultInterval = 15 * time.Second + DefaultIntervalJitter = 5 * time.Millisecond + + DefaultHealthyThreshold uint32 = 1 + DefaultUnhealthyThreshold uint32 = 1 + DefaultFirstInterval = time.Second +) + +type HealthChecker struct { + checkers map[string]*EndpointChecker sessionConfig map[string]interface{} // check config timeout time.Duration @@ -22,7 +44,7 @@ type healthChecker struct { intervalJitter time.Duration healthyThreshold uint32 initialDelay time.Duration - cluster *model.Cluster + cluster *model.ClusterConfig unhealthyThreshold uint32 localProcessHealthy int64 } @@ -30,7 +52,7 @@ type healthChecker struct { // EndpointChecker is a wrapper of types.HealthCheckSession for health check type EndpointChecker struct { endpoint *model.Endpoint - HealthChecker *healthChecker + HealthChecker *HealthChecker // tcpChecker *TCPChecker resp chan checkResponse @@ -41,6 +63,7 @@ type EndpointChecker struct { checkTimeout *gxtime.Timer unHealthCount uint32 healthCount uint32 + threshold uint32 } type checkResponse struct { @@ -48,14 +71,56 @@ type checkResponse struct { Healthy bool } -func (hc *healthChecker) start() { +func CreateHealthCheck(cluster *model.ClusterConfig, cfg model.HealthCheckConfig) *HealthChecker { + timeout := cfg.TimeoutConfig + if cfg.TimeoutConfig == 0 { + timeout = DefaultTimeout + } + interval := cfg.IntervalConfig + if cfg.IntervalConfig == 0 { + interval = DefaultInterval + } + unhealthyThreshold := cfg.UnhealthyThreshold + if unhealthyThreshold == 0 { + unhealthyThreshold = DefaultUnhealthyThreshold + } + healthyThreshold := cfg.HealthyThreshold + if healthyThreshold == 0 { + healthyThreshold = DefaultHealthyThreshold + } + intervalJitter := cfg.IntervalJitterConfig + if intervalJitter == 0 { + intervalJitter = DefaultIntervalJitter + } + initialDelay := DefaultFirstInterval + if cfg.InitialDelaySeconds.Microseconds() > 0 { + initialDelay = cfg.InitialDelaySeconds + } + + hc := &HealthChecker{ + // cfg + sessionConfig: cfg.SessionConfig, + cluster: cluster, + timeout: timeout, + intervalBase: interval, + intervalJitter: intervalJitter, + healthyThreshold: healthyThreshold, + unhealthyThreshold: unhealthyThreshold, + initialDelay: initialDelay, + checkers: make(map[string]*EndpointChecker), + } + + return hc +} + +func (hc *HealthChecker) Start() { // each endpoint for _, h := range hc.cluster.Endpoints { hc.startCheck(h) } } -func (hc *healthChecker) startCheck(endpoint *model.Endpoint) { +func (hc *HealthChecker) startCheck(endpoint *model.Endpoint) { addr := endpoint.Address.GetAddress() if _, ok := hc.checkers[addr]; !ok { c := newChecker(endpoint, hc) @@ -66,7 +131,7 @@ func (hc *healthChecker) startCheck(endpoint *model.Endpoint) { } } -func newChecker(endpoint *model.Endpoint, hc *healthChecker) *EndpointChecker { +func newChecker(endpoint *model.Endpoint, hc *HealthChecker) *EndpointChecker { c := &EndpointChecker{ tcpChecker: newTcpChecker(endpoint), endpoint: endpoint, @@ -84,6 +149,11 @@ func newTcpChecker(endpoint *model.Endpoint) *TCPChecker { } } +func (hc *HealthChecker) getCheckInterval() time.Duration { + interval := hc.intervalBase + return interval +} + func (c *EndpointChecker) Start() { defer func() { if r := recover(); r != nil { @@ -105,20 +175,19 @@ func (c *EndpointChecker) Start() { case <-c.stop: return case resp := <-c.resp: - // if the ID is not equal, means we receive a timeout for this ID, ignore the response if resp.ID == currentID { c.checkTimeout.Stop() if resp.Healthy { c.HandleSuccess() } else { - c.HandleFailure(types.FailureActive) + c.HandleFailure(false) } // next health checker c.checkTimer = gxtime.AfterFunc(c.HealthChecker.getCheckInterval(), c.OnCheck) } case <-c.timeout: c.checkTimer.Stop() - c.HandleFailure(types.FailureNetwork) + c.HandleFailure(true) // next health checker c.checkTimer = gxtime.AfterFunc(c.HealthChecker.getCheckInterval(), c.OnCheck) logger.Infof("[health check] receive a timeout response at id: %d", currentID) @@ -134,62 +203,54 @@ func (c *EndpointChecker) Stop() { func (c *EndpointChecker) HandleSuccess() { c.unHealthCount = 0 - changed := false - if c.endpoint.ContainHealthFlag(api.FAILED_ACTIVE_HC) { - c.healthCount++ - // check the threshold - if c.healthCount == c.HealthChecker.healthyThreshold { - changed = true - c.Host.ClearHealthFlag(api.FAILED_ACTIVE_HC) - } + c.healthCount++ + if c.healthCount > c.threshold { + c.handleHealth() + } +} + +func (c *EndpointChecker) HandleFailure(isTimeout bool) { + if isTimeout { + c.HandleTimeout() + } else { + c.handleUnHealth() } - c.HealthChecker.incHealthy(c.Host, changed) } -func (c *EndpointChecker) HandleFailure(reason types.FailureType) { +func (c *EndpointChecker) HandleTimeout() { c.healthCount = 0 - changed := false - if !c.Host.ContainHealthFlag(api.FAILED_ACTIVE_HC) { - c.unHealthCount++ - // check the threshold - if c.unHealthCount == c.HealthChecker.unhealthyThreshold { - changed = true - c.Host.SetHealthFlag(api.FAILED_ACTIVE_HC) - } + c.unHealthCount++ + if c.unHealthCount > c.threshold { + c.handleUnHealth() } - c.HealthChecker.decHealthy(c.Host, reason, changed) +} + +func (c *EndpointChecker) handleHealth() { + c.healthCount = 0 + c.unHealthCount = 0 + c.endpoint.Healthy = true +} + +func (c *EndpointChecker) handleUnHealth() { + c.healthCount = 0 + c.unHealthCount = 0 + c.endpoint.Healthy = false } func (c *EndpointChecker) OnCheck() { // record current id id := atomic.LoadUint64(&c.checkID) - c.HealthChecker.stats.attempt.Inc(1) // start a timeout before check health - c.checkTimeout.Stop() - c.checkTimeout = utils.NewTimer(c.HealthChecker.timeout, c.OnTimeout) + if c.checkTimeout != nil { + c.checkTimeout.Stop() + } + c.checkTimeout = gxtime.AfterFunc(c.HealthChecker.timeout, c.OnTimeout) c.resp <- checkResponse{ ID: id, - Healthy: c.Session.CheckHealth(), + Healthy: c.tcpChecker.CheckHealth(), } } func (c *EndpointChecker) OnTimeout() { c.timeout <- true } - -type TCPChecker struct { - addr string -} - -func (s *TCPChecker) CheckHealth() bool { - // default dial timeout, maybe already timeout by checker - conn, err := net.DialTimeout("tcp", s.addr, 30*time.Second) - if err != nil { - logger.Error("[health check] tcp checker for host %s error: %v", s.addr, err) - return false - } - conn.Close() - return true -} - -func (s *TCPChecker) OnTimeout() {} diff --git a/pkg/cluster/healthcheck/tcp.go b/pkg/cluster/healthcheck/tcp.go new file mode 100644 index 000000000..a2c264106 --- /dev/null +++ b/pkg/cluster/healthcheck/tcp.go @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package healthcheck + +import ( + "github.com/apache/dubbo-go-pixiu/pkg/logger" + "net" + "time" +) + +type TCPChecker struct { + addr string +} + +func (s *TCPChecker) CheckHealth() bool { + conn, err := net.DialTimeout("tcp", s.addr, 30*time.Second) + if err != nil { + logger.Error("[health check] tcp checker for host %s error: %v", s.addr, err) + return false + } + conn.Close() + return true +} + +func (s *TCPChecker) OnTimeout() {} diff --git a/pkg/cluster/loadbalancer/load_balancer.go b/pkg/cluster/loadbalancer/load_balancer.go index fb2e3e3fb..eb7d6e98e 100644 --- a/pkg/cluster/loadbalancer/load_balancer.go +++ b/pkg/cluster/loadbalancer/load_balancer.go @@ -22,7 +22,7 @@ import ( ) type LoadBalancer interface { - Handler(c *model.Cluster) *model.Endpoint + Handler(c *model.ClusterConfig) *model.Endpoint } // LoadBalancerStrategy load balancer strategy mode diff --git a/pkg/cluster/loadbalancer/rand/load_balancer_rand.go b/pkg/cluster/loadbalancer/rand/load_balancer_rand.go index 1c7899c6e..fc7a83521 100644 --- a/pkg/cluster/loadbalancer/rand/load_balancer_rand.go +++ b/pkg/cluster/loadbalancer/rand/load_balancer_rand.go @@ -32,6 +32,6 @@ func init() { type Rand struct{} -func (Rand) Handler(c *model.Cluster) *model.Endpoint { +func (Rand) Handler(c *model.ClusterConfig) *model.Endpoint { return c.Endpoints[rand.Intn(len(c.Endpoints))] } diff --git a/pkg/cluster/loadbalancer/roundrobin/round_robin.go b/pkg/cluster/loadbalancer/roundrobin/round_robin.go index 138b62fa6..fb0584492 100644 --- a/pkg/cluster/loadbalancer/roundrobin/round_robin.go +++ b/pkg/cluster/loadbalancer/roundrobin/round_robin.go @@ -28,7 +28,7 @@ func init() { type RoundRobin struct{} -func (RoundRobin) Handler(c *model.Cluster) *model.Endpoint { +func (RoundRobin) Handler(c *model.ClusterConfig) *model.Endpoint { lens := len(c.Endpoints) if c.PrePickEndpointIndex >= lens { c.PrePickEndpointIndex = 0 diff --git a/pkg/config/config_load_test.go b/pkg/config/config_load_test.go index b83d6bbb4..2ab0b0d1b 100644 --- a/pkg/config/config_load_test.go +++ b/pkg/config/config_load_test.go @@ -107,7 +107,7 @@ func TestMain(m *testing.M) { }, }, }, - Clusters: []*model.Cluster{ + Clusters: []*model.ClusterConfig{ { Name: "test_dubbo", TypeStr: "EDS", diff --git a/pkg/model/bootstrap.go b/pkg/model/bootstrap.go index 7f1405218..f33fb5005 100644 --- a/pkg/model/bootstrap.go +++ b/pkg/model/bootstrap.go @@ -60,11 +60,11 @@ func (bs *Bootstrap) ExistCluster(name string) bool { // StaticResources type StaticResources struct { - Listeners []*Listener `yaml:"listeners" json:"listeners" mapstructure:"listeners"` - Clusters []*Cluster `yaml:"clusters" json:"clusters" mapstructure:"clusters"` - Adapters []*Adapter `yaml:"adapters" json:"adapters" mapstructure:"adapters"` - ShutdownConfig *ShutdownConfig `yaml:"shutdown_config" json:"shutdown_config" mapstructure:"shutdown_config"` - PprofConf PprofConf `yaml:"pprofConf" json:"pprofConf" mapstructure:"pprofConf"` + Listeners []*Listener `yaml:"listeners" json:"listeners" mapstructure:"listeners"` + Clusters []*ClusterConfig `yaml:"clusters" json:"clusters" mapstructure:"clusters"` + Adapters []*Adapter `yaml:"adapters" json:"adapters" mapstructure:"adapters"` + ShutdownConfig *ShutdownConfig `yaml:"shutdown_config" json:"shutdown_config" mapstructure:"shutdown_config"` + PprofConf PprofConf `yaml:"pprofConf" json:"pprofConf" mapstructure:"pprofConf"` } // DynamicResources config the dynamic resource source diff --git a/pkg/model/cluster.go b/pkg/model/cluster.go index fdf4f22b0..c31b4815a 100644 --- a/pkg/model/cluster.go +++ b/pkg/model/cluster.go @@ -46,15 +46,15 @@ var ( ) type ( - // Cluster a single upstream cluster - Cluster struct { - Name string `yaml:"name" json:"name"` // Name the cluster unique name - TypeStr string `yaml:"type" json:"type"` // Type the cluster discovery type string value - Type DiscoveryType `yaml:"-" json:"-"` // Type the cluster discovery type - EdsClusterConfig EdsClusterConfig `yaml:"eds_cluster_config" json:"eds_cluster_config" mapstructure:"eds_cluster_config"` - LbStr LbPolicyType `yaml:"lb_policy" json:"lb_policy"` // Lb the cluster select node used loadBalance policy - HealthChecks []HealthCheck `yaml:"health_checks" json:"health_checks"` - Endpoints []*Endpoint `yaml:"endpoints" json:"endpoints"` + // ClusterConfig a single upstream cluster + ClusterConfig struct { + Name string `yaml:"name" json:"name"` // Name the cluster unique name + TypeStr string `yaml:"type" json:"type"` // Type the cluster discovery type string value + Type DiscoveryType `yaml:"-" json:"-"` // Type the cluster discovery type + EdsClusterConfig EdsClusterConfig `yaml:"eds_cluster_config" json:"eds_cluster_config" mapstructure:"eds_cluster_config"` + LbStr LbPolicyType `yaml:"lb_policy" json:"lb_policy"` // Lb the cluster select node used loadBalance policy + HealthChecks []HealthCheckConfig `yaml:"health_checks" json:"health_checks"` + Endpoints []*Endpoint `yaml:"endpoints" json:"endpoints"` PrePickEndpointIndex int } @@ -82,5 +82,6 @@ type ( Name string `yaml:"name" json:"name"` // Name the cluster unique name Address SocketAddress `yaml:"socket_address" json:"socket_address" mapstructure:"socket_address"` // Address socket address Metadata map[string]string `yaml:"meta" json:"meta"` // Metadata extra info such as label or other meta data + Healthy bool } ) diff --git a/pkg/model/health.go b/pkg/model/health.go index a3332fc3e..c0b554c66 100644 --- a/pkg/model/health.go +++ b/pkg/model/health.go @@ -17,9 +17,20 @@ package model +import "time" + // HealthCheck type HealthCheckConfig struct { - Protocol string `json:"protocol,omitempty"` + Protocol string `json:"protocol,omitempty"` + TimeoutConfig time.Duration `json:"timeout,omitempty"` + IntervalConfig time.Duration `json:"interval,omitempty"` + IntervalJitterConfig time.Duration `json:"interval_jitter,omitempty"` + InitialDelaySeconds time.Duration `json:"initial_delay_seconds,omitempty"` + HealthyThreshold uint32 `json:"healthy_threshold,omitempty"` + UnhealthyThreshold uint32 `json:"unhealthy_threshold,omitempty"` + ServiceName string `json:"service_name,omitempty"` + SessionConfig map[string]interface{} `json:"check_config,omitempty"` + CommonCallbacks []string `json:"common_callbacks,omitempty"` } // HttpHealthCheck diff --git a/pkg/server/cluster_manager.go b/pkg/server/cluster_manager.go index 9933ff57c..0efc5cf44 100644 --- a/pkg/server/cluster_manager.go +++ b/pkg/server/cluster_manager.go @@ -18,6 +18,7 @@ package server import ( + "github.com/apache/dubbo-go-pixiu/pkg/cluster" "sync" "sync/atomic" ) @@ -34,21 +35,30 @@ type ( rw sync.RWMutex store *ClusterStore - //cConfig []*model.Cluster + //cConfig []*model.ClusterConfig } // ClusterStore store for cluster array ClusterStore struct { - Config []*model.Cluster `yaml:"config" json:"config"` - Version int32 `yaml:"version" json:"version"` + Config []*model.ClusterConfig `yaml:"config" json:"config"` + Version int32 `yaml:"version" json:"version"` + clustersMap map[string]*cluster.Cluster } ) func CreateDefaultClusterManager(bs *model.Bootstrap) *ClusterManager { - return &ClusterManager{store: &ClusterStore{Config: bs.StaticResources.Clusters}} + return &ClusterManager{store: newClusterStore(bs)} } -func (cm *ClusterManager) AddCluster(c *model.Cluster) { +func newClusterStore(bs *model.Bootstrap) *ClusterStore { + store := &ClusterStore{} + for _, cluster := range bs.StaticResources.Clusters { + store.AddCluster(cluster) + } + return store +} + +func (cm *ClusterManager) AddCluster(c *model.ClusterConfig) { cm.rw.Lock() defer cm.rw.Unlock() @@ -56,7 +66,7 @@ func (cm *ClusterManager) AddCluster(c *model.Cluster) { cm.store.AddCluster(c) } -func (cm *ClusterManager) UpdateCluster(new *model.Cluster) { +func (cm *ClusterManager) UpdateCluster(new *model.ClusterConfig) { cm.rw.Lock() defer cm.rw.Unlock() @@ -127,7 +137,7 @@ func (cm *ClusterManager) PickEndpoint(clusterName string) *model.Endpoint { return nil } -func pickOneEndpoint(c *model.Cluster) *model.Endpoint { +func pickOneEndpoint(c *model.ClusterConfig) *model.Endpoint { if c.Endpoints == nil || len(c.Endpoints) == 0 { return nil } @@ -174,12 +184,12 @@ func (cm *ClusterManager) HasCluster(clusterName string) bool { return cm.store.HasCluster(clusterName) } -func (s *ClusterStore) AddCluster(c *model.Cluster) { - +func (s *ClusterStore) AddCluster(c *model.ClusterConfig) { s.Config = append(s.Config, c) + s.clustersMap[c.Name] = cluster.NewCluster(c) } -func (s *ClusterStore) UpdateCluster(new *model.Cluster) { +func (s *ClusterStore) UpdateCluster(new *model.ClusterConfig) { for i, c := range s.Config { if c.Name == new.Name { @@ -210,7 +220,7 @@ func (s *ClusterStore) SetEndpoint(clusterName string, endpoint *model.Endpoint) } // cluster create - c := &model.Cluster{Name: clusterName, LbStr: model.LoadBalancerRoundRobin, Endpoints: []*model.Endpoint{endpoint}} + c := &model.ClusterConfig{Name: clusterName, LbStr: model.LoadBalancerRoundRobin, Endpoints: []*model.Endpoint{endpoint}} // not call AddCluster, because lock is not reenter s.Config = append(s.Config, c) } diff --git a/pkg/server/cluster_manager_test.go b/pkg/server/cluster_manager_test.go index 33d561a36..6d0680fff 100644 --- a/pkg/server/cluster_manager_test.go +++ b/pkg/server/cluster_manager_test.go @@ -32,7 +32,7 @@ import ( func TestClusterManager(t *testing.T) { bs := &model.Bootstrap{ StaticResources: model.StaticResources{ - Clusters: []*model.Cluster{ + Clusters: []*model.ClusterConfig{ { Name: "test", Endpoints: []*model.Endpoint{ @@ -49,7 +49,7 @@ func TestClusterManager(t *testing.T) { cm := CreateDefaultClusterManager(bs) assert.Equal(t, len(cm.store.Config), 1) - cm.AddCluster(&model.Cluster{ + cm.AddCluster(&model.ClusterConfig{ Name: "test2", Endpoints: []*model.Endpoint{ { diff --git a/samples/http/simple/pixiu/conf.yaml b/samples/http/simple/pixiu/conf.yaml index ad238c180..827bd7e80 100644 --- a/samples/http/simple/pixiu/conf.yaml +++ b/samples/http/simple/pixiu/conf.yaml @@ -61,6 +61,12 @@ static_resources: socket_address: address: 127.0.0.1 port: 1314 + health_checks: + - protocol: "tcp" + timeout: 100 + interval: 100 + healthy_threshold: 4 + unhealthy_threshold: 4 shutdown_config: timeout: "60s" step_timeout: "10s" From ce5d79dbd00f27b1d99f5fe4b9146c7390f7ba61 Mon Sep 17 00:00:00 2001 From: randy Date: Sun, 1 May 2022 18:02:33 +0800 Subject: [PATCH 4/9] health check --- pkg/cluster/cluster.go | 18 +++ pkg/cluster/healthcheck/healthcheck.go | 118 +++++++++++------- pkg/cluster/healthcheck/tcp.go | 7 +- .../loadbalancer/rand/load_balancer_rand.go | 2 +- .../loadbalancer/roundrobin/round_robin.go | 5 +- pkg/model/cluster.go | 21 +++- pkg/model/health.go | 21 ++-- pkg/server/cluster_manager.go | 17 +++ samples/http/simple/pixiu/conf.yaml | 4 +- 9 files changed, 142 insertions(+), 71 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 98057db94..173cd26e5 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -39,3 +39,21 @@ func NewCluster(clusterConfig *model.ClusterConfig) *Cluster { } return c } + +func (c *Cluster) Stop() { + if c.HealthCheck != nil { + c.HealthCheck.Stop() + } +} + +func (c *Cluster) RemoveEndpoint(endpoint *model.Endpoint) { + if c.HealthCheck != nil { + c.HealthCheck.StopOne(endpoint) + } +} + +func (c *Cluster) AddEndpoint(endpoint *model.Endpoint) { + if c.HealthCheck != nil { + c.HealthCheck.StartOne(endpoint) + } +} diff --git a/pkg/cluster/healthcheck/healthcheck.go b/pkg/cluster/healthcheck/healthcheck.go index d4b1d9462..a8b3f435d 100644 --- a/pkg/cluster/healthcheck/healthcheck.go +++ b/pkg/cluster/healthcheck/healthcheck.go @@ -17,36 +17,39 @@ package healthcheck import ( - "github.com/apache/dubbo-go-pixiu/pkg/logger" - "github.com/apache/dubbo-go-pixiu/pkg/model" - gxtime "github.com/dubbogo/gost/time" "runtime/debug" "sync/atomic" "time" ) +import ( + gxtime "github.com/dubbogo/gost/time" +) + +import ( + "github.com/apache/dubbo-go-pixiu/pkg/logger" + "github.com/apache/dubbo-go-pixiu/pkg/model" +) + const ( - DefaultTimeout = time.Second - DefaultInterval = 15 * time.Second - DefaultIntervalJitter = 5 * time.Millisecond + DefaultTimeout = time.Second + DefaultInterval = 3 * time.Second - DefaultHealthyThreshold uint32 = 1 - DefaultUnhealthyThreshold uint32 = 1 - DefaultFirstInterval = time.Second + DefaultHealthyThreshold uint32 = 5 + DefaultUnhealthyThreshold uint32 = 5 + DefaultFirstInterval = 5 * time.Second ) type HealthChecker struct { checkers map[string]*EndpointChecker sessionConfig map[string]interface{} // check config - timeout time.Duration - intervalBase time.Duration - intervalJitter time.Duration - healthyThreshold uint32 - initialDelay time.Duration - cluster *model.ClusterConfig - unhealthyThreshold uint32 - localProcessHealthy int64 + timeout time.Duration + intervalBase time.Duration + healthyThreshold uint32 + initialDelay time.Duration + cluster *model.ClusterConfig + unhealthyThreshold uint32 } // EndpointChecker is a wrapper of types.HealthCheckSession for health check @@ -72,14 +75,25 @@ type checkResponse struct { } func CreateHealthCheck(cluster *model.ClusterConfig, cfg model.HealthCheckConfig) *HealthChecker { - timeout := cfg.TimeoutConfig - if cfg.TimeoutConfig == 0 { + + timeout, err := time.ParseDuration(cfg.TimeoutConfig) + if err != nil { + logger.Infof("[health check] timeout parse duration error %s", cfg.TimeoutConfig) timeout = DefaultTimeout } - interval := cfg.IntervalConfig - if cfg.IntervalConfig == 0 { + + interval, err := time.ParseDuration(cfg.IntervalConfig) + if err != nil { + logger.Infof("[health check] interval parse duration error %s", cfg.TimeoutConfig) interval = DefaultInterval } + + initialDelay, err := time.ParseDuration(cfg.IntervalConfig) + if err != nil { + logger.Infof("[health check] initialDelay parse duration error %s", cfg.TimeoutConfig) + initialDelay = DefaultFirstInterval + } + unhealthyThreshold := cfg.UnhealthyThreshold if unhealthyThreshold == 0 { unhealthyThreshold = DefaultUnhealthyThreshold @@ -88,22 +102,12 @@ func CreateHealthCheck(cluster *model.ClusterConfig, cfg model.HealthCheckConfig if healthyThreshold == 0 { healthyThreshold = DefaultHealthyThreshold } - intervalJitter := cfg.IntervalJitterConfig - if intervalJitter == 0 { - intervalJitter = DefaultIntervalJitter - } - initialDelay := DefaultFirstInterval - if cfg.InitialDelaySeconds.Microseconds() > 0 { - initialDelay = cfg.InitialDelaySeconds - } hc := &HealthChecker{ - // cfg sessionConfig: cfg.SessionConfig, cluster: cluster, timeout: timeout, intervalBase: interval, - intervalJitter: intervalJitter, healthyThreshold: healthyThreshold, unhealthyThreshold: unhealthyThreshold, initialDelay: initialDelay, @@ -120,20 +124,42 @@ func (hc *HealthChecker) Start() { } } +func (hc *HealthChecker) Stop() { + for _, h := range hc.cluster.Endpoints { + hc.stopCheck(h) + } +} + +func (hc *HealthChecker) StopOne(endpoint *model.Endpoint) { + hc.stopCheck(endpoint) +} + +func (hc *HealthChecker) StartOne(endpoint *model.Endpoint) { + hc.startCheck(endpoint) +} + func (hc *HealthChecker) startCheck(endpoint *model.Endpoint) { addr := endpoint.Address.GetAddress() if _, ok := hc.checkers[addr]; !ok { c := newChecker(endpoint, hc) hc.checkers[addr] = c c.Start() - atomic.AddInt64(&hc.localProcessHealthy, 1) // default host is healthy + logger.Infof("[health check] create a health check session for %s", addr) + } +} + +func (hc *HealthChecker) stopCheck(endpoint *model.Endpoint) { + addr := endpoint.Address.GetAddress() + if c, ok := hc.checkers[addr]; ok { + c.Stop() + delete(hc.checkers, addr) logger.Infof("[health check] create a health check session for %s", addr) } } func newChecker(endpoint *model.Endpoint, hc *HealthChecker) *EndpointChecker { c := &EndpointChecker{ - tcpChecker: newTcpChecker(endpoint), + tcpChecker: newTcpChecker(endpoint, hc.timeout), endpoint: endpoint, HealthChecker: hc, resp: make(chan checkResponse), @@ -143,15 +169,15 @@ func newChecker(endpoint *model.Endpoint, hc *HealthChecker) *EndpointChecker { return c } -func newTcpChecker(endpoint *model.Endpoint) *TCPChecker { +func newTcpChecker(endpoint *model.Endpoint, timeout time.Duration) *TCPChecker { return &TCPChecker{ - addr: endpoint.Address.GetAddress(), + addr: endpoint.Address.GetAddress(), + timeout: timeout, } } func (hc *HealthChecker) getCheckInterval() time.Duration { - interval := hc.intervalBase - return interval + return hc.intervalBase } func (c *EndpointChecker) Start() { @@ -159,7 +185,6 @@ func (c *EndpointChecker) Start() { if r := recover(); r != nil { logger.Warnf("[health check] node checker panic %v\n%s", r, string(debug.Stack())) } - // stop all the timer when start is finished c.checkTimer.Stop() c.checkTimeout.Stop() }() @@ -176,19 +201,22 @@ func (c *EndpointChecker) Start() { return case resp := <-c.resp: if resp.ID == currentID { - c.checkTimeout.Stop() + if c.checkTimeout != nil { + c.checkTimeout.Stop() + c.checkTimeout = nil + } if resp.Healthy { c.HandleSuccess() } else { c.HandleFailure(false) } - // next health checker c.checkTimer = gxtime.AfterFunc(c.HealthChecker.getCheckInterval(), c.OnCheck) } case <-c.timeout: - c.checkTimer.Stop() + if c.checkTimer != nil { + c.checkTimer.Stop() + } c.HandleFailure(true) - // next health checker c.checkTimer = gxtime.AfterFunc(c.HealthChecker.getCheckInterval(), c.OnCheck) logger.Infof("[health check] receive a timeout response at id: %d", currentID) @@ -228,19 +256,17 @@ func (c *EndpointChecker) HandleTimeout() { func (c *EndpointChecker) handleHealth() { c.healthCount = 0 c.unHealthCount = 0 - c.endpoint.Healthy = true + c.endpoint.UnHealthy = false } func (c *EndpointChecker) handleUnHealth() { c.healthCount = 0 c.unHealthCount = 0 - c.endpoint.Healthy = false + c.endpoint.UnHealthy = true } func (c *EndpointChecker) OnCheck() { - // record current id id := atomic.LoadUint64(&c.checkID) - // start a timeout before check health if c.checkTimeout != nil { c.checkTimeout.Stop() } diff --git a/pkg/cluster/healthcheck/tcp.go b/pkg/cluster/healthcheck/tcp.go index a2c264106..b15460a85 100644 --- a/pkg/cluster/healthcheck/tcp.go +++ b/pkg/cluster/healthcheck/tcp.go @@ -23,13 +23,14 @@ import ( ) type TCPChecker struct { - addr string + addr string + timeout time.Duration } func (s *TCPChecker) CheckHealth() bool { - conn, err := net.DialTimeout("tcp", s.addr, 30*time.Second) + conn, err := net.DialTimeout("tcp", s.addr, s.timeout) if err != nil { - logger.Error("[health check] tcp checker for host %s error: %v", s.addr, err) + logger.Infof("[health check] tcp checker for host %s error: %v", s.addr, err) return false } conn.Close() diff --git a/pkg/cluster/loadbalancer/rand/load_balancer_rand.go b/pkg/cluster/loadbalancer/rand/load_balancer_rand.go index fc7a83521..976bc2818 100644 --- a/pkg/cluster/loadbalancer/rand/load_balancer_rand.go +++ b/pkg/cluster/loadbalancer/rand/load_balancer_rand.go @@ -33,5 +33,5 @@ func init() { type Rand struct{} func (Rand) Handler(c *model.ClusterConfig) *model.Endpoint { - return c.Endpoints[rand.Intn(len(c.Endpoints))] + return c.GetEndpoint(true)[rand.Intn(len(c.Endpoints))] } diff --git a/pkg/cluster/loadbalancer/roundrobin/round_robin.go b/pkg/cluster/loadbalancer/roundrobin/round_robin.go index fb0584492..dd2e1a937 100644 --- a/pkg/cluster/loadbalancer/roundrobin/round_robin.go +++ b/pkg/cluster/loadbalancer/roundrobin/round_robin.go @@ -29,11 +29,12 @@ func init() { type RoundRobin struct{} func (RoundRobin) Handler(c *model.ClusterConfig) *model.Endpoint { - lens := len(c.Endpoints) + endpoints := c.GetEndpoint(true) + lens := len(endpoints) if c.PrePickEndpointIndex >= lens { c.PrePickEndpointIndex = 0 } - e := c.Endpoints[c.PrePickEndpointIndex] + e := endpoints[c.PrePickEndpointIndex] c.PrePickEndpointIndex = (c.PrePickEndpointIndex + 1) % lens return e } diff --git a/pkg/model/cluster.go b/pkg/model/cluster.go index c31b4815a..0d83eff3c 100644 --- a/pkg/model/cluster.go +++ b/pkg/model/cluster.go @@ -78,10 +78,21 @@ type ( // Endpoint Endpoint struct { - ID string `yaml:"ID" json:"ID"` // ID indicate one endpoint - Name string `yaml:"name" json:"name"` // Name the cluster unique name - Address SocketAddress `yaml:"socket_address" json:"socket_address" mapstructure:"socket_address"` // Address socket address - Metadata map[string]string `yaml:"meta" json:"meta"` // Metadata extra info such as label or other meta data - Healthy bool + ID string `yaml:"ID" json:"ID"` // ID indicate one endpoint + Name string `yaml:"name" json:"name"` // Name the cluster unique name + Address SocketAddress `yaml:"socket_address" json:"socket_address" mapstructure:"socket_address"` // Address socket address + Metadata map[string]string `yaml:"meta" json:"meta"` // Metadata extra info such as label or other meta data + UnHealthy bool } ) + +func (c *ClusterConfig) GetEndpoint(mustHealth bool) []*Endpoint { + var endpoints []*Endpoint + for _, e := range c.Endpoints { + // select all endpoint or endpoint is health + if !mustHealth || !e.UnHealthy { + endpoints = append(endpoints, e) + } + } + return endpoints +} diff --git a/pkg/model/health.go b/pkg/model/health.go index c0b554c66..df0eb396d 100644 --- a/pkg/model/health.go +++ b/pkg/model/health.go @@ -17,20 +17,17 @@ package model -import "time" - // HealthCheck type HealthCheckConfig struct { - Protocol string `json:"protocol,omitempty"` - TimeoutConfig time.Duration `json:"timeout,omitempty"` - IntervalConfig time.Duration `json:"interval,omitempty"` - IntervalJitterConfig time.Duration `json:"interval_jitter,omitempty"` - InitialDelaySeconds time.Duration `json:"initial_delay_seconds,omitempty"` - HealthyThreshold uint32 `json:"healthy_threshold,omitempty"` - UnhealthyThreshold uint32 `json:"unhealthy_threshold,omitempty"` - ServiceName string `json:"service_name,omitempty"` - SessionConfig map[string]interface{} `json:"check_config,omitempty"` - CommonCallbacks []string `json:"common_callbacks,omitempty"` + Protocol string `json:"protocol,omitempty"` + TimeoutConfig string `json:"timeout,omitempty"` + IntervalConfig string `json:"interval,omitempty"` + InitialDelaySeconds string `json:"initial_delay_seconds,omitempty"` + HealthyThreshold uint32 `json:"healthy_threshold,omitempty"` + UnhealthyThreshold uint32 `json:"unhealthy_threshold,omitempty"` + ServiceName string `json:"service_name,omitempty"` + SessionConfig map[string]interface{} `json:"check_config,omitempty"` + CommonCallbacks []string `json:"common_callbacks,omitempty"` } // HttpHealthCheck diff --git a/pkg/server/cluster_manager.go b/pkg/server/cluster_manager.go index 0efc5cf44..3bc6b0de9 100644 --- a/pkg/server/cluster_manager.go +++ b/pkg/server/cluster_manager.go @@ -18,6 +18,7 @@ package server import ( + "fmt" "github.com/apache/dubbo-go-pixiu/pkg/cluster" "sync" "sync/atomic" @@ -30,6 +31,9 @@ import ( "github.com/apache/dubbo-go-pixiu/pkg/model" ) +// generate cluster name for unnamed cluster +var clusterIndex int32 = 1 + type ( ClusterManager struct { rw sync.RWMutex @@ -163,7 +167,10 @@ func (cm *ClusterManager) RemoveCluster(namesToDel []string) { } for _, name := range namesToDel { // suppose resource to remove and clusters is few if name == cluster.Name { + removed := cm.store.Config[i] + cm.store.clustersMap[removed.Name].Stop() cm.store.Config[i] = nil + delete(cm.store.clustersMap, removed.Name) } } } @@ -185,6 +192,10 @@ func (cm *ClusterManager) HasCluster(clusterName string) bool { } func (s *ClusterStore) AddCluster(c *model.ClusterConfig) { + if c.Name == "" { + index := atomic.AddInt32(&clusterIndex, 1) + c.Name = fmt.Sprintf("cluster%d", index) + } s.Config = append(s.Config, c) s.clustersMap[c.Name] = cluster.NewCluster(c) } @@ -201,20 +212,24 @@ func (s *ClusterStore) UpdateCluster(new *model.ClusterConfig) { } func (s *ClusterStore) SetEndpoint(clusterName string, endpoint *model.Endpoint) { + cluster := s.clustersMap[clusterName] for _, c := range s.Config { if c.Name == clusterName { for _, e := range c.Endpoints { // endpoint update if e.ID == endpoint.ID { + cluster.RemoveEndpoint(e) e.Name = endpoint.Name e.Metadata = endpoint.Metadata e.Address = endpoint.Address + cluster.AddEndpoint(e) return } } // endpoint create c.Endpoints = append(c.Endpoints, endpoint) + cluster.AddEndpoint(endpoint) return } } @@ -226,11 +241,13 @@ func (s *ClusterStore) SetEndpoint(clusterName string, endpoint *model.Endpoint) } func (s *ClusterStore) DeleteEndpoint(clusterName string, endpointID string) { + cluster := s.clustersMap[clusterName] for _, c := range s.Config { if c.Name == clusterName { for i, e := range c.Endpoints { if e.ID == endpointID { + cluster.RemoveEndpoint(e) c.Endpoints = append(c.Endpoints[:i], c.Endpoints[i+1:]...) return } diff --git a/samples/http/simple/pixiu/conf.yaml b/samples/http/simple/pixiu/conf.yaml index 827bd7e80..6b3898111 100644 --- a/samples/http/simple/pixiu/conf.yaml +++ b/samples/http/simple/pixiu/conf.yaml @@ -63,8 +63,8 @@ static_resources: port: 1314 health_checks: - protocol: "tcp" - timeout: 100 - interval: 100 + timeout: 1s + interval: 2s healthy_threshold: 4 unhealthy_threshold: 4 shutdown_config: From 019527522a71ed362b57e4ddb92fc9a64bec1e72 Mon Sep 17 00:00:00 2001 From: randy Date: Sun, 1 May 2022 18:05:47 +0800 Subject: [PATCH 5/9] health check --- .github/workflows/go.sum | 1 - pkg/cluster/healthcheck/tcp.go | 5 +- pkg/server/cluster_manager.go | 2 +- pkg/upstream/healthchecker/healthchecker.go | 60 --------------------- 4 files changed, 5 insertions(+), 63 deletions(-) delete mode 100644 .github/workflows/go.sum delete mode 100644 pkg/upstream/healthchecker/healthchecker.go diff --git a/.github/workflows/go.sum b/.github/workflows/go.sum deleted file mode 100644 index 1e010d95c..000000000 --- a/.github/workflows/go.sum +++ /dev/null @@ -1 +0,0 @@ -github.com/dubbogo/dubbo-go-pixiu-filter v0.1.3/go.mod h1:d6SDK5BHl/QCvg84BN+g6LZS9QzVqnI2+yw0NBu0uac= diff --git a/pkg/cluster/healthcheck/tcp.go b/pkg/cluster/healthcheck/tcp.go index b15460a85..7d12670a6 100644 --- a/pkg/cluster/healthcheck/tcp.go +++ b/pkg/cluster/healthcheck/tcp.go @@ -17,11 +17,14 @@ package healthcheck import ( - "github.com/apache/dubbo-go-pixiu/pkg/logger" "net" "time" ) +import ( + "github.com/apache/dubbo-go-pixiu/pkg/logger" +) + type TCPChecker struct { addr string timeout time.Duration diff --git a/pkg/server/cluster_manager.go b/pkg/server/cluster_manager.go index 3bc6b0de9..821b7a49c 100644 --- a/pkg/server/cluster_manager.go +++ b/pkg/server/cluster_manager.go @@ -19,12 +19,12 @@ package server import ( "fmt" - "github.com/apache/dubbo-go-pixiu/pkg/cluster" "sync" "sync/atomic" ) import ( + "github.com/apache/dubbo-go-pixiu/pkg/cluster" "github.com/apache/dubbo-go-pixiu/pkg/cluster/loadbalancer" "github.com/apache/dubbo-go-pixiu/pkg/common/yaml" "github.com/apache/dubbo-go-pixiu/pkg/logger" diff --git a/pkg/upstream/healthchecker/healthchecker.go b/pkg/upstream/healthchecker/healthchecker.go deleted file mode 100644 index facf9830d..000000000 --- a/pkg/upstream/healthchecker/healthchecker.go +++ /dev/null @@ -1,60 +0,0 @@ -package healthchecker - -import ( - "fmt" - "github.com/apache/dubbo-go-pixiu/pkg/context/http" - "github.com/apache/dubbo-go-pixiu/pkg/model" - "github.com/pkg/errors" -) - -type ( - // HealthCheckerFactory describe health checker factory - HealthCheckerFactory interface { - CreateHealthChecker(cfg map[string]interface{}, endpoint model.Endpoint) HealthChecker - } - - // HealthChecker upstream cluster health checker - HealthChecker interface { - // CheckHealth check health - CheckHealth() bool - } -) - -var ( - healthCheckerFactoryRegistry = map[string]HealthCheckerFactory{} -) - -// Register registers health checker factory. -func RegisterHealthCheckerFactory(name string, f HealthCheckerFactory) { - if name == "" { - panic(fmt.Errorf("%T: empty name", f)) - } - - existedFactory, existed := healthCheckerFactoryRegistry[name] - if existed { - panic(fmt.Errorf("%T and %T got same factory: %s", f, existedFactory, name)) - } - - healthCheckerFactoryRegistry[name] = f -} - -// GetHttpHealthCheckerFactory get factory by kind -func GetHttpHealthCheckerFactory(name string) (HealthCheckerFactory, error) { - existedFilter, existed := healthCheckerFactoryRegistry[name] - if existed { - return existedFilter, nil - } - return nil, errors.Errorf("factory not found %s", name) -} - -// CreateHealthCheck is a extendable function that can create different health checker -// by different health check session. -// The Default session is TCPDial session -func CreateHealthCheck(cfg model.HealthCheckConfig) HealthChecker { - f, ok := GetHttpHealthCheckerFactory(cfg.Protocol) - if !ok { - // not registered, use default session factory - f = &TCPDialSessionFactory{} - } - return newHealthChecker(cfg, f) -} From 4ee6f67fc256dd3b847d7c1f09536fd272681c84 Mon Sep 17 00:00:00 2001 From: randy Date: Tue, 10 May 2022 15:34:11 +0800 Subject: [PATCH 6/9] fix ci --- pkg/cluster/healthcheck/healthcheck.go | 6 +++--- pkg/config/xds/apiclient/grpc.go | 4 ++-- pkg/server/cluster_manager.go | 2 +- pkg/server/controls/controls.go | 6 +++--- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/cluster/healthcheck/healthcheck.go b/pkg/cluster/healthcheck/healthcheck.go index a8b3f435d..ec658b853 100644 --- a/pkg/cluster/healthcheck/healthcheck.go +++ b/pkg/cluster/healthcheck/healthcheck.go @@ -56,7 +56,7 @@ type HealthChecker struct { type EndpointChecker struct { endpoint *model.Endpoint HealthChecker *HealthChecker - // + // TCP checker, can extend to http, grpc, dubbo or other protocol checker tcpChecker *TCPChecker resp chan checkResponse timeout chan bool @@ -237,8 +237,8 @@ func (c *EndpointChecker) HandleSuccess() { } } -func (c *EndpointChecker) HandleFailure(isTimeout bool) { - if isTimeout { +func (c *EndpointChecker) HandleFailure(timeout bool) { + if timeout { c.HandleTimeout() } else { c.handleUnHealth() diff --git a/pkg/config/xds/apiclient/grpc.go b/pkg/config/xds/apiclient/grpc.go index 996ad8a5c..4f674a6f4 100644 --- a/pkg/config/xds/apiclient/grpc.go +++ b/pkg/config/xds/apiclient/grpc.go @@ -288,7 +288,7 @@ type GRPCClusterManager struct { type GRPCCluster struct { name string //cluster name - config *model.Cluster + config *model.ClusterConfig once sync.Once conn *grpc.ClientConn } @@ -309,7 +309,7 @@ func (g *GRPCClusterManager) GetGrpcCluster(name string) (*GRPCCluster, error) { return nil, errors.WithMessagef(err, "clone cluster store failed") } - var clusterCfg *model.Cluster + var clusterCfg *model.ClusterConfig for _, cfg := range store.Config() { if cfg.Name == name { clusterCfg = cfg diff --git a/pkg/server/cluster_manager.go b/pkg/server/cluster_manager.go index 31533d5cc..5e1eeac41 100644 --- a/pkg/server/cluster_manager.go +++ b/pkg/server/cluster_manager.go @@ -56,7 +56,7 @@ type ( } ) -func (x *xdsControlStore) Config() []*model.Cluster { +func (x *xdsControlStore) Config() []*model.ClusterConfig { return x.ClusterStore.Config } diff --git a/pkg/server/controls/controls.go b/pkg/server/controls/controls.go index 4bebfcc39..355f49bf9 100644 --- a/pkg/server/controls/controls.go +++ b/pkg/server/controls/controls.go @@ -25,8 +25,8 @@ type ( ClusterManager interface { RemoveCluster(names []string) HasCluster(name string) bool - UpdateCluster(cluster *model.Cluster) - AddCluster(cluster *model.Cluster) + UpdateCluster(cluster *model.ClusterConfig) + AddCluster(cluster *model.ClusterConfig) CloneXdsControlStore() (ClusterStore, error) } @@ -42,7 +42,7 @@ type ( } ClusterStore interface { - Config() []*model.Cluster + Config() []*model.ClusterConfig HasCluster(name string) bool } ) From 52262f58e6f2d6f445dd4a6aa9d6ffa94ab1496a Mon Sep 17 00:00:00 2001 From: randy Date: Tue, 10 May 2022 15:51:24 +0800 Subject: [PATCH 7/9] fix ci --- pkg/config/xds/apiclient/grpc_test.go | 6 +++--- pkg/config/xds/cds_test.go | 4 ++-- pkg/config/xds/xds_test.go | 4 ++-- pkg/server/controls/mocks/mocks.go | 8 ++++---- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/config/xds/apiclient/grpc_test.go b/pkg/config/xds/apiclient/grpc_test.go index 49d5bc806..fa3c3cc5e 100644 --- a/pkg/config/xds/apiclient/grpc_test.go +++ b/pkg/config/xds/apiclient/grpc_test.go @@ -41,7 +41,7 @@ import ( ) func TestGRPCClusterManager_GetGrpcCluster(t *testing.T) { - cluster := &model.Cluster{ + cluster := &model.ClusterConfig{ Name: "cluster-1", TypeStr: "GRPC", Endpoints: []*model.Endpoint{ @@ -67,7 +67,7 @@ func TestGRPCClusterManager_GetGrpcCluster(t *testing.T) { Return(true) clusterMg.EXPECT().CloneXdsControlStore().DoAndReturn(func() (controls.ClusterStore, error) { store := mocks.NewMockClusterStore(ctrl) - store.EXPECT().Config().Return([]*model.Cluster{cluster}) + store.EXPECT().Config().Return([]*model.ClusterConfig{cluster}) return store, nil }) clusterMg.EXPECT().HasCluster("cluster-2").Return(false) @@ -109,7 +109,7 @@ func TestGRPCClusterManager_GetGrpcCluster(t *testing.T) { } func TestGRPCCluster_GetConnect(t *testing.T) { - cluster := &model.Cluster{ + cluster := &model.ClusterConfig{ Name: "cluster-1", TypeStr: "GRPC", Endpoints: []*model.Endpoint{ diff --git a/pkg/config/xds/cds_test.go b/pkg/config/xds/cds_test.go index 8c1ba2564..53d65684a 100644 --- a/pkg/config/xds/cds_test.go +++ b/pkg/config/xds/cds_test.go @@ -110,10 +110,10 @@ func TestCdsManager_Fetch(t *testing.T) { return ok }) - clusterMg.EXPECT().UpdateCluster(gomock.Any()).AnyTimes().Do(func(new *model.Cluster) { + clusterMg.EXPECT().UpdateCluster(gomock.Any()).AnyTimes().Do(func(new *model.ClusterConfig) { updateCluster = new }) - clusterMg.EXPECT().AddCluster(gomock.Any()).AnyTimes().Do(func(c *model.Cluster) { + clusterMg.EXPECT().AddCluster(gomock.Any()).AnyTimes().Do(func(c *model.ClusterConfig) { addCluster = c }) clusterMg.EXPECT().RemoveCluster(gomock.Any()).AnyTimes() diff --git a/pkg/config/xds/xds_test.go b/pkg/config/xds/xds_test.go index dc2045ff8..3bd71bcc9 100644 --- a/pkg/config/xds/xds_test.go +++ b/pkg/config/xds/xds_test.go @@ -45,7 +45,7 @@ import ( ) func TestAdapter_createApiManager(t *testing.T) { - cluster := &model.Cluster{ + cluster := &model.ClusterConfig{ Name: "cluster-1", TypeStr: "GRPC", Endpoints: []*model.Endpoint{ @@ -118,7 +118,7 @@ func TestAdapter_createApiManager(t *testing.T) { Return(true) clusterMg.EXPECT().CloneXdsControlStore().DoAndReturn(func() (controls.ClusterStore, error) { store := mocks.NewMockClusterStore(ctrl) - store.EXPECT().Config().Return([]*model.Cluster{cluster}) + store.EXPECT().Config().Return([]*model.ClusterConfig{cluster}) return store, nil }) clusterMg.EXPECT().HasCluster("cluster-2").Return(false) diff --git a/pkg/server/controls/mocks/mocks.go b/pkg/server/controls/mocks/mocks.go index a91ab0e77..630bee259 100644 --- a/pkg/server/controls/mocks/mocks.go +++ b/pkg/server/controls/mocks/mocks.go @@ -54,7 +54,7 @@ func (m *MockClusterManager) EXPECT() *MockClusterManagerMockRecorder { } // AddCluster mocks base method. -func (m *MockClusterManager) AddCluster(cluster *model.Cluster) { +func (m *MockClusterManager) AddCluster(cluster *model.ClusterConfig) { m.ctrl.T.Helper() m.ctrl.Call(m, "AddCluster", cluster) } @@ -107,7 +107,7 @@ func (mr *MockClusterManagerMockRecorder) RemoveCluster(names interface{}) *gomo } // UpdateCluster mocks base method. -func (m *MockClusterManager) UpdateCluster(cluster *model.Cluster) { +func (m *MockClusterManager) UpdateCluster(cluster *model.ClusterConfig) { m.ctrl.T.Helper() m.ctrl.Call(m, "UpdateCluster", cluster) } @@ -256,10 +256,10 @@ func (m *MockClusterStore) EXPECT() *MockClusterStoreMockRecorder { } // Config mocks base method. -func (m *MockClusterStore) Config() []*model.Cluster { +func (m *MockClusterStore) Config() []*model.ClusterConfig { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Config") - ret0, _ := ret[0].([]*model.Cluster) + ret0, _ := ret[0].([]*model.ClusterConfig) return ret0 } From b5616d2a34afa3f10a10ccfea92ce220bdef3b8f Mon Sep 17 00:00:00 2001 From: randy Date: Tue, 10 May 2022 17:11:03 +0800 Subject: [PATCH 8/9] fix ci --- pkg/server/cluster_manager.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/server/cluster_manager.go b/pkg/server/cluster_manager.go index 5e1eeac41..287bdaa72 100644 --- a/pkg/server/cluster_manager.go +++ b/pkg/server/cluster_manager.go @@ -71,7 +71,9 @@ func CreateDefaultClusterManager(bs *model.Bootstrap) *ClusterManager { } func newClusterStore(bs *model.Bootstrap) *ClusterStore { - store := &ClusterStore{} + store := &ClusterStore{ + clustersMap: map[string]*cluster.Cluster{}, + } for _, cluster := range bs.StaticResources.Clusters { store.AddCluster(cluster) } @@ -119,7 +121,9 @@ func (cm *ClusterManager) CloneStore() (*ClusterStore, error) { return nil, err } - c := &ClusterStore{} + c := &ClusterStore{ + clustersMap: map[string]*cluster.Cluster{}, + } if err := yaml.UnmarshalYML(b, c); err != nil { return nil, err } From 985541ebe3753d63885e07f4731fdd64fd4e79bf Mon Sep 17 00:00:00 2001 From: randy Date: Thu, 19 May 2022 22:23:13 +0800 Subject: [PATCH 9/9] fix ci --- pkg/cluster/healthcheck/healthcheck.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/cluster/healthcheck/healthcheck.go b/pkg/cluster/healthcheck/healthcheck.go index ec658b853..f2f6e5a59 100644 --- a/pkg/cluster/healthcheck/healthcheck.go +++ b/pkg/cluster/healthcheck/healthcheck.go @@ -14,10 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package healthcheck import ( "runtime/debug" + "sync" "sync/atomic" "time" ) @@ -67,6 +69,8 @@ type EndpointChecker struct { unHealthCount uint32 healthCount uint32 threshold uint32 + + once sync.Once } type checkResponse struct { @@ -78,19 +82,19 @@ func CreateHealthCheck(cluster *model.ClusterConfig, cfg model.HealthCheckConfig timeout, err := time.ParseDuration(cfg.TimeoutConfig) if err != nil { - logger.Infof("[health check] timeout parse duration error %s", cfg.TimeoutConfig) + logger.Infof("[health check] timeout parse duration error %s", err) timeout = DefaultTimeout } interval, err := time.ParseDuration(cfg.IntervalConfig) if err != nil { - logger.Infof("[health check] interval parse duration error %s", cfg.TimeoutConfig) + logger.Infof("[health check] interval parse duration error %s", err) interval = DefaultInterval } initialDelay, err := time.ParseDuration(cfg.IntervalConfig) if err != nil { - logger.Infof("[health check] initialDelay parse duration error %s", cfg.TimeoutConfig) + logger.Infof("[health check] initialDelay parse duration error %s", err) initialDelay = DefaultFirstInterval } @@ -226,7 +230,9 @@ func (c *EndpointChecker) Start() { } func (c *EndpointChecker) Stop() { - close(c.stop) + c.once.Do(func() { + close(c.stop) + }) } func (c *EndpointChecker) HandleSuccess() {