From 7eb6b1a01ef703d71cd85fcceb58e9db17be8190 Mon Sep 17 00:00:00 2001 From: gocronx Date: Fri, 8 May 2026 23:13:24 +0800 Subject: [PATCH] fix(recover): re-enqueue orphan StateReady tasks (#3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a process crashes between BLPOP and the state update inside PopTask, the task ID is removed from the ready list but the task in Redis is still in StateReady. recover() previously skipped such tasks on the assumption that StateReady implied ready-list membership, leaving the task as an orphan until its TTL expired. This change has recover() pre-fetch each topic's ready list once and RPUSH any StateReady task whose ID is missing. Tests cover both the orphan and live-task cases — the live-task test guards against duplicate enqueues on restart. --- seqdelay.go | 14 +++++++- seqdelay_test.go | 87 ++++++++++++++++++++++++++++++++++++++++++++++++ store.go | 25 ++++++++++++++ 3 files changed, 125 insertions(+), 1 deletion(-) diff --git a/seqdelay.go b/seqdelay.go index 44c06d2..eb006cb 100644 --- a/seqdelay.go +++ b/seqdelay.go @@ -125,6 +125,13 @@ func (q *Queue) recover(ctx context.Context) error { if err != nil { continue } + + // Pre-fetch the ready list once so StateReady checks below are O(1). + // Best-effort: on error we treat the set as empty, which means orphans + // get re-enqueued (safe) and live IDs may get duplicated (harmless — + // the duplicate just becomes a stale ID once the original is popped). + readySet, _ := q.store.ReadyListIDs(ctx, topic) + for _, task := range tasks { switch task.State { case StateDelayed: @@ -142,7 +149,12 @@ func (q *Queue) recover(ctx context.Context) error { q.wheel.add(remaining, task.ID, task.Topic) } case StateReady: - // Already in the ready list — nothing to do. + // If the ID isn't on the ready list, the previous process + // crashed between BLPOP and the state update — re-enqueue so + // the task isn't lost. + if _, ok := readySet[task.ID]; !ok { + _ = q.store.RequeueReady(ctx, topic, task.ID) + } case StateFinished, StateCancelled: _ = q.store.CleanupIndex(ctx, topic, task.ID) } diff --git a/seqdelay_test.go b/seqdelay_test.go index 25ef6ad..e2099d2 100644 --- a/seqdelay_test.go +++ b/seqdelay_test.go @@ -356,3 +356,90 @@ func TestQueue_OnExpire_Conflict(t *testing.T) { t.Errorf("expected ErrTopicConflict, got %v", err) } } + +// --------------------------------------------------------------------------- +// Recover: orphan StateReady tasks +// --------------------------------------------------------------------------- + +// TestQueue_Recover_OrphanReadyTask plants the state a previous process leaves +// behind when it crashes between BLPOP and the state update inside PopTask: +// the task is stored as StateReady, but its ID is no longer on the ready list. +// recover() should re-enqueue the orphan so it isn't lost. +func TestQueue_Recover_OrphanReadyTask(t *testing.T) { + client := getTestRedis(t) + defer client.Close() + + ctx := context.Background() + s := newStore(client) + + // Persist a task in StateReady without pushing the ID onto the ready list. + orphan := newTestTask("orphan-topic", "orphan-1") + orphan.State = StateReady + if err := s.SaveTask(ctx, orphan); err != nil { + t.Fatalf("SaveTask: %v", err) + } + + // Sanity: ready list is empty, task is in StateReady. + if n, _ := client.LLen(ctx, readyKey(orphan.Topic)).Result(); n != 0 { + t.Fatalf("ready list should start empty, got %d", n) + } + + q, err := New( + WithRedisClient(client), + WithTickInterval(10*time.Millisecond), + WithPopTimeout(2*time.Second), + ) + if err != nil { + t.Fatalf("New: %v", err) + } + if err := q.Start(ctx); err != nil { + t.Fatalf("Start: %v", err) + } + defer q.Shutdown(ctx) //nolint:errcheck + + // After Start (which calls recover), the orphan ID should be back on the + // ready list and Pop should return it. + popped, err := q.Pop(ctx, orphan.Topic) + if err != nil { + t.Fatalf("Pop: %v", err) + } + if popped == nil { + t.Fatal("Pop returned nil — orphan was not recovered") + } + if popped.ID != orphan.ID { + t.Errorf("recovered wrong task: got %q, want %q", popped.ID, orphan.ID) + } +} + +// TestQueue_Recover_LiveReadyTaskNotDuplicated guards against the inverse: +// when a StateReady task's ID is already on the ready list, recover() must +// not push a duplicate. +func TestQueue_Recover_LiveReadyTaskNotDuplicated(t *testing.T) { + client := getTestRedis(t) + defer client.Close() + + ctx := context.Background() + s := newStore(client) + + live := newTestTask("live-topic", "live-1") + live.State = StateReady + if err := s.SaveTask(ctx, live); err != nil { + t.Fatalf("SaveTask: %v", err) + } + if err := client.RPush(ctx, readyKey(live.Topic), live.ID).Err(); err != nil { + t.Fatalf("RPush: %v", err) + } + + q, err := New(WithRedisClient(client), WithTickInterval(10*time.Millisecond)) + if err != nil { + t.Fatalf("New: %v", err) + } + if err := q.Start(ctx); err != nil { + t.Fatalf("Start: %v", err) + } + defer q.Shutdown(ctx) //nolint:errcheck + + if n, _ := client.LLen(ctx, readyKey(live.Topic)).Result(); n != 1 { + t.Errorf("ready list should still hold exactly 1 entry, got %d", n) + } +} diff --git a/store.go b/store.go index 39ae544..5269ba0 100644 --- a/store.go +++ b/store.go @@ -369,6 +369,31 @@ func (s *store) PopTask(ctx context.Context, topic string, timeout time.Duration return t, nil } +// --------------------------------------------------------------------------- +// ReadyListIDs / RequeueReady +// --------------------------------------------------------------------------- + +// ReadyListIDs returns the set of task IDs currently sitting on the topic's +// ready list. Used by recover() to detect orphan StateReady tasks whose IDs +// went missing because a previous process crashed mid-Pop. +func (s *store) ReadyListIDs(ctx context.Context, topic string) (map[string]struct{}, error) { + ids, err := s.client.LRange(ctx, readyKey(topic), 0, -1).Result() + if err != nil { + return nil, err + } + set := make(map[string]struct{}, len(ids)) + for _, id := range ids { + set[id] = struct{}{} + } + return set, nil +} + +// RequeueReady appends id to the topic's ready list. Idempotency is the +// caller's responsibility (recover checks ReadyListIDs first). +func (s *store) RequeueReady(ctx context.Context, topic, id string) error { + return s.client.RPush(ctx, readyKey(topic), id).Err() +} + // --------------------------------------------------------------------------- // LoadTopicTasks // ---------------------------------------------------------------------------