Skip to content

Commit

Permalink
Checks to passing/critical only after reaching a consecutive success/…
Browse files Browse the repository at this point in the history
…failure threshold

A check may be set to become passing/critical only if a specified number of successive
checks return passing/critical in a row. Status will stay identical as before until
the threshold is reached.
This feature is available for HTTP, TCP, gRPC, Docker & Monitor checks.
  • Loading branch information
PHBourquin authored and Paul-Hadrien Bourquin committed Oct 14, 2019
1 parent 9795345 commit c40afbf
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 117 deletions.
23 changes: 13 additions & 10 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2627,6 +2627,8 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}
}

statusHandler := checks.NewStatusHandler(a.State, a.logger, chkType.SuccessBeforePassing, chkType.FailuresBeforeCritical)

switch {

case chkType.IsTTL():
Expand Down Expand Up @@ -2667,7 +2669,6 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
tlsClientConfig := a.tlsConfigurator.OutgoingTLSConfigForCheck(chkType.TLSSkipVerify)

http := &checks.CheckHTTP{
Notify: a.State,
CheckID: check.CheckID,
ServiceID: check.ServiceID,
HTTP: chkType.HTTP,
Expand All @@ -2678,6 +2679,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
Logger: a.logger,
OutputMaxSize: maxOutputSize,
TLSClientConfig: tlsClientConfig,
StatusHandler: statusHandler,
}

if proxy != nil && proxy.Proxy.Expose.Checks {
Expand All @@ -2704,13 +2706,13 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}

tcp := &checks.CheckTCP{
Notify: a.State,
CheckID: check.CheckID,
ServiceID: check.ServiceID,
TCP: chkType.TCP,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
CheckID: check.CheckID,
ServiceID: check.ServiceID,
TCP: chkType.TCP,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
StatusHandler: statusHandler,
}
tcp.Start()
a.checkTCPs[check.CheckID] = tcp
Expand All @@ -2732,14 +2734,14 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}

grpc := &checks.CheckGRPC{
Notify: a.State,
CheckID: check.CheckID,
ServiceID: check.ServiceID,
GRPC: chkType.GRPC,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
TLSClientConfig: tlsClientConfig,
StatusHandler: statusHandler,
}

if proxy != nil && proxy.Proxy.Expose.Checks {
Expand Down Expand Up @@ -2776,7 +2778,6 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}

