Skip to content

Commit

Permalink
Add TELEPORT_PLUGIN_FAIL_FAST environment variable
Browse files Browse the repository at this point in the history
  • Loading branch information
hugoShaka committed Aug 8, 2023
1 parent 6d98014 commit f56a732
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 20 deletions.
5 changes: 4 additions & 1 deletion integrations/access/common/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,17 @@ func (a *BaseApp) run(ctx context.Context) error {
if err := a.init(ctx); err != nil {
return trace.Wrap(err)
}
watcherJob := watcherjob.NewJob(
watcherJob, err := watcherjob.NewJob(
a.apiClient,
watcherjob.Config{
Watch: types.Watch{Kinds: []types.WatchKind{{Kind: types.KindAccessRequest}}},
EventFuncTimeout: handlerTimeout,
},
a.onWatcherEvent,
)
if err != nil {
return trace.Wrap(err)
}
a.SpawnCriticalJob(watcherJob)
ok, err := watcherJob.WaitReady(ctx)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion integrations/access/jira/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,17 @@ func (a *App) run(ctx context.Context) error {
}
}

watcherJob := watcherjob.NewJob(
watcherJob, err := watcherjob.NewJob(
a.teleport,
watcherjob.Config{
Watch: types.Watch{Kinds: []types.WatchKind{types.WatchKind{Kind: types.KindAccessRequest}}},
EventFuncTimeout: handlerTimeout,
},
a.onWatcherEvent,
)
if err != nil {
return trace.Wrap(err)
}
a.SpawnCriticalJob(watcherJob)
watcherOk, err := watcherJob.WaitReady(ctx)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion integrations/access/pagerduty/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,17 @@ func (a *App) run(ctx context.Context) error {
return trace.Wrap(err)
}

watcherJob := watcherjob.NewJob(
watcherJob, err := watcherjob.NewJob(
a.teleport,
watcherjob.Config{
Watch: types.Watch{Kinds: []types.WatchKind{types.WatchKind{Kind: types.KindAccessRequest}}},
EventFuncTimeout: handlerTimeout,
},
a.onWatcherEvent,
)
if err != nil {
return trace.Wrap(err)
}
a.SpawnCriticalJob(watcherJob)
ok, err := watcherJob.WaitReady(ctx)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion integrations/lib/watcherjob/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ func NewMockEventsProcess(ctx context.Context, t *testing.T, config Config, fn E
assert.NoError(t, process.Shutdown(ctx))
process.Close()
})
process.eventsJob = NewJobWithEvents(&process.Events, config, fn)
var err error
process.eventsJob, err = NewJobWithEvents(&process.Events, config, fn)
require.NoError(t, err)
process.SpawnCriticalJob(process.eventsJob)
require.NoError(t, process.Events.WaitSomeWatchers(ctx))
process.Events.Fire(types.Event{Type: types.OpInit})
Expand Down
42 changes: 26 additions & 16 deletions integrations/lib/watcherjob/watcherjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"errors"
"io"
"os"
"strconv"
"time"

"github.com/gravitational/trace"
Expand All @@ -30,15 +32,19 @@ import (
"github.com/gravitational/teleport/integrations/lib/logger"
)

const DefaultMaxConcurrency = 128
const DefaultEventFuncTimeout = time.Second * 5
const (
DefaultMaxConcurrency = 128
DefaultEventFuncTimeout = time.Second * 5
failFastEnvVarName = "TELEPORT_PLUGIN_FAIL_FAST"
)

type EventFunc func(context.Context, types.Event) error

type Config struct {
Watch types.Watch
MaxConcurrency int
EventFuncTimeout time.Duration
FailFast bool
}

type job struct {
Expand All @@ -54,17 +60,24 @@ type eventKey struct {
name string
}

func NewJob(client teleport.Client, config Config, fn EventFunc) lib.ServiceJob {
func NewJob(client teleport.Client, config Config, fn EventFunc) (lib.ServiceJob, error) {
return NewJobWithEvents(client, config, fn)
}

func NewJobWithEvents(events types.Events, config Config, fn EventFunc) lib.ServiceJob {
func NewJobWithEvents(events types.Events, config Config, fn EventFunc) (lib.ServiceJob, error) {
if config.MaxConcurrency == 0 {
config.MaxConcurrency = DefaultMaxConcurrency
}
if config.EventFuncTimeout == 0 {
config.EventFuncTimeout = DefaultEventFuncTimeout
}
if flagVar := os.Getenv(failFastEnvVarName); !config.FailFast && flagVar != "" {
flag, err := strconv.ParseBool(flagVar)
if err != nil {
return nil, trace.WrapWithMessage(err, "failed to parse content '%s' of the %s environment variable", flagVar, failFastEnvVarName)
}
config.FailFast = flag
}
job := job{
events: events,
config: config,
Expand Down Expand Up @@ -95,18 +108,15 @@ func NewJobWithEvents(events types.Events, config Config, fn EventFunc) lib.Serv

switch {
case trace.IsConnectionProblem(err):
<<<<<<< HEAD
log.WithError(err).Error("Failed to connect to Teleport Auth server. Reconnecting...")
if config.FailFast {
return trace.WrapWithMessage(err, "Connection problem detected. Exiting as fail fast is on.")
}
log.WithError(err).Error("Connection problem detected. Attempting to reconnect.")
case errors.Is(err, io.EOF):
=======
// Not all connection problems can be retried. The client can
// end up in a broken state and won't be able to connect.
// Exiting in error is noisier but allows the orchestrator to
// know something is not right.
return trace.WrapWithMessage(err, "Failed to connect to Teleport server. Exiting.")
case trace.IsEOF(err):
>>>>>>> 5965bbe145 (integrations/access: avoid infinite retry on broken connection)
log.WithError(err).Error("Watcher stream closed. Reconnecting...")
if config.FailFast {
return trace.WrapWithMessage(err, "Watcher stream closed. Exiting as fail fast is on.")
}
log.WithError(err).Error("Watcher stream closed. Attempting to reconnect.")
case lib.IsCanceled(err):
log.Debug("Watcher context is canceled")
// Context cancellation is not an error
Expand All @@ -123,7 +133,7 @@ func NewJobWithEvents(events types.Events, config Config, fn EventFunc) lib.Serv
}
}
})
return job
return job, nil
}

// watchEvents spawns a watcher and reads events from it.
Expand Down

0 comments on commit f56a732

Please sign in to comment.