diff --git a/docs/pages/references/configuration.mdx b/docs/pages/references/configuration.mdx index f9d98cee..70fcd01e 100644 --- a/docs/pages/references/configuration.mdx +++ b/docs/pages/references/configuration.mdx @@ -72,7 +72,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM | `LOG_LEVEL` | Defines the verbosity of application logs. Common values: 'trace', 'debug', 'info', 'warn', 'error'. | `info` | No | | `LOG_MAX_CONCURRENCY` | Maximum number of log writing operations to process concurrently. | `1` | No | | `MAX_DESTINATIONS_PER_TENANT` | Maximum number of destinations allowed per tenant/organization. | `20` | No | -| `MAX_RETRY_LIMIT` | Maximum number of retry attempts for a single event delivery before giving up. | `10` | No | +| `MAX_RETRY_LIMIT` | Maximum number of retry attempts for a single event delivery before giving up. Ignored if retry_schedule is provided. | `10` | No | | `ORGANIZATION_NAME` | Name of the organization, used for display purposes and potentially in user agent strings. | `nil` | No | | `OTEL_EXPORTER` | Specifies the OTLP exporter to use for this telemetry type (e.g., 'otlp'). Typically used with environment variables like OTEL_EXPORTER_OTLP_TRACES_ENDPOINT. | `nil` | Conditional | | `OTEL_PROTOCOL` | Specifies the OTLP protocol ('grpc' or 'http') for this telemetry type. Typically used with environment variables like OTEL_EXPORTER_OTLP_TRACES_PROTOCOL. | `nil` | Conditional | @@ -114,7 +114,8 @@ Global configurations are provided through env variables or a YAML file. ConfigM | `REDIS_PASSWORD` | Password for Redis authentication, if required by the server. | `nil` | Yes | | `REDIS_PORT` | Port number for the Redis server. | `6379` | Yes | | `REDIS_TLS_ENABLED` | Enable TLS encryption for Redis connection. | `false` | No | -| `RETRY_INTERVAL_SECONDS` | Interval in seconds between delivery retry attempts for failed webhooks. | `30` | No | +| `RETRY_INTERVAL_SECONDS` | Interval in seconds for exponential backoff retry strategy (base 2). Ignored if retry_schedule is provided. | `30` | No | +| `RETRY_SCHEDULE` | Comma-separated list of retry delays in seconds. If provided, overrides retry_interval_seconds and retry_max_limit. Schedule length defines the max number of retries. Example: '5,60,600,3600,7200' for 5 retries at 5s, 1m, 10m, 1h, 2h. | `[]` | No | | `SERVICE` | Specifies the service type to run. Valid values: 'api', 'log', 'delivery', or empty/all for singular mode (runs all services). | `nil` | No | | `TELEMETRY_BATCH_INTERVAL` | Maximum time in seconds to wait before sending a batch of telemetry events if batch size is not reached. | `5` | No | | `TELEMETRY_BATCH_SIZE` | Maximum number of telemetry events to batch before sending. | `100` | No | @@ -526,12 +527,15 @@ redis: tls_enabled: false -# Interval in seconds between delivery retry attempts for failed webhooks. +# Interval in seconds for exponential backoff retry strategy (base 2). Ignored if retry_schedule is provided. retry_interval_seconds: 30 -# Maximum number of retry attempts for a single event delivery before giving up. +# Maximum number of retry attempts for a single event delivery before giving up. Ignored if retry_schedule is provided. retry_max_limit: 10 +# Comma-separated list of retry delays in seconds. If provided, overrides retry_interval_seconds and retry_max_limit. Schedule length defines the max number of retries. Example: '5,60,600,3600,7200' for 5 retries at 5s, 1m, 10m, 1h, 2h. +retry_schedule: [] + # Specifies the service type to run. Valid values: 'api', 'log', 'delivery', or empty/all for singular mode (runs all services). service: "" diff --git a/internal/backoff/backoff.go b/internal/backoff/backoff.go index a99d82d5..7f1d62b0 100644 --- a/internal/backoff/backoff.go +++ b/internal/backoff/backoff.go @@ -4,7 +4,7 @@ import "time" type Backoff interface { // Duration returns the duration to wait before retrying the operation. - // Duration accepts the numeber of times the operation has been retried. + // Duration accepts the number of times the operation has been retried. // If the operation has never been retried, the number should be 0. Duration(int) time.Duration } @@ -45,3 +45,22 @@ var _ Backoff = &ConstantBackoff{} func (b *ConstantBackoff) Duration(retries int) time.Duration { return b.Interval } + +// ScheduledBackoff uses a predefined schedule of delays for each retry attempt. +// If the retry attempt exceeds the schedule length, it returns the last value. +type ScheduledBackoff struct { + Schedule []time.Duration +} + +var _ Backoff = &ScheduledBackoff{} + +func (b *ScheduledBackoff) Duration(retries int) time.Duration { + if len(b.Schedule) == 0 { + return 0 + } + if retries >= len(b.Schedule) { + // Return last value for attempts beyond schedule + return b.Schedule[len(b.Schedule)-1] + } + return b.Schedule[retries] +} diff --git a/internal/backoff/backoff_test.go b/internal/backoff/backoff_test.go index b827aba6..099eb0bf 100644 --- a/internal/backoff/backoff_test.go +++ b/internal/backoff/backoff_test.go @@ -87,3 +87,53 @@ func TestBackoff_Constant(t *testing.T) { } testBackoff(t, "ConstantBackoff{Interval:30*time.Second}", bo, testCases) } + +func TestBackoff_Scheduled(t *testing.T) { + t.Parallel() + + t.Run("CustomSchedule", func(t *testing.T) { + bo := &backoff.ScheduledBackoff{ + Schedule: []time.Duration{ + 5 * time.Second, + 1 * time.Minute, + 10 * time.Minute, + 1 * time.Hour, + 2 * time.Hour, + }, + } + testCases := []testCase{ + {0, 5 * time.Second}, + {1, 1 * time.Minute}, + {2, 10 * time.Minute}, + {3, 1 * time.Hour}, + {4, 2 * time.Hour}, + {5, 2 * time.Hour}, // Beyond schedule, returns last value + {10, 2 * time.Hour}, // Beyond schedule, returns last value + } + testBackoff(t, "ScheduledBackoff{Custom}", bo, testCases) + }) + + t.Run("EmptySchedule", func(t *testing.T) { + bo := &backoff.ScheduledBackoff{ + Schedule: []time.Duration{}, + } + testCases := []testCase{ + {0, 0}, + {1, 0}, + {5, 0}, + } + testBackoff(t, "ScheduledBackoff{Empty}", bo, testCases) + }) + + t.Run("SingleElement", func(t *testing.T) { + bo := &backoff.ScheduledBackoff{ + Schedule: []time.Duration{1 * time.Minute}, + } + testCases := []testCase{ + {0, 1 * time.Minute}, + {1, 1 * time.Minute}, // Beyond schedule, returns last value + {5, 1 * time.Minute}, // Beyond schedule, returns last value + } + testBackoff(t, "ScheduledBackoff{Single}", bo, testCases) + }) +} diff --git a/internal/config/config.go b/internal/config/config.go index e82b9142..7c4114d6 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -5,8 +5,10 @@ import ( "errors" "fmt" "strings" + "time" "github.com/caarlos0/env/v9" + "github.com/hookdeck/outpost/internal/backoff" "github.com/hookdeck/outpost/internal/migrator" "github.com/hookdeck/outpost/internal/redis" "github.com/hookdeck/outpost/internal/telemetry" @@ -73,8 +75,9 @@ type Config struct { LogMaxConcurrency int `yaml:"log_max_concurrency" env:"LOG_MAX_CONCURRENCY" desc:"Maximum number of log writing operations to process concurrently." required:"N"` // Delivery Retry - RetryIntervalSeconds int `yaml:"retry_interval_seconds" env:"RETRY_INTERVAL_SECONDS" desc:"Interval in seconds between delivery retry attempts for failed webhooks." required:"N"` - RetryMaxLimit int `yaml:"retry_max_limit" env:"MAX_RETRY_LIMIT" desc:"Maximum number of retry attempts for a single event delivery before giving up." required:"N"` + RetrySchedule []int `yaml:"retry_schedule" env:"RETRY_SCHEDULE" envSeparator:"," desc:"Comma-separated list of retry delays in seconds. If provided, overrides retry_interval_seconds and retry_max_limit. Schedule length defines the max number of retries. Example: '5,60,600,3600,7200' for 5 retries at 5s, 1m, 10m, 1h, 2h." required:"N"` + RetryIntervalSeconds int `yaml:"retry_interval_seconds" env:"RETRY_INTERVAL_SECONDS" desc:"Interval in seconds for exponential backoff retry strategy (base 2). Ignored if retry_schedule is provided." required:"N"` + RetryMaxLimit int `yaml:"retry_max_limit" env:"MAX_RETRY_LIMIT" desc:"Maximum number of retry attempts for a single event delivery before giving up. Ignored if retry_schedule is provided." required:"N"` // Event Delivery MaxDestinationsPerTenant int `yaml:"max_destinations_per_tenant" env:"MAX_DESTINATIONS_PER_TENANT" desc:"Maximum number of destinations allowed per tenant/organization." required:"N"` @@ -100,14 +103,14 @@ type Config struct { } var ( - ErrMismatchedServiceType = errors.New("config validation error: service type mismatch") - ErrInvalidServiceType = errors.New("config validation error: invalid service type") - ErrMissingRedis = errors.New("config validation error: redis configuration is required") - ErrMissingLogStorage = errors.New("config validation error: log storage must be provided") - ErrMissingMQs = errors.New("config validation error: message queue configuration is required") - ErrMissingAESSecret = errors.New("config validation error: AES encryption secret is required") - ErrInvalidPortalProxyURL = errors.New("config validation error: invalid portal proxy url") - ErrInvalidDeploymentID = errors.New("config validation error: deployment_id must contain only alphanumeric characters, hyphens, and underscores (max 64 characters)") + ErrMismatchedServiceType = errors.New("config validation error: service type mismatch") + ErrInvalidServiceType = errors.New("config validation error: invalid service type") + ErrMissingRedis = errors.New("config validation error: redis configuration is required") + ErrMissingLogStorage = errors.New("config validation error: log storage must be provided") + ErrMissingMQs = errors.New("config validation error: message queue configuration is required") + ErrMissingAESSecret = errors.New("config validation error: AES encryption secret is required") + ErrInvalidPortalProxyURL = errors.New("config validation error: invalid portal proxy url") + ErrInvalidDeploymentID = errors.New("config validation error: deployment_id must contain only alphanumeric characters, hyphens, and underscores (max 64 characters)") ) func (c *Config) InitDefaults() { @@ -149,6 +152,7 @@ func (c *Config) InitDefaults() { c.PublishMaxConcurrency = 1 c.DeliveryMaxConcurrency = 1 c.LogMaxConcurrency = 1 + c.RetrySchedule = []int{} // Empty by default, falls back to exponential backoff c.RetryIntervalSeconds = 30 c.RetryMaxLimit = 10 c.MaxDestinationsPerTenant = 20 @@ -365,6 +369,23 @@ func (c *Config) ConfigFilePath() string { return c.configPath } +// GetRetryBackoff returns the configured backoff strategy based on retry configuration +func (c *Config) GetRetryBackoff() (backoff.Backoff, int) { + if len(c.RetrySchedule) > 0 { + // Use scheduled backoff if retry_schedule is provided + schedule := make([]time.Duration, len(c.RetrySchedule)) + for i, seconds := range c.RetrySchedule { + schedule[i] = time.Duration(seconds) * time.Second + } + return &backoff.ScheduledBackoff{Schedule: schedule}, c.RetryMaxLimit + } + // Fall back to exponential backoff + return &backoff.ExponentialBackoff{ + Interval: time.Duration(c.RetryIntervalSeconds) * time.Second, + Base: 2, + }, c.RetryMaxLimit +} + type TelemetryConfig struct { Disabled bool `yaml:"disabled" env:"DISABLE_TELEMETRY" desc:"Disables telemetry within the 'telemetry' block (Hookdeck usage stats and Sentry). Can be overridden by the global 'disable_telemetry' flag at the root of the configuration." required:"N"` BatchSize int `yaml:"batch_size" env:"TELEMETRY_BATCH_SIZE" desc:"Maximum number of telemetry events to batch before sending." required:"N"` diff --git a/internal/config/config_retry_test.go b/internal/config/config_retry_test.go new file mode 100644 index 00000000..7ca81ba6 --- /dev/null +++ b/internal/config/config_retry_test.go @@ -0,0 +1,107 @@ +package config_test + +import ( + "testing" + + "github.com/hookdeck/outpost/internal/config" + "github.com/stretchr/testify/assert" +) + +func TestRetrySchedule(t *testing.T) { + tests := []struct { + name string + files map[string][]byte + envVars map[string]string + wantSchedule []int + wantInterval int + wantMaxLimit int + }{ + { + name: "default empty retry schedule", + files: map[string][]byte{}, + envVars: map[string]string{}, + wantSchedule: []int{}, + wantInterval: 30, // default exponential backoff interval + wantMaxLimit: 10, // default max limit + }, + { + name: "yaml retry schedule overrides max limit", + files: map[string][]byte{ + "config.yaml": []byte(` +retry_schedule: [5, 300, 1800, 7200, 18000, 36000, 36000] +`), + }, + envVars: map[string]string{ + "CONFIG": "config.yaml", + }, + wantSchedule: []int{5, 300, 1800, 7200, 18000, 36000, 36000}, + wantInterval: 30, // still have default even though not used + wantMaxLimit: 7, // overridden to schedule length + }, + { + name: "env retry schedule overrides yaml and max limit", + files: map[string][]byte{ + "config.yaml": []byte(` +retry_schedule: [10, 20, 30] +`), + }, + envVars: map[string]string{ + "CONFIG": "config.yaml", + "RETRY_SCHEDULE": "5,300,1800", + }, + wantSchedule: []int{5, 300, 1800}, + wantInterval: 30, + wantMaxLimit: 3, // overridden to env schedule length + }, + { + name: "retry_interval_seconds without retry_schedule", + files: map[string][]byte{ + "config.yaml": []byte(` +retry_interval_seconds: 60 +`), + }, + envVars: map[string]string{ + "CONFIG": "config.yaml", + }, + wantSchedule: []int{}, + wantInterval: 60, + wantMaxLimit: 10, // default max limit used + }, + { + name: "both retry_schedule and retry_interval_seconds set", + files: map[string][]byte{ + "config.yaml": []byte(` +retry_schedule: [5, 300, 1800] +retry_interval_seconds: 60 +`), + }, + envVars: map[string]string{ + "CONFIG": "config.yaml", + }, + wantSchedule: []int{5, 300, 1800}, + wantInterval: 60, // both present, schedule takes precedence + wantMaxLimit: 3, // overridden to schedule length + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockOS := &mockOS{ + files: tt.files, + envVars: tt.envVars, + } + + mockOS.envVars["API_KEY"] = "test-key" + mockOS.envVars["API_JWT_SECRET"] = "test-jwt-secret" + mockOS.envVars["AES_ENCRYPTION_SECRET"] = "test-aes-secret-16b" + mockOS.envVars["POSTGRES_URL"] = "postgres://localhost:5432/test" + mockOS.envVars["RABBITMQ_SERVER_URL"] = "amqp://localhost:5672" + + cfg, err := config.ParseWithOS(config.Flags{}, mockOS) + assert.NoError(t, err) + assert.Equal(t, tt.wantSchedule, cfg.RetrySchedule) + assert.Equal(t, tt.wantInterval, cfg.RetryIntervalSeconds) + assert.Equal(t, tt.wantMaxLimit, cfg.RetryMaxLimit) + }) + } +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go index c4caf4c7..5270c834 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -63,6 +63,7 @@ func TestDefaultValues(t *testing.T) { assert.Equal(t, 1, cfg.PublishMaxConcurrency) assert.Equal(t, 1, cfg.DeliveryMaxConcurrency) assert.Equal(t, 1, cfg.LogMaxConcurrency) + assert.Equal(t, []int{}, cfg.RetrySchedule) assert.Equal(t, 30, cfg.RetryIntervalSeconds) assert.Equal(t, 10, cfg.RetryMaxLimit) assert.Equal(t, 20, cfg.MaxDestinationsPerTenant) diff --git a/internal/config/validation.go b/internal/config/validation.go index b0a4cc2a..705ef3e8 100644 --- a/internal/config/validation.go +++ b/internal/config/validation.go @@ -45,6 +45,10 @@ func (c *Config) Validate(flags Flags) error { return err } + if err := c.validateRetryConfiguration(); err != nil { + return err + } + // Mark as validated if we get here c.validated = true return nil @@ -167,3 +171,12 @@ func (c *Config) validateDeploymentID() error { return nil } + +// validateRetryConfiguration validates and adjusts the retry configuration +func (c *Config) validateRetryConfiguration() error { + // If retry_schedule is provided, override retry_max_limit to match schedule length + if len(c.RetrySchedule) > 0 { + c.RetryMaxLimit = len(c.RetrySchedule) + } + return nil +} diff --git a/internal/services/delivery/delivery.go b/internal/services/delivery/delivery.go index 4dabfe20..8a13b31d 100644 --- a/internal/services/delivery/delivery.go +++ b/internal/services/delivery/delivery.go @@ -7,7 +7,6 @@ import ( "time" "github.com/hookdeck/outpost/internal/alert" - "github.com/hookdeck/outpost/internal/backoff" "github.com/hookdeck/outpost/internal/config" "github.com/hookdeck/outpost/internal/consumer" "github.com/hookdeck/outpost/internal/deliverymq" @@ -140,6 +139,8 @@ func NewService(ctx context.Context, alert.WithDeploymentID(cfg.DeploymentID), ) + retryBackoff, retryMaxLimit := cfg.GetRetryBackoff() + handler = deliverymq.NewMessageHandler( logger, redisClient, @@ -149,11 +150,8 @@ func NewService(ctx context.Context, registry, eventTracer, retryScheduler, - &backoff.ExponentialBackoff{ - Interval: time.Duration(cfg.RetryIntervalSeconds) * time.Second, - Base: 2, - }, - cfg.RetryMaxLimit, + retryBackoff, + retryMaxLimit, alertMonitor, ) }