dockerCheck := &checks.CheckDocker{
Notify: a.State,
CheckID: check.CheckID,
ServiceID: check.ServiceID,
DockerContainerID: chkType.DockerContainerID,
Expand All @@ -2785,6 +2786,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
Interval: chkType.Interval,
Logger: a.logger,
Client: a.dockerClient,
StatusHandler: statusHandler,
}
if prev := a.checkDockers[check.CheckID]; prev != nil {
prev.Stop()
Expand All @@ -2811,6 +2813,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
Timeout: chkType.Timeout,
Logger: a.logger,
OutputMaxSize: maxOutputSize,
StatusHandler: statusHandler,
}
monitor.Start()
a.checkMonitors[check.CheckID] = monitor
Expand Down
120 changes: 78 additions & 42 deletions agent/checks/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package checks
import (
"crypto/tls"
"fmt"
"github.com/hashicorp/consul/agent/structs"
"io"
"io/ioutil"
"log"
Expand All @@ -15,6 +14,8 @@ import (
"syscall"
"time"

"github.com/hashicorp/consul/agent/structs"

"github.com/armon/circbuf"
"github.com/hashicorp/consul/agent/exec"
"github.com/hashicorp/consul/api"
Expand Down Expand Up @@ -56,6 +57,7 @@ type CheckNotifier interface {
// CheckMonitor is used to periodically invoke a script to
// determine the health of a given check. It is compatible with
// nagios plugins and expects the output in the same format.
// Supports failures_before_critical and success_before_passing.
type CheckMonitor struct {
Notify CheckNotifier
CheckID types.CheckID
Expand All @@ -66,6 +68,7 @@ type CheckMonitor struct {
Timeout time.Duration
Logger *log.Logger
OutputMaxSize int
StatusHandler *StatusHandler

stop bool
stopCh chan struct{}
Expand Down Expand Up @@ -184,8 +187,7 @@ func (c *CheckMonitor) check() {
// Check if the check passed
outputStr := truncateAndLogOutput()
if err == nil {
c.Logger.Printf("[DEBUG] agent: Check %q is passing", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, outputStr)
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, outputStr)
return
}

Expand All @@ -195,16 +197,14 @@ func (c *CheckMonitor) check() {
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
code := status.ExitStatus()
if code == 1 {
c.Logger.Printf("[WARN] agent: Check %q is now warning", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, outputStr)
c.StatusHandler.updateCheck(c.CheckID, api.HealthWarning, outputStr)
return
}
}
}

// Set the health as critical
c.Logger.Printf("[WARN] agent: Check %q is now critical", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, outputStr)
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, outputStr)
}

// CheckTTL is used to apply a TTL to check status,
Expand Down Expand Up @@ -308,8 +308,8 @@ func (c *CheckTTL) SetStatus(status, output string) string {
// The check is warning if the response code is 429.
// The check is critical if the response code is anything else
// or if the request returns an error
// Supports failures_before_critical and success_before_passing.
type CheckHTTP struct {
Notify CheckNotifier
CheckID types.CheckID
ServiceID string
HTTP string
Expand All @@ -320,6 +320,7 @@ type CheckHTTP struct {
Logger *log.Logger
TLSClientConfig *tls.Config
OutputMaxSize int
StatusHandler *StatusHandler

httpClient *http.Client
stop bool
Expand Down Expand Up @@ -418,8 +419,7 @@ func (c *CheckHTTP) check() {

req, err := http.NewRequest(method, target, nil)
if err != nil {
c.Logger.Printf("[WARN] agent: Check %q HTTP request failed: %s", c.CheckID, err)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}

Expand All @@ -443,8 +443,7 @@ func (c *CheckHTTP) check() {

resp, err := c.httpClient.Do(req)
if err != nil {
c.Logger.Printf("[WARN] agent: Check %q HTTP request failed: %s", c.CheckID, err)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}
defer resp.Body.Close()
Expand All @@ -460,35 +459,31 @@ func (c *CheckHTTP) check() {

if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
// PASSING (2xx)
c.Logger.Printf("[DEBUG] agent: Check %q is passing", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, result)

c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, result)
} else if resp.StatusCode == 429 {
// WARNING
// 429 Too Many Requests (RFC 6585)
// The user has sent too many requests in a given amount of time.
c.Logger.Printf("[WARN] agent: Check %q is now warning", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, result)

c.StatusHandler.updateCheck(c.CheckID, api.HealthWarning, result)
} else {
// CRITICAL
c.Logger.Printf("[WARN] agent: Check %q is now critical", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, result)
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, result)
}
}

// CheckTCP is used to periodically make an TCP/UDP connection to
// determine the health of a given check.
// The check is passing if the connection succeeds
// The check is critical if the connection returns an error
// Supports failures_before_critical and success_before_passing.
type CheckTCP struct {
Notify CheckNotifier
CheckID types.CheckID
ServiceID string
TCP string
Interval time.Duration
Timeout time.Duration
Logger *log.Logger
CheckID types.CheckID
ServiceID string
TCP string
Interval time.Duration
Timeout time.Duration
Logger *log.Logger
StatusHandler *StatusHandler

dialer *net.Dialer
stop bool
Expand Down Expand Up @@ -549,20 +544,19 @@ func (c *CheckTCP) check() {
conn, err := c.dialer.Dial(`tcp`, c.TCP)
if err != nil {
c.Logger.Printf("[WARN] agent: Check %q socket connection failed: %s", c.CheckID, err)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}
conn.Close()
c.Logger.Printf("[DEBUG] agent: Check %q is passing", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP))
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP))
}

// CheckDocker is used to periodically invoke a script to
// determine the health of an application running inside a
// Docker Container. We assume that the script is compatible
// with nagios plugins and expects the output in the same format.
// Supports failures_before_critical and success_before_passing.
type CheckDocker struct {
Notify CheckNotifier
CheckID types.CheckID
ServiceID string
Script string
Expand All @@ -572,6 +566,7 @@ type CheckDocker struct {
Interval time.Duration
Logger *log.Logger
Client *DockerClient
StatusHandler *StatusHandler

stop chan struct{}
}
Expand Down Expand Up @@ -633,12 +628,7 @@ func (c *CheckDocker) check() {
}
c.Logger.Printf("[TRACE] agent: Check %q output: %s", c.CheckID, out)
}

if status == api.HealthCritical {
c.Logger.Printf("[WARN] agent: Check %q is now critical", c.CheckID)
}

c.Notify.UpdateCheck(c.CheckID, status, out)
c.StatusHandler.updateCheck(c.CheckID, status, out)
}

func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) {
Expand Down Expand Up @@ -681,15 +671,16 @@ func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) {
// The check is passing if returned status is SERVING.
// The check is critical if connection fails or returned status is
// not SERVING.
// Supports failures_before_critical and success_before_passing.
type CheckGRPC struct {
Notify CheckNotifier
CheckID types.CheckID
ServiceID string
GRPC string
Interval time.Duration
Timeout time.Duration
TLSClientConfig *tls.Config
Logger *log.Logger
StatusHandler *StatusHandler

probe *GrpcHealthProbe
stop bool
Expand Down Expand Up @@ -747,11 +738,9 @@ func (c *CheckGRPC) check() {

err := c.probe.Check(target)
if err != nil {
c.Logger.Printf("[DEBUG] agent: Check %q failed: %s", c.CheckID, err.Error())
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
} else {
c.Logger.Printf("[DEBUG] agent: Check %q is passing", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("gRPC check %s: success", target))
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("gRPC check %s: success", target))
}
}

Expand All @@ -763,3 +752,50 @@ func (c *CheckGRPC) Stop() {
close(c.stopCh)
}
}

