Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 203 additions & 0 deletions cmd/e2e/regressions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,209 @@ func TestE2E_Regression_AutoDisableWithoutCallbackURL(t *testing.T) {
t.Fatal("timed out waiting for destination to be disabled (disabled_at should not be null) - issue #596")
}

// TestE2E_ManualRetryScheduleInteraction is a standalone E2E test that verifies
// manual retries correctly interact with the automatic retry schedule.
//
// Setup:
// - Isolated outpost instance with a scheduled backoff: [2s, 4s] (2 retries, 3 max attempts)
// - Fast log flush (immediate) so logstore queries in the retry handler are reliable
// - Fast retry poll (50ms) so retries fire promptly after their delay expires
// - A destination that always fails (should_err metadata)
//
// Timeline:
//
// t=0s Publish always-failing event
// t~0s Attempt 1 (auto) fails → auto retry scheduled at tier 0 (2s)
// t<2s Trigger manual retry via POST /retry
// t~0s Attempt 2 (manual) fails → cancels pending 2s retry, schedules at tier 1 (4s)
// Key assertion: NO attempt 3 arrives at t~2s (the 2s retry was canceled)
// t~4s Attempt 3 (auto) fires → fails → budget exhausted (3 = 1 initial + 2 retries)
// Key assertion: no attempt 4 arrives (budget exhausted)
//
// What this proves:
// 1. Manual retry gets correct sequential attempt_number (2, derived from logstore)
// 2. Manual retry failure cancels the pending automatic retry (the 2s tier never fires)
// 3. Manual retry failure schedules the next tier (4s) — the schedule advances
// 4. Budget is correctly exhausted after 3 total attempts (1 auto + 1 manual + 1 auto)
// 5. No further retries after budget exhaustion
func TestE2E_ManualRetryScheduleInteraction(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping e2e test")
}

testinfraCleanup := testinfra.Start(t)
defer testinfraCleanup()
gin.SetMode(gin.TestMode)
mockServerBaseURL := testinfra.GetMockServer(t)

cfg := configs.Basic(t, configs.BasicOpts{
LogStorage: configs.LogStorageTypeClickHouse,
})

// Scheduled backoff: 2s, 4s — enough window to trigger manual retry before
// the first auto retry fires at 2s, while keeping the test under 10s
cfg.RetrySchedule = []int{2, 4}
cfg.RetryPollBackoffMs = 50
cfg.LogBatchThresholdSeconds = 0 // Immediate flush so logstore queries work

require.NoError(t, cfg.Validate(config.Flags{}))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

appDone := make(chan struct{})
go func() {
defer close(appDone)
application := app.New(&cfg)
if err := application.Run(ctx); err != nil {
log.Println("Application stopped:", err)
}
}()
defer func() {
cancel()
<-appDone
}()

waitForHealthy(t, cfg.APIPort, 5*time.Second)

client := newRegressionHTTPClient(cfg.APIKey)
apiURL := fmt.Sprintf("http://localhost:%d/api/v1", cfg.APIPort)

tenantID := fmt.Sprintf("tenant_manual_%d", time.Now().UnixNano())
destinationID := fmt.Sprintf("dest_manual_%d", time.Now().UnixNano())
eventID := fmt.Sprintf("evt_manual_%d", time.Now().UnixNano())
secret := testSecret

// Create tenant
status := client.doJSON(t, http.MethodPut, apiURL+"/tenants/"+tenantID, nil, nil)
require.Equal(t, 201, status, "failed to create tenant")

// Configure mock server destination (always returns error)
status = client.doJSONRaw(t, http.MethodPut, mockServerBaseURL+"/destinations", map[string]any{
"id": destinationID,
"type": "webhook",
"config": map[string]any{
"url": fmt.Sprintf("%s/webhook/%s", mockServerBaseURL, destinationID),
},
"credentials": map[string]any{
"secret": secret,
},
}, nil)
require.Equal(t, 200, status, "failed to configure mock server")

// Create destination
status = client.doJSON(t, http.MethodPost, apiURL+"/tenants/"+tenantID+"/destinations", map[string]any{
"id": destinationID,
"type": "webhook",
"topics": "*",
"config": map[string]any{
"url": fmt.Sprintf("%s/webhook/%s", mockServerBaseURL, destinationID),
},
"credentials": map[string]any{
"secret": secret,
},
}, nil)
require.Equal(t, 201, status, "failed to create destination")

