Skip to content

Commit

Permalink
icinga2: restart catch-up-phase on error
Browse files Browse the repository at this point in the history
The catch-up-phase logic was extended to also propagate back an error
if the state was left unsuccessfully. In this case - and not if another
catch-up-phase was requested and the old phase worker got canceled -
another attempt will be made.
  • Loading branch information
oxzi committed Jan 12, 2024
1 parent f66f726 commit d6c4d36
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 26 deletions.
95 changes: 72 additions & 23 deletions internal/icinga2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package icinga2

import (
"context"
"errors"
"github.com/icinga/icinga-notifications/internal/event"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand All @@ -18,6 +19,15 @@ type eventMsg struct {
apiTime time.Time
}

// catchupEventMsg propagates either an eventMsg or an error back from the catch-up worker.
//
// The type must be used as a sum-type like data structure holding either an eventMsg pointer or an error. The error
// should have a higher precedence than the eventMsg.
type catchupEventMsg struct {
*eventMsg
error
}

// Client for the Icinga 2 Event Stream API with support for other Icinga 2 APIs to gather additional information and
// perform a catch-up of unknown events either when starting up to or in case of a connection loss.
//
Expand Down Expand Up @@ -181,11 +191,14 @@ func (client *Client) buildAcknowledgementEvent(ctx context.Context, host, servi
// startCatchupWorkers launches goroutines for catching up the Icinga 2 API state.
//
// Each event will be sent to the returned channel. When all launched workers have finished - either because all are
// done or one has failed and the others were interrupted -, the channel will be closed. Those workers honor a context
// derived from the Client.Ctx and would either stop when this context is done or when the context.CancelFunc is called.
func (client *Client) startCatchupWorkers() (chan *eventMsg, context.CancelFunc) {
// done or one has failed and the others were interrupted -, the channel will be closed. In case of a failure, _one_
// final error will be sent back.
//
// Those workers honor a context derived from the Client.Ctx and would either stop when this context is done or when the
// context.CancelFunc is called.
func (client *Client) startCatchupWorkers() (chan *catchupEventMsg, context.CancelFunc) {
startTime := time.Now()
eventMsgCh := make(chan *eventMsg)
catchupEventCh := make(chan *catchupEventMsg)

// Unfortunately, the errgroup context is hidden, that's why another context is necessary.
ctx, cancel := context.WithCancel(client.Ctx)
Expand All @@ -195,7 +208,7 @@ func (client *Client) startCatchupWorkers() (chan *eventMsg, context.CancelFunc)
for _, objType := range objTypes {
objType := objType // https://go.dev/doc/faq#closures_and_goroutines
group.Go(func() error {
err := client.checkMissedChanges(groupCtx, objType, eventMsgCh)
err := client.checkMissedChanges(groupCtx, objType, catchupEventCh)
if err != nil {
client.Logger.Errorw("Catch-up-phase event worker failed", zap.String("object type", objType), zap.Error(err))
}
Expand All @@ -205,17 +218,27 @@ func (client *Client) startCatchupWorkers() (chan *eventMsg, context.CancelFunc)

go func() {
err := group.Wait()
if err != nil {
client.Logger.Errorw("Catching up the API failed", zap.Error(err), zap.Duration("duration", time.Since(startTime)))
} else {
if err == nil {
client.Logger.Infow("Catching up the API has finished", zap.Duration("duration", time.Since(startTime)))
} else if errors.Is(err, context.Canceled) {
// The context is either canceled when the Client got canceled or, more likely, when another catchup-worker
// was requested. In the first case, the already sent messages will be discarded as the worker's main loop
// was left. In the other case, the message buffers will be reset to an empty state.
client.Logger.Warnw("Catching up the API was interrupted", zap.Duration("duration", time.Since(startTime)))
} else {
client.Logger.Errorw("Catching up the API failed", zap.Error(err), zap.Duration("duration", time.Since(startTime)))

select {
case <-ctx.Done():
case catchupEventCh <- &catchupEventMsg{error: err}:
}
}

cancel()
close(eventMsgCh)
close(catchupEventCh)
}()

return eventMsgCh, cancel
return catchupEventCh, cancel
}

// worker is the Client's main background worker, taking care of event.Event dispatching and mode switching.
Expand All @@ -226,19 +249,30 @@ func (client *Client) startCatchupWorkers() (chan *eventMsg, context.CancelFunc)
// Communication takes place over the eventDispatcherEventStream and catchupPhaseRequest channels.
func (client *Client) worker() {
var (
// catchupEventCh emits events generated during the catch-up-phase from catch-up-workers. It will be closed when
// catching up is done, which indicates the select below to switch phases. When this variable is nil, this
// Client is in the normal operating phase.
catchupEventCh chan *eventMsg
// catchupEventCh either emits events generated during the catch-up-phase from catch-up-workers or one final
// error if something went wrong. It will be closed when catching up is done, which indicates the select below
// to switch phases. When this variable is nil, this Client is in the normal operating phase.
catchupEventCh chan *catchupEventMsg
// catchupCancel cancels, if not nil, all running catch-up-workers, e.g., when restarting catching-up.
catchupCancel context.CancelFunc

// catchupBuffer holds Event Stream events to be replayed after the catch-up-phase has finished.
catchupBuffer = make([]*event.Event, 0)
// catchupCache maps event.Events.Name to API time to skip replaying outdated events.
catchupCache = make(map[string]time.Time)

// catchupErr might hold an error received from catchupEventCh, indicating another catch-up-phase run.
catchupErr error
)

// catchupReset resets all catchup variables to their initial empty state.
catchupReset := func() {
catchupEventCh, catchupCancel = nil, nil
catchupBuffer = make([]*event.Event, 0)
catchupCache = make(map[string]time.Time)
catchupErr = nil
}

// catchupCacheUpdate updates the catchupCache if this eventMsg seems to be the latest of its kind.
catchupCacheUpdate := func(ev *eventMsg) {
ts, ok := catchupCache[ev.event.Name]
Expand All @@ -258,21 +292,28 @@ func (client *Client) worker() {
client.Logger.Warn("Switching to catch-up-phase was requested while already catching up, restarting phase")

// Drain the old catch-up-phase producer channel until it is closed as its context will be canceled.
go func(catchupEventCh chan *eventMsg) {
go func(catchupEventCh chan *catchupEventMsg) {
for _, ok := <-catchupEventCh; ok; {
}
}(catchupEventCh)
catchupCancel()
}

client.Logger.Info("Worker enters catch-up-phase, start caching up on Event Stream events")
catchupReset()
catchupEventCh, catchupCancel = client.startCatchupWorkers()

case ev, ok := <-catchupEventCh:
case catchupMsg, ok := <-catchupEventCh:
// Process an incoming event
if ok {
client.CallbackFn(ev.event)
catchupCacheUpdate(ev)
if ok && catchupMsg.error == nil {
client.CallbackFn(catchupMsg.eventMsg.event)
catchupCacheUpdate(catchupMsg.eventMsg)
break
}

// Store an incoming error as the catchupErr to be processed below
if ok && catchupMsg.error != nil {
catchupErr = catchupMsg.error
break
}

Expand All @@ -295,11 +336,19 @@ func (client *Client) worker() {
break
}

client.Logger.Info("Worker leaves catch-up-phase, returning to normal operation")
if catchupErr != nil {
client.Logger.Warnw("Worker leaves catch-up-phase with an error, another attempt will be made", zap.Error(catchupErr))
go func() {
select {
case <-client.Ctx.Done():
case client.catchupPhaseRequest <- struct{}{}:
}
}()
} else {
client.Logger.Info("Worker leaves catch-up-phase, returning to normal operation")
}

catchupEventCh, catchupCancel = nil, nil
catchupBuffer = make([]*event.Event, 0)
catchupCache = make(map[string]time.Time)
catchupReset()

case ev := <-client.eventDispatcherEventStream:
// During catch-up-phase, buffer Event Stream events
Expand Down
6 changes: 3 additions & 3 deletions internal/icinga2/client_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (client *Client) fetchAcknowledgementComment(ctx context.Context, host, ser
//
// If the object's acknowledgement field is non-zero, an Acknowledgement Event will be constructed following the Host or
// Service object. Each event will be delivered to the channel.
func (client *Client) checkMissedChanges(ctx context.Context, objType string, eventCh chan *eventMsg) error {
func (client *Client) checkMissedChanges(ctx context.Context, objType string, catchupEventCh chan *catchupEventMsg) error {
jsonRaw, err := client.queryObjectsApiDirect(ctx, objType, "")
if err != nil {
return err
Expand Down Expand Up @@ -236,7 +236,7 @@ func (client *Client) checkMissedChanges(ctx context.Context, objType string, ev
select {
case <-ctx.Done():
return ctx.Err()
case eventCh <- &eventMsg{ev, objQueriesResult.Attrs.LastStateChange.Time()}:
case catchupEventCh <- &catchupEventMsg{eventMsg: &eventMsg{ev, objQueriesResult.Attrs.LastStateChange.Time()}}:
stateChangeEvents++
}

Expand All @@ -263,7 +263,7 @@ func (client *Client) checkMissedChanges(ctx context.Context, objType string, ev
select {
case <-ctx.Done():
return ctx.Err()
case eventCh <- &eventMsg{ev, objQueriesResult.Attrs.LastStateChange.Time()}:
case catchupEventCh <- &catchupEventMsg{eventMsg: &eventMsg{ev, objQueriesResult.Attrs.LastStateChange.Time()}}:
acknowledgementEvents++
}
}
Expand Down

0 comments on commit d6c4d36

Please sign in to comment.