Skip to content

Commit

Permalink
Merge pull request #421 from ztelur/feature-health-check-2
Browse files Browse the repository at this point in the history
[feature] health check
  • Loading branch information
AlexStocks committed May 22, 2022
2 parents bf77bc3 + 985541e commit a274007
Show file tree
Hide file tree
Showing 21 changed files with 516 additions and 66 deletions.
1 change: 0 additions & 1 deletion .github/workflows/go.sum

This file was deleted.

59 changes: 59 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cluster

import (
"github.com/apache/dubbo-go-pixiu/pkg/cluster/healthcheck"
"github.com/apache/dubbo-go-pixiu/pkg/model"
)

type Cluster struct {
HealthCheck *healthcheck.HealthChecker
Config *model.ClusterConfig
}

func NewCluster(clusterConfig *model.ClusterConfig) *Cluster {
c := &Cluster{
Config: clusterConfig,
}

// only handle one health checker
if len(c.Config.HealthChecks) != 0 {
c.HealthCheck = healthcheck.CreateHealthCheck(clusterConfig, c.Config.HealthChecks[0])
c.HealthCheck.Start()
}
return c
}

func (c *Cluster) Stop() {
if c.HealthCheck != nil {
c.HealthCheck.Stop()
}
}

func (c *Cluster) RemoveEndpoint(endpoint *model.Endpoint) {
if c.HealthCheck != nil {
c.HealthCheck.StopOne(endpoint)
}
}

func (c *Cluster) AddEndpoint(endpoint *model.Endpoint) {
if c.HealthCheck != nil {
c.HealthCheck.StartOne(endpoint)
}
}
288 changes: 288 additions & 0 deletions pkg/cluster/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package healthcheck

import (
"runtime/debug"
"sync"
"sync/atomic"
"time"
)

import (
gxtime "github.com/dubbogo/gost/time"
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/model"
)

const (
DefaultTimeout = time.Second
DefaultInterval = 3 * time.Second

DefaultHealthyThreshold uint32 = 5
DefaultUnhealthyThreshold uint32 = 5
DefaultFirstInterval = 5 * time.Second
)

type HealthChecker struct {
checkers map[string]*EndpointChecker
sessionConfig map[string]interface{}
// check config
timeout time.Duration
intervalBase time.Duration
healthyThreshold uint32
initialDelay time.Duration
cluster *model.ClusterConfig
unhealthyThreshold uint32
}

// EndpointChecker is a wrapper of types.HealthCheckSession for health check
type EndpointChecker struct {
endpoint *model.Endpoint
HealthChecker *HealthChecker
// TCP checker, can extend to http, grpc, dubbo or other protocol checker
tcpChecker *TCPChecker
resp chan checkResponse
timeout chan bool
checkID uint64
stop chan struct{}
checkTimer *gxtime.Timer
checkTimeout *gxtime.Timer
unHealthCount uint32
healthCount uint32
threshold uint32

once sync.Once
}

type checkResponse struct {
ID uint64
Healthy bool
}

func CreateHealthCheck(cluster *model.ClusterConfig, cfg model.HealthCheckConfig) *HealthChecker {

timeout, err := time.ParseDuration(cfg.TimeoutConfig)
if err != nil {
logger.Infof("[health check] timeout parse duration error %s", err)
timeout = DefaultTimeout
}

interval, err := time.ParseDuration(cfg.IntervalConfig)
if err != nil {
logger.Infof("[health check] interval parse duration error %s", err)
interval = DefaultInterval
}

initialDelay, err := time.ParseDuration(cfg.IntervalConfig)
if err != nil {
logger.Infof("[health check] initialDelay parse duration error %s", err)
initialDelay = DefaultFirstInterval
}

unhealthyThreshold := cfg.UnhealthyThreshold
if unhealthyThreshold == 0 {
unhealthyThreshold = DefaultUnhealthyThreshold
}
healthyThreshold := cfg.HealthyThreshold
if healthyThreshold == 0 {
healthyThreshold = DefaultHealthyThreshold
}

hc := &HealthChecker{
sessionConfig: cfg.SessionConfig,
cluster: cluster,
timeout: timeout,
intervalBase: interval,
healthyThreshold: healthyThreshold,
unhealthyThreshold: unhealthyThreshold,
initialDelay: initialDelay,
checkers: make(map[string]*EndpointChecker),
}

return hc
}

func (hc *HealthChecker) Start() {
// each endpoint
for _, h := range hc.cluster.Endpoints {
hc.startCheck(h)
}
}

func (hc *HealthChecker) Stop() {
for _, h := range hc.cluster.Endpoints {
hc.stopCheck(h)
}
}

func (hc *HealthChecker) StopOne(endpoint *model.Endpoint) {
hc.stopCheck(endpoint)
}