// Publish always-failing event with retry enabled
status = client.doJSON(t, http.MethodPost, apiURL+"/publish", map[string]any{
"id": eventID,
"tenant_id": tenantID,
"topic": "user.created",
"eligible_for_retry": true,
"metadata": map[string]any{
"should_err": "true",
},
"data": map[string]any{
"test": "manual-retry-schedule",
},
}, nil)
require.Equal(t, 202, status, "failed to publish event")

// Wait for attempt 1 (initial auto delivery, fails)
// TODO: extract into a shared standalone poll helper (not tied to basicSuite)
pollAttempts := func(t *testing.T, minCount int, timeout time.Duration) []map[string]any {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
var resp struct {
Models []map[string]any `json:"models"`
}
s := client.doJSON(t, http.MethodGet,
apiURL+"/attempts?tenant_id="+tenantID+"&event_id="+eventID+"&dir=asc", nil, &resp)
if s == http.StatusOK && len(resp.Models) >= minCount {
return resp.Models
}
time.Sleep(100 * time.Millisecond)
}
t.Fatalf("timed out waiting for %d attempts", minCount)
return nil
}

attempts := pollAttempts(t, 1, 5*time.Second)
require.Len(t, attempts, 1, "should have exactly 1 attempt (initial delivery)")
require.Equal(t, float64(1), attempts[0]["attempt_number"], "initial attempt should be attempt_number=1")

// Trigger manual retry BEFORE the 3s auto retry fires
// This should cancel the pending 3s retry and schedule a 6s retry
status = client.doJSON(t, http.MethodPost, apiURL+"/retry", map[string]any{
"event_id": eventID,
"destination_id": destinationID,
}, nil)
require.Equal(t, 202, status, "manual retry should be accepted")

// Wait for attempt 2 (manual retry, fails)
attempts = pollAttempts(t, 2, 5*time.Second)
require.Equal(t, float64(2), attempts[1]["attempt_number"], "manual retry should be attempt_number=2")
require.Equal(t, true, attempts[1]["manual"], "attempt 2 should be manual=true")

// Verify the 2s auto retry was canceled: wait 2.5s and confirm no attempt 3 arrived
// (if the cancel failed, a 3rd attempt would appear at ~t=2s)
time.Sleep(2500 * time.Millisecond)
var midResp struct {
Models []map[string]any `json:"models"`
}
client.doJSON(t, http.MethodGet,
apiURL+"/attempts?tenant_id="+tenantID+"&event_id="+eventID, nil, &midResp)
require.Len(t, midResp.Models, 2,
"should still have only 2 attempts at t~2.5s (2s auto retry was canceled)")

// Wait for attempt 3 (auto retry at tier 1 = 4s from manual retry time)
attempts = pollAttempts(t, 3, 5*time.Second)
require.Equal(t, float64(3), attempts[2]["attempt_number"], "auto retry should be attempt_number=3")
require.NotEqual(t, true, attempts[2]["manual"], "attempt 3 should be auto (not manual)")

// Verify budget exhaustion: no attempt 4 should arrive
// Schedule has 2 entries → 2 retries max → 3 total attempts. Budget is exhausted.
time.Sleep(2 * time.Second)
var finalResp struct {
Models []map[string]any `json:"models"`
}
client.doJSON(t, http.MethodGet,
apiURL+"/attempts?tenant_id="+tenantID+"&event_id="+eventID+"&dir=asc", nil, &finalResp)
require.Len(t, finalResp.Models, 3,
"should have exactly 3 attempts (budget exhausted: 1 initial + 2 retries)")

// Assert the full picture: attempt_number is sequential, manual flag is correct
expected := []struct {
attemptNumber float64
manual bool
}{
{1, false}, // initial auto delivery
{2, true}, // manual retry
{3, false}, // auto retry (advanced tier)
}
for i, exp := range expected {
atm := finalResp.Models[i]
require.Equal(t, exp.attemptNumber, atm["attempt_number"],
"attempt %d: expected attempt_number=%v", i+1, exp.attemptNumber)
manual, _ := atm["manual"].(bool)
require.Equal(t, exp.manual, manual,
"attempt %d: expected manual=%v", i+1, exp.manual)
}
}

// TestE2E_Regression_RetryRaceCondition verifies that retries are not lost when
// the retry scheduler queries logstore before the event has been persisted.
func TestE2E_Regression_RetryRaceCondition(t *testing.T) {
Expand Down
11 changes: 6 additions & 5 deletions cmd/e2e/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *basicSuite) TestRetry_FailedDeliveryAutoRetries() {

for i, atm := range resp.Models {
s.Equal(float64(i+1), atm["attempt_number"],
"attempt %d should have attempt_number=%d (automated retry increments)", i, i+1)
"attempt %d should have attempt_number=%d (1-indexed, automated retry increments)", i, i+1)
}
}

