From ada72bf6e4a8e488275743f0cd188c824582f0b0 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 26 May 2026 21:52:40 +0700 Subject: [PATCH 1/8] chore(logging): demote most info logs to debug Reserve info for process lifecycle and meaningful aggregate outcomes. Per-event/per-request handler lines in deliverymq, publishmq, logmq, and logretention move to debug. Keep "batch persisted" at info. --- internal/deliverymq/messagehandler.go | 10 +++++----- internal/logmq/batchprocessor.go | 4 ++-- internal/logmq/messagehandler.go | 2 +- internal/logretention/ttl.go | 4 ++-- internal/publishmq/eventhandler.go | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index 1b2e44730..ad74139b4 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -128,7 +128,7 @@ func (h *messageHandler) Handle(ctx context.Context, msg *mqs.Message) error { return h.handleError(msg, &PreDeliveryError{err: err}) } - h.logger.Ctx(ctx).Info("processing delivery task", + h.logger.Ctx(ctx).Debug("processing delivery task", zap.String("event_id", task.Event.ID), zap.String("tenant_id", task.Event.TenantID), zap.String("destination_id", task.DestinationID), @@ -146,7 +146,7 @@ func (h *messageHandler) Handle(ctx context.Context, msg *mqs.Message) error { return h.doHandle(ctx, task, destination) }) if err == nil && !executed { - h.logger.Ctx(ctx).Info("delivery task skipped (idempotent)", + h.logger.Ctx(ctx).Debug("delivery task skipped (idempotent)", zap.String("event_id", task.Event.ID), zap.String("tenant_id", task.Event.TenantID), zap.String("destination_id", task.DestinationID), @@ -410,7 +410,7 @@ func (h *messageHandler) ensurePublishableDestination(ctx context.Context, task } if errors.Is(err, tenantstore.ErrDestinationDeleted) { - logger.Info("destination deleted", fields...) + logger.Debug("destination deleted", fields...) } else { // Unexpected errors like DB connection issues logger.Error("failed to retrieve destination", fields...) @@ -418,14 +418,14 @@ func (h *messageHandler) ensurePublishableDestination(ctx context.Context, task return nil, err } if destination == nil { - h.logger.Ctx(ctx).Info("destination not found", + h.logger.Ctx(ctx).Debug("destination not found", zap.String("event_id", task.Event.ID), zap.String("tenant_id", task.Event.TenantID), zap.String("destination_id", task.DestinationID)) return nil, tenantstore.ErrDestinationNotFound } if destination.DisabledAt != nil { - h.logger.Ctx(ctx).Info("skipping disabled destination", + h.logger.Ctx(ctx).Debug("skipping disabled destination", zap.String("event_id", task.Event.ID), zap.String("tenant_id", task.Event.TenantID), zap.String("destination_id", destination.ID), diff --git a/internal/logmq/batchprocessor.go b/internal/logmq/batchprocessor.go index 7dd8a14a5..43a80d953 100644 --- a/internal/logmq/batchprocessor.go +++ b/internal/logmq/batchprocessor.go @@ -80,7 +80,7 @@ func (bp *BatchProcessor) Shutdown() { // processBatch processes a batch of messages. func (bp *BatchProcessor) processBatch(_ string, msgs []*mqs.Message) { logger := bp.logger.Ctx(bp.ctx) - logger.Info("processing batch", zap.Int("message_count", len(msgs))) + logger.Debug("processing batch", zap.Int("message_count", len(msgs))) entries := make([]*models.LogEntry, 0, len(msgs)) validMsgs := make([]*mqs.Message, 0, len(msgs)) @@ -116,7 +116,7 @@ func (bp *BatchProcessor) processBatch(_ string, msgs []*mqs.Message) { continue } - logger.Info("added to batch", + logger.Debug("added to batch", zap.String("message_id", msg.LoggableID), zap.String("event_id", entry.Event.ID), zap.String("attempt_id", entry.Attempt.ID), diff --git a/internal/logmq/messagehandler.go b/internal/logmq/messagehandler.go index af99f3ce1..1fe5aa86a 100644 --- a/internal/logmq/messagehandler.go +++ b/internal/logmq/messagehandler.go @@ -30,7 +30,7 @@ func NewMessageHandler(logger *logging.Logger, batchAdder BatchAdder) consumer.M func (h *messageHandler) Handle(ctx context.Context, msg *mqs.Message) error { logger := h.logger.Ctx(ctx) - logger.Info("logmq handler", + logger.Debug("logmq handler", zap.String("message_id", msg.LoggableID)) h.batchAdder.Add(ctx, msg) return nil diff --git a/internal/logretention/ttl.go b/internal/logretention/ttl.go index 57c0f1684..2e028e80a 100644 --- a/internal/logretention/ttl.go +++ b/internal/logretention/ttl.go @@ -49,7 +49,7 @@ func sync(ctx context.Context, ps policyStore, ls logStoreTTL, desiredTTLDays in return nil } - logger.Info("applying log retention TTL", + logger.Debug("applying log retention TTL", zap.Int("old_ttl_days", persistedTTL), zap.Int("new_ttl_days", desiredTTLDays)) @@ -61,7 +61,7 @@ func sync(ctx context.Context, ps policyStore, ls logStoreTTL, desiredTTLDays in return fmt.Errorf("failed to persist TTL: %w", err) } - logger.Info("log retention TTL applied successfully", + logger.Debug("log retention TTL applied successfully", zap.Int("ttl_days", desiredTTLDays)) return nil diff --git a/internal/publishmq/eventhandler.go b/internal/publishmq/eventhandler.go index 94a8ec55b..4b7689cce 100644 --- a/internal/publishmq/eventhandler.go +++ b/internal/publishmq/eventhandler.go @@ -118,7 +118,7 @@ func (h *eventHandler) Handle(ctx context.Context, event *models.Event) (*Handle // Early return if no destinations matched if len(matchedDestinations) == 0 { - logger.Info("no matching destinations", + logger.Debug("no matching destinations", zap.String("event_id", event.ID), zap.String("tenant_id", event.TenantID)) return result, nil From ed53b675c54230d51d1f397acf1a5089b155bea4 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 26 May 2026 22:18:48 +0700 Subject: [PATCH 2/8] chore(logging): only log API requests at 400+ Successful 2xx/3xx traffic is expected and high-volume on the hot path; emitting an info line per request scales with RPS for no operational signal that metrics/traces don't already provide. 4xx stays at info (client errors worth surfacing), 5xx stays at error. Note: outpost does not currently persist request metrics, so 4xx visibility in production lives only in this log line. If metrics are added later, the 4xx case can be dropped to debug too. --- internal/apirouter/logger_middleware.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/internal/apirouter/logger_middleware.go b/internal/apirouter/logger_middleware.go index 8b92f492c..2f2215148 100644 --- a/internal/apirouter/logger_middleware.go +++ b/internal/apirouter/logger_middleware.go @@ -41,6 +41,8 @@ func LoggerMiddlewareWithSanitizer(logger *logging.Logger, sanitizer *RequestBod c.Next() + status := c.Writer.Status() + fields := []zap.Field{} fields = append(fields, basicFields(c)...) fields = append(fields, pathFields(c)...) @@ -48,19 +50,21 @@ func LoggerMiddlewareWithSanitizer(logger *logging.Logger, sanitizer *RequestBod fields = append(fields, errorFields(c)...) // Add sanitized request body for 5xx errors - if c.Writer.Status() >= 500 && bufferedBody != nil { + if status >= 500 && bufferedBody != nil { requestBodyFields = getRequestBodyFields(bufferedBody, sanitizer) fields = append(fields, requestBodyFields...) } - if c.Writer.Status() >= 500 { + switch { + case status >= 500: logger.Error("request completed", fields...) - if hub := sentrygin.GetHubFromContext(c); hub != nil { hub.CaptureException(getErrorWithStackTrace(c.Errors.Last().Err)) } - } else { + case status >= 400: logger.Info("request completed", fields...) + default: + logger.Debug("request completed", fields...) } } } From 5fd1eee305bccc261dcbe3b8bc4bee080485d7b1 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 27 May 2026 19:50:15 +0700 Subject: [PATCH 3/8] chore(logging): suppress expected delivery-failure errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Webhook 5xx, timeouts, and connection refusals from destination publish calls are expected operational outcomes — the retry scheduler handles them, the audit log captures the outcome, and the per-attempt record is written to ClickHouse via logmq. Logging them as Error (and again as "consumer handler error" via the consumer wrapper) inflates the error stream with no actionable signal. - Drop the "failed to publish event" Error line in doHandle. - In handleError, return nil for AttemptError wrapping ErrDestinationPublishAttempt so the consumer doesn't log it as an unexpected handler error. Ack/nack semantics are unchanged. - Rename the audit message "event delivered" to "delivery attempt completed" since it fires for both success and failure outcomes. --- internal/deliverymq/messagehandler.go | 20 ++++++++++++-------- internal/deliverymq/messagehandler_test.go | 8 ++++---- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index ad74139b4..2624adb36 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -172,6 +172,17 @@ func (h *messageHandler) handleError(msg *mqs.Message, err error) error { return nil } } + // Attempt errors from a destination's publish call (webhook 5xx, timeout, + // refused, etc.) are expected operational outcomes. Ack semantics are + // already decided above; the failure is captured in the audit log, the + // ClickHouse log entry, and the scheduled retry. Suppress propagation so + // the consumer doesn't log them as unexpected handler errors. + if atmErr, ok := err.(*AttemptError); ok { + var pubErr *destregistry.ErrDestinationPublishAttempt + if errors.As(atmErr.err, &pubErr) { + return nil + } + } return err } @@ -192,13 +203,6 @@ func (h *messageHandler) doHandle(ctx context.Context, task models.DeliveryTask, recorder.RecordDeliveryResult(false) } - h.logger.Ctx(ctx).Error("failed to publish event", - zap.Error(err), - zap.String("attempt_id", attempt.ID), - zap.String("event_id", task.Event.ID), - zap.String("tenant_id", task.Event.TenantID), - zap.String("destination_id", destination.ID), - zap.String("destination_type", destination.Type)) attemptErr := &AttemptError{err: err} if h.shouldScheduleRetry(task, err) { @@ -257,7 +261,7 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.Del attempt.AttemptNumber = task.Attempt attempt.Manual = task.Manual - logger.Audit("event delivered", + logger.Audit("delivery attempt completed", zap.String("attempt_id", attempt.ID), zap.String("event_id", task.Event.ID), zap.String("tenant_id", task.Event.TenantID), diff --git a/internal/deliverymq/messagehandler_test.go b/internal/deliverymq/messagehandler_test.go index 1ddda662a..b9a375029 100644 --- a/internal/deliverymq/messagehandler_test.go +++ b/internal/deliverymq/messagehandler_test.go @@ -234,7 +234,7 @@ func TestMessageHandler_PublishError_EligibleForRetry(t *testing.T) { // Handle message err := handler.Handle(context.Background(), msg) - require.Error(t, err) + require.NoError(t, err) // Assert behavior assert.False(t, mockMsg.nacked, "message should not be nacked when scheduling retry") @@ -300,7 +300,7 @@ func TestMessageHandler_PublishError_NotEligible(t *testing.T) { // Handle message err := handler.Handle(context.Background(), msg) - require.Error(t, err) + require.NoError(t, err) // Assert behavior assert.False(t, mockMsg.nacked, "message should not be nacked for ineligible retry") @@ -807,7 +807,7 @@ func TestManualDelivery_PublishError(t *testing.T) { // Handle message err := handler.Handle(context.Background(), msg) - require.Error(t, err) + require.NoError(t, err) // Assert behavior assert.True(t, mockMsg.acked, "message should be acked") @@ -886,7 +886,7 @@ func TestManualDelivery_PublishError_BudgetExhausted(t *testing.T) { // Handle message err := handler.Handle(context.Background(), msg) - require.Error(t, err) + require.NoError(t, err) // Assert behavior assert.True(t, mockMsg.acked, "message should be acked") From 2fe20d8a67b5bd3e6349a60f21b931f2940e684c Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 27 May 2026 20:02:55 +0700 Subject: [PATCH 4/8] chore(logging): include attempt_duration_ms in delivery attempt audit Measure the wall time spent in publisher.PublishEvent and add it to the "delivery attempt completed" audit line so operators and customers can see per-attempt latency without joining against ClickHouse. --- internal/deliverymq/messagehandler.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index 2624adb36..f6a07eaa6 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -190,7 +190,9 @@ func (h *messageHandler) doHandle(ctx context.Context, task models.DeliveryTask, _, span := h.eventTracer.Deliver(ctx, &task, destination) defer span.End() + attemptStart := time.Now() attempt, err := h.publisher.PublishEvent(ctx, destination, &task.Event) + attemptDuration := time.Since(attemptStart) if err != nil { // If attempt is nil, it means no attempt was made. // This is an unexpected error and considered a pre-delivery error. @@ -213,7 +215,7 @@ func (h *messageHandler) doHandle(ctx context.Context, task models.DeliveryTask, // without needing an explicit cancel — the new tier's delay takes effect // and the old scheduled retry is gone in a single operation. if retryErr := h.scheduleRetry(ctx, task); retryErr != nil { - return h.logDeliveryResult(ctx, &task, destination, attempt, errors.Join(err, retryErr)) + return h.logDeliveryResult(ctx, &task, destination, attempt, attemptDuration, errors.Join(err, retryErr)) } } else if task.Manual { // Budget exhausted or not eligible — cancel any lingering scheduled retry. @@ -221,7 +223,7 @@ func (h *messageHandler) doHandle(ctx context.Context, task models.DeliveryTask, // explicitly cancel to prevent a stale automatic retry from firing. _ = h.retryScheduler.Cancel(ctx, models.RetryID(task.Event.ID, task.DestinationID)) } - return h.logDeliveryResult(ctx, &task, destination, attempt, attemptErr) + return h.logDeliveryResult(ctx, &task, destination, attempt, attemptDuration, attemptErr) } // Record delivery success for metrics @@ -241,7 +243,7 @@ func (h *messageHandler) doHandle(ctx context.Context, task models.DeliveryTask, zap.String("destination_id", destination.ID), zap.String("destination_type", destination.Type), zap.String("retry_id", models.RetryID(task.Event.ID, task.DestinationID))) - return h.logDeliveryResult(ctx, &task, destination, attempt, err) + return h.logDeliveryResult(ctx, &task, destination, attempt, attemptDuration, err) } logger.Audit("scheduled retry canceled", zap.String("attempt_id", attempt.ID), @@ -251,10 +253,10 @@ func (h *messageHandler) doHandle(ctx context.Context, task models.DeliveryTask, zap.String("destination_type", destination.Type), zap.String("retry_id", models.RetryID(task.Event.ID, task.DestinationID))) } - return h.logDeliveryResult(ctx, &task, destination, attempt, nil) + return h.logDeliveryResult(ctx, &task, destination, attempt, attemptDuration, nil) } -func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.DeliveryTask, destination *models.Destination, attempt *models.Attempt, err error) error { +func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.DeliveryTask, destination *models.Destination, attempt *models.Attempt, attemptDuration time.Duration, err error) error { logger := h.logger.Ctx(ctx) attempt.TenantID = task.Event.TenantID @@ -269,7 +271,8 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.Del zap.String("destination_type", destination.Type), zap.String("attempt_status", attempt.Status), zap.Int("attempt", task.Attempt), - zap.Bool("manual", task.Manual)) + zap.Bool("manual", task.Manual), + zap.Int64("attempt_duration_ms", attemptDuration.Milliseconds())) logEntry := models.LogEntry{ Event: &task.Event, From fffd1c55b9d391550b7753080ffa1435d6f96585 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 27 May 2026 20:20:36 +0700 Subject: [PATCH 5/8] chore(logging): collapse delivery & publish audits into wide events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the multi-line audit pattern in deliverymq and publishmq with a single wide-event audit per unit of work. Consumers no longer have to join across lines (or worry about ordering) to reconstruct what happened: the full outcome is in one event. deliverymq emits "delivery.attempted" once per attempt with attempt result, timing (attempt_started_at, attempt_duration_ms), and retry decision (retry_scheduled, retry_backoff_ms, retry_canceled, plus retry_schedule_failed / retry_cancel_failed when relevant). Replaces "delivery attempt completed", "retry scheduled", and "scheduled retry canceled". publishmq emits "event.received" once per Handle call with matched and enqueued destination lists, duplicate flag, received_at, and duration_ms. Replaces "processing event" and per-destination "delivery task enqueued". System-failure ERROR lines (failed to schedule retry, failed to cancel scheduled retry, failed to enqueue delivery task, failed to match event destinations) remain — those are operator-actionable diagnostics that the wide event also flags via boolean fields. --- internal/deliverymq/messagehandler.go | 86 ++++++++++++++++--------- internal/publishmq/eventhandler.go | 90 +++++++++++++++++---------- 2 files changed, 114 insertions(+), 62 deletions(-) diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index f6a07eaa6..7fcb17c74 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -186,6 +186,16 @@ func (h *messageHandler) handleError(msg *mqs.Message, err error) error { return err } +// retryOutcome captures the retry-scheduling decisions made during a delivery +// attempt so they can be folded into the single delivery.attempted audit event. +type retryOutcome struct { + scheduled bool + backoff time.Duration + scheduleFailed bool + canceled bool + cancelFailed bool +} + func (h *messageHandler) doHandle(ctx context.Context, task models.DeliveryTask, destination *models.Destination) error { _, span := h.eventTracer.Deliver(ctx, &task, destination) defer span.End() @@ -193,6 +203,9 @@ func (h *messageHandler) doHandle(ctx context.Context, task models.DeliveryTask, attemptStart := time.Now() attempt, err := h.publisher.PublishEvent(ctx, destination, &task.Event) attemptDuration := time.Since(attemptStart) + + var retry retryOutcome + if err != nil { // If attempt is nil, it means no attempt was made. // This is an unexpected error and considered a pre-delivery error. @@ -214,16 +227,24 @@ func (h *messageHandler) doHandle(ctx context.Context, task models.DeliveryTask, // means manual retries automatically override any pending automatic retry // without needing an explicit cancel — the new tier's delay takes effect // and the old scheduled retry is gone in a single operation. - if retryErr := h.scheduleRetry(ctx, task); retryErr != nil { - return h.logDeliveryResult(ctx, &task, destination, attempt, attemptDuration, errors.Join(err, retryErr)) + backoff, retryErr := h.scheduleRetry(ctx, task) + retry.backoff = backoff + if retryErr != nil { + retry.scheduleFailed = true + return h.logDeliveryResult(ctx, &task, destination, attempt, attemptStart, attemptDuration, retry, errors.Join(err, retryErr)) } + retry.scheduled = true } else if task.Manual { // Budget exhausted or not eligible — cancel any lingering scheduled retry. // Unlike the case above, there's no new retry to schedule so we must // explicitly cancel to prevent a stale automatic retry from firing. - _ = h.retryScheduler.Cancel(ctx, models.RetryID(task.Event.ID, task.DestinationID)) + if cancelErr := h.retryScheduler.Cancel(ctx, models.RetryID(task.Event.ID, task.DestinationID)); cancelErr == nil { + retry.canceled = true + } else { + retry.cancelFailed = true + } } - return h.logDeliveryResult(ctx, &task, destination, attempt, attemptDuration, attemptErr) + return h.logDeliveryResult(ctx, &task, destination, attempt, attemptStart, attemptDuration, retry, attemptErr) } // Record delivery success for metrics @@ -233,37 +254,35 @@ func (h *messageHandler) doHandle(ctx context.Context, task models.DeliveryTask, // Handle successful delivery if task.Manual { - logger := h.logger.Ctx(ctx) - if err := h.retryScheduler.Cancel(ctx, models.RetryID(task.Event.ID, task.DestinationID)); err != nil { + if cancelErr := h.retryScheduler.Cancel(ctx, models.RetryID(task.Event.ID, task.DestinationID)); cancelErr != nil { + retry.cancelFailed = true h.logger.Ctx(ctx).Error("failed to cancel scheduled retry", - zap.Error(err), + zap.Error(cancelErr), zap.String("attempt_id", attempt.ID), zap.String("event_id", task.Event.ID), zap.String("tenant_id", task.Event.TenantID), zap.String("destination_id", destination.ID), zap.String("destination_type", destination.Type), zap.String("retry_id", models.RetryID(task.Event.ID, task.DestinationID))) - return h.logDeliveryResult(ctx, &task, destination, attempt, attemptDuration, err) + return h.logDeliveryResult(ctx, &task, destination, attempt, attemptStart, attemptDuration, retry, cancelErr) } - logger.Audit("scheduled retry canceled", - zap.String("attempt_id", attempt.ID), - zap.String("event_id", task.Event.ID), - zap.String("tenant_id", task.Event.TenantID), - zap.String("destination_id", destination.ID), - zap.String("destination_type", destination.Type), - zap.String("retry_id", models.RetryID(task.Event.ID, task.DestinationID))) + retry.canceled = true } - return h.logDeliveryResult(ctx, &task, destination, attempt, attemptDuration, nil) + return h.logDeliveryResult(ctx, &task, destination, attempt, attemptStart, attemptDuration, retry, nil) } -func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.DeliveryTask, destination *models.Destination, attempt *models.Attempt, attemptDuration time.Duration, err error) error { +func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.DeliveryTask, destination *models.Destination, attempt *models.Attempt, attemptStart time.Time, attemptDuration time.Duration, retry retryOutcome, err error) error { logger := h.logger.Ctx(ctx) attempt.TenantID = task.Event.TenantID attempt.AttemptNumber = task.Attempt attempt.Manual = task.Manual - logger.Audit("delivery attempt completed", + // Wide event: one audit per delivery attempt carrying the full outcome + // (attempt result, timing, retry decision). Replaces the separate + // "retry scheduled" and "scheduled retry canceled" audits so consumers + // don't have to join across lines to reconstruct what happened. + fields := []zap.Field{ zap.String("attempt_id", attempt.ID), zap.String("event_id", task.Event.ID), zap.String("tenant_id", task.Event.TenantID), @@ -272,7 +291,21 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.Del zap.String("attempt_status", attempt.Status), zap.Int("attempt", task.Attempt), zap.Bool("manual", task.Manual), - zap.Int64("attempt_duration_ms", attemptDuration.Milliseconds())) + zap.Time("attempt_started_at", attemptStart), + zap.Int64("attempt_duration_ms", attemptDuration.Milliseconds()), + zap.Bool("retry_scheduled", retry.scheduled), + zap.Bool("retry_canceled", retry.canceled), + } + if retry.scheduled { + fields = append(fields, zap.Int64("retry_backoff_ms", retry.backoff.Milliseconds())) + } + if retry.scheduleFailed { + fields = append(fields, zap.Bool("retry_schedule_failed", true)) + } + if retry.cancelFailed { + fields = append(fields, zap.Bool("retry_cancel_failed", true)) + } + logger.Audit("delivery.attempted", fields...) logEntry := models.LogEntry{ Event: &task.Event, @@ -370,7 +403,7 @@ func (h *messageHandler) shouldNackDeliveryError(err error) bool { return true // Nack other delivery errors } -func (h *messageHandler) scheduleRetry(ctx context.Context, task models.DeliveryTask) error { +func (h *messageHandler) scheduleRetry(ctx context.Context, task models.DeliveryTask) (time.Duration, error) { // Attempt is 1-indexed; backoff schedule is 0-indexed. // Clamp to 0 to safely handle any leftover Attempt=0 in-flight tasks. backoffDuration := h.retryBackoff.Duration(max(task.Attempt-1, 0)) @@ -378,7 +411,7 @@ func (h *messageHandler) scheduleRetry(ctx context.Context, task models.Delivery retryTask := RetryTaskFromDeliveryTask(task) retryTaskStr, err := retryTask.ToString() if err != nil { - return err + return backoffDuration, err } if err := h.retryScheduler.Schedule(ctx, retryTaskStr, backoffDuration, scheduler.WithTaskID(models.RetryID(task.Event.ID, task.DestinationID))); err != nil { @@ -389,17 +422,10 @@ func (h *messageHandler) scheduleRetry(ctx context.Context, task models.Delivery zap.String("destination_id", task.DestinationID), zap.Int("attempt", task.Attempt), zap.Duration("backoff", backoffDuration)) - return err + return backoffDuration, err } - h.logger.Ctx(ctx).Audit("retry scheduled", - zap.String("event_id", task.Event.ID), - zap.String("tenant_id", task.Event.TenantID), - zap.String("destination_id", task.DestinationID), - zap.Int("attempt", task.Attempt), - zap.Duration("backoff", backoffDuration)) - - return nil + return backoffDuration, nil } // ensurePublishableDestination ensures that the destination exists and is in a publishable state. diff --git a/internal/publishmq/eventhandler.go b/internal/publishmq/eventhandler.go index 4b7689cce..14edcd563 100644 --- a/internal/publishmq/eventhandler.go +++ b/internal/publishmq/eventhandler.go @@ -4,6 +4,8 @@ import ( "context" "errors" "slices" + "sync" + "time" "github.com/hookdeck/outpost/internal/deliverymq" "github.com/hookdeck/outpost/internal/emetrics" @@ -66,8 +68,6 @@ func NewEventHandler( var _ EventHandler = (*eventHandler)(nil) func (h *eventHandler) Handle(ctx context.Context, event *models.Event) (*HandleResult, error) { - logger := h.logger.Ctx(ctx) - if len(h.topics) > 0 && event.Topic == "" { return nil, ErrRequiredTopic } @@ -75,25 +75,54 @@ func (h *eventHandler) Handle(ctx context.Context, event *models.Event) (*Handle return nil, ErrInvalidTopic } - logger.Audit("processing event", - zap.String("event_id", event.ID), - zap.String("tenant_id", event.TenantID), - zap.String("destination_id", event.DestinationID), - zap.String("topic", event.Topic)) + logger := h.logger.Ctx(ctx) + receivedAt := time.Now() + + // Wide event state: populated by the rest of Handle and emitted as a single + // audit at the end. Replaces the separate "processing event" and per- + // destination "delivery task enqueued" audits. + var enqueuedMu sync.Mutex + var enqueued []string + var matched []string + var duplicate bool + var enqueueFailed bool + + defer func() { + enqueuedMu.Lock() + enqueuedCopy := append([]string{}, enqueued...) + enqueuedMu.Unlock() + + fields := []zap.Field{ + zap.String("event_id", event.ID), + zap.String("tenant_id", event.TenantID), + zap.String("topic", event.Topic), + zap.Int("matched_destination_count", len(matched)), + zap.Strings("matched_destination_ids", matched), + zap.Int("enqueued_destination_count", len(enqueuedCopy)), + zap.Strings("enqueued_destination_ids", enqueuedCopy), + zap.Bool("duplicate", duplicate), + zap.Time("received_at", receivedAt), + zap.Int64("duration_ms", time.Since(receivedAt).Milliseconds()), + } + if event.DestinationID != "" { + fields = append(fields, zap.String("destination_id", event.DestinationID)) + } + if enqueueFailed { + fields = append(fields, zap.Bool("enqueue_failed", true)) + } + logger.Audit("event.received", fields...) + }() - var matchedDestinations []string var err error // Branch: specific destination vs topic-based matching if event.DestinationID != "" { - // Specific destination path - matchedDestinations, err = h.matchSpecificDestination(ctx, event) + matched, err = h.matchSpecificDestination(ctx, event) if err != nil { return nil, err } } else { - // Topic-based matching path - matchedDestinations, err = h.tenantStore.MatchEvent(ctx, *event) + matched, err = h.tenantStore.MatchEvent(ctx, *event) if err != nil { logger.Error("failed to match event destinations", zap.Error(err), @@ -103,24 +132,20 @@ func (h *eventHandler) Handle(ctx context.Context, event *models.Event) (*Handle } } - if matchedDestinations == nil { - matchedDestinations = []string{} + if matched == nil { + matched = []string{} } // Stamp matched destinations onto the event for downstream persistence. - event.MatchedDestinationIDs = matchedDestinations + event.MatchedDestinationIDs = matched result := &HandleResult{ EventID: event.ID, Duplicate: false, - DestinationIDs: matchedDestinations, + DestinationIDs: matched, } - // Early return if no destinations matched - if len(matchedDestinations) == 0 { - logger.Debug("no matching destinations", - zap.String("event_id", event.ID), - zap.String("tenant_id", event.TenantID)) + if len(matched) == 0 { return result, nil } @@ -128,22 +153,23 @@ func (h *eventHandler) Handle(ctx context.Context, event *models.Event) (*Handle executed := false err = h.idempotence.Exec(ctx, idempotencyKeyFromEvent(event), func(ctx context.Context) error { executed = true - return h.doPublish(ctx, event, matchedDestinations) + return h.doPublish(ctx, event, matched, &enqueuedMu, &enqueued) }) if err != nil { + enqueueFailed = true return nil, err } - // Set duplicate flag if not executed (idempotency hit) if !executed { + duplicate = true result.Duplicate = true } return result, nil } -func (h *eventHandler) doPublish(ctx context.Context, event *models.Event, matchedDestinations []string) error { +func (h *eventHandler) doPublish(ctx context.Context, event *models.Event, matchedDestinations []string, enqueuedMu *sync.Mutex, enqueued *[]string) error { _, span := h.eventTracer.Receive(ctx, event) defer span.End() @@ -152,7 +178,13 @@ func (h *eventHandler) doPublish(ctx context.Context, event *models.Event, match var g errgroup.Group for _, destID := range matchedDestinations { g.Go(func() error { - return h.enqueueDeliveryTask(ctx, models.NewDeliveryTask(*event, destID)) + if err := h.enqueueDeliveryTask(ctx, models.NewDeliveryTask(*event, destID)); err != nil { + return err + } + enqueuedMu.Lock() + *enqueued = append(*enqueued, destID) + enqueuedMu.Unlock() + return nil }) } if err := g.Wait(); err != nil { @@ -188,6 +220,7 @@ func (h *eventHandler) matchSpecificDestination(ctx context.Context, event *mode func (h *eventHandler) enqueueDeliveryTask(ctx context.Context, task models.DeliveryTask) error { _, deliverySpan := h.eventTracer.StartDelivery(ctx, &task) + defer deliverySpan.End() if err := h.deliveryMQ.Publish(ctx, task); err != nil { h.logger.Ctx(ctx).Error("failed to enqueue delivery task", zap.Error(err), @@ -195,15 +228,8 @@ func (h *eventHandler) enqueueDeliveryTask(ctx context.Context, task models.Deli zap.String("tenant_id", task.Event.TenantID), zap.String("destination_id", task.DestinationID)) deliverySpan.RecordError(err) - deliverySpan.End() return err } - - h.logger.Ctx(ctx).Audit("delivery task enqueued", - zap.String("event_id", task.Event.ID), - zap.String("tenant_id", task.Event.TenantID), - zap.String("destination_id", task.DestinationID)) - deliverySpan.End() return nil } From 8d6d355a3c1704b86a326d1aeacabda6a1e4524b Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 27 May 2026 20:41:53 +0700 Subject: [PATCH 6/8] chore(logging): enrich wide event fields Add fields requested for observability use cases: delivery.attempted: - topic, attempt_code, retry_id, attempt_max, eligible_for_retry - rename attempt -> attempt_number to disambiguate from attempt_id event.received: - match_failed flag for MatchEvent failures - rename received_at -> event_received_at for *_at field consistency --- internal/deliverymq/messagehandler.go | 7 ++++++- internal/publishmq/eventhandler.go | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index 7fcb17c74..3130253e2 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -286,13 +286,18 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.Del zap.String("attempt_id", attempt.ID), zap.String("event_id", task.Event.ID), zap.String("tenant_id", task.Event.TenantID), + zap.String("topic", task.Event.Topic), zap.String("destination_id", destination.ID), zap.String("destination_type", destination.Type), zap.String("attempt_status", attempt.Status), - zap.Int("attempt", task.Attempt), + zap.String("attempt_code", attempt.Code), + zap.Int("attempt_number", task.Attempt), + zap.Int("attempt_max", h.retryMaxLimit+1), zap.Bool("manual", task.Manual), + zap.Bool("eligible_for_retry", task.Event.EligibleForRetry), zap.Time("attempt_started_at", attemptStart), zap.Int64("attempt_duration_ms", attemptDuration.Milliseconds()), + zap.String("retry_id", models.RetryID(task.Event.ID, task.DestinationID)), zap.Bool("retry_scheduled", retry.scheduled), zap.Bool("retry_canceled", retry.canceled), } diff --git a/internal/publishmq/eventhandler.go b/internal/publishmq/eventhandler.go index 14edcd563..cb507ee62 100644 --- a/internal/publishmq/eventhandler.go +++ b/internal/publishmq/eventhandler.go @@ -86,6 +86,7 @@ func (h *eventHandler) Handle(ctx context.Context, event *models.Event) (*Handle var matched []string var duplicate bool var enqueueFailed bool + var matchFailed bool defer func() { enqueuedMu.Lock() @@ -101,12 +102,15 @@ func (h *eventHandler) Handle(ctx context.Context, event *models.Event) (*Handle zap.Int("enqueued_destination_count", len(enqueuedCopy)), zap.Strings("enqueued_destination_ids", enqueuedCopy), zap.Bool("duplicate", duplicate), - zap.Time("received_at", receivedAt), + zap.Time("event_received_at", receivedAt), zap.Int64("duration_ms", time.Since(receivedAt).Milliseconds()), } if event.DestinationID != "" { fields = append(fields, zap.String("destination_id", event.DestinationID)) } + if matchFailed { + fields = append(fields, zap.Bool("match_failed", true)) + } if enqueueFailed { fields = append(fields, zap.Bool("enqueue_failed", true)) } @@ -124,6 +128,7 @@ func (h *eventHandler) Handle(ctx context.Context, event *models.Event) (*Handle } else { matched, err = h.tenantStore.MatchEvent(ctx, *event) if err != nil { + matchFailed = true logger.Error("failed to match event destinations", zap.Error(err), zap.String("event_id", event.ID), From 926f36b474be61c729c253f163ca71170a919e77 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 27 May 2026 22:20:33 +0700 Subject: [PATCH 7/8] chore(logging): route only Audit lines to OTel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split the logger so regular Info/Debug/Warn/Error calls go through a plain *zap.Logger (stdout only, no OTel export) and Audit() lines go through a separate otelzap.Logger (stdout + OTel logs SDK). This keeps operator diagnostics and debug noise out of the customer-visible OTel sink while preserving the wide-event audit stream for downstream consumers. Trace correlation in stdout logs is preserved manually via a small traceFields helper that pulls trace_id / span_id from the active span into zap fields — otelzap did this automatically; with the OTel sink gated to Audit() we attach them ourselves. The AuditLog config option / env var is removed; audit logging is now always enabled (audit lines were already a structural part of the event lifecycle, and the wide-event refactor made them the customer- facing source of truth). --- cmd/e2e/configs/basic.go | 1 - internal/apirouter/audit_log_test.go | 4 +- .../logger_middleware_integration_test.go | 3 +- internal/apirouter/router_test.go | 3 +- internal/app/app.go | 1 - internal/config/config.go | 2 - internal/config/logging.go | 1 - internal/logging/logger.go | 100 ++++++++++-------- internal/util/testutil/registry.go | 5 +- internal/util/testutil/testutil.go | 7 +- 10 files changed, 63 insertions(+), 64 deletions(-) diff --git a/cmd/e2e/configs/basic.go b/cmd/e2e/configs/basic.go index 927a2fed7..ea924708c 100644 --- a/cmd/e2e/configs/basic.go +++ b/cmd/e2e/configs/basic.go @@ -78,7 +78,6 @@ func Basic(t *testing.T, opts BasicOpts) config.Config { c.MQs.RabbitMQ.LogQueue = idgen.String() // Test-specific overrides - c.AuditLog = false // Disable audit logging to suppress info-level logs in tests c.PublishMaxConcurrency = 3 c.DeliveryMaxConcurrency = 3 c.LogMaxConcurrency = 3 diff --git a/internal/apirouter/audit_log_test.go b/internal/apirouter/audit_log_test.go index 62a37e7dc..d92a9b617 100644 --- a/internal/apirouter/audit_log_test.go +++ b/internal/apirouter/audit_log_test.go @@ -9,7 +9,6 @@ import ( "github.com/hookdeck/outpost/internal/logging" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/uptrace/opentelemetry-go-extra/otelzap" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" ) @@ -17,8 +16,7 @@ import ( func newAuditTest(t *testing.T, opts ...apiTestOption) (*apiTest, *observer.ObservedLogs) { t.Helper() core, logs := observer.New(zap.InfoLevel) - // When auditLogger is nil, Audit() falls through to the main logger. - logger := &logging.Logger{Logger: otelzap.New(zap.New(core))} + logger := logging.NewTestLogger(zap.New(core)) opts = append(opts, withLogger(logger)) return newAPITest(t, opts...), logs } diff --git a/internal/apirouter/logger_middleware_integration_test.go b/internal/apirouter/logger_middleware_integration_test.go index 392b1f943..60062a942 100644 --- a/internal/apirouter/logger_middleware_integration_test.go +++ b/internal/apirouter/logger_middleware_integration_test.go @@ -18,7 +18,6 @@ import ( "github.com/hookdeck/outpost/internal/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/uptrace/opentelemetry-go-extra/otelzap" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" ) @@ -99,7 +98,7 @@ func setupTestEnvironment(t *testing.T) (*gin.Engine, *observer.ObservedLogs, de // Create observed logger to capture logs core, logs := observer.New(zap.InfoLevel) - testLogger := &logging.Logger{Logger: otelzap.New(zap.New(core))} + testLogger := logging.NewTestLogger(zap.New(core)) // Create mock metadata with sensitive fields metadataLoader := &mockMetadataLoader{ diff --git a/internal/apirouter/router_test.go b/internal/apirouter/router_test.go index 16a3e0f1c..024bdb374 100644 --- a/internal/apirouter/router_test.go +++ b/internal/apirouter/router_test.go @@ -21,7 +21,6 @@ import ( "github.com/hookdeck/outpost/internal/telemetry" "github.com/hookdeck/outpost/internal/tenantstore" "github.com/hookdeck/outpost/internal/util/testutil" - "github.com/uptrace/opentelemetry-go-extra/otelzap" "go.uber.org/zap" ) @@ -100,7 +99,7 @@ func newAPITest(t *testing.T, opts ...apiTestOption) *apiTest { logger := cfg.logger if logger == nil { - logger = &logging.Logger{Logger: otelzap.New(zap.NewNop())} + logger = logging.NewTestLogger(zap.NewNop()) } ts := cfg.tenantStore ls := logstore.NewMemLogStore() diff --git a/internal/app/app.go b/internal/app/app.go index 87dc64eca..7c1d73d49 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -163,7 +163,6 @@ func (a *App) run(ctx context.Context) error { func (a *App) setupLogger() error { logger, err := logging.NewLogger( logging.WithLogLevel(a.config.LogLevel), - logging.WithAuditLog(a.config.AuditLog), ) if err != nil { return err diff --git a/internal/config/config.go b/internal/config/config.go index 6bfd03625..9bd97c69f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -44,7 +44,6 @@ type Config struct { Service string `yaml:"service" env:"SERVICE" desc:"Specifies the service type to run. Valid values: 'api', 'log', 'delivery', or empty/all for singular mode (runs all services)." required:"N"` LogLevel string `yaml:"log_level" env:"LOG_LEVEL" desc:"Defines the verbosity of application logs. Common values: 'trace', 'debug', 'info', 'warn', 'error'." required:"N"` - AuditLog bool `yaml:"audit_log" env:"AUDIT_LOG" desc:"Enables or disables audit logging for significant events." required:"N"` OpenTelemetry OpenTelemetryConfig `yaml:"otel"` Telemetry TelemetryConfig `yaml:"telemetry"` @@ -128,7 +127,6 @@ var ( func (c *Config) InitDefaults() { c.APIPort = 3333 c.LogLevel = "info" - c.AuditLog = true c.OpenTelemetry = OpenTelemetryConfig{} c.GinMode = "release" c.Redis = RedisConfig{ diff --git a/internal/config/logging.go b/internal/config/logging.go index 5706faf39..36e23ea91 100644 --- a/internal/config/logging.go +++ b/internal/config/logging.go @@ -31,7 +31,6 @@ func (c *Config) LogConfigurationSummary() []zap.Field { return "none (using defaults and environment variables)" }()), zap.String("log_level", c.LogLevel), - zap.Bool("audit_log", c.AuditLog), zap.String("deployment_id", c.DeploymentID), zap.Strings("topics", c.Topics), zap.String("http_user_agent", c.HTTPUserAgent), diff --git a/internal/logging/logger.go b/internal/logging/logger.go index 77623c310..6b5defa9a 100644 --- a/internal/logging/logger.go +++ b/internal/logging/logger.go @@ -5,23 +5,28 @@ import ( "os" "github.com/uptrace/opentelemetry-go-extra/otelzap" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) +// Logger splits sinks by purpose: the embedded *zap.Logger handles regular +// operator logs (stdout, no OTel export) while auditLogger is otelzap-wrapped +// so Audit() lines flow to both stdout and the OTel logs SDK. This keeps +// infrastructure errors and debug noise out of customer-visible OTel sinks. type Logger struct { - *otelzap.Logger + *zap.Logger auditLogger *otelzap.Logger } type LoggerWithCtx struct { - *otelzap.LoggerWithCtx - auditLogger *otelzap.LoggerWithCtx + *zap.Logger + ctx context.Context + auditLogger otelzap.LoggerWithCtx } type LoggerOption struct { LogLevel string - AuditLog bool } type Option func(o *LoggerOption) @@ -32,43 +37,37 @@ func WithLogLevel(logLevel string) Option { } } -func WithAuditLog(auditLog bool) Option { - return func(o *LoggerOption) { - o.AuditLog = auditLog - } -} - func NewLogger(opts ...Option) (*Logger, error) { option := &LoggerOption{} for _, opt := range opts { opt(option) } - logger, err := makeLogger(option.LogLevel) + zapLogger, level, err := buildZap(option.LogLevel) if err != nil { return nil, err } - var auditLogger *otelzap.Logger - if option.AuditLog { - auditLogger, err = makeLogger(zap.InfoLevel.String()) - if err != nil { - return nil, err - } + + auditZap, _, err := buildZap(option.LogLevel) + if err != nil { + return nil, err } - return &Logger{Logger: logger, auditLogger: auditLogger}, nil + auditLogger := otelzap.New(auditZap, otelzap.WithMinLevel(level)) + + return &Logger{Logger: zapLogger, auditLogger: auditLogger}, nil } -func makeLogger(logLevel string) (*otelzap.Logger, error) { +func buildZap(logLevel string) (*zap.Logger, zapcore.Level, error) { level, err := zapcore.ParseLevel(logLevel) if err != nil { - return nil, err + return nil, 0, err } zapConfig := zap.NewProductionConfig() zapConfig.Level = zap.NewAtomicLevelAt(level) zapLogger, err := zapConfig.Build() if err != nil { - return nil, err + return nil, 0, err } hostname, err := os.Hostname() if err != nil { @@ -76,39 +75,54 @@ func makeLogger(logLevel string) (*otelzap.Logger, error) { } zapLogger = zapLogger.With(zap.String("host.name", hostname)) - return otelzap.New(zapLogger, - otelzap.WithMinLevel(level), - ), nil + return zapLogger, level, nil } func (l *Logger) Ctx(ctx context.Context) LoggerWithCtx { - loggerWithCtx := l.Logger.Ctx(ctx) - var auditLoggerWithCtx *otelzap.LoggerWithCtx - if l.auditLogger != nil { - auditLoggerWithCtxValue := l.auditLogger.Ctx(ctx) - auditLoggerWithCtx = &auditLoggerWithCtxValue + return LoggerWithCtx{ + Logger: l.Logger.With(traceFields(ctx)...), + ctx: ctx, + auditLogger: l.auditLogger.Ctx(ctx), } - return LoggerWithCtx{LoggerWithCtx: &loggerWithCtx, auditLogger: auditLoggerWithCtx} } -func (l *Logger) getAuditLogger() *otelzap.Logger { - if l.auditLogger != nil { - return l.auditLogger - } - return l.Logger +func (l *Logger) Audit(msg string, fields ...zap.Field) { + l.auditLogger.Info(msg, fields...) } -func (l *Logger) Audit(msg string, fields ...zap.Field) { - l.getAuditLogger().Info(msg, fields...) +func (l LoggerWithCtx) Audit(msg string, fields ...zap.Field) { + l.auditLogger.Info(msg, fields...) } -func (l *LoggerWithCtx) getAuditLogger() *otelzap.LoggerWithCtx { - if l.auditLogger != nil { - return l.auditLogger +func (l LoggerWithCtx) WithOptions(opts ...zap.Option) LoggerWithCtx { + return LoggerWithCtx{ + Logger: l.Logger.WithOptions(opts...), + ctx: l.ctx, + auditLogger: l.auditLogger.WithOptions(opts...), } - return l.LoggerWithCtx } -func (l LoggerWithCtx) Audit(msg string, fields ...zap.Field) { - l.getAuditLogger().Info(msg, fields...) +// NewTestLogger wraps a zap logger for use in tests, providing an audit +// sink that points at the same zap so test output is self-contained. +func NewTestLogger(zapLogger *zap.Logger) *Logger { + return &Logger{ + Logger: zapLogger, + auditLogger: otelzap.New(zapLogger, otelzap.WithMinLevel(zap.InfoLevel)), + } +} + +// traceFields extracts the active span's trace/span IDs from ctx so they +// appear on stdout logs. otelzap did this automatically; with the OTel +// log sink reserved for Audit lines we attach them manually to preserve +// log<->trace correlation in operator-facing logs. +func traceFields(ctx context.Context) []zap.Field { + span := trace.SpanFromContext(ctx) + sc := span.SpanContext() + if !sc.IsValid() { + return nil + } + return []zap.Field{ + zap.String("trace_id", sc.TraceID().String()), + zap.String("span_id", sc.SpanID().String()), + } } diff --git a/internal/util/testutil/registry.go b/internal/util/testutil/registry.go index c8ed8a5bd..a3019eadc 100644 --- a/internal/util/testutil/registry.go +++ b/internal/util/testutil/registry.go @@ -4,15 +4,14 @@ import ( "github.com/hookdeck/outpost/internal/destregistry" destregistrydefault "github.com/hookdeck/outpost/internal/destregistry/providers" "github.com/hookdeck/outpost/internal/logging" - "github.com/uptrace/opentelemetry-go-extra/otelzap" + "go.uber.org/zap" ) var Registry destregistry.Registry func init() { - otelLogger := otelzap.L() Registry = destregistry.NewRegistry(&destregistry.Config{ DestinationMetadataPath: "", - }, &logging.Logger{Logger: otelLogger}) + }, logging.NewTestLogger(zap.NewNop())) destregistrydefault.RegisterDefault(Registry, destregistrydefault.RegisterDefaultDestinationOptions{}) } diff --git a/internal/util/testutil/testutil.go b/internal/util/testutil/testutil.go index 4719b9c0c..26068ff34 100644 --- a/internal/util/testutil/testutil.go +++ b/internal/util/testutil/testutil.go @@ -13,8 +13,6 @@ import ( "github.com/hookdeck/outpost/internal/logging" internalredis "github.com/hookdeck/outpost/internal/redis" "github.com/redis/go-redis/v9" - "github.com/uptrace/opentelemetry-go-extra/otelzap" - "go.uber.org/zap" "go.uber.org/zap/zaptest" ) @@ -80,10 +78,7 @@ func CreateTestRedisClient(t *testing.T) internalredis.Client { func CreateTestLogger(t *testing.T) *logging.Logger { zapLogger := zaptest.NewLogger(t) - logger := otelzap.New(zapLogger, - otelzap.WithMinLevel(zap.InfoLevel), - ) - return &logging.Logger{Logger: logger} + return logging.NewTestLogger(zapLogger) } func RandomString(length int) string { From d9a61ca56f6f522e5fe1f772cf8b6cd4f9b5d7c6 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Thu, 28 May 2026 01:01:38 +0700 Subject: [PATCH 8/8] chore(logging): emit event/delivery wide events at info instead of audit Per team discussion: the event lifecycle wide events are operator-relevant diagnostics, not customer-facing audit records. Drop them from the OTel audit sink so we don't push per-event / per-attempt records to the customer-visible stream. Stdout output and field shape unchanged. --- internal/deliverymq/messagehandler.go | 2 +- internal/publishmq/eventhandler.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index 3130253e2..30f5943a3 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -310,7 +310,7 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.Del if retry.cancelFailed { fields = append(fields, zap.Bool("retry_cancel_failed", true)) } - logger.Audit("delivery.attempted", fields...) + logger.Info("delivery.attempted", fields...) logEntry := models.LogEntry{ Event: &task.Event, diff --git a/internal/publishmq/eventhandler.go b/internal/publishmq/eventhandler.go index cb507ee62..9c5fca34f 100644 --- a/internal/publishmq/eventhandler.go +++ b/internal/publishmq/eventhandler.go @@ -114,7 +114,7 @@ func (h *eventHandler) Handle(ctx context.Context, event *models.Event) (*Handle if enqueueFailed { fields = append(fields, zap.Bool("enqueue_failed", true)) } - logger.Audit("event.received", fields...) + logger.Info("event.received", fields...) }() var err error