func (hc *HealthChecker) StartOne(endpoint *model.Endpoint) {
hc.startCheck(endpoint)
}

func (hc *HealthChecker) startCheck(endpoint *model.Endpoint) {
addr := endpoint.Address.GetAddress()
if _, ok := hc.checkers[addr]; !ok {
c := newChecker(endpoint, hc)
hc.checkers[addr] = c
c.Start()
logger.Infof("[health check] create a health check session for %s", addr)
}
}

func (hc *HealthChecker) stopCheck(endpoint *model.Endpoint) {
addr := endpoint.Address.GetAddress()
if c, ok := hc.checkers[addr]; ok {
c.Stop()
delete(hc.checkers, addr)
logger.Infof("[health check] create a health check session for %s", addr)
}
}

func newChecker(endpoint *model.Endpoint, hc *HealthChecker) *EndpointChecker {
c := &EndpointChecker{
tcpChecker: newTcpChecker(endpoint, hc.timeout),
endpoint: endpoint,
HealthChecker: hc,
resp: make(chan checkResponse),
timeout: make(chan bool),
stop: make(chan struct{}),
}
return c
}

func newTcpChecker(endpoint *model.Endpoint, timeout time.Duration) *TCPChecker {
return &TCPChecker{
addr: endpoint.Address.GetAddress(),
timeout: timeout,
}
}

func (hc *HealthChecker) getCheckInterval() time.Duration {
return hc.intervalBase
}

func (c *EndpointChecker) Start() {
defer func() {
if r := recover(); r != nil {
logger.Warnf("[health check] node checker panic %v\n%s", r, string(debug.Stack()))
}
c.checkTimer.Stop()
c.checkTimeout.Stop()
}()
c.checkTimer = gxtime.AfterFunc(c.HealthChecker.initialDelay, c.OnCheck)
for {
select {
case <-c.stop:
return
default:
// prepare a check
currentID := atomic.AddUint64(&c.checkID, 1)
select {
case <-c.stop:
return
case resp := <-c.resp:
if resp.ID == currentID {
if c.checkTimeout != nil {
c.checkTimeout.Stop()
c.checkTimeout = nil
}
if resp.Healthy {
c.HandleSuccess()
} else {
c.HandleFailure(false)
}
c.checkTimer = gxtime.AfterFunc(c.HealthChecker.getCheckInterval(), c.OnCheck)
}
case <-c.timeout:
if c.checkTimer != nil {
c.checkTimer.Stop()
}
c.HandleFailure(true)
c.checkTimer = gxtime.AfterFunc(c.HealthChecker.getCheckInterval(), c.OnCheck)
logger.Infof("[health check] receive a timeout response at id: %d", currentID)

}
}
}
}

func (c *EndpointChecker) Stop() {
c.once.Do(func() {
close(c.stop)
})
}

func (c *EndpointChecker) HandleSuccess() {
c.unHealthCount = 0
c.healthCount++
if c.healthCount > c.threshold {
c.handleHealth()
}
}

func (c *EndpointChecker) HandleFailure(timeout bool) {
if timeout {
c.HandleTimeout()
} else {
c.handleUnHealth()
}
}

func (c *EndpointChecker) HandleTimeout() {
c.healthCount = 0
c.unHealthCount++
if c.unHealthCount > c.threshold {
c.handleUnHealth()
}
}

func (c *EndpointChecker) handleHealth() {
c.healthCount = 0
c.unHealthCount = 0
c.endpoint.UnHealthy = false
}

func (c *EndpointChecker) handleUnHealth() {
c.healthCount = 0
c.unHealthCount = 0
c.endpoint.UnHealthy = true
}

func (c *EndpointChecker) OnCheck() {
id := atomic.LoadUint64(&c.checkID)
if c.checkTimeout != nil {
c.checkTimeout.Stop()
}
c.checkTimeout = gxtime.AfterFunc(c.HealthChecker.timeout, c.OnTimeout)
c.resp <- checkResponse{
ID: id,
Healthy: c.tcpChecker.CheckHealth(),
}
}

func (c *EndpointChecker) OnTimeout() {
c.timeout <- true
}
43 changes: 43 additions & 0 deletions pkg/cluster/healthcheck/tcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package healthcheck

import (
"net"
"time"
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/logger"
)

type TCPChecker struct {
addr string
timeout time.Duration
}

func (s *TCPChecker) CheckHealth() bool {
conn, err := net.DialTimeout("tcp", s.addr, s.timeout)
if err != nil {
logger.Infof("[health check] tcp checker for host %s error: %v", s.addr, err)
return false
}
conn.Close()
return true
}

func (s *TCPChecker) OnTimeout() {}
Loading

0 comments on commit a274007

Please sign in to comment.