Expand All @@ -51,7 +51,7 @@ func (s *basicSuite) TestRetry_ManualRetryCreatesNewAttempt() {
// Wait for initial delivery
s.waitForNewAttempts(tenant.ID, 1)

// Verify first attempt has attempt_number=1
// Verify first attempt has attempt_number=1 (1-indexed)
var attResp struct {
Models []map[string]any `json:"models"`
}
Expand All @@ -75,9 +75,10 @@ func (s *basicSuite) TestRetry_ManualRetryCreatesNewAttempt() {
s.Require().Equal(http.StatusOK, status)
s.Require().Len(verifyResp.Models, 2)

// Both should have attempt_number=1 (manual retry resets)
for _, atm := range verifyResp.Models {
s.Equal(float64(1), atm["attempt_number"])
// Attempt numbers should be sequential (1-indexed)
for i, atm := range verifyResp.Models {
s.Equal(float64(i+1), atm["attempt_number"],
"attempt %d should have attempt_number=%d", i, i+1)
}

// Verify one manual=true
Expand Down
24 changes: 17 additions & 7 deletions internal/apirouter/retry_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,30 @@ func (h *RetryHandlers) Retry(c *gin.Context) {

tenantID := tenantIDFromContext(c)

// 1. Look up event by ID
event, err := h.logStore.RetrieveEvent(c.Request.Context(), logstore.RetrieveEventRequest{
TenantID: tenantID,
EventID: req.EventID,
})
// 1. Look up prior attempt (includes event data) — single logstore query
listReq := logstore.ListAttemptRequest{
EventIDs: []string{req.EventID},
DestinationIDs: []string{req.DestinationID},
Limit: 1,
SortOrder: "desc",
}
if tenantID != "" {
listReq.TenantIDs = []string{tenantID}
}
attemptResp, err := h.logStore.ListAttempt(c.Request.Context(), listReq)
if err != nil {
AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err))
return
}
if event == nil {
if len(attemptResp.Data) == 0 {
AbortWithError(c, http.StatusNotFound, NewErrNotFound("event"))
return
}

record := attemptResp.Data[0]
event := record.Event
attemptNumber := record.Attempt.AttemptNumber + 1

// Authz: JWT tenant can only retry their own events
if tenant := tenantFromContext(c); tenant != nil {
if event.TenantID != tenant.ID {
Expand Down Expand Up @@ -106,7 +116,7 @@ func (h *RetryHandlers) Retry(c *gin.Context) {
}

// 3. Create and publish manual delivery task
task := models.NewManualDeliveryTask(*event, req.DestinationID)
task := models.NewManualDeliveryTask(*event, req.DestinationID, attemptNumber)

if err := h.deliveryPublisher.Publish(c.Request.Context(), task); err != nil {
AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err))
Expand Down
11 changes: 6 additions & 5 deletions internal/apirouter/retry_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestAPI_Retry(t *testing.T) {
h.tenantStore.UpsertDestination(t.Context(), df.Any(df.WithID("d1"), df.WithTenantID("t1"), df.WithTopics([]string{"*"})))
e := ef.AnyPointer(ef.WithID("e1"), ef.WithTenantID("t1"), ef.WithTopic("user.created"))
require.NoError(t, h.logStore.InsertMany(t.Context(), []*models.LogEntry{
{Event: e, Attempt: attemptForEvent(e)},
{Event: e, Attempt: attemptForEvent(e, af.WithDestinationID("d1"))},
}))
return h
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestAPI_Retry(t *testing.T) {
// Event belongs to t1
e := ef.AnyPointer(ef.WithID("e1"), ef.WithTenantID("t1"), ef.WithTopic("user.created"))
require.NoError(t, h.logStore.InsertMany(t.Context(), []*models.LogEntry{
{Event: e, Attempt: attemptForEvent(e)},
{Event: e, Attempt: attemptForEvent(e, af.WithDestinationID("d1"))},
}))

// JWT for t2 tries to retry t1's event
Expand All @@ -154,7 +154,7 @@ func TestAPI_Retry(t *testing.T) {
h.tenantStore.UpsertDestination(t.Context(), dest)
e := ef.AnyPointer(ef.WithID("e1"), ef.WithTenantID("t1"), ef.WithTopic("user.created"))
require.NoError(t, h.logStore.InsertMany(t.Context(), []*models.LogEntry{
{Event: e, Attempt: attemptForEvent(e)},
{Event: e, Attempt: attemptForEvent(e, af.WithDestinationID("d1"))},
}))

req := h.jsonReq(http.MethodPost, "/api/v1/retry", map[string]any{
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestAPI_Retry(t *testing.T) {
h.tenantStore.UpsertDestination(t.Context(), dest)
e := ef.AnyPointer(ef.WithID("e1"), ef.WithTenantID("t1"), ef.WithTopic("user.created"))
require.NoError(t, h.logStore.InsertMany(t.Context(), []*models.LogEntry{
{Event: e, Attempt: attemptForEvent(e)},
{Event: e, Attempt: attemptForEvent(e, af.WithDestinationID("d1"))},
}))

req := h.jsonReq(http.MethodPost, "/api/v1/retry", map[string]any{
Expand All @@ -209,7 +209,7 @@ func TestAPI_Retry(t *testing.T) {
// Event has topic "user.created"
e := ef.AnyPointer(ef.WithID("e1"), ef.WithTenantID("t1"), ef.WithTopic("user.created"))
require.NoError(t, h.logStore.InsertMany(t.Context(), []*models.LogEntry{
{Event: e, Attempt: attemptForEvent(e)},
{Event: e, Attempt: attemptForEvent(e, af.WithDestinationID("d1"))},
}))

req := h.jsonReq(http.MethodPost, "/api/v1/retry", map[string]any{
Expand Down Expand Up @@ -252,6 +252,7 @@ func TestAPI_Retry(t *testing.T) {
assert.Equal(t, "e1", task.Event.ID)
assert.Equal(t, "t1", task.Event.TenantID)
assert.Equal(t, "d1", task.DestinationID)
assert.Equal(t, 2, task.Attempt, "should derive attempt_number=2 from 1 prior attempt")
})

t.Run("returns success body", func(t *testing.T) {
Expand Down
24 changes: 15 additions & 9 deletions internal/deliverymq/messagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,20 @@ func (h *messageHandler) doHandle(ctx context.Context, task models.DeliveryTask,
attemptErr := &AttemptError{err: err}

if h.shouldScheduleRetry(task, err) {
// scheduleRetry uses RetryID (event_id:destination_id) as the scheduler
// task ID. The scheduler has upsert semantics: scheduling with the same ID
// atomically replaces the existing entry (both timing and payload). This
// means manual retries automatically override any pending automatic retry
// without needing an explicit cancel — the new tier's delay takes effect
// and the old scheduled retry is gone in a single operation.
if retryErr := h.scheduleRetry(ctx, task); retryErr != nil {
return h.logDeliveryResult(ctx, &task, destination, attempt, errors.Join(err, retryErr))
}
} else if task.Manual {
// Budget exhausted or not eligible — cancel any lingering scheduled retry.
// Unlike the case above, there's no new retry to schedule so we must
// explicitly cancel to prevent a stale automatic retry from firing.
_ = h.retryScheduler.Cancel(ctx, models.RetryID(task.Event.ID, task.DestinationID))
}
return h.logDeliveryResult(ctx, &task, destination, attempt, attemptErr)
}
Expand Down Expand Up @@ -360,17 +371,14 @@ func (h *messageHandler) handleAlertAttempt(ctx context.Context, task *models.De
}

func (h *messageHandler) shouldScheduleRetry(task models.DeliveryTask, err error) bool {
if task.Manual {
return false
}
if !task.Event.EligibleForRetry {
return false
}
var pubErr *destregistry.ErrDestinationPublishAttempt
if !errors.As(err, &pubErr) {
return false
}
// Attempt starts at 1 for initial attempt, so use <= to allow retryMaxLimit total attempts
// Attempt is 1-indexed: max attempts = 1 (initial) + retryMaxLimit (retries)
return task.Attempt <= h.retryMaxLimit
}

Expand Down Expand Up @@ -420,11 +428,9 @@ func (h *messageHandler) shouldNackDeliveryError(err error) bool {
}

func (h *messageHandler) scheduleRetry(ctx context.Context, task models.DeliveryTask) error {
// Backoff expects a 0-based index (0 for first retry, 1 for second, etc.).
// attempt_number changed from 0-based to 1-based without migrating in-flight
// tasks, so clamp to 0 to safely handle any leftover Attempt=0 tasks.
backoffIndex := max(task.Attempt-1, 0)
backoffDuration := h.retryBackoff.Duration(backoffIndex)
// Attempt is 1-indexed; backoff schedule is 0-indexed.
// Clamp to 0 to safely handle any leftover Attempt=0 in-flight tasks.
backoffDuration := h.retryBackoff.Duration(max(task.Attempt-1, 0))

retryTask := RetryTaskFromDeliveryTask(task)
retryTaskStr, err := retryTask.ToString()
Expand Down
Loading
Loading