Skip to content

Commit

Permalink
icinga2: Rework catch-up-worker processing
Browse files Browse the repository at this point in the history
  • Loading branch information
oxzi committed Mar 21, 2024
1 parent aaae894 commit 786d287
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 33 deletions.
81 changes: 52 additions & 29 deletions internal/icinga2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package icinga2
import (
"context"
"errors"
"github.com/google/uuid"
"github.com/icinga/icinga-notifications/internal/event"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"math"
"net/http"
"net/url"
"time"
Expand Down Expand Up @@ -200,10 +200,18 @@ func (client *Client) buildAcknowledgementEvent(ctx context.Context, host, servi
//
// The startup time might be delayed through the parameter. This lets the goroutines sleep to rate-limit reconnection
// attempts during network hiccups.
//
// To distinguish different catch-up-phase workers - for example, when one worker was canceled by its context and
// another one was just started -, all log their debug messages with a UUID.
func (client *Client) startCatchupWorkers(delay time.Duration) (chan *catchupEventMsg, context.CancelFunc) {
workerId := uuid.New()
startTime := time.Now()
catchupEventCh := make(chan *catchupEventMsg)

client.Logger.Debugw("Catch-up-phase worker has started",
zap.Stringer("worker", workerId),
zap.Duration("delay", delay))

// Unfortunately, the errgroup context is hidden, that's why another context is necessary.
ctx, cancel := context.WithCancel(client.Ctx)
group, groupCtx := errgroup.WithContext(ctx)
Expand All @@ -219,8 +227,11 @@ func (client *Client) startCatchupWorkers(delay time.Duration) (chan *catchupEve
}

err := client.checkMissedChanges(groupCtx, objType, catchupEventCh)
if err != nil {
client.Logger.Errorw("Catch-up-phase event worker failed", zap.String("object type", objType), zap.Error(err))
if err != nil && !errors.Is(err, context.Canceled) {
client.Logger.Debugw("Catch-up-phase event worker failed",
zap.Stringer("worker", workerId),
zap.String("object type", objType),
zap.Error(err))
}
return err
})
Expand All @@ -229,14 +240,21 @@ func (client *Client) startCatchupWorkers(delay time.Duration) (chan *catchupEve
go func() {
err := group.Wait()
if err == nil {
client.Logger.Infow("Catching up the API has finished", zap.Duration("duration", time.Since(startTime)))
client.Logger.Debugw("Catching up the API has finished",
zap.Stringer("worker", workerId),
zap.Duration("duration", time.Since(startTime)))
} else if errors.Is(err, context.Canceled) {
// The context is either canceled when the Client got canceled or, more likely, when another catchup-worker
// The context is either canceled when the Client got canceled or, more likely, when another catch-up-worker
// was requested. In the first case, the already sent messages will be discarded as the worker's main loop
// was left. In the other case, the message buffers will be reset to an empty state.
client.Logger.Warnw("Catching up the API was interrupted", zap.Duration("duration", time.Since(startTime)))
client.Logger.Debugw("Catching up the API was interrupted",
zap.Stringer("worker", workerId),
zap.Duration("duration", time.Since(startTime)))
} else {
client.Logger.Errorw("Catching up the API failed", zap.Error(err), zap.Duration("duration", time.Since(startTime)))
client.Logger.Debugw("Catching up the API failed",
zap.Stringer("worker", workerId),
zap.Error(err),
zap.Duration("duration", time.Since(startTime)))

select {
case <-ctx.Done():
Expand Down Expand Up @@ -271,9 +289,8 @@ func (client *Client) worker() {
// catchupCache maps event.Events.Name to API time to skip replaying outdated events.
catchupCache = make(map[string]time.Time)

// catchupFailCounter indicates how many prior catch-up-phase attempts have failed. It will be used to
// rate limit catch-up-phase restarts.
catchupFailCounter int
// catchupWorkerDelay slows down future catch-up-phase workers if prior attempts have failed.
catchupWorkerDelay time.Duration
)

// catchupReset resets all catchup variables to their initial empty state.
Expand All @@ -291,22 +308,26 @@ func (client *Client) worker() {
}
}

// catchupWorkerStart starts a catch-up-phase worker and stops already running workers, if necessary.
catchupWorkerStart := func() {
if catchupEventCh != nil {
client.Logger.Debug("Switching to catch-up-phase was requested while still catching up, stopping old worker")
catchupCancel()
}

client.Logger.Info("Worker enters catch-up-phase, start caching up on Event Stream events")
catchupReset()
catchupEventCh, catchupCancel = client.startCatchupWorkers(catchupWorkerDelay)
}

for {
select {
case <-client.Ctx.Done():
client.Logger.Warnw("Closing down main worker as context is finished", zap.Error(client.Ctx.Err()))
return

case <-client.catchupPhaseRequest:
if catchupEventCh != nil {
client.Logger.Warn("Switching to catch-up-phase was requested while already catching up, restarting phase")
catchupCancel()
}

client.Logger.Info("Worker enters catch-up-phase, start caching up on Event Stream events")
catchupReset()
catchupEventCh, catchupCancel = client.startCatchupWorkers(
min(3*time.Minute, time.Duration(math.Exp2(float64(catchupFailCounter))-1)*time.Second))
catchupWorkerStart()

case catchupMsg, ok := <-catchupEventCh:
// Process an incoming event
Expand All @@ -318,15 +339,17 @@ func (client *Client) worker() {

// Abort and restart the catch-up-phase when receiving an error.
if ok && catchupMsg.error != nil {
client.Logger.Warnw("Worker leaves catch-up-phase with an error, another attempt will be made", zap.Error(catchupMsg.error))
go func() {
select {
case <-client.Ctx.Done():
case client.catchupPhaseRequest <- struct{}{}:
}
}()
catchupReset()
catchupFailCounter++
if catchupWorkerDelay == 0 {
catchupWorkerDelay = time.Second
} else {
catchupWorkerDelay = min(3*time.Minute, 2*catchupWorkerDelay)
}

client.Logger.Warnw("Catch-up-phase was interrupted by an error, another attempt will be made",
zap.Error(catchupMsg.error),
zap.Duration("delay", catchupWorkerDelay))

catchupWorkerStart()
break
}

Expand All @@ -351,7 +374,7 @@ func (client *Client) worker() {

client.Logger.Info("Worker leaves catch-up-phase, returning to normal operation")
catchupReset()
catchupFailCounter = 0
catchupWorkerDelay = 0

case ev := <-client.eventDispatcherEventStream:
// During catch-up-phase, buffer Event Stream events
Expand Down
9 changes: 5 additions & 4 deletions internal/icinga2/client_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/icinga/icinga-notifications/internal/event"
"go.uber.org/zap"
"io"
"math"
"net/http"
"net/url"
"slices"
Expand Down Expand Up @@ -293,7 +292,7 @@ func (client *Client) connectEventStream(esTypes []string) (io.ReadCloser, error
return nil, err
}

for i := 0; ; i++ {
for retryDelay := time.Second; ; retryDelay = min(3*time.Minute, 2*retryDelay) {
// Always ensure an unique queue name to mitigate possible naming conflicts.
queueNameRndBuff := make([]byte, 16)
_, _ = rand.Read(queueNameRndBuff)
Expand Down Expand Up @@ -330,7 +329,9 @@ func (client *Client) connectEventStream(esTypes []string) (io.ReadCloser, error
httpClient := &http.Client{Transport: &client.ApiHttpTransport}
res, err := httpClient.Do(req)
if err != nil {
client.Logger.Warnw("Establishing an Event Stream API connection failed, will be retried", zap.Error(err))
client.Logger.Warnw("Establishing an Event Stream API connection failed, will be retried",
zap.Error(err),
zap.Duration("delay", retryDelay))
return
}

Expand Down Expand Up @@ -361,7 +362,7 @@ func (client *Client) connectEventStream(esTypes []string) (io.ReadCloser, error
// Rate limit API reconnections: slow down for successive failed attempts but limit to three minutes.
// 1s, 2s, 4s, 8s, 16s, 32s, 1m4s, 2m8s, 3m, 3m, 3m, ...
select {
case <-time.After(min(3*time.Minute, time.Duration(math.Exp2(float64(i)))*time.Second)):
case <-time.After(retryDelay):
case <-client.Ctx.Done():
return nil, client.Ctx.Err()
}
Expand Down

0 comments on commit 786d287

Please sign in to comment.