// StatusHandler keep tracks of successive error/success counts and ensures
// that status can be set to critical/passing only once the successive number of event
// reaches the given threshold.
type StatusHandler struct {
inner CheckNotifier
logger *log.Logger
successBeforePassing int
successCounter int
failuresBeforeCritical int
failuresCounter int
}

// NewStatusHandler set counters values to threshold in order to immediatly update status after first check.
func NewStatusHandler(inner CheckNotifier, logger *log.Logger, successBeforePassing, failuresBeforeCritical int) *StatusHandler {
return &StatusHandler{
logger: logger,
inner: inner,
successBeforePassing: successBeforePassing,
successCounter: successBeforePassing,
failuresBeforeCritical: failuresBeforeCritical,
failuresCounter: failuresBeforeCritical,
}
}

func (s *StatusHandler) updateCheck(checkID types.CheckID, status, output string) {

if status == api.HealthPassing || status == api.HealthWarning {
s.successCounter++
s.failuresCounter = 0
if s.successCounter >= s.successBeforePassing {
s.logger.Printf("[DEBUG] agent: Check %q is %q", checkID, status)
s.inner.UpdateCheck(checkID, status, output)
return
}
s.logger.Printf("[WARN] agent: Check %q was %q but has not reached success threshold %d/%d", checkID, status, s.successCounter, s.successBeforePassing)
} else {
s.failuresCounter++
s.successCounter = 0
if s.failuresCounter >= s.failuresBeforeCritical {
s.logger.Printf("[WARN] agent: Check %q is now critical", checkID)
s.inner.UpdateCheck(checkID, status, output)
return
}
s.logger.Printf("[WARN] agent: Check %q failed but has not reached failure threshold %d/%d", checkID, s.failuresCounter, s.failuresBeforeCritical)
}
}
Loading

0 comments on commit c40afbf

Please sign in to comment.