Skip to content
1 change: 0 additions & 1 deletion cmd/e2e/configs/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func Basic(t *testing.T, opts BasicOpts) config.Config {
c.MQs.RabbitMQ.LogQueue = idgen.String()

// Test-specific overrides
c.AuditLog = false // Disable audit logging to suppress info-level logs in tests
c.PublishMaxConcurrency = 3
c.DeliveryMaxConcurrency = 3
c.LogMaxConcurrency = 3
Expand Down
4 changes: 1 addition & 3 deletions internal/apirouter/audit_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@ import (
"github.com/hookdeck/outpost/internal/logging"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uptrace/opentelemetry-go-extra/otelzap"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
)

func newAuditTest(t *testing.T, opts ...apiTestOption) (*apiTest, *observer.ObservedLogs) {
t.Helper()
core, logs := observer.New(zap.InfoLevel)
// When auditLogger is nil, Audit() falls through to the main logger.
logger := &logging.Logger{Logger: otelzap.New(zap.New(core))}
logger := logging.NewTestLogger(zap.New(core))
opts = append(opts, withLogger(logger))
return newAPITest(t, opts...), logs
}
Expand Down
12 changes: 8 additions & 4 deletions internal/apirouter/logger_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,30 @@ func LoggerMiddlewareWithSanitizer(logger *logging.Logger, sanitizer *RequestBod

c.Next()

status := c.Writer.Status()

fields := []zap.Field{}
fields = append(fields, basicFields(c)...)
fields = append(fields, pathFields(c)...)
fields = append(fields, queryFields(c)...)
fields = append(fields, errorFields(c)...)

// Add sanitized request body for 5xx errors
if c.Writer.Status() >= 500 && bufferedBody != nil {
if status >= 500 && bufferedBody != nil {
requestBodyFields = getRequestBodyFields(bufferedBody, sanitizer)
fields = append(fields, requestBodyFields...)
}

if c.Writer.Status() >= 500 {
switch {
case status >= 500:
logger.Error("request completed", fields...)

if hub := sentrygin.GetHubFromContext(c); hub != nil {
hub.CaptureException(getErrorWithStackTrace(c.Errors.Last().Err))
}
} else {
case status >= 400:
logger.Info("request completed", fields...)
default:
logger.Debug("request completed", fields...)
}
}
Comment thread
alexluong marked this conversation as resolved.
}
Expand Down
3 changes: 1 addition & 2 deletions internal/apirouter/logger_middleware_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/hookdeck/outpost/internal/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uptrace/opentelemetry-go-extra/otelzap"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
)
Expand Down Expand Up @@ -99,7 +98,7 @@ func setupTestEnvironment(t *testing.T) (*gin.Engine, *observer.ObservedLogs, de

// Create observed logger to capture logs
core, logs := observer.New(zap.InfoLevel)
testLogger := &logging.Logger{Logger: otelzap.New(zap.New(core))}
testLogger := logging.NewTestLogger(zap.New(core))

// Create mock metadata with sensitive fields
metadataLoader := &mockMetadataLoader{
Expand Down
3 changes: 1 addition & 2 deletions internal/apirouter/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/hookdeck/outpost/internal/telemetry"
"github.com/hookdeck/outpost/internal/tenantstore"
"github.com/hookdeck/outpost/internal/util/testutil"
"github.com/uptrace/opentelemetry-go-extra/otelzap"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -100,7 +99,7 @@ func newAPITest(t *testing.T, opts ...apiTestOption) *apiTest {

logger := cfg.logger
if logger == nil {
logger = &logging.Logger{Logger: otelzap.New(zap.NewNop())}
logger = logging.NewTestLogger(zap.NewNop())
}
ts := cfg.tenantStore
ls := logstore.NewMemLogStore()
Expand Down
1 change: 0 additions & 1 deletion internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ func (a *App) run(ctx context.Context) error {
func (a *App) setupLogger() error {
logger, err := logging.NewLogger(
logging.WithLogLevel(a.config.LogLevel),
logging.WithAuditLog(a.config.AuditLog),
)
if err != nil {
return err
Expand Down
2 changes: 0 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type Config struct {

Service string `yaml:"service" env:"SERVICE" desc:"Specifies the service type to run. Valid values: 'api', 'log', 'delivery', or empty/all for singular mode (runs all services)." required:"N"`
LogLevel string `yaml:"log_level" env:"LOG_LEVEL" desc:"Defines the verbosity of application logs. Common values: 'trace', 'debug', 'info', 'warn', 'error'." required:"N"`
AuditLog bool `yaml:"audit_log" env:"AUDIT_LOG" desc:"Enables or disables audit logging for significant events." required:"N"`
OpenTelemetry OpenTelemetryConfig `yaml:"otel"`
Telemetry TelemetryConfig `yaml:"telemetry"`

Expand Down Expand Up @@ -128,7 +127,6 @@ var (
func (c *Config) InitDefaults() {
c.APIPort = 3333
c.LogLevel = "info"
c.AuditLog = true
c.OpenTelemetry = OpenTelemetryConfig{}
c.GinMode = "release"
c.Redis = RedisConfig{
Expand Down
1 change: 0 additions & 1 deletion internal/config/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func (c *Config) LogConfigurationSummary() []zap.Field {
return "none (using defaults and environment variables)"
}()),
zap.String("log_level", c.LogLevel),
zap.Bool("audit_log", c.AuditLog),
zap.String("deployment_id", c.DeploymentID),
zap.Strings("topics", c.Topics),
zap.String("http_user_agent", c.HTTPUserAgent),
Expand Down
124 changes: 81 additions & 43 deletions internal/deliverymq/messagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (h *messageHandler) Handle(ctx context.Context, msg *mqs.Message) error {
return h.handleError(msg, &PreDeliveryError{err: err})
}

h.logger.Ctx(ctx).Info("processing delivery task",
h.logger.Ctx(ctx).Debug("processing delivery task",
zap.String("event_id", task.Event.ID),
zap.String("tenant_id", task.Event.TenantID),
zap.String("destination_id", task.DestinationID),
Expand All @@ -146,7 +146,7 @@ func (h *messageHandler) Handle(ctx context.Context, msg *mqs.Message) error {
return h.doHandle(ctx, task, destination)
})
if err == nil && !executed {
h.logger.Ctx(ctx).Info("delivery task skipped (idempotent)",
h.logger.Ctx(ctx).Debug("delivery task skipped (idempotent)",
zap.String("event_id", task.Event.ID),
zap.String("tenant_id", task.Event.TenantID),
zap.String("destination_id", task.DestinationID),
Expand All @@ -172,14 +172,40 @@ func (h *messageHandler) handleError(msg *mqs.Message, err error) error {
return nil
}
}
// Attempt errors from a destination's publish call (webhook 5xx, timeout,
// refused, etc.) are expected operational outcomes. Ack semantics are
// already decided above; the failure is captured in the audit log, the
// ClickHouse log entry, and the scheduled retry. Suppress propagation so
// the consumer doesn't log them as unexpected handler errors.
if atmErr, ok := err.(*AttemptError); ok {
var pubErr *destregistry.ErrDestinationPublishAttempt
if errors.As(atmErr.err, &pubErr) {
return nil
}
}
return err
}

// retryOutcome captures the retry-scheduling decisions made during a delivery
// attempt so they can be folded into the single delivery.attempted audit event.
type retryOutcome struct {
scheduled bool
backoff time.Duration
scheduleFailed bool
canceled bool
cancelFailed bool
}

func (h *messageHandler) doHandle(ctx context.Context, task models.DeliveryTask, destination *models.Destination) error {
_, span := h.eventTracer.Deliver(ctx, &task, destination)
defer span.End()

attemptStart := time.Now()
attempt, err := h.publisher.PublishEvent(ctx, destination, &task.Event)
attemptDuration := time.Since(attemptStart)

var retry retryOutcome

if err != nil {
// If attempt is nil, it means no attempt was made.
// This is an unexpected error and considered a pre-delivery error.
Expand All @@ -192,13 +218,6 @@ func (h *messageHandler) doHandle(ctx context.Context, task models.DeliveryTask,
recorder.RecordDeliveryResult(false)
}

h.logger.Ctx(ctx).Error("failed to publish event",
zap.Error(err),
zap.String("attempt_id", attempt.ID),
zap.String("event_id", task.Event.ID),
zap.String("tenant_id", task.Event.TenantID),
zap.String("destination_id", destination.ID),
zap.String("destination_type", destination.Type))
attemptErr := &AttemptError{err: err}

if h.shouldScheduleRetry(task, err) {
Expand All @@ -208,16 +227,24 @@ func (h *messageHandler) doHandle(ctx context.Context, task models.DeliveryTask,
// means manual retries automatically override any pending automatic retry
// without needing an explicit cancel — the new tier's delay takes effect
// and the old scheduled retry is gone in a single operation.
if retryErr := h.scheduleRetry(ctx, task); retryErr != nil {
return h.logDeliveryResult(ctx, &task, destination, attempt, errors.Join(err, retryErr))
backoff, retryErr := h.scheduleRetry(ctx, task)
retry.backoff = backoff
if retryErr != nil {
retry.scheduleFailed = true
return h.logDeliveryResult(ctx, &task, destination, attempt, attemptStart, attemptDuration, retry, errors.Join(err, retryErr))
}
retry.scheduled = true
} else if task.Manual {
// Budget exhausted or not eligible — cancel any lingering scheduled retry.
// Unlike the case above, there's no new retry to schedule so we must
// explicitly cancel to prevent a stale automatic retry from firing.
_ = h.retryScheduler.Cancel(ctx, models.RetryID(task.Event.ID, task.DestinationID))
if cancelErr := h.retryScheduler.Cancel(ctx, models.RetryID(task.Event.ID, task.DestinationID)); cancelErr == nil {
retry.canceled = true
} else {
retry.cancelFailed = true
}
}
return h.logDeliveryResult(ctx, &task, destination, attempt, attemptErr)
return h.logDeliveryResult(ctx, &task, destination, attempt, attemptStart, attemptDuration, retry, attemptErr)
}

// Record delivery success for metrics
Expand All @@ -227,45 +254,63 @@ func (h *messageHandler) doHandle(ctx context.Context, task models.DeliveryTask,

// Handle successful delivery
if task.Manual {
logger := h.logger.Ctx(ctx)
if err := h.retryScheduler.Cancel(ctx, models.RetryID(task.Event.ID, task.DestinationID)); err != nil {
if cancelErr := h.retryScheduler.Cancel(ctx, models.RetryID(task.Event.ID, task.DestinationID)); cancelErr != nil {
retry.cancelFailed = true
h.logger.Ctx(ctx).Error("failed to cancel scheduled retry",
zap.Error(err),
zap.Error(cancelErr),
zap.String("attempt_id", attempt.ID),
zap.String("event_id", task.Event.ID),
zap.String("tenant_id", task.Event.TenantID),
zap.String("destination_id", destination.ID),
zap.String("destination_type", destination.Type),
zap.String("retry_id", models.RetryID(task.Event.ID, task.DestinationID)))
return h.logDeliveryResult(ctx, &task, destination, attempt, err)
return h.logDeliveryResult(ctx, &task, destination, attempt, attemptStart, attemptDuration, retry, cancelErr)
}
logger.Audit("scheduled retry canceled",
zap.String("attempt_id", attempt.ID),
zap.String("event_id", task.Event.ID),
zap.String("tenant_id", task.Event.TenantID),
zap.String("destination_id", destination.ID),
zap.String("destination_type", destination.Type),
zap.String("retry_id", models.RetryID(task.Event.ID, task.DestinationID)))
retry.canceled = true
}
return h.logDeliveryResult(ctx, &task, destination, attempt, nil)
return h.logDeliveryResult(ctx, &task, destination, attempt, attemptStart, attemptDuration, retry, nil)
}

func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.DeliveryTask, destination *models.Destination, attempt *models.Attempt, err error) error {
func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.DeliveryTask, destination *models.Destination, attempt *models.Attempt, attemptStart time.Time, attemptDuration time.Duration, retry retryOutcome, err error) error {
logger := h.logger.Ctx(ctx)

attempt.TenantID = task.Event.TenantID
attempt.AttemptNumber = task.Attempt
attempt.Manual = task.Manual

logger.Audit("event delivered",
// Wide event: one audit per delivery attempt carrying the full outcome
// (attempt result, timing, retry decision). Replaces the separate
// "retry scheduled" and "scheduled retry canceled" audits so consumers
// don't have to join across lines to reconstruct what happened.
fields := []zap.Field{
zap.String("attempt_id", attempt.ID),
zap.String("event_id", task.Event.ID),
zap.String("tenant_id", task.Event.TenantID),
zap.String("topic", task.Event.Topic),
zap.String("destination_id", destination.ID),
zap.String("destination_type", destination.Type),
zap.String("attempt_status", attempt.Status),
zap.Int("attempt", task.Attempt),
zap.Bool("manual", task.Manual))
zap.String("attempt_code", attempt.Code),
zap.Int("attempt_number", task.Attempt),
zap.Int("attempt_max", h.retryMaxLimit+1),
zap.Bool("manual", task.Manual),
zap.Bool("eligible_for_retry", task.Event.EligibleForRetry),
zap.Time("attempt_started_at", attemptStart),
zap.Int64("attempt_duration_ms", attemptDuration.Milliseconds()),
zap.String("retry_id", models.RetryID(task.Event.ID, task.DestinationID)),
zap.Bool("retry_scheduled", retry.scheduled),
zap.Bool("retry_canceled", retry.canceled),
}
if retry.scheduled {
fields = append(fields, zap.Int64("retry_backoff_ms", retry.backoff.Milliseconds()))
}
if retry.scheduleFailed {
fields = append(fields, zap.Bool("retry_schedule_failed", true))
}
if retry.cancelFailed {
fields = append(fields, zap.Bool("retry_cancel_failed", true))
}
logger.Info("delivery.attempted", fields...)

logEntry := models.LogEntry{
Event: &task.Event,
Expand Down Expand Up @@ -363,15 +408,15 @@ func (h *messageHandler) shouldNackDeliveryError(err error) bool {
return true // Nack other delivery errors
}

func (h *messageHandler) scheduleRetry(ctx context.Context, task models.DeliveryTask) error {
func (h *messageHandler) scheduleRetry(ctx context.Context, task models.DeliveryTask) (time.Duration, error) {
// Attempt is 1-indexed; backoff schedule is 0-indexed.
// Clamp to 0 to safely handle any leftover Attempt=0 in-flight tasks.
backoffDuration := h.retryBackoff.Duration(max(task.Attempt-1, 0))

retryTask := RetryTaskFromDeliveryTask(task)
retryTaskStr, err := retryTask.ToString()
if err != nil {
return err
return backoffDuration, err
}

if err := h.retryScheduler.Schedule(ctx, retryTaskStr, backoffDuration, scheduler.WithTaskID(models.RetryID(task.Event.ID, task.DestinationID))); err != nil {
Expand All @@ -382,17 +427,10 @@ func (h *messageHandler) scheduleRetry(ctx context.Context, task models.Delivery
zap.String("destination_id", task.DestinationID),
zap.Int("attempt", task.Attempt),
zap.Duration("backoff", backoffDuration))
return err
return backoffDuration, err
}

h.logger.Ctx(ctx).Audit("retry scheduled",
zap.String("event_id", task.Event.ID),
zap.String("tenant_id", task.Event.TenantID),
zap.String("destination_id", task.DestinationID),
zap.Int("attempt", task.Attempt),
zap.Duration("backoff", backoffDuration))

return nil
return backoffDuration, nil
}

// ensurePublishableDestination ensures that the destination exists and is in a publishable state.
Expand All @@ -410,22 +448,22 @@ func (h *messageHandler) ensurePublishableDestination(ctx context.Context, task
}

if errors.Is(err, tenantstore.ErrDestinationDeleted) {
logger.Info("destination deleted", fields...)
logger.Debug("destination deleted", fields...)
} else {
// Unexpected errors like DB connection issues
logger.Error("failed to retrieve destination", fields...)
}
return nil, err
}
if destination == nil {
h.logger.Ctx(ctx).Info("destination not found",
h.logger.Ctx(ctx).Debug("destination not found",
zap.String("event_id", task.Event.ID),
zap.String("tenant_id", task.Event.TenantID),
zap.String("destination_id", task.DestinationID))
return nil, tenantstore.ErrDestinationNotFound
}
if destination.DisabledAt != nil {
h.logger.Ctx(ctx).Info("skipping disabled destination",
h.logger.Ctx(ctx).Debug("skipping disabled destination",
zap.String("event_id", task.Event.ID),
zap.String("tenant_id", task.Event.TenantID),
zap.String("destination_id", destination.ID),
Expand Down
Loading
Loading