Skip to content

Commit

Permalink
reverseproxy: configurable active health_passes and health_fails (#6154)
Browse files Browse the repository at this point in the history
* reverseproxy: active health check allows configurable health_passes and health_fails

* Need to reset counters after recovery

* rename methods to be more clear that these are coming from active health checks

* do not export methods
  • Loading branch information
ottenhoff committed Mar 20, 2024
1 parent a9768d2 commit e65b97f
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 9 deletions.
2 changes: 2 additions & 0 deletions caddytest/integration/reverseproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ func TestReverseProxyHealthCheck(t *testing.T) {
health_port 2021
health_interval 10ms
health_timeout 100ms
health_passes 1
health_fails 1
}
}
`, "caddyfile")
Expand Down
34 changes: 34 additions & 0 deletions modules/caddyhttp/reverseproxy/caddyfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error)
// health_uri <uri>
// health_port <port>
// health_interval <interval>
// health_passes <num>
// health_fails <num>
// health_timeout <duration>
// health_status <status>
// health_body <regexp>
Expand Down Expand Up @@ -447,6 +449,38 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
}
h.HealthChecks.Active.ExpectBody = d.Val()

case "health_passes":
if !d.NextArg() {
return d.ArgErr()
}
if h.HealthChecks == nil {
h.HealthChecks = new(HealthChecks)
}
if h.HealthChecks.Active == nil {
h.HealthChecks.Active = new(ActiveHealthChecks)
}
passes, err := strconv.Atoi(d.Val())
if err != nil {
return d.Errf("invalid passes count '%s': %v", d.Val(), err)
}
h.HealthChecks.Active.Passes = passes

case "health_fails":
if !d.NextArg() {
return d.ArgErr()
}
if h.HealthChecks == nil {
h.HealthChecks = new(HealthChecks)
}
if h.HealthChecks.Active == nil {
h.HealthChecks.Active = new(ActiveHealthChecks)
}
fails, err := strconv.Atoi(d.Val())
if err != nil {
return d.Errf("invalid fails count '%s': %v", d.Val(), err)
}
h.HealthChecks.Active.Fails = fails

case "max_fails":
if !d.NextArg() {
return d.ArgErr()
Expand Down
56 changes: 49 additions & 7 deletions modules/caddyhttp/reverseproxy/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ type ActiveHealthChecks struct {
// considering it unhealthy (default 5s).
Timeout caddy.Duration `json:"timeout,omitempty"`

// Number of consecutive health check passes before marking
// a previously unhealthy backend as healthy again (default 1).
Passes int `json:"passes,omitempty"`

// Number of consecutive health check failures before marking
// a previously healthy backend as unhealthy (default 1).
Fails int `json:"fails,omitempty"`

// The maximum response body to download from the backend
// during a health check.
MaxSize int64 `json:"max_size,omitempty"`
Expand Down Expand Up @@ -167,6 +175,14 @@ func (a *ActiveHealthChecks) Provision(ctx caddy.Context, h *Handler) error {
}
}

if a.Passes < 1 {
a.Passes = 1
}

if a.Fails < 1 {
a.Fails = 1
}

return nil
}

Expand Down Expand Up @@ -373,9 +389,37 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstre
}

markUnhealthy := func() {
// dispatch an event that the host newly became unhealthy
if upstream.setHealthy(false) {
h.events.Emit(h.ctx, "unhealthy", map[string]any{"host": hostAddr})
// increment failures and then check if it has reached the threshold to mark unhealthy
err := upstream.Host.countHealthFail(1)
if err != nil {
h.HealthChecks.Active.logger.Error("could not count active health failure",
zap.String("host", upstream.Dial),
zap.Error(err))
return
}
if upstream.Host.activeHealthFails() >= h.HealthChecks.Active.Fails {
// dispatch an event that the host newly became unhealthy
if upstream.setHealthy(false) {
h.events.Emit(h.ctx, "unhealthy", map[string]any{"host": hostAddr})
upstream.Host.resetHealth()
}
}
}

markHealthy := func() {
// increment passes and then check if it has reached the threshold to be healthy
err := upstream.Host.countHealthPass(1)
if err != nil {
h.HealthChecks.Active.logger.Error("could not count active health pass",
zap.String("host", upstream.Dial),
zap.Error(err))
return
}
if upstream.Host.activeHealthPasses() >= h.HealthChecks.Active.Passes {
if upstream.setHealthy(true) {
h.events.Emit(h.ctx, "healthy", map[string]any{"host": hostAddr})
upstream.Host.resetHealth()
}
}
}

Expand Down Expand Up @@ -439,10 +483,8 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstre
}

// passed health check parameters, so mark as healthy
if upstream.setHealthy(true) {
h.HealthChecks.Active.logger.Info("host is up", zap.String("host", hostAddr))
h.events.Emit(h.ctx, "healthy", map[string]any{"host": hostAddr})
}
h.HealthChecks.Active.logger.Info("host is up", zap.String("host", hostAddr))
markHealthy()

return nil
}
Expand Down
42 changes: 40 additions & 2 deletions modules/caddyhttp/reverseproxy/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ func (u *Upstream) fillHost() {
// Host is the basic, in-memory representation of the state of a remote host.
// Its fields are accessed atomically and Host values must not be copied.
type Host struct {
numRequests int64 // must be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
fails int64
numRequests int64 // must be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
fails int64
activePasses int64
activeFails int64
}

// NumRequests returns the number of active requests to the upstream.
Expand All @@ -150,6 +152,16 @@ func (h *Host) Fails() int {
return int(atomic.LoadInt64(&h.fails))
}

// activeHealthPasses returns the number of consecutive active health check passes with the upstream.
func (h *Host) activeHealthPasses() int {
return int(atomic.LoadInt64(&h.activePasses))
}

// activeHealthFails returns the number of consecutive active health check failures with the upstream.
func (h *Host) activeHealthFails() int {
return int(atomic.LoadInt64(&h.activeFails))
}

// countRequest mutates the active request count by
// delta. It returns an error if the adjustment fails.
func (h *Host) countRequest(delta int) error {
Expand All @@ -170,6 +182,32 @@ func (h *Host) countFail(delta int) error {
return nil
}

// countHealthPass mutates the recent passes count by
// delta. It returns an error if the adjustment fails.
func (h *Host) countHealthPass(delta int) error {
result := atomic.AddInt64(&h.activePasses, int64(delta))
if result < 0 {
return fmt.Errorf("count below 0: %d", result)
}
return nil
}

// countHealthFail mutates the recent failures count by
// delta. It returns an error if the adjustment fails.
func (h *Host) countHealthFail(delta int) error {
result := atomic.AddInt64(&h.activeFails, int64(delta))
if result < 0 {
return fmt.Errorf("count below 0: %d", result)
}
return nil
}

// resetHealth resets the health check counters.
func (h *Host) resetHealth() {
atomic.StoreInt64(&h.activePasses, 0)
atomic.StoreInt64(&h.activeFails, 0)
}

// healthy returns true if the upstream is not actively marked as unhealthy.
// (This returns the status only from the "active" health checks.)
func (u *Upstream) healthy() bool {
Expand Down

0 comments on commit e65b97f

Please sign in to comment.