diff --git a/seqdelay.go b/seqdelay.go index 44c06d2..eb006cb 100644 --- a/seqdelay.go +++ b/seqdelay.go @@ -125,6 +125,13 @@ func (q *Queue) recover(ctx context.Context) error { if err != nil { continue } + + // Pre-fetch the ready list once so StateReady checks below are O(1). + // Best-effort: on error we treat the set as empty, which means orphans + // get re-enqueued (safe) and live IDs may get duplicated (harmless — + // the duplicate just becomes a stale ID once the original is popped). + readySet, _ := q.store.ReadyListIDs(ctx, topic) + for _, task := range tasks { switch task.State { case StateDelayed: @@ -142,7 +149,12 @@ func (q *Queue) recover(ctx context.Context) error { q.wheel.add(remaining, task.ID, task.Topic) } case StateReady: - // Already in the ready list — nothing to do. + // If the ID isn't on the ready list, the previous process + // crashed between BLPOP and the state update — re-enqueue so + // the task isn't lost. + if _, ok := readySet[task.ID]; !ok { + _ = q.store.RequeueReady(ctx, topic, task.ID) + } case StateFinished, StateCancelled: _ = q.store.CleanupIndex(ctx, topic, task.ID) } diff --git a/seqdelay_test.go b/seqdelay_test.go index 25ef6ad..e2099d2 100644 --- a/seqdelay_test.go +++ b/seqdelay_test.go @@ -356,3 +356,90 @@ func TestQueue_OnExpire_Conflict(t *testing.T) { t.Errorf("expected ErrTopicConflict, got %v", err) } } + +// --------------------------------------------------------------------------- +// Recover: orphan StateReady tasks +// --------------------------------------------------------------------------- + +// TestQueue_Recover_OrphanReadyTask plants the state a previous process leaves +// behind when it crashes between BLPOP and the state update inside PopTask: +// the task is stored as StateReady, but its ID is no longer on the ready list. +// recover() should re-enqueue the orphan so it isn't lost. +func TestQueue_Recover_OrphanReadyTask(t *testing.T) { + client := getTestRedis(t) + defer client.Close() + + ctx := context.Background() + s := newStore(client) + + // Persist a task in StateReady without pushing the ID onto the ready list. + orphan := newTestTask("orphan-topic", "orphan-1") + orphan.State = StateReady + if err := s.SaveTask(ctx, orphan); err != nil { + t.Fatalf("SaveTask: %v", err) + } + + // Sanity: ready list is empty, task is in StateReady. + if n, _ := client.LLen(ctx, readyKey(orphan.Topic)).Result(); n != 0 { + t.Fatalf("ready list should start empty, got %d", n) + } + + q, err := New( + WithRedisClient(client), + WithTickInterval(10*time.Millisecond), + WithPopTimeout(2*time.Second), + ) + if err != nil { + t.Fatalf("New: %v", err) + } + if err := q.Start(ctx); err != nil { + t.Fatalf("Start: %v", err) + } + defer q.Shutdown(ctx) //nolint:errcheck + + // After Start (which calls recover), the orphan ID should be back on the + // ready list and Pop should return it. + popped, err := q.Pop(ctx, orphan.Topic) + if err != nil { + t.Fatalf("Pop: %v", err) + } + if popped == nil { + t.Fatal("Pop returned nil — orphan was not recovered") + } + if popped.ID != orphan.ID { + t.Errorf("recovered wrong task: got %q, want %q", popped.ID, orphan.ID) + } +} + +// TestQueue_Recover_LiveReadyTaskNotDuplicated guards against the inverse: +// when a StateReady task's ID is already on the ready list, recover() must +// not push a duplicate. +func TestQueue_Recover_LiveReadyTaskNotDuplicated(t *testing.T) { + client := getTestRedis(t) + defer client.Close() + + ctx := context.Background() + s := newStore(client) + + live := newTestTask("live-topic", "live-1") + live.State = StateReady + if err := s.SaveTask(ctx, live); err != nil { + t.Fatalf("SaveTask: %v", err) + } + if err := client.RPush(ctx, readyKey(live.Topic), live.ID).Err(); err != nil { + t.Fatalf("RPush: %v", err) + } + + q, err := New(WithRedisClient(client), WithTickInterval(10*time.Millisecond)) + if err != nil { + t.Fatalf("New: %v", err) + } + if err := q.Start(ctx); err != nil { + t.Fatalf("Start: %v", err) + } + defer q.Shutdown(ctx) //nolint:errcheck + + if n, _ := client.LLen(ctx, readyKey(live.Topic)).Result(); n != 1 { + t.Errorf("ready list should still hold exactly 1 entry, got %d", n) + } +} diff --git a/store.go b/store.go index 39ae544..5269ba0 100644 --- a/store.go +++ b/store.go @@ -369,6 +369,31 @@ func (s *store) PopTask(ctx context.Context, topic string, timeout time.Duration return t, nil } +// --------------------------------------------------------------------------- +// ReadyListIDs / RequeueReady +// --------------------------------------------------------------------------- + +// ReadyListIDs returns the set of task IDs currently sitting on the topic's +// ready list. Used by recover() to detect orphan StateReady tasks whose IDs +// went missing because a previous process crashed mid-Pop. +func (s *store) ReadyListIDs(ctx context.Context, topic string) (map[string]struct{}, error) { + ids, err := s.client.LRange(ctx, readyKey(topic), 0, -1).Result() + if err != nil { + return nil, err + } + set := make(map[string]struct{}, len(ids)) + for _, id := range ids { + set[id] = struct{}{} + } + return set, nil +} + +// RequeueReady appends id to the topic's ready list. Idempotency is the +// caller's responsibility (recover checks ReadyListIDs first). +func (s *store) RequeueReady(ctx context.Context, topic, id string) error { + return s.client.RPush(ctx, readyKey(topic), id).Err() +} + // --------------------------------------------------------------------------- // LoadTopicTasks // ---------------------------------------------------------------------------