From 3213e73fbfcae2ddecfb71ab256fc09d40d5a16d Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 23 Mar 2026 16:02:12 +0700 Subject: [PATCH 1/5] fix: redesign attempt_number semantics for consistent 1-indexed tracking (#662) - Make attempt_number 1-indexed (was 0-indexed) across all delivery paths - Manual retries derive attempt_number from logstore instead of hardcoding - Manual retry failures now interact with retry schedule (cancel + reschedule) - Budget exhaustion on manual retry cancels any lingering scheduled retry Co-Authored-By: Claude Opus 4.6 --- cmd/e2e/regressions_test.go | 185 +++++++++++++++++++++ cmd/e2e/retry_test.go | 11 +- internal/apirouter/retry_handlers.go | 21 ++- internal/apirouter/retry_handlers_test.go | 11 +- internal/deliverymq/messagehandler.go | 24 ++- internal/deliverymq/messagehandler_test.go | 115 +++++++++++-- internal/deliverymq/mock_test.go | 31 +++- internal/models/tasks.go | 14 +- 8 files changed, 369 insertions(+), 43 deletions(-) diff --git a/cmd/e2e/regressions_test.go b/cmd/e2e/regressions_test.go index ceb550957..fa2cace81 100644 --- a/cmd/e2e/regressions_test.go +++ b/cmd/e2e/regressions_test.go @@ -182,6 +182,191 @@ func TestE2E_Regression_AutoDisableWithoutCallbackURL(t *testing.T) { t.Fatal("timed out waiting for destination to be disabled (disabled_at should not be null) - issue #596") } +// TestE2E_ManualRetryScheduleInteraction is a standalone E2E test that verifies +// manual retries correctly interact with the automatic retry schedule. +// +// Setup: +// - Isolated outpost instance with a scheduled backoff: [2s, 4s] (2 retries, 3 max attempts) +// - Fast log flush (immediate) so logstore queries in the retry handler are reliable +// - Fast retry poll (50ms) so retries fire promptly after their delay expires +// - A destination that always fails (should_err metadata) +// +// Timeline: +// +// t=0s Publish always-failing event +// t~0s Attempt 1 (auto) fails → auto retry scheduled at tier 0 (2s) +// t<2s Trigger manual retry via POST /retry +// t~0s Attempt 2 (manual) fails → cancels pending 2s retry, schedules at tier 1 (4s) +// Key assertion: NO attempt 3 arrives at t~2s (the 2s retry was canceled) +// t~4s Attempt 3 (auto) fires → fails → budget exhausted (3 = 1 initial + 2 retries) +// Key assertion: no attempt 4 arrives (budget exhausted) +// +// What this proves: +// 1. Manual retry gets correct sequential attempt_number (2, derived from logstore) +// 2. Manual retry failure cancels the pending automatic retry (the 2s tier never fires) +// 3. Manual retry failure schedules the next tier (4s) — the schedule advances +// 4. Budget is correctly exhausted after 3 total attempts (1 auto + 1 manual + 1 auto) +// 5. No further retries after budget exhaustion +func TestE2E_ManualRetryScheduleInteraction(t *testing.T) { + t.Parallel() + if testing.Short() { + t.Skip("skipping e2e test") + } + + testinfraCleanup := testinfra.Start(t) + defer testinfraCleanup() + gin.SetMode(gin.TestMode) + mockServerBaseURL := testinfra.GetMockServer(t) + + cfg := configs.Basic(t, configs.BasicOpts{ + LogStorage: configs.LogStorageTypeClickHouse, + }) + + // Scheduled backoff: 2s, 4s — enough window to trigger manual retry before + // the first auto retry fires at 2s, while keeping the test under 10s + cfg.RetrySchedule = []int{2, 4} + cfg.RetryPollBackoffMs = 50 + cfg.LogBatchThresholdSeconds = 0 // Immediate flush so logstore queries work + + require.NoError(t, cfg.Validate(config.Flags{})) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + appDone := make(chan struct{}) + go func() { + defer close(appDone) + application := app.New(&cfg) + if err := application.Run(ctx); err != nil { + log.Println("Application stopped:", err) + } + }() + defer func() { + cancel() + <-appDone + }() + + waitForHealthy(t, cfg.APIPort, 5*time.Second) + + client := newRegressionHTTPClient(cfg.APIKey) + apiURL := fmt.Sprintf("http://localhost:%d/api/v1", cfg.APIPort) + + tenantID := fmt.Sprintf("tenant_manual_%d", time.Now().UnixNano()) + destinationID := fmt.Sprintf("dest_manual_%d", time.Now().UnixNano()) + eventID := fmt.Sprintf("evt_manual_%d", time.Now().UnixNano()) + secret := testSecret + + // Create tenant + status := client.doJSON(t, http.MethodPut, apiURL+"/tenants/"+tenantID, nil, nil) + require.Equal(t, 201, status, "failed to create tenant") + + // Configure mock server destination (always returns error) + status = client.doJSONRaw(t, http.MethodPut, mockServerBaseURL+"/destinations", map[string]any{ + "id": destinationID, + "type": "webhook", + "config": map[string]any{ + "url": fmt.Sprintf("%s/webhook/%s", mockServerBaseURL, destinationID), + }, + "credentials": map[string]any{ + "secret": secret, + }, + }, nil) + require.Equal(t, 200, status, "failed to configure mock server") + + // Create destination + status = client.doJSON(t, http.MethodPost, apiURL+"/tenants/"+tenantID+"/destinations", map[string]any{ + "id": destinationID, + "type": "webhook", + "topics": "*", + "config": map[string]any{ + "url": fmt.Sprintf("%s/webhook/%s", mockServerBaseURL, destinationID), + }, + "credentials": map[string]any{ + "secret": secret, + }, + }, nil) + require.Equal(t, 201, status, "failed to create destination") + + // Publish always-failing event with retry enabled + status = client.doJSON(t, http.MethodPost, apiURL+"/publish", map[string]any{ + "id": eventID, + "tenant_id": tenantID, + "topic": "user.created", + "eligible_for_retry": true, + "metadata": map[string]any{ + "should_err": "true", + }, + "data": map[string]any{ + "test": "manual-retry-schedule", + }, + }, nil) + require.Equal(t, 202, status, "failed to publish event") + + // Wait for attempt 1 (initial auto delivery, fails) + // TODO: extract into a shared standalone poll helper (not tied to basicSuite) + pollAttempts := func(t *testing.T, minCount int, timeout time.Duration) []map[string]any { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + var resp struct { + Models []map[string]any `json:"models"` + } + s := client.doJSON(t, http.MethodGet, + apiURL+"/attempts?tenant_id="+tenantID+"&event_id="+eventID+"&dir=asc", nil, &resp) + if s == http.StatusOK && len(resp.Models) >= minCount { + return resp.Models + } + time.Sleep(100 * time.Millisecond) + } + t.Fatalf("timed out waiting for %d attempts", minCount) + return nil + } + + attempts := pollAttempts(t, 1, 5*time.Second) + require.Len(t, attempts, 1, "should have exactly 1 attempt (initial delivery)") + require.Equal(t, float64(1), attempts[0]["attempt_number"], "initial attempt should be attempt_number=1") + + // Trigger manual retry BEFORE the 3s auto retry fires + // This should cancel the pending 3s retry and schedule a 6s retry + status = client.doJSON(t, http.MethodPost, apiURL+"/retry", map[string]any{ + "event_id": eventID, + "destination_id": destinationID, + }, nil) + require.Equal(t, 202, status, "manual retry should be accepted") + + // Wait for attempt 2 (manual retry, fails) + attempts = pollAttempts(t, 2, 5*time.Second) + require.Equal(t, float64(2), attempts[1]["attempt_number"], "manual retry should be attempt_number=2") + require.Equal(t, true, attempts[1]["manual"], "attempt 2 should be manual=true") + + // Verify the 2s auto retry was canceled: wait 2.5s and confirm no attempt 3 arrived + // (if the cancel failed, a 3rd attempt would appear at ~t=2s) + time.Sleep(2500 * time.Millisecond) + var midResp struct { + Models []map[string]any `json:"models"` + } + client.doJSON(t, http.MethodGet, + apiURL+"/attempts?tenant_id="+tenantID+"&event_id="+eventID, nil, &midResp) + require.Len(t, midResp.Models, 2, + "should still have only 2 attempts at t~2.5s (2s auto retry was canceled)") + + // Wait for attempt 3 (auto retry at tier 1 = 4s from manual retry time) + attempts = pollAttempts(t, 3, 5*time.Second) + require.Equal(t, float64(3), attempts[2]["attempt_number"], "auto retry should be attempt_number=3") + require.NotEqual(t, true, attempts[2]["manual"], "attempt 3 should be auto (not manual)") + + // Verify budget exhaustion: no attempt 4 should arrive + // Schedule has 2 entries → 2 retries max → 3 total attempts. Budget is exhausted. + time.Sleep(2 * time.Second) + var finalResp struct { + Models []map[string]any `json:"models"` + } + client.doJSON(t, http.MethodGet, + apiURL+"/attempts?tenant_id="+tenantID+"&event_id="+eventID, nil, &finalResp) + require.Len(t, finalResp.Models, 3, + "should have exactly 3 attempts (budget exhausted: 1 initial + 2 retries)") +} + // TestE2E_Regression_RetryRaceCondition verifies that retries are not lost when // the retry scheduler queries logstore before the event has been persisted. func TestE2E_Regression_RetryRaceCondition(t *testing.T) { diff --git a/cmd/e2e/retry_test.go b/cmd/e2e/retry_test.go index 6b9b7ccbb..3d289bc1d 100644 --- a/cmd/e2e/retry_test.go +++ b/cmd/e2e/retry_test.go @@ -35,7 +35,7 @@ func (s *basicSuite) TestRetry_FailedDeliveryAutoRetries() { for i, atm := range resp.Models { s.Equal(float64(i+1), atm["attempt_number"], - "attempt %d should have attempt_number=%d (automated retry increments)", i, i+1) + "attempt %d should have attempt_number=%d (1-indexed, automated retry increments)", i, i+1) } } @@ -51,7 +51,7 @@ func (s *basicSuite) TestRetry_ManualRetryCreatesNewAttempt() { // Wait for initial delivery s.waitForNewAttempts(tenant.ID, 1) - // Verify first attempt has attempt_number=1 + // Verify first attempt has attempt_number=1 (1-indexed) var attResp struct { Models []map[string]any `json:"models"` } @@ -75,9 +75,10 @@ func (s *basicSuite) TestRetry_ManualRetryCreatesNewAttempt() { s.Require().Equal(http.StatusOK, status) s.Require().Len(verifyResp.Models, 2) - // Both should have attempt_number=1 (manual retry resets) - for _, atm := range verifyResp.Models { - s.Equal(float64(1), atm["attempt_number"]) + // Attempt numbers should be sequential (1-indexed) + for i, atm := range verifyResp.Models { + s.Equal(float64(i+1), atm["attempt_number"], + "attempt %d should have attempt_number=%d", i, i+1) } // Verify one manual=true diff --git a/internal/apirouter/retry_handlers.go b/internal/apirouter/retry_handlers.go index 116d2a628..590239e1d 100644 --- a/internal/apirouter/retry_handlers.go +++ b/internal/apirouter/retry_handlers.go @@ -105,8 +105,25 @@ func (h *RetryHandlers) Retry(c *gin.Context) { return } - // 3. Create and publish manual delivery task - task := models.NewManualDeliveryTask(*event, req.DestinationID) + // 3. Derive attempt number from existing attempts + attemptResp, err := h.logStore.ListAttempt(c.Request.Context(), logstore.ListAttemptRequest{ + TenantIDs: []string{event.TenantID}, + EventIDs: []string{req.EventID}, + DestinationIDs: []string{req.DestinationID}, + Limit: 1, + SortOrder: "desc", + }) + if err != nil { + AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) + return + } + attemptNumber := 1 + if len(attemptResp.Data) > 0 { + attemptNumber = attemptResp.Data[0].Attempt.AttemptNumber + 1 + } + + // 4. Create and publish manual delivery task + task := models.NewManualDeliveryTask(*event, req.DestinationID, attemptNumber) if err := h.deliveryPublisher.Publish(c.Request.Context(), task); err != nil { AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) diff --git a/internal/apirouter/retry_handlers_test.go b/internal/apirouter/retry_handlers_test.go index 1d4edbb4a..cb2670c90 100644 --- a/internal/apirouter/retry_handlers_test.go +++ b/internal/apirouter/retry_handlers_test.go @@ -23,7 +23,7 @@ func TestAPI_Retry(t *testing.T) { h.tenantStore.UpsertDestination(t.Context(), df.Any(df.WithID("d1"), df.WithTenantID("t1"), df.WithTopics([]string{"*"}))) e := ef.AnyPointer(ef.WithID("e1"), ef.WithTenantID("t1"), ef.WithTopic("user.created")) require.NoError(t, h.logStore.InsertMany(t.Context(), []*models.LogEntry{ - {Event: e, Attempt: attemptForEvent(e)}, + {Event: e, Attempt: attemptForEvent(e, af.WithDestinationID("d1"))}, })) return h } @@ -134,7 +134,7 @@ func TestAPI_Retry(t *testing.T) { // Event belongs to t1 e := ef.AnyPointer(ef.WithID("e1"), ef.WithTenantID("t1"), ef.WithTopic("user.created")) require.NoError(t, h.logStore.InsertMany(t.Context(), []*models.LogEntry{ - {Event: e, Attempt: attemptForEvent(e)}, + {Event: e, Attempt: attemptForEvent(e, af.WithDestinationID("d1"))}, })) // JWT for t2 tries to retry t1's event @@ -154,7 +154,7 @@ func TestAPI_Retry(t *testing.T) { h.tenantStore.UpsertDestination(t.Context(), dest) e := ef.AnyPointer(ef.WithID("e1"), ef.WithTenantID("t1"), ef.WithTopic("user.created")) require.NoError(t, h.logStore.InsertMany(t.Context(), []*models.LogEntry{ - {Event: e, Attempt: attemptForEvent(e)}, + {Event: e, Attempt: attemptForEvent(e, af.WithDestinationID("d1"))}, })) req := h.jsonReq(http.MethodPost, "/api/v1/retry", map[string]any{ @@ -188,7 +188,7 @@ func TestAPI_Retry(t *testing.T) { h.tenantStore.UpsertDestination(t.Context(), dest) e := ef.AnyPointer(ef.WithID("e1"), ef.WithTenantID("t1"), ef.WithTopic("user.created")) require.NoError(t, h.logStore.InsertMany(t.Context(), []*models.LogEntry{ - {Event: e, Attempt: attemptForEvent(e)}, + {Event: e, Attempt: attemptForEvent(e, af.WithDestinationID("d1"))}, })) req := h.jsonReq(http.MethodPost, "/api/v1/retry", map[string]any{ @@ -209,7 +209,7 @@ func TestAPI_Retry(t *testing.T) { // Event has topic "user.created" e := ef.AnyPointer(ef.WithID("e1"), ef.WithTenantID("t1"), ef.WithTopic("user.created")) require.NoError(t, h.logStore.InsertMany(t.Context(), []*models.LogEntry{ - {Event: e, Attempt: attemptForEvent(e)}, + {Event: e, Attempt: attemptForEvent(e, af.WithDestinationID("d1"))}, })) req := h.jsonReq(http.MethodPost, "/api/v1/retry", map[string]any{ @@ -252,6 +252,7 @@ func TestAPI_Retry(t *testing.T) { assert.Equal(t, "e1", task.Event.ID) assert.Equal(t, "t1", task.Event.TenantID) assert.Equal(t, "d1", task.DestinationID) + assert.Equal(t, 2, task.Attempt, "should derive attempt_number=2 from 1 prior attempt") }) t.Run("returns success body", func(t *testing.T) { diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index 117a25a53..622e5c1db 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -210,9 +210,20 @@ func (h *messageHandler) doHandle(ctx context.Context, task models.DeliveryTask, attemptErr := &AttemptError{err: err} if h.shouldScheduleRetry(task, err) { + // scheduleRetry uses RetryID (event_id:destination_id) as the scheduler + // task ID. The scheduler has upsert semantics: scheduling with the same ID + // atomically replaces the existing entry (both timing and payload). This + // means manual retries automatically override any pending automatic retry + // without needing an explicit cancel — the new tier's delay takes effect + // and the old scheduled retry is gone in a single operation. if retryErr := h.scheduleRetry(ctx, task); retryErr != nil { return h.logDeliveryResult(ctx, &task, destination, attempt, errors.Join(err, retryErr)) } + } else if task.Manual { + // Budget exhausted or not eligible — cancel any lingering scheduled retry. + // Unlike the case above, there's no new retry to schedule so we must + // explicitly cancel to prevent a stale automatic retry from firing. + _ = h.retryScheduler.Cancel(ctx, models.RetryID(task.Event.ID, task.DestinationID)) } return h.logDeliveryResult(ctx, &task, destination, attempt, attemptErr) } @@ -360,9 +371,6 @@ func (h *messageHandler) handleAlertAttempt(ctx context.Context, task *models.De } func (h *messageHandler) shouldScheduleRetry(task models.DeliveryTask, err error) bool { - if task.Manual { - return false - } if !task.Event.EligibleForRetry { return false } @@ -370,7 +378,7 @@ func (h *messageHandler) shouldScheduleRetry(task models.DeliveryTask, err error if !errors.As(err, &pubErr) { return false } - // Attempt starts at 1 for initial attempt, so use <= to allow retryMaxLimit total attempts + // Attempt is 1-indexed: max attempts = 1 (initial) + retryMaxLimit (retries) return task.Attempt <= h.retryMaxLimit } @@ -420,11 +428,9 @@ func (h *messageHandler) shouldNackDeliveryError(err error) bool { } func (h *messageHandler) scheduleRetry(ctx context.Context, task models.DeliveryTask) error { - // Backoff expects a 0-based index (0 for first retry, 1 for second, etc.). - // attempt_number changed from 0-based to 1-based without migrating in-flight - // tasks, so clamp to 0 to safely handle any leftover Attempt=0 tasks. - backoffIndex := max(task.Attempt-1, 0) - backoffDuration := h.retryBackoff.Duration(backoffIndex) + // Attempt is 1-indexed; backoff schedule is 0-indexed. + // Clamp to 0 to safely handle any leftover Attempt=0 in-flight tasks. + backoffDuration := h.retryBackoff.Duration(max(task.Attempt-1, 0)) retryTask := RetryTaskFromDeliveryTask(task) retryTaskStr, err := retryTask.ToString() diff --git a/internal/deliverymq/messagehandler_test.go b/internal/deliverymq/messagehandler_test.go index ebbab75f6..56293087c 100644 --- a/internal/deliverymq/messagehandler_test.go +++ b/internal/deliverymq/messagehandler_test.go @@ -770,9 +770,10 @@ func TestManualDelivery_Success(t *testing.T) { func TestManualDelivery_PublishError(t *testing.T) { // Test scenario: - // - Manual delivery fails with publish error - // - Should not schedule retry (manual delivery never retries) - // - Should be acked + // - A pending automatic retry exists at tier 0 (1s delay) + // - Manual delivery (attempt 2) fails with publish error + // - The schedule should be atomically overridden to the next tier (attempt 3) + // - Result: exactly 1 scheduled retry at the new tier, no explicit cancel needed // Setup test data tenant := models.Tenant{ID: idgen.String()} @@ -782,9 +783,11 @@ func TestManualDelivery_PublishError(t *testing.T) { event := testutil.EventFactory.Any( testutil.EventFactory.WithTenantID(tenant.ID), testutil.EventFactory.WithDestinationID(destination.ID), - testutil.EventFactory.WithEligibleForRetry(true), // even with retry enabled + testutil.EventFactory.WithEligibleForRetry(true), ) + retryID := models.RetryID(event.ID, destination.ID) + // Setup mocks destGetter := &mockDestinationGetter{dest: &destination} retryScheduler := newMockRetryScheduler() @@ -800,6 +803,15 @@ func TestManualDelivery_PublishError(t *testing.T) { logPublisher := newMockLogPublisher(nil) alertMonitor := newMockAlertMonitor() + // Seed: a pending automatic retry exists (scheduled after attempt 1 failed) + retryScheduler.entries[retryID] = scheduledEntry{task: "old-retry", delay: 1 * time.Second} + + // Backoff schedule: tier N has delay (N+1)s, so we can assert the exact tier + // by checking the delay. Attempt 2 uses backoff index 1 → tier 1 → 2s. + retryBackoff := &backoff.ScheduledBackoff{ + Schedule: []time.Duration{1 * time.Second, 2 * time.Second, 3 * time.Second, 4 * time.Second, 5 * time.Second}, + } + // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), @@ -808,17 +820,18 @@ func TestManualDelivery_PublishError(t *testing.T) { publisher, testutil.NewMockEventTracer(nil), retryScheduler, - &backoff.ConstantBackoff{Interval: 1 * time.Second}, - 10, + retryBackoff, + 5, alertMonitor, idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) - // Create and handle message + // Create and handle message — manual retry as attempt 2 task := models.DeliveryTask{ Event: event, DestinationID: destination.ID, - Manual: true, // Manual delivery + Attempt: 2, + Manual: true, } mockMsg, msg := newDeliveryMockMessage(task) @@ -830,10 +843,90 @@ func TestManualDelivery_PublishError(t *testing.T) { assert.True(t, mockMsg.acked, "message should be acked") assert.False(t, mockMsg.nacked, "message should not be nacked") assert.Equal(t, 1, publisher.current, "should attempt publish once") - assert.Empty(t, retryScheduler.schedules, "should not schedule retry for manual delivery") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusFailed, logPublisher.entries[0].Attempt.Status, "delivery status should be Failed") assertAlertMonitor(t, alertMonitor, false, &destination, publishErr.Data) + + // Assert retry state: the old pending retry was atomically replaced + require.Len(t, retryScheduler.entries, 1, "should have exactly 1 scheduled retry") + entry, ok := retryScheduler.entries[retryID] + require.True(t, ok, "scheduled retry should use the same RetryID") + assert.NotEqual(t, "old-retry", entry.task, "old retry payload should be replaced") + assert.Equal(t, 2*time.Second, entry.delay, "should schedule at tier 1 (2s), not tier 0 (1s)") +} + +func TestManualDelivery_PublishError_BudgetExhausted(t *testing.T) { + // Test scenario: + // - A pending automatic retry exists (lingering from a previous tier) + // - Manual delivery fails with publish error but retry budget is exhausted + // - The pending retry should be canceled, no new retry scheduled + // - Result: empty retry state + + // Setup test data + tenant := models.Tenant{ID: idgen.String()} + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithTenantID(tenant.ID), + ) + event := testutil.EventFactory.Any( + testutil.EventFactory.WithTenantID(tenant.ID), + testutil.EventFactory.WithDestinationID(destination.ID), + testutil.EventFactory.WithEligibleForRetry(true), + ) + + retryID := models.RetryID(event.ID, destination.ID) + + // Setup mocks + destGetter := &mockDestinationGetter{dest: &destination} + retryScheduler := newMockRetryScheduler() + publishErr := &destregistry.ErrDestinationPublishAttempt{ + Err: errors.New("webhook returned 429"), + Provider: "webhook", + Data: map[string]interface{}{ + "error": "publish_failed", + "message": "webhook returned 429", + }, + } + publisher := newMockPublisher([]error{publishErr}) + logPublisher := newMockLogPublisher(nil) + alertMonitor := newMockAlertMonitor() + + // Seed: a pending automatic retry exists (lingering from earlier failure) + retryScheduler.entries[retryID] = scheduledEntry{task: "stale-retry", delay: 5 * time.Second} + + // Setup message handler with retryMaxLimit=3 (max attempts = 4) + handler := deliverymq.NewMessageHandler( + testutil.CreateTestLogger(t), + logPublisher, + destGetter, + publisher, + testutil.NewMockEventTracer(nil), + retryScheduler, + &backoff.ConstantBackoff{Interval: 1 * time.Second}, + 3, + alertMonitor, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), + ) + + // Create and handle message — manual retry as attempt 4 (budget exhausted: 1 + 3 retries) + task := models.DeliveryTask{ + Event: event, + DestinationID: destination.ID, + Attempt: 4, + Manual: true, + } + mockMsg, msg := newDeliveryMockMessage(task) + + // Handle message + err := handler.Handle(context.Background(), msg) + require.Error(t, err) + + // Assert behavior + assert.True(t, mockMsg.acked, "message should be acked") + require.Len(t, logPublisher.entries, 1, "should have one delivery") + assert.Equal(t, models.AttemptStatusFailed, logPublisher.entries[0].Attempt.Status, "delivery status should be Failed") + + // Assert retry state: pending retry was canceled, nothing new scheduled + assert.Empty(t, retryScheduler.entries, "should have no scheduled retries after budget exhaustion") } func TestManualDelivery_CancelError(t *testing.T) { @@ -1160,7 +1253,7 @@ func TestManualDelivery_DuplicateRetry(t *testing.T) { ) // Step 1: First manual retry succeeds - task1 := models.NewManualDeliveryTask(event, destination.ID) + task1 := models.NewManualDeliveryTask(event, destination.ID, 1) mockMsg1, msg1 := newDeliveryMockMessage(task1) err := handler.Handle(context.Background(), msg1) require.NoError(t, err) @@ -1169,7 +1262,7 @@ func TestManualDelivery_DuplicateRetry(t *testing.T) { require.Len(t, logPublisher.entries, 1, "first manual retry should log delivery") // Step 2: Second manual retry for same event+destination should also execute - task2 := models.NewManualDeliveryTask(event, destination.ID) + task2 := models.NewManualDeliveryTask(event, destination.ID, 2) mockMsg2, msg2 := newDeliveryMockMessage(task2) err = handler.Handle(context.Background(), msg2) require.NoError(t, err) diff --git a/internal/deliverymq/mock_test.go b/internal/deliverymq/mock_test.go index d90f6ea86..5e650ba60 100644 --- a/internal/deliverymq/mock_test.go +++ b/internal/deliverymq/mock_test.go @@ -180,7 +180,16 @@ func (m *mockLogPublisher) Publish(ctx context.Context, entry models.LogEntry) e return m.err } +// scheduledEntry represents a retry entry in the stateful mock scheduler. +// Mirrors the real scheduler's upsert semantics: Schedule with the same ID +// overwrites the previous entry, Cancel removes it. +type scheduledEntry struct { + task string + delay time.Duration +} + type mockRetryScheduler struct { + // Call-recording fields (used by existing tests) schedules []string taskIDs []string canceled []string @@ -188,6 +197,11 @@ type mockRetryScheduler struct { cancelResp []error scheduleIdx int cancelIdx int + + // Stateful map: tracks the current set of scheduled retries. + // Schedule upserts by ID, Cancel deletes by ID — matching real + // scheduler behavior (RSMQ ZAdd+HSet overwrites, DeleteMessage removes). + entries map[string]scheduledEntry } func newMockRetryScheduler() *mockRetryScheduler { @@ -197,6 +211,7 @@ func newMockRetryScheduler() *mockRetryScheduler { canceled: make([]string, 0), scheduleResp: make([]error, 0), cancelResp: make([]error, 0), + entries: make(map[string]scheduledEntry), } } @@ -204,12 +219,14 @@ func (m *mockRetryScheduler) Schedule(ctx context.Context, task string, delay ti m.schedules = append(m.schedules, task) // Capture the task ID by applying the option - if len(opts) > 0 { - options := &scheduler.ScheduleOptions{} - opts[0](options) - if options.ID != "" { - m.taskIDs = append(m.taskIDs, options.ID) - } + options := &scheduler.ScheduleOptions{} + for _, opt := range opts { + opt(options) + } + if options.ID != "" { + m.taskIDs = append(m.taskIDs, options.ID) + // Upsert into stateful map + m.entries[options.ID] = scheduledEntry{task: task, delay: delay} } if m.scheduleIdx < len(m.scheduleResp) { @@ -222,6 +239,8 @@ func (m *mockRetryScheduler) Schedule(ctx context.Context, task string, delay ti func (m *mockRetryScheduler) Cancel(ctx context.Context, taskID string) error { m.canceled = append(m.canceled, taskID) + delete(m.entries, taskID) + if m.cancelIdx < len(m.cancelResp) { err := m.cancelResp[m.cancelIdx] m.cancelIdx++ diff --git a/internal/models/tasks.go b/internal/models/tasks.go index dd54d1cdc..8f1e1d8eb 100644 --- a/internal/models/tasks.go +++ b/internal/models/tasks.go @@ -87,12 +87,16 @@ func NewDeliveryTask(event Event, destinationID string) DeliveryTask { } // NewManualDeliveryTask creates a new DeliveryTask for a manual retry. +// attemptNumber is the 1-indexed attempt number derived from the count of prior attempts. // Each manual retry gets a unique nonce so separate /retry requests are not deduplicated. -func NewManualDeliveryTask(event Event, destinationID string) DeliveryTask { - task := NewDeliveryTask(event, destinationID) - task.Manual = true - task.Nonce = idgen.String() - return task +func NewManualDeliveryTask(event Event, destinationID string, attemptNumber int) DeliveryTask { + return DeliveryTask{ + Event: event, + DestinationID: destinationID, + Attempt: attemptNumber, + Manual: true, + Nonce: idgen.String(), + } } // LogEntry represents a message for the log queue. From 131a85feaefe843299ccf3b907be873c0ff2f613 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 23 Mar 2026 16:32:32 +0700 Subject: [PATCH 2/5] test: use stateful mock scheduler with tier-aware backoff assertions Replace call-recording assertions with stateful map that mirrors real scheduler upsert/delete semantics. Tests now seed initial retry state and assert the resulting state after handler execution. Use ScheduledBackoff so each tier has a distinct delay, proving the schedule advances to the correct tier. Co-Authored-By: Claude Opus 4.6 --- cmd/e2e/regressions_test.go | 20 +++++++++++++++++++- internal/deliverymq/messagehandler_test.go | 5 +++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/cmd/e2e/regressions_test.go b/cmd/e2e/regressions_test.go index fa2cace81..bf0b60a72 100644 --- a/cmd/e2e/regressions_test.go +++ b/cmd/e2e/regressions_test.go @@ -362,9 +362,27 @@ func TestE2E_ManualRetryScheduleInteraction(t *testing.T) { Models []map[string]any `json:"models"` } client.doJSON(t, http.MethodGet, - apiURL+"/attempts?tenant_id="+tenantID+"&event_id="+eventID, nil, &finalResp) + apiURL+"/attempts?tenant_id="+tenantID+"&event_id="+eventID+"&dir=asc", nil, &finalResp) require.Len(t, finalResp.Models, 3, "should have exactly 3 attempts (budget exhausted: 1 initial + 2 retries)") + + // Assert the full picture: attempt_number is sequential, manual flag is correct + expected := []struct { + attemptNumber float64 + manual bool + }{ + {1, false}, // initial auto delivery + {2, true}, // manual retry + {3, false}, // auto retry (advanced tier) + } + for i, exp := range expected { + atm := finalResp.Models[i] + require.Equal(t, exp.attemptNumber, atm["attempt_number"], + "attempt %d: expected attempt_number=%v", i+1, exp.attemptNumber) + manual, _ := atm["manual"].(bool) + require.Equal(t, exp.manual, manual, + "attempt %d: expected manual=%v", i+1, exp.manual) + } } // TestE2E_Regression_RetryRaceCondition verifies that retries are not lost when diff --git a/internal/deliverymq/messagehandler_test.go b/internal/deliverymq/messagehandler_test.go index 56293087c..8a0383919 100644 --- a/internal/deliverymq/messagehandler_test.go +++ b/internal/deliverymq/messagehandler_test.go @@ -803,7 +803,7 @@ func TestManualDelivery_PublishError(t *testing.T) { logPublisher := newMockLogPublisher(nil) alertMonitor := newMockAlertMonitor() - // Seed: a pending automatic retry exists (scheduled after attempt 1 failed) + // Seed: a pending automatic retry exists (scheduled after attempt 1 failed at tier 0 = 1s) retryScheduler.entries[retryID] = scheduledEntry{task: "old-retry", delay: 1 * time.Second} // Backoff schedule: tier N has delay (N+1)s, so we can assert the exact tier @@ -847,7 +847,8 @@ func TestManualDelivery_PublishError(t *testing.T) { assert.Equal(t, models.AttemptStatusFailed, logPublisher.entries[0].Attempt.Status, "delivery status should be Failed") assertAlertMonitor(t, alertMonitor, false, &destination, publishErr.Data) - // Assert retry state: the old pending retry was atomically replaced + // Assert retry state: the old pending retry (tier 0, 1s) was atomically + // replaced with the next tier. Attempt 2 uses backoff index 1 → 2s. require.Len(t, retryScheduler.entries, 1, "should have exactly 1 scheduled retry") entry, ok := retryScheduler.entries[retryID] require.True(t, ok, "scheduled retry should use the same RetryID") From 279c298d247ea11fc101f3b7aca3a6bf08c3bea6 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 23 Mar 2026 21:15:49 +0700 Subject: [PATCH 3/5] fix: derive attempt_number from logstore and unify idempotency key Retry executor now queries logstore for the latest attempt instead of carrying a stale attempt number in the RSMQ message. Idempotency key changed to event_id:destination_id:attempt_number so manual and auto retries with the same attempt number are deduplicated (race protection). Co-Authored-By: Claude Opus 4.6 --- internal/deliverymq/mock_test.go | 56 +++++++++++++++++++++++++++++-- internal/deliverymq/retry.go | 32 +++++++++++++++--- internal/deliverymq/retry_test.go | 9 +++-- internal/models/tasks.go | 20 ++++------- 4 files changed, 93 insertions(+), 24 deletions(-) diff --git a/internal/deliverymq/mock_test.go b/internal/deliverymq/mock_test.go index 5e650ba60..b8294434c 100644 --- a/internal/deliverymq/mock_test.go +++ b/internal/deliverymq/mock_test.go @@ -108,13 +108,15 @@ func (m *mockMultiDestinationGetter) RetrieveDestination(ctx context.Context, te type mockEventGetter struct { events map[string]*models.Event + attempts []*models.Attempt // tracks logged attempts for ListAttempt err error lastRetrievedID string } func newMockEventGetter() *mockEventGetter { return &mockEventGetter{ - events: make(map[string]*models.Event), + events: make(map[string]*models.Event), + attempts: make([]*models.Attempt, 0), } } @@ -135,6 +137,35 @@ func (m *mockEventGetter) RetrieveEvent(ctx context.Context, req logstore.Retrie return m.events[req.EventID], nil } +func (m *mockEventGetter) ListAttempt(ctx context.Context, req logstore.ListAttemptRequest) (logstore.ListAttemptResponse, error) { + if m.err != nil { + return logstore.ListAttemptResponse{}, m.err + } + // Filter attempts matching the request criteria + var matched []*logstore.AttemptRecord + for _, a := range m.attempts { + if len(req.EventIDs) > 0 && !contains(req.EventIDs, a.EventID) { + continue + } + if len(req.DestinationIDs) > 0 && !contains(req.DestinationIDs, a.DestinationID) { + continue + } + matched = append(matched, &logstore.AttemptRecord{Attempt: a}) + } + // Sort desc by AttemptNumber (highest first) + for i := 0; i < len(matched); i++ { + for j := i + 1; j < len(matched); j++ { + if matched[j].Attempt.AttemptNumber > matched[i].Attempt.AttemptNumber { + matched[i], matched[j] = matched[j], matched[i] + } + } + } + if req.Limit > 0 && len(matched) > req.Limit { + matched = matched[:req.Limit] + } + return logstore.ListAttemptResponse{Data: matched}, nil +} + // mockDelayedEventGetter simulates the race condition where event is not yet // persisted to logstore when retry scheduler first queries it. // Returns (nil, nil) for the first N calls, then returns the event. @@ -163,9 +194,24 @@ func (m *mockDelayedEventGetter) RetrieveEvent(ctx context.Context, req logstore return m.event, nil } +func (m *mockDelayedEventGetter) ListAttempt(ctx context.Context, req logstore.ListAttemptRequest) (logstore.ListAttemptResponse, error) { + // Return empty — simulates no prior attempts logged yet (consistent with delayed persistence) + return logstore.ListAttemptResponse{}, nil +} + +func contains(slice []string, s string) bool { + for _, v := range slice { + if v == s { + return true + } + } + return false +} + type mockLogPublisher struct { - err error - entries []models.LogEntry + err error + entries []models.LogEntry + eventGetter *mockEventGetter // if set, feed logged attempts to this getter } func newMockLogPublisher(err error) *mockLogPublisher { @@ -177,6 +223,10 @@ func newMockLogPublisher(err error) *mockLogPublisher { func (m *mockLogPublisher) Publish(ctx context.Context, entry models.LogEntry) error { m.entries = append(m.entries, entry) + // Feed attempt to mockEventGetter so ListAttempt returns correct data + if m.eventGetter != nil && entry.Attempt != nil { + m.eventGetter.attempts = append(m.eventGetter.attempts, entry.Attempt) + } return m.err } diff --git a/internal/deliverymq/retry.go b/internal/deliverymq/retry.go index 66f1e791b..e0d361294 100644 --- a/internal/deliverymq/retry.go +++ b/internal/deliverymq/retry.go @@ -19,6 +19,7 @@ import ( // This is defined separately from EventGetter in messagehandler.go to avoid circular dependencies. type RetryEventGetter interface { RetrieveEvent(ctx context.Context, request logstore.RetrieveEventRequest) (*models.Event, error) + ListAttempt(ctx context.Context, request logstore.ListAttemptRequest) (logstore.ListAttemptResponse, error) } // RetrySchedulerOption is a functional option for configuring the retry scheduler. @@ -103,7 +104,30 @@ func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, d return fmt.Errorf("event not found in logstore") } - deliveryTask := retryTask.ToDeliveryTask(*event) + // Derive attempt number from logstore (single source of truth) + attemptResp, err := eventGetter.ListAttempt(ctx, logstore.ListAttemptRequest{ + TenantIDs: []string{retryTask.TenantID}, + EventIDs: []string{retryTask.EventID}, + DestinationIDs: []string{retryTask.DestinationID}, + Limit: 1, + SortOrder: "desc", + }) + if err != nil { + if logger != nil { + logger.Ctx(ctx).Error("failed to list attempts for retry", + zap.Error(err), + zap.String("event_id", retryTask.EventID), + zap.String("tenant_id", retryTask.TenantID), + zap.String("destination_id", retryTask.DestinationID)) + } + return err + } + attemptNumber := 1 + if len(attemptResp.Data) > 0 { + attemptNumber = attemptResp.Data[0].Attempt.AttemptNumber + 1 + } + + deliveryTask := retryTask.ToDeliveryTask(*event, attemptNumber) if err := deliverymq.Publish(ctx, deliveryTask); err != nil { return err } @@ -125,7 +149,6 @@ type RetryTask struct { EventID string TenantID string DestinationID string - Attempt int Telemetry *models.DeliveryTelemetry } @@ -141,9 +164,9 @@ func (m *RetryTask) FromString(str string) error { return json.Unmarshal([]byte(str), &m) } -func (m *RetryTask) ToDeliveryTask(event models.Event) models.DeliveryTask { +func (m *RetryTask) ToDeliveryTask(event models.Event, attemptNumber int) models.DeliveryTask { return models.DeliveryTask{ - Attempt: m.Attempt, + Attempt: attemptNumber, DestinationID: m.DestinationID, Event: event, Telemetry: m.Telemetry, @@ -155,7 +178,6 @@ func RetryTaskFromDeliveryTask(task models.DeliveryTask) RetryTask { EventID: task.Event.ID, TenantID: task.Event.TenantID, DestinationID: task.DestinationID, - Attempt: task.Attempt + 1, Telemetry: task.Telemetry, } } diff --git a/internal/deliverymq/retry_test.go b/internal/deliverymq/retry_test.go index 86257a962..298f3951f 100644 --- a/internal/deliverymq/retry_test.go +++ b/internal/deliverymq/retry_test.go @@ -195,13 +195,15 @@ func TestDeliveryMQRetry_EligibleForRetryTrue(t *testing.T) { }) eventGetter := newMockEventGetter() eventGetter.registerEvent(&event) + logPublisher := newMockLogPublisher(nil) + logPublisher.eventGetter = eventGetter suite := &RetryDeliveryMQSuite{ ctx: ctx, mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}}, publisher: publisher, eventGetter: eventGetter, - logPublisher: newMockLogPublisher(nil), + logPublisher: logPublisher, destGetter: &mockDestinationGetter{dest: &destination}, alertMonitor: newMockAlertMonitor(), retryMaxCount: 10, @@ -321,13 +323,15 @@ func TestDeliveryMQRetry_RetryMaxCount(t *testing.T) { }) eventGetter := newMockEventGetter() eventGetter.registerEvent(&event) + logPublisher := newMockLogPublisher(nil) + logPublisher.eventGetter = eventGetter suite := &RetryDeliveryMQSuite{ ctx: ctx, mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}}, publisher: publisher, eventGetter: eventGetter, - logPublisher: newMockLogPublisher(nil), + logPublisher: logPublisher, destGetter: &mockDestinationGetter{dest: &destination}, alertMonitor: newMockAlertMonitor(), retryMaxCount: 2, // 1 initial + 2 retries = 3 total attempts @@ -529,6 +533,7 @@ func TestRetryScheduler_EventFetchSuccess(t *testing.T) { eventGetter.registerEvent(&event) logPublisher := newMockLogPublisher(nil) + logPublisher.eventGetter = eventGetter suite := &RetryDeliveryMQSuite{ ctx: ctx, diff --git a/internal/models/tasks.go b/internal/models/tasks.go index 8f1e1d8eb..da4ea837b 100644 --- a/internal/models/tasks.go +++ b/internal/models/tasks.go @@ -2,8 +2,8 @@ package models import ( "encoding/json" + "strconv" - "github.com/hookdeck/outpost/internal/idgen" "github.com/hookdeck/outpost/internal/mqs" ) @@ -39,7 +39,6 @@ type DeliveryTask struct { DestinationID string `json:"destination_id"` Attempt int `json:"attempt"` Manual bool `json:"manual"` - Nonce string `json:"nonce,omitempty"` Telemetry *DeliveryTelemetry `json:"telemetry,omitempty"` } @@ -58,17 +57,12 @@ func (t *DeliveryTask) ToMessage() (*mqs.Message, error) { } // IdempotencyKey returns the key used for idempotency checks. -// Manual retries include a nonce so each /retry request gets its own idempotency key, -// while MQ redeliveries of the same message (same nonce) are still deduplicated. -// Nonce was added to fix a regression from #653 where removing DeliveryEvent.ID -// made the manual retry idempotency key static per event+destination. +// Uses event_id:destination_id:attempt_number so that: +// - Manual and auto retries with the same attempt_number are deduplicated (race protection) +// - Each new attempt gets a fresh key (no need to clear on failure) +// - MQ redeliveries of the same message are still deduplicated func (t *DeliveryTask) IdempotencyKey() string { - if t.Manual { - return t.Event.ID + ":" + t.DestinationID + ":manual:" + t.Nonce - } - // Non-manual deliveries share a key per event+destination. On failure, the - // idempotency key is cleared so the scheduled retry can execute with the same key. - return t.Event.ID + ":" + t.DestinationID + return t.Event.ID + ":" + t.DestinationID + ":" + strconv.Itoa(t.Attempt) } // RetryID returns the ID used for scheduling and canceling retries. @@ -88,14 +82,12 @@ func NewDeliveryTask(event Event, destinationID string) DeliveryTask { // NewManualDeliveryTask creates a new DeliveryTask for a manual retry. // attemptNumber is the 1-indexed attempt number derived from the count of prior attempts. -// Each manual retry gets a unique nonce so separate /retry requests are not deduplicated. func NewManualDeliveryTask(event Event, destinationID string, attemptNumber int) DeliveryTask { return DeliveryTask{ Event: event, DestinationID: destinationID, Attempt: attemptNumber, Manual: true, - Nonce: idgen.String(), } } From 6970a1be27f08ef93419fd7d938468e4cc5512d0 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 24 Mar 2026 00:26:12 +0700 Subject: [PATCH 4/5] refactor: use single ListAttempt query for both event data and attempt number ListAttempt already returns the associated Event on each AttemptRecord. Replace the two-query pattern (RetrieveEvent + ListAttempt) with a single ListAttempt call in both the retry executor and the API retry handler. Remove RetrieveEvent from RetryEventGetter interface. Co-Authored-By: Claude Opus 4.6 --- internal/apirouter/retry_handlers.go | 41 +++++++--------- internal/deliverymq/mock_test.go | 72 +++++++++++----------------- internal/deliverymq/retry.go | 56 +++++++--------------- internal/deliverymq/retry_test.go | 15 ++---- 4 files changed, 66 insertions(+), 118 deletions(-) diff --git a/internal/apirouter/retry_handlers.go b/internal/apirouter/retry_handlers.go index 590239e1d..bc85bd1fb 100644 --- a/internal/apirouter/retry_handlers.go +++ b/internal/apirouter/retry_handlers.go @@ -54,20 +54,30 @@ func (h *RetryHandlers) Retry(c *gin.Context) { tenantID := tenantIDFromContext(c) - // 1. Look up event by ID - event, err := h.logStore.RetrieveEvent(c.Request.Context(), logstore.RetrieveEventRequest{ - TenantID: tenantID, - EventID: req.EventID, - }) + // 1. Look up prior attempt (includes event data) — single logstore query + listReq := logstore.ListAttemptRequest{ + EventIDs: []string{req.EventID}, + DestinationIDs: []string{req.DestinationID}, + Limit: 1, + SortOrder: "desc", + } + if tenantID != "" { + listReq.TenantIDs = []string{tenantID} + } + attemptResp, err := h.logStore.ListAttempt(c.Request.Context(), listReq) if err != nil { AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) return } - if event == nil { + if len(attemptResp.Data) == 0 { AbortWithError(c, http.StatusNotFound, NewErrNotFound("event")) return } + record := attemptResp.Data[0] + event := record.Event + attemptNumber := record.Attempt.AttemptNumber + 1 + // Authz: JWT tenant can only retry their own events if tenant := tenantFromContext(c); tenant != nil { if event.TenantID != tenant.ID { @@ -105,24 +115,7 @@ func (h *RetryHandlers) Retry(c *gin.Context) { return } - // 3. Derive attempt number from existing attempts - attemptResp, err := h.logStore.ListAttempt(c.Request.Context(), logstore.ListAttemptRequest{ - TenantIDs: []string{event.TenantID}, - EventIDs: []string{req.EventID}, - DestinationIDs: []string{req.DestinationID}, - Limit: 1, - SortOrder: "desc", - }) - if err != nil { - AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) - return - } - attemptNumber := 1 - if len(attemptResp.Data) > 0 { - attemptNumber = attemptResp.Data[0].Attempt.AttemptNumber + 1 - } - - // 4. Create and publish manual delivery task + // 3. Create and publish manual delivery task task := models.NewManualDeliveryTask(*event, req.DestinationID, attemptNumber) if err := h.deliveryPublisher.Publish(c.Request.Context(), task); err != nil { diff --git a/internal/deliverymq/mock_test.go b/internal/deliverymq/mock_test.go index b8294434c..e64f21c8c 100644 --- a/internal/deliverymq/mock_test.go +++ b/internal/deliverymq/mock_test.go @@ -107,50 +107,34 @@ func (m *mockMultiDestinationGetter) RetrieveDestination(ctx context.Context, te } type mockEventGetter struct { - events map[string]*models.Event - attempts []*models.Attempt // tracks logged attempts for ListAttempt - err error - lastRetrievedID string + records []*logstore.AttemptRecord // tracks logged attempts with event data + err error } func newMockEventGetter() *mockEventGetter { return &mockEventGetter{ - events: make(map[string]*models.Event), - attempts: make([]*models.Attempt, 0), + records: make([]*logstore.AttemptRecord, 0), } } -func (m *mockEventGetter) registerEvent(event *models.Event) { - m.events[event.ID] = event -} - -func (m *mockEventGetter) clearError() { - m.err = nil -} - -func (m *mockEventGetter) RetrieveEvent(ctx context.Context, req logstore.RetrieveEventRequest) (*models.Event, error) { - if m.err != nil { - return nil, m.err - } - m.lastRetrievedID = req.EventID - // Match actual logstore behavior: return (nil, nil) when event not found - return m.events[req.EventID], nil +func (m *mockEventGetter) addRecord(attempt *models.Attempt, event *models.Event) { + m.records = append(m.records, &logstore.AttemptRecord{Attempt: attempt, Event: event}) } func (m *mockEventGetter) ListAttempt(ctx context.Context, req logstore.ListAttemptRequest) (logstore.ListAttemptResponse, error) { if m.err != nil { return logstore.ListAttemptResponse{}, m.err } - // Filter attempts matching the request criteria + // Filter records matching the request criteria var matched []*logstore.AttemptRecord - for _, a := range m.attempts { - if len(req.EventIDs) > 0 && !contains(req.EventIDs, a.EventID) { + for _, r := range m.records { + if len(req.EventIDs) > 0 && !contains(req.EventIDs, r.Attempt.EventID) { continue } - if len(req.DestinationIDs) > 0 && !contains(req.DestinationIDs, a.DestinationID) { + if len(req.DestinationIDs) > 0 && !contains(req.DestinationIDs, r.Attempt.DestinationID) { continue } - matched = append(matched, &logstore.AttemptRecord{Attempt: a}) + matched = append(matched, r) } // Sort desc by AttemptNumber (highest first) for i := 0; i < len(matched); i++ { @@ -166,37 +150,39 @@ func (m *mockEventGetter) ListAttempt(ctx context.Context, req logstore.ListAtte return logstore.ListAttemptResponse{Data: matched}, nil } -// mockDelayedEventGetter simulates the race condition where event is not yet -// persisted to logstore when retry scheduler first queries it. -// Returns (nil, nil) for the first N calls, then returns the event. +// mockDelayedEventGetter simulates the race condition where logstore data is not yet +// persisted when retry scheduler first queries it. +// Returns empty for the first N calls, then returns the attempt record. type mockDelayedEventGetter struct { - event *models.Event + record *logstore.AttemptRecord callCount int - returnAfterCall int // Return event after this many calls + returnAfterCall int // Return record after this many calls mu sync.Mutex } func newMockDelayedEventGetter(event *models.Event, returnAfterCall int) *mockDelayedEventGetter { return &mockDelayedEventGetter{ - event: event, + record: &logstore.AttemptRecord{ + Attempt: &models.Attempt{ + EventID: event.ID, + DestinationID: event.DestinationID, + AttemptNumber: 0, + }, + Event: event, + }, returnAfterCall: returnAfterCall, } } -func (m *mockDelayedEventGetter) RetrieveEvent(ctx context.Context, req logstore.RetrieveEventRequest) (*models.Event, error) { +func (m *mockDelayedEventGetter) ListAttempt(ctx context.Context, req logstore.ListAttemptRequest) (logstore.ListAttemptResponse, error) { m.mu.Lock() defer m.mu.Unlock() m.callCount++ if m.callCount <= m.returnAfterCall { - // Simulate event not yet persisted - return nil, nil + // Simulate data not yet persisted + return logstore.ListAttemptResponse{}, nil } - return m.event, nil -} - -func (m *mockDelayedEventGetter) ListAttempt(ctx context.Context, req logstore.ListAttemptRequest) (logstore.ListAttemptResponse, error) { - // Return empty — simulates no prior attempts logged yet (consistent with delayed persistence) - return logstore.ListAttemptResponse{}, nil + return logstore.ListAttemptResponse{Data: []*logstore.AttemptRecord{m.record}}, nil } func contains(slice []string, s string) bool { @@ -223,9 +209,9 @@ func newMockLogPublisher(err error) *mockLogPublisher { func (m *mockLogPublisher) Publish(ctx context.Context, entry models.LogEntry) error { m.entries = append(m.entries, entry) - // Feed attempt to mockEventGetter so ListAttempt returns correct data + // Feed attempt+event to mockEventGetter so ListAttempt returns correct data if m.eventGetter != nil && entry.Attempt != nil { - m.eventGetter.attempts = append(m.eventGetter.attempts, entry.Attempt) + m.eventGetter.addRecord(entry.Attempt, entry.Event) } return m.err } diff --git a/internal/deliverymq/retry.go b/internal/deliverymq/retry.go index e0d361294..8196316e1 100644 --- a/internal/deliverymq/retry.go +++ b/internal/deliverymq/retry.go @@ -15,10 +15,9 @@ import ( "go.uber.org/zap" ) -// RetryEventGetter is the interface for fetching events from logstore. -// This is defined separately from EventGetter in messagehandler.go to avoid circular dependencies. +// RetryEventGetter is the interface for fetching prior attempts from logstore. +// ListAttempt returns both the attempt and the associated event data. type RetryEventGetter interface { - RetrieveEvent(ctx context.Context, request logstore.RetrieveEventRequest) (*models.Event, error) ListAttempt(ctx context.Context, request logstore.ListAttemptRequest) (logstore.ListAttemptResponse, error) } @@ -73,38 +72,9 @@ func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, d return err } - // Fetch full event data from logstore - event, err := eventGetter.RetrieveEvent(ctx, logstore.RetrieveEventRequest{ - TenantID: retryTask.TenantID, - EventID: retryTask.EventID, - }) - if err != nil { - // Returning an error leaves the message in the RSMQ queue. After the - // visibility timeout expires, the message becomes visible again and will - // be reprocessed. This handles both transient DB errors and the race - // condition where logmq hasn't flushed the event yet. - if logger != nil { - logger.Ctx(ctx).Error("failed to fetch event for retry", - zap.Error(err), - zap.String("event_id", retryTask.EventID), - zap.String("tenant_id", retryTask.TenantID), - zap.String("destination_id", retryTask.DestinationID)) - } - return err - } - if event == nil { - // Event not found - may be race condition with logmq batching delay. - // Return error so scheduler retries later. - if logger != nil { - logger.Ctx(ctx).Warn("event not found in logstore, will retry", - zap.String("event_id", retryTask.EventID), - zap.String("tenant_id", retryTask.TenantID), - zap.String("destination_id", retryTask.DestinationID)) - } - return fmt.Errorf("event not found in logstore") - } - - // Derive attempt number from logstore (single source of truth) + // Fetch prior attempt from logstore (single source of truth for both + // event data and attempt number). A retry always has at least one prior + // attempt — the one that failed and triggered this retry. attemptResp, err := eventGetter.ListAttempt(ctx, logstore.ListAttemptRequest{ TenantIDs: []string{retryTask.TenantID}, EventIDs: []string{retryTask.EventID}, @@ -122,12 +92,20 @@ func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, d } return err } - attemptNumber := 1 - if len(attemptResp.Data) > 0 { - attemptNumber = attemptResp.Data[0].Attempt.AttemptNumber + 1 + if len(attemptResp.Data) == 0 { + // No prior attempt found — may be race condition with logmq batching delay. + // Return error so scheduler retries later. + if logger != nil { + logger.Ctx(ctx).Warn("no prior attempt found in logstore, will retry", + zap.String("event_id", retryTask.EventID), + zap.String("tenant_id", retryTask.TenantID), + zap.String("destination_id", retryTask.DestinationID)) + } + return fmt.Errorf("no prior attempt found in logstore") } - deliveryTask := retryTask.ToDeliveryTask(*event, attemptNumber) + record := attemptResp.Data[0] + deliveryTask := retryTask.ToDeliveryTask(*record.Event, record.Attempt.AttemptNumber+1) if err := deliverymq.Publish(ctx, deliveryTask); err != nil { return err } diff --git a/internal/deliverymq/retry_test.go b/internal/deliverymq/retry_test.go index 298f3951f..0d350f887 100644 --- a/internal/deliverymq/retry_test.go +++ b/internal/deliverymq/retry_test.go @@ -132,7 +132,6 @@ func TestDeliveryMQRetry_EligibleForRetryFalse(t *testing.T) { }, }) eventGetter := newMockEventGetter() - eventGetter.registerEvent(&event) suite := &RetryDeliveryMQSuite{ ctx: ctx, @@ -194,7 +193,6 @@ func TestDeliveryMQRetry_EligibleForRetryTrue(t *testing.T) { nil, // succeeds on 3rd try }) eventGetter := newMockEventGetter() - eventGetter.registerEvent(&event) logPublisher := newMockLogPublisher(nil) logPublisher.eventGetter = eventGetter @@ -253,7 +251,6 @@ func TestDeliveryMQRetry_SystemError(t *testing.T) { // Setup mocks with system error destGetter := &mockDestinationGetter{err: errors.New("destination lookup failed")} eventGetter := newMockEventGetter() - eventGetter.registerEvent(&event) suite := &RetryDeliveryMQSuite{ ctx: ctx, @@ -322,7 +319,6 @@ func TestDeliveryMQRetry_RetryMaxCount(t *testing.T) { }, // 4th attempt should never happen }) eventGetter := newMockEventGetter() - eventGetter.registerEvent(&event) logPublisher := newMockLogPublisher(nil) logPublisher.eventGetter = eventGetter @@ -385,10 +381,8 @@ func TestRetryScheduler_EventNotFound(t *testing.T) { }, }) - // Event getter does NOT have the event registered - // This simulates event being deleted from logstore before retry + // Event getter has no records — simulates data not yet in logstore eventGetter := newMockEventGetter() - // Intentionally NOT calling: eventGetter.registerEvent(&event) suite := &RetryDeliveryMQSuite{ ctx: ctx, @@ -421,8 +415,8 @@ func TestRetryScheduler_EventNotFound(t *testing.T) { // 50ms backoff + 10ms poll = 60ms minimum for retry time.Sleep(200 * time.Millisecond) - // Should only have 1 attempt - the retry was skipped because event not found - assert.Equal(t, 1, publisher.Current(), "should skip retry when event not found in logstore (returns nil, nil)") + // Should only have 1 attempt - the retry was skipped because no prior attempt found + assert.Equal(t, 1, publisher.Current(), "should skip retry when no prior attempt found in logstore") } func TestRetryScheduler_EventFetchError(t *testing.T) { @@ -459,7 +453,6 @@ func TestRetryScheduler_EventFetchError(t *testing.T) { // Event getter returns error (simulating transient DB error) eventGetter := newMockEventGetter() - eventGetter.registerEvent(&event) eventGetter.err = errors.New("database connection error") suite := &RetryDeliveryMQSuite{ @@ -528,9 +521,7 @@ func TestRetryScheduler_EventFetchSuccess(t *testing.T) { nil, // Second attempt succeeds }) - // Event getter has the event registered eventGetter := newMockEventGetter() - eventGetter.registerEvent(&event) logPublisher := newMockLogPublisher(nil) logPublisher.eventGetter = eventGetter From 3416478e20e8638ea93c228dc83d7c9a8806e27a Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 24 Mar 2026 00:45:49 +0700 Subject: [PATCH 5/5] fix: skip mqinfra integration tests in short mode EnsureLocalStack/EnsureRabbitMQ/EnsureGCP were called before testinfra.Start(t) had a chance to skip, causing container startups in CI even with -short. Add testutil.CheckIntegrationTest(t) at the top of each test function so the skip runs first. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/mqinfra/mqinfra_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/mqinfra/mqinfra_test.go b/internal/mqinfra/mqinfra_test.go index 539b1e601..3d9eda8b2 100644 --- a/internal/mqinfra/mqinfra_test.go +++ b/internal/mqinfra/mqinfra_test.go @@ -182,6 +182,7 @@ func testMQInfra(t *testing.T, mqConfig *Config, dlqConfig *Config) { } func TestIntegrationMQInfra_RabbitMQ(t *testing.T) { + testutil.CheckIntegrationTest(t) exchange := idgen.String() queue := idgen.String() @@ -225,6 +226,7 @@ func TestIntegrationMQInfra_RabbitMQ(t *testing.T) { } func TestIntegrationMQInfra_AWSSQS(t *testing.T) { + testutil.CheckIntegrationTest(t) q := idgen.String() testMQInfra(t, @@ -274,6 +276,7 @@ func TestIntegrationMQInfra_AWSSQS(t *testing.T) { } func TestIntegrationMQInfra_GCPPubSub(t *testing.T) { + testutil.CheckIntegrationTest(t) // Set PUBSUB_EMULATOR_HOST environment variable testinfra.EnsureGCP()