From 1ebd992641d28531d892309a62f2c4a0dde36127 Mon Sep 17 00:00:00 2001 From: shark Date: Tue, 12 May 2026 17:13:00 +0800 Subject: [PATCH 1/2] feat: update seed script --- scripts/seed.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/scripts/seed.py b/scripts/seed.py index e11c66c..ac12bca 100755 --- a/scripts/seed.py +++ b/scripts/seed.py @@ -85,37 +85,31 @@ "slug": "厨房取物", "description": "从冰箱取出物品并放到备菜台上。", "version": "1.0.0", - "skill_sequence": ["move-base", "open-fridge", "pick-item", "place-item"], }, { "slug": "快递分拣", "description": "在快递站扫描并分拣到达的包裹。", "version": "1.0.0", - "skill_sequence": ["move-base", "scan-qr", "sort-package"], }, { "slug": "卫浴快清", "description": "卫生间洗手池的快速清洁流程。", "version": "1.0.0", - "skill_sequence": ["move-base", "clean-sink"], }, { "slug": "卧室整理", "description": "通过整理床铺来完成卧室收纳。", "version": "1.0.0", - "skill_sequence": ["move-base", "make-bed"], }, { "slug": "折叠毛巾", "description": "在卫生间折叠毛巾并整齐摆放。", "version": "1.0.0", - "skill_sequence": ["move-base", "fold-towel", "place-item"], }, { "slug": "移动水瓶", "description": "将水瓶移动到厨房指定位置。", "version": "1.0.0", - "skill_sequence": ["move-base", "pick-item", "place-item"], }, ] @@ -390,7 +384,6 @@ def _ensure_sop(sop): payload = { "slug": slug, "version": version, - "skill_sequence": list(sop.get("skill_sequence") or []), } if sop.get("description"): payload["description"] = sop["description"] From 91a4d971b2fe497d154e56526be9d801d59cf81a Mon Sep 17 00:00:00 2001 From: shark Date: Tue, 12 May 2026 20:09:17 +0800 Subject: [PATCH 2/2] feat(sync): persist cloud sync queue state --- .../cloud-sync-persistent-queue-design.md | 383 ++++++++++++++++++ internal/services/sync_worker.go | 292 ++++++++++--- internal/services/sync_worker_test.go | 254 +++++++++++- 3 files changed, 869 insertions(+), 60 deletions(-) create mode 100644 docs/designs/cloud-sync-persistent-queue-design.md diff --git a/docs/designs/cloud-sync-persistent-queue-design.md b/docs/designs/cloud-sync-persistent-queue-design.md new file mode 100644 index 0000000..b97e4fa --- /dev/null +++ b/docs/designs/cloud-sync-persistent-queue-design.md @@ -0,0 +1,383 @@ + + +# Cloud Sync Persistent Queue Design + +## 1. Problem + +Synapse Cloud Sync Center exposes a queued status: + +```text +Failed -> Queued -> Syncing -> Synced +``` + +However, Keystone manual retry does not currently persist a queued state. + +When an admin retries failed sync jobs: + +1. Synapse calls `POST /api/v1/sync/episodes/:id`. +2. Keystone validates the episode and calls `SyncWorker.EnqueueEpisodeManual()`. +3. The worker places the episode into an in-memory channel. +4. When a worker goroutine claims the job, Keystone writes or updates + `sync_logs.status = 'in_progress'` directly. + +As a result, the API request can be accepted and the upload can eventually +complete, while Synapse never observes `sync_logs.status = 'pending'`. With +`KEYSTONE_SYNC_MAX_CONCURRENT=2`, retrying four failed episodes often appears as: + +```text +2 syncing, 2 still failed, then all synced +``` + +instead of: + +```text +2 syncing, 2 queued, then all synced +``` + +This is a backend state-model gap, not a configuration issue. + +## 2. Current Behavior + +### 2.1 Manual Retry + +Current single episode retry path: + +```text +POST /sync/episodes/:id + -> SyncHandler.TriggerEpisodeSync() + -> SyncWorker.EnqueueEpisodeManual() + -> enqueueCh + -> jobCh + -> acquireSyncLogWithMode() + -> sync_logs.status = 'in_progress' +``` + +Important properties: + +- `pending` is not written before the API returns `202 Accepted`. +- In-memory `enqueuedEpisode` prevents duplicate scheduling only within one + running Keystone process. +- `manual=true` intentionally bypasses exhausted automatic retry and active + backoff checks. +- An episode with latest `pending` or `in_progress` is rejected as already active. + +### 2.2 Automatic Retry + +The polling worker periodically: + +1. Finds failed rows whose `next_retry_at` is due and whose `attempt_count` is + below `MaxRetries`. +2. Dispatches them to worker goroutines. +3. Reuses the failed `sync_logs` row and changes it directly to `in_progress`. + +Automatic retry also does not create an observable queued period. + +### 2.3 Frontend Listing + +Cloud Sync Center lists one row per episode using the latest `sync_logs` row. +Its queued count is the number of latest rows whose status is `pending`. + +Therefore, a job that only exists in memory cannot be counted as queued. + +## 3. Goals + +- Make `sync_logs.status = 'pending'` a real, persistent queued state. +- Make manual retry visible immediately after the API returns `202 Accepted`. +- Preserve manual retry semantics: operators can retry exhausted or backoff + failures explicitly. +- Preserve automatic retry limits and backoff behavior. +- Allow queued work to recover after Keystone restarts. +- Keep the change scoped to the existing `sync_logs` model unless stronger + queue semantics become necessary later. + +## 4. Non-Goals + +- Do not introduce a new `sync_queue` table in the first implementation. +- Do not change cloud upload protocol behavior. +- Do not change episode QA eligibility rules. +- Do not make batch trigger a forced retry-all-failures operation unless a + separate API contract is designed. + +## 5. Recommended Design + +Use the existing `sync_logs.status = 'pending'` as the durable sync queue state. + +### 5.1 State Model + +```text +No sync log + -> pending + -> in_progress + -> completed + +failed + -> pending + -> in_progress + -> completed + +failed + -> pending + -> in_progress + -> failed +``` + +Automatic retry remains bounded: + +```text +failed(attempt_count < max_retries, next_retry_at due) + -> pending +``` + +Manual retry remains operator-forced: + +```text +failed(exhausted or still in backoff) + -> new pending attempt chain +``` + +### 5.2 Manual Enqueue + +`EnqueueEpisodeManual()` should persist a pending row before returning success. + +The operation must be transactionally protected: + +1. Lock the parent `episodes` row with `FOR UPDATE`. +2. Load the latest `sync_logs` row for the episode. +3. Reject latest `pending` or `in_progress`. +4. Reject if `episodes.cloud_synced = TRUE`. +5. If the latest row is `failed` and manual retry should create a fresh chain, + insert a new `sync_logs` row with `status = 'pending'`. +6. If there is no sync log, insert a new `pending` row. +7. Commit before returning `202 Accepted`. + +The in-memory enqueue channel should become an acceleration path only. If the +channel is full after the pending row has been persisted, the API can still +return accepted because the polling loop can recover the pending job. + +### 5.3 Worker Dispatch + +The worker loop should process DB-backed queued work before discovering new work: + +1. Dispatch latest `pending` sync logs. +2. Promote due failed rows to `pending`, then dispatch them. +3. Discover approved, unsynced episodes with no active/latest completed log. + +This makes the queue restart-safe. If Keystone crashes after writing `pending` +but before placing the job into memory, the next polling cycle will pick it up. + +### 5.4 Claiming Pending Rows + +Worker goroutines should claim a pending row using an atomic DB transition: + +```sql +UPDATE sync_logs +SET status = 'in_progress', + started_at = ?, + source_path = ?, + error_message = NULL, + duration_sec = NULL, + completed_at = NULL, + next_retry_at = NULL, + attempt_count = ? +WHERE id = ? + AND status = 'pending' +``` + +The worker must check `RowsAffected == 1`. + +This protects against duplicate execution if multiple dispatchers or future +Keystone instances see the same pending row. + +### 5.5 Attempt Count Semantics + +Recommended semantics: + +- A fresh pending row uses `attempt_count = 0`; claim changes it to `1`. +- A retryable due failed row can be reused by changing `failed -> pending` and + preserving its current `attempt_count`; claim increments it to the next + attempt number. +- `in_progress.attempt_count >= 1` means an upload attempt has been claimed by a + worker. +- `failed.attempt_count` records the number of failed attempts in the current + attempt chain. +- Manual retry after exhausted/backoff failure creates a new row with + `attempt_count = 0`, then claim changes it to `1`. + +This preserves the existing automatic retry counter while still making fresh +manual attempt chains clearly visible. + +### 5.6 Frontend Behavior + +Synapse can keep the current status model: + +- `pending`: queued +- `in_progress`: syncing +- `completed`: synced +- `failed`: failed + +After a successful manual retry: + +1. Optimistically mark the affected row as `pending`. +2. Refresh counts and the current page. +3. Poll every 2 to 3 seconds while any latest row is `pending` or `in_progress`. +4. Stop polling once no active work remains. + +The frontend should continue treating the backend summary endpoint as the source +of truth. + +## 6. Risks and Mitigations + +### 6.1 Duplicate Pending Rows + +Risk: two admins retry the same failed episode at the same time. + +Mitigation: + +- Lock the `episodes` row before inspecting or inserting `sync_logs`. +- Reject latest `pending` or `in_progress`. +- Keep the in-memory duplicate marker as an optimization, not as the only guard. + +### 6.2 Pending Rows Stuck Forever + +Risk: API writes `pending`, Keystone crashes before enqueueing in memory. + +Mitigation: + +- Poll and dispatch latest `pending` rows from the database. +- Treat in-memory enqueue as best-effort acceleration. + +### 6.3 In-Progress Rows Stuck Forever + +Risk: Keystone crashes after claiming a row as `in_progress`. + +Mitigation: + +- Add stale `in_progress` recovery in a follow-up step. +- A row can be considered stale when `started_at` exceeds a conservative timeout + derived from request timeout, OSS timeout, and a buffer. +- Stale rows can be marked `failed` with a retryable error and `next_retry_at` + set by the normal backoff function. + +This risk already exists today; making `pending` durable makes it more visible. + +### 6.4 Automatic and Manual Retry Confusion + +Risk: manual retry bypass rules accidentally affect automatic retry. + +Mitigation: + +- Keep explicit `manual` mode in the enqueue/acquire path. +- Automatic retry can only promote failed rows when `attempt_count < MaxRetries` + and `next_retry_at <= NOW()`. +- Manual retry can create a new pending attempt chain even when backoff is active + or the latest chain is exhausted. + +### 6.5 Queue Full API Semantics + +Risk: current code returns `429 queue_full` when the in-memory channel is full. + +Mitigation: + +- After durable pending is introduced, a full channel should not fail the API if + the DB write succeeded. +- Return `202 Accepted`; the polling loop will dispatch the pending job later. + +### 6.6 Listing Sort Semantics + +Risk: `started_at` currently acts as both queue time and start time. + +Mitigation: + +- For the first implementation, keep using `started_at` as the row ordering time. +- If stronger audit semantics are needed, add `queued_at` in a later migration. + +### 6.7 Multi-Instance Keystone + +Risk: future multi-instance deployments may have multiple workers scanning the +same pending rows. + +Mitigation: + +- Use conditional `pending -> in_progress` updates and require + `RowsAffected == 1`. +- Avoid assuming the in-memory marker is globally authoritative. + +## 7. Implementation Plan + +### Phase 1: Durable Manual Queue + +- Add a worker method to create or reuse a pending sync log transactionally. +- Change `EnqueueEpisodeManual()` to persist pending before returning success. +- Dispatch latest pending rows during each poll. +- Keep existing `sync_logs` schema. +- Keep existing summary/history APIs. + +Expected user-visible behavior: + +```text +Retry 4 failed episodes with MaxConcurrent=2 + -> 2 in_progress, 2 pending + -> pending rows become in_progress as workers free up + -> completed or failed +``` + +### Phase 2: Automatic Retry Alignment + +- Change due failed-row retry from direct dispatch to `failed -> pending`. +- Preserve `MaxRetries` and `next_retry_at` checks. +- Ensure exhausted automatic retry rows remain failed until manually retried. + +### Phase 3: Stale In-Progress Recovery + +- Detect `in_progress` rows older than a conservative timeout. +- Mark stale rows failed with a clear error message. +- Let the normal retry/backoff mechanism decide when to retry. + +### Phase 4: Frontend Polling Polish + +- Add short polling while active work exists. +- Keep retry buttons visible only on latest failed rows. +- Keep backend summary results as the source of truth after optimistic updates. + +## 8. Test Plan + +### Backend Unit Tests + +- Manual retry creates a pending row for a failed exhausted episode. +- Manual retry rejects an episode whose latest row is pending. +- Manual retry rejects an episode whose latest row is in_progress. +- Two concurrent manual retries result in one pending row. +- Polling dispatches existing pending rows after process restart simulation. +- Worker claim changes exactly one pending row to in_progress. +- Automatic retry promotes due failed rows to pending only below MaxRetries. +- Exhausted automatic failures are not promoted without manual retry. + +### Backend Integration Tests + +- Retry four failed episodes with `MaxConcurrent=2`; observe two active and two + queued before completion. +- Kill and restart Keystone after pending rows are written; verify pending rows + are dispatched after restart. +- Simulate upload failure; verify failed status, attempt count, and + `next_retry_at`. + +### Frontend Verification + +- Cloud Sync Center queued count increases immediately after retry. +- Rows transition from failed to queued to syncing to synced or failed. +- Refreshing the page does not lose queued rows. +- Polling stops when no pending or in_progress rows remain. + +## 9. Recommendation + +Implement Phase 1 first. It fixes the operator-visible problem with a small, +contained backend change and does not require a new database table. + +Do not ship durable pending without DB-backed duplicate protection and polling +recovery. Those two pieces are required for correctness; otherwise the system +can trade an invisible in-memory queue for stuck or duplicated persistent queue +entries. diff --git a/internal/services/sync_worker.go b/internal/services/sync_worker.go index 48639d5..46e93d9 100644 --- a/internal/services/sync_worker.go +++ b/internal/services/sync_worker.go @@ -80,6 +80,10 @@ var ( ErrSyncAlreadyInProgress = errors.New("sync already in progress") // ErrSyncWorkerNotRunning is returned when Start has not been called or after Stop. ErrSyncWorkerNotRunning = errors.New("sync worker is not running") + + errSyncRetryBackoffActive = errors.New("sync retry backoff active") + errSyncRetryExhausted = errors.New("sync retry max retries exceeded") + errSyncAlreadyCompleted = errors.New("sync already completed") ) // NewSyncWorker creates a new sync worker. Call Start() to begin background processing. @@ -199,7 +203,14 @@ func (w *SyncWorker) EnqueueEpisode(ctx context.Context, episodeID int64) error // EnqueueEpisodeManual adds a specific episode ID for immediate sync processing, // allowing explicit API-triggered retries even after automatic retries are exhausted. func (w *SyncWorker) EnqueueEpisodeManual(ctx context.Context, episodeID int64) error { - return w.enqueueEpisode(ctx, episodeID, true) + if !w.running.Load() { + return ErrSyncWorkerNotRunning + } + if err := w.persistPendingSyncLog(ctx, episodeID, true); err != nil { + return err + } + w.enqueuePersistedEpisode(ctx, syncEnqueueRequest{episodeID: episodeID, manual: true}) + return nil } func (w *SyncWorker) enqueueEpisode(ctx context.Context, episodeID int64, manual bool) error { @@ -207,12 +218,6 @@ func (w *SyncWorker) enqueueEpisode(ctx context.Context, episodeID int64, manual return ErrSyncWorkerNotRunning } - if manual { - if err := w.validateEpisodeForManualEnqueue(ctx, episodeID); err != nil { - return err - } - } - if !w.tryMarkEnqueued(episodeID) { return ErrEpisodeAlreadyEnqueued } @@ -229,38 +234,165 @@ func (w *SyncWorker) enqueueEpisode(ctx context.Context, episodeID int64, manual } } -func (w *SyncWorker) validateEpisodeForManualEnqueue(ctx context.Context, episodeID int64) error { +func (w *SyncWorker) enqueuePersistedEpisode(ctx context.Context, req syncEnqueueRequest) { + if !w.tryMarkEnqueued(req.episodeID) { + return + } + + select { + case w.enqueueCh <- req: + case <-ctx.Done(): + w.unmarkEnqueued(req.episodeID) + default: + w.unmarkEnqueued(req.episodeID) + logger.Printf("[SYNC-WORKER] Persistent enqueue for episode %d will be recovered by polling", req.episodeID) + } +} + +func (w *SyncWorker) persistPendingSyncLog(ctx context.Context, episodeID int64, manual bool) error { if w.db == nil { return nil } + tx, err := w.db.BeginTxx(ctx, nil) + if err != nil { + return fmt.Errorf("begin pending sync_log transaction: %w", err) + } + defer func() { _ = tx.Rollback() }() + + lockClause := txLockClause(tx) + var episode struct { + ID int64 `db:"id"` + CloudSynced bool `db:"cloud_synced"` + } + if err := tx.GetContext(ctx, &episode, ` + SELECT id, cloud_synced + FROM episodes + WHERE id = ? AND deleted_at IS NULL + `+lockClause, episodeID); err != nil { + if err == sql.ErrNoRows { + return fmt.Errorf("episode %d not found", episodeID) + } + return fmt.Errorf("lock episode %d: %w", episodeID, err) + } + if episode.CloudSynced { + return fmt.Errorf("episode %d already synced", episodeID) + } + + var activeCount int + if err := tx.GetContext(ctx, &activeCount, ` + SELECT COUNT(*) + FROM sync_logs + WHERE episode_id = ? + AND status IN ('pending', 'in_progress') + `, episodeID); err != nil { + return fmt.Errorf("query active sync_log count: %w", err) + } + if activeCount > 0 { + return fmt.Errorf("%w for episode %d", ErrSyncAlreadyInProgress, episodeID) + } + var latest struct { + ID int64 `db:"id"` Status string `db:"status"` NextRetry sql.NullTime `db:"next_retry_at"` AttemptCount int `db:"attempt_count"` } - err := w.db.GetContext(ctx, &latest, ` - SELECT sl.status, sl.next_retry_at, sl.attempt_count - FROM sync_logs sl - INNER JOIN ( - SELECT episode_id, MAX(id) AS latest_id - FROM sync_logs - GROUP BY episode_id - ) t ON sl.episode_id = t.episode_id AND sl.id = t.latest_id - WHERE sl.episode_id = ? - `, episodeID) + err = tx.GetContext(ctx, &latest, ` + SELECT id, status, next_retry_at, attempt_count + FROM sync_logs + WHERE episode_id = ? + ORDER BY id DESC + LIMIT 1 + `+lockClause, episodeID) if err == sql.ErrNoRows { - return nil + if err := insertPendingSyncLog(ctx, tx, episodeID, time.Now().UTC(), 0); err != nil { + return err + } + return tx.Commit() } if err != nil { - return fmt.Errorf("query latest sync_log: %w", err) + return fmt.Errorf("lock latest sync_log: %w", err) } - if latest.Status == "pending" || latest.Status == "in_progress" { + + now := time.Now().UTC() + switch latest.Status { + case "pending", "in_progress": return fmt.Errorf("%w for episode %d", ErrSyncAlreadyInProgress, episodeID) + case "completed": + return fmt.Errorf("%w for episode %d", errSyncAlreadyCompleted, episodeID) + case "failed": + retryDue := !latest.NextRetry.Valid || !latest.NextRetry.Time.After(now) + if latest.AttemptCount < w.cfg.MaxRetries && retryDue { + if err := promoteFailedSyncLogToPending(ctx, tx, latest.ID, now); err != nil { + return err + } + return tx.Commit() + } + if !manual && latest.AttemptCount >= w.cfg.MaxRetries { + return fmt.Errorf("%w for episode %d", errSyncRetryExhausted, episodeID) + } + if !manual && !retryDue { + return fmt.Errorf("%w for episode %d", errSyncRetryBackoffActive, episodeID) + } + if err := insertPendingSyncLog(ctx, tx, episodeID, now, 0); err != nil { + return err + } + return tx.Commit() + default: + return fmt.Errorf("unknown sync status %q for episode %d", latest.Status, episodeID) + } +} + +func insertPendingSyncLog(ctx context.Context, tx *sqlx.Tx, episodeID int64, queuedAt time.Time, attemptCount int) error { + if _, err := tx.ExecContext(ctx, ` + INSERT INTO sync_logs (episode_id, status, attempt_count, started_at) + VALUES (?, 'pending', ?, ?) + `, episodeID, attemptCount, queuedAt); err != nil { + return fmt.Errorf("insert pending sync_log: %w", err) + } + return nil +} + +func promoteFailedSyncLogToPending(ctx context.Context, tx *sqlx.Tx, syncLogID int64, queuedAt time.Time) error { + res, err := tx.ExecContext(ctx, ` + UPDATE sync_logs + SET status = 'pending', + started_at = ?, + error_message = NULL, + duration_sec = NULL, + completed_at = NULL, + next_retry_at = NULL + WHERE id = ? + AND status = 'failed' + `, queuedAt, syncLogID) + if err != nil { + return fmt.Errorf("promote failed sync_log to pending: %w", err) + } + n, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("promote failed sync_log rows affected: %w", err) + } + if n != 1 { + return fmt.Errorf("promote failed sync_log %d lost state", syncLogID) } return nil } +func txLockClause(tx *sqlx.Tx) string { + if tx.DriverName() == "sqlite" { + return "" + } + return " FOR UPDATE" +} + +func isSkippablePendingError(err error) bool { + return errors.Is(err, ErrSyncAlreadyInProgress) || + errors.Is(err, errSyncRetryBackoffActive) || + errors.Is(err, errSyncRetryExhausted) || + errors.Is(err, errSyncAlreadyCompleted) +} + // EnqueuePendingEpisodes scans for all approved but un-synced episodes and enqueues them. // Returns the number of episodes enqueued. func (w *SyncWorker) EnqueuePendingEpisodes(ctx context.Context) (int, error) { @@ -268,22 +400,21 @@ func (w *SyncWorker) EnqueuePendingEpisodes(ctx context.Context) (int, error) { return 0, ErrSyncWorkerNotRunning } - ids, err := w.findPendingEpisodes(ctx, true) + ids, err := w.findPendingEpisodes(ctx, false) if err != nil { return 0, err } count := 0 for _, id := range ids { - if !w.tryMarkEnqueued(id) { + if err := w.persistPendingSyncLog(ctx, id, false); err != nil { + if isSkippablePendingError(err) { + continue + } + logger.Printf("[SYNC-WORKER] Failed to persist pending sync for episode %d: %v", id, err) continue } - select { - case w.enqueueCh <- syncEnqueueRequest{episodeID: id, manual: false}: - count++ - default: - w.unmarkEnqueued(id) - logger.Printf("[SYNC-WORKER] Enqueue channel full, skipping episode %d", id) - } + count++ + w.enqueuePersistedEpisode(ctx, syncEnqueueRequest{episodeID: id, manual: false}) } return count, nil } @@ -417,10 +548,13 @@ func (w *SyncWorker) finalizeRun() { } func (w *SyncWorker) pollAndProcess(ctx context.Context) { - // First, retry any failed episodes that are due + // Recover persisted queued rows first; enqueueCh is only an acceleration path. + w.dispatchPendingSyncLogs(ctx) + + // Then, retry any failed episodes that are due. w.retryFailedEpisodes(ctx) - // Then, find new pending episodes + // Finally, find newly eligible episodes and persist them as queued work. ids, err := w.findPendingEpisodes(ctx, false) if err != nil { logger.Printf("[SYNC-WORKER] Failed to find pending episodes: %v", err) @@ -434,13 +568,57 @@ func (w *SyncWorker) pollAndProcess(ctx context.Context) { logger.Printf("[SYNC-WORKER] Found %d episodes to sync", len(ids)) for _, id := range ids { - if !w.tryMarkEnqueued(id) { + if err := w.persistPendingSyncLog(ctx, id, false); err != nil { + if isSkippablePendingError(err) { + continue + } + logger.Printf("[SYNC-WORKER] Failed to persist pending sync for episode %d: %v", id, err) continue } - w.dispatchJob(ctx, syncEnqueueRequest{episodeID: id, manual: false}) + w.dispatchPersistedJob(ctx, syncEnqueueRequest{episodeID: id, manual: false}) } } +func (w *SyncWorker) dispatchPendingSyncLogs(ctx context.Context) { + ids, err := w.findPendingSyncLogEpisodes(ctx) + if err != nil { + logger.Printf("[SYNC-WORKER] Failed to find queued sync logs: %v", err) + return + } + for _, id := range ids { + w.dispatchPersistedJob(ctx, syncEnqueueRequest{episodeID: id, manual: false}) + } +} + +func (w *SyncWorker) dispatchPersistedJob(ctx context.Context, req syncEnqueueRequest) { + if !w.tryMarkEnqueued(req.episodeID) { + return + } + w.dispatchJob(ctx, req) +} + +func (w *SyncWorker) findPendingSyncLogEpisodes(ctx context.Context) ([]int64, error) { + var ids []int64 + if err := w.db.SelectContext(ctx, &ids, ` + SELECT latest_log.episode_id + FROM sync_logs latest_log + INNER JOIN ( + SELECT episode_id, MAX(id) AS latest_id + FROM sync_logs + GROUP BY episode_id + ) latest ON latest_log.episode_id = latest.episode_id AND latest_log.id = latest.latest_id + INNER JOIN episodes e ON e.id = latest_log.episode_id + WHERE latest_log.status = 'pending' + AND e.cloud_synced = FALSE + AND e.deleted_at IS NULL + ORDER BY latest_log.started_at ASC, latest_log.id ASC + LIMIT ? + `, w.cfg.BatchSize); err != nil { + return nil, fmt.Errorf("query pending sync logs: %w", err) + } + return ids, nil +} + func (w *SyncWorker) findPendingEpisodes(ctx context.Context, includeExhaustedFailures bool) ([]int64, error) { var ids []int64 var err error @@ -496,6 +674,7 @@ func (w *SyncWorker) findPendingEpisodes(ctx context.Context, includeExhaustedFa func (w *SyncWorker) retryFailedEpisodes(ctx context.Context) { var ids []int64 + now := time.Now().UTC() err := w.db.SelectContext(ctx, &ids, ` SELECT sl.episode_id FROM sync_logs sl @@ -506,15 +685,15 @@ func (w *SyncWorker) retryFailedEpisodes(ctx context.Context) { ) t ON sl.episode_id = t.episode_id AND sl.id = t.latest_id WHERE sl.status = 'failed' AND sl.attempt_count < ? - AND (sl.next_retry_at IS NULL OR sl.next_retry_at <= NOW()) + AND (sl.next_retry_at IS NULL OR sl.next_retry_at <= ?) AND NOT EXISTS ( SELECT 1 FROM sync_logs sl2 WHERE sl2.episode_id = sl.episode_id AND sl2.status IN ('pending', 'in_progress') - ) + ) ORDER BY sl.started_at ASC LIMIT ? - `, w.cfg.MaxRetries, w.cfg.BatchSize) + `, w.cfg.MaxRetries, now, w.cfg.BatchSize) if err != nil { logger.Printf("[SYNC-WORKER] Failed to query retryable episodes: %v", err) return @@ -525,10 +704,14 @@ func (w *SyncWorker) retryFailedEpisodes(ctx context.Context) { } for _, id := range ids { - if !w.tryMarkEnqueued(id) { + if err := w.persistPendingSyncLog(ctx, id, false); err != nil { + if isSkippablePendingError(err) { + continue + } + logger.Printf("[SYNC-WORKER] Failed to queue retry for episode %d: %v", id, err) continue } - w.dispatchJob(ctx, syncEnqueueRequest{episodeID: id, manual: false}) + w.dispatchPersistedJob(ctx, syncEnqueueRequest{episodeID: id, manual: false}) } } @@ -616,14 +799,15 @@ func (w *SyncWorker) acquireSyncLogWithMode(ctx context.Context, episodeID int64 } defer func() { _ = tx.Rollback() }() + lockClause := txLockClause(tx) + // Serialize per episode even when sync_logs is empty for this episode. var lockedEpisodeID int64 if err := tx.GetContext(ctx, &lockedEpisodeID, ` SELECT id FROM episodes WHERE id = ? AND deleted_at IS NULL - FOR UPDATE - `, episodeID); err != nil { + `+lockClause, episodeID); err != nil { if err == sql.ErrNoRows { return 0, 0, fmt.Errorf("episode %d not found", episodeID) } @@ -636,23 +820,23 @@ func (w *SyncWorker) acquireSyncLogWithMode(ctx context.Context, episodeID int64 NextRetry sql.NullTime `db:"next_retry_at"` AttemptCount int `db:"attempt_count"` } - err = tx.GetContext(ctx, &latest, ` - SELECT sl.id, sl.status, sl.next_retry_at, sl.attempt_count - FROM sync_logs sl - INNER JOIN ( - SELECT episode_id, MAX(id) AS latest_id - FROM sync_logs - GROUP BY episode_id - ) t ON sl.episode_id = t.episode_id AND sl.id = t.latest_id - WHERE sl.episode_id = ? - FOR UPDATE - `, episodeID) + latestQuery := ` + SELECT sl.id, sl.status, sl.next_retry_at, sl.attempt_count + FROM sync_logs sl + INNER JOIN ( + SELECT episode_id, MAX(id) AS latest_id + FROM sync_logs + GROUP BY episode_id + ) t ON sl.episode_id = t.episode_id AND sl.id = t.latest_id + WHERE sl.episode_id = ? + ` + lockClause + err = tx.GetContext(ctx, &latest, latestQuery, episodeID) if err == nil { now := time.Now().UTC() switch latest.Status { case "pending": - claimedAttemptCount := latest.AttemptCount - if claimedAttemptCount < 1 { + claimedAttemptCount := latest.AttemptCount + 1 + if latest.AttemptCount < 1 { claimedAttemptCount = 1 } res, updErr := tx.ExecContext(ctx, ` diff --git a/internal/services/sync_worker_test.go b/internal/services/sync_worker_test.go index 6957789..2a34575 100644 --- a/internal/services/sync_worker_test.go +++ b/internal/services/sync_worker_test.go @@ -146,6 +146,17 @@ func TestEnqueueEpisodeManual_AllowsExhaustedRetryEpisode(t *testing.T) { t.Fatalf("manual enqueue failed: %v", err) } + latest := latestSyncLogForSyncWorkerTest(t, db, 10) + if latest.Status != "pending" { + t.Fatalf("latest status = %q, want pending", latest.Status) + } + if latest.AttemptCount != 0 { + t.Fatalf("latest attempt_count = %d, want 0 for fresh manual chain", latest.AttemptCount) + } + if count := countSyncLogsForSyncWorkerTest(t, db, 10); count != 2 { + t.Fatalf("sync log count = %d, want failed history plus fresh pending", count) + } + select { case got := <-w.enqueueCh: if got.episodeID != 10 { @@ -159,6 +170,35 @@ func TestEnqueueEpisodeManual_AllowsExhaustedRetryEpisode(t *testing.T) { } } +func TestEnqueueEpisodeManual_PromotesDueFailureToPending(t *testing.T) { + db := newTestSyncWorkerDB(t) + w := &SyncWorker{ + db: db, + cfg: SyncWorkerConfig{BatchSize: 10, MaxRetries: 3}, + enqueueCh: make(chan syncEnqueueRequest, 1), + enqueuedEpisode: make(map[int64]struct{}), + } + w.running.Store(true) + + insertEpisodeForSyncWorkerTest(t, db, 13, "approved", false) + insertSyncLogForSyncWorkerTest(t, db, 13, "failed", 1) + + if err := w.EnqueueEpisodeManual(context.Background(), 13); err != nil { + t.Fatalf("manual enqueue failed: %v", err) + } + + latest := latestSyncLogForSyncWorkerTest(t, db, 13) + if latest.Status != "pending" { + t.Fatalf("latest status = %q, want pending", latest.Status) + } + if latest.AttemptCount != 1 { + t.Fatalf("latest attempt_count = %d, want completed attempt count 1", latest.AttemptCount) + } + if count := countSyncLogsForSyncWorkerTest(t, db, 13); count != 1 { + t.Fatalf("sync log count = %d, want reused failed row", count) + } +} + func TestEnqueueEpisode_RejectsInProgressEpisode(t *testing.T) { db := newTestSyncWorkerDB(t) w := &SyncWorker{ @@ -177,6 +217,33 @@ func TestEnqueueEpisode_RejectsInProgressEpisode(t *testing.T) { } } +func TestEnqueueEpisodeManual_PersistsPendingWhenMemoryQueueFull(t *testing.T) { + db := newTestSyncWorkerDB(t) + w := &SyncWorker{ + db: db, + cfg: SyncWorkerConfig{BatchSize: 10, MaxRetries: 3}, + enqueueCh: make(chan syncEnqueueRequest), + enqueuedEpisode: make(map[int64]struct{}), + } + w.running.Store(true) + + insertEpisodeForSyncWorkerTest(t, db, 14, "approved", false) + insertSyncLogForSyncWorkerTest(t, db, 14, "failed", 3) + + if err := w.EnqueueEpisodeManual(context.Background(), 14); err != nil { + t.Fatalf("manual enqueue failed despite durable pending: %v", err) + } + + latest := latestSyncLogForSyncWorkerTest(t, db, 14) + if latest.Status != "pending" { + t.Fatalf("latest status = %q, want pending", latest.Status) + } + if !w.tryMarkEnqueued(14) { + t.Fatal("episode marker remained set after enqueue channel was full") + } + w.unmarkEnqueued(14) +} + func TestEnqueueEpisodeManual_RejectsPendingEpisode(t *testing.T) { db := newTestSyncWorkerDB(t) w := &SyncWorker{ @@ -195,6 +262,147 @@ func TestEnqueueEpisodeManual_RejectsPendingEpisode(t *testing.T) { } } +func TestEnqueuePendingEpisodes_PersistsPendingWhenMemoryQueueFull(t *testing.T) { + db := newTestSyncWorkerDB(t) + w := &SyncWorker{ + db: db, + cfg: SyncWorkerConfig{BatchSize: 10, MaxRetries: 3}, + enqueueCh: make(chan syncEnqueueRequest), + enqueuedEpisode: make(map[int64]struct{}), + } + w.running.Store(true) + + insertEpisodeForSyncWorkerTest(t, db, 15, "approved", false) + + count, err := w.EnqueuePendingEpisodes(context.Background()) + if err != nil { + t.Fatalf("enqueue pending episodes failed: %v", err) + } + if count != 1 { + t.Fatalf("enqueued count = %d, want 1", count) + } + + latest := latestSyncLogForSyncWorkerTest(t, db, 15) + if latest.Status != "pending" { + t.Fatalf("latest status = %q, want pending", latest.Status) + } +} + +func TestDispatchPendingSyncLogs_DispatchesPersistedRows(t *testing.T) { + db := newTestSyncWorkerDB(t) + w := &SyncWorker{ + db: db, + cfg: SyncWorkerConfig{BatchSize: 10, MaxRetries: 3}, + jobCh: make(chan syncEnqueueRequest, 1), + enqueuedEpisode: make(map[int64]struct{}), + } + + insertEpisodeForSyncWorkerTest(t, db, 16, "approved", false) + insertSyncLogForSyncWorkerTest(t, db, 16, "pending", 0) + + w.dispatchPendingSyncLogs(context.Background()) + + select { + case got := <-w.jobCh: + if got.episodeID != 16 { + t.Fatalf("unexpected episode id: got %d want 16", got.episodeID) + } + if got.manual { + t.Fatal("unexpected manual mode for recovered pending row") + } + default: + t.Fatal("expected persisted pending row to be dispatched") + } +} + +func TestRetryFailedEpisodes_PromotesDueFailureToPendingBeforeDispatch(t *testing.T) { + db := newTestSyncWorkerDB(t) + w := &SyncWorker{ + db: db, + cfg: SyncWorkerConfig{BatchSize: 10, MaxRetries: 3}, + jobCh: make(chan syncEnqueueRequest, 1), + enqueuedEpisode: make(map[int64]struct{}), + } + + insertEpisodeForSyncWorkerTest(t, db, 17, "approved", false) + insertSyncLogForSyncWorkerTest(t, db, 17, "failed", 1) + + w.retryFailedEpisodes(context.Background()) + + latest := latestSyncLogForSyncWorkerTest(t, db, 17) + if latest.Status != "pending" { + t.Fatalf("latest status = %q, want pending", latest.Status) + } + if latest.AttemptCount != 1 { + t.Fatalf("latest attempt_count = %d, want completed attempt count 1", latest.AttemptCount) + } + select { + case got := <-w.jobCh: + if got.episodeID != 17 { + t.Fatalf("unexpected episode id: got %d want 17", got.episodeID) + } + default: + t.Fatal("expected retryable failure to be dispatched") + } +} + +func TestAcquireSyncLogWithMode_ClaimsFreshPendingRow(t *testing.T) { + db := newTestSyncWorkerDB(t) + w := &SyncWorker{ + db: db, + cfg: SyncWorkerConfig{BatchSize: 10, MaxRetries: 3}, + } + + insertEpisodeForSyncWorkerTest(t, db, 18, "approved", false) + insertSyncLogForSyncWorkerTest(t, db, 18, "pending", 0) + + syncLogID, attemptCount, err := w.acquireSyncLogWithMode(context.Background(), 18, "local/episode.mcap", false) + if err != nil { + t.Fatalf("claim pending sync log failed: %v", err) + } + if syncLogID <= 0 { + t.Fatalf("syncLogID = %d, want positive id", syncLogID) + } + if attemptCount != 1 { + t.Fatalf("attemptCount = %d, want 1", attemptCount) + } + + latest := latestSyncLogForSyncWorkerTest(t, db, 18) + if latest.Status != "in_progress" { + t.Fatalf("latest status = %q, want in_progress", latest.Status) + } + if latest.AttemptCount != 1 { + t.Fatalf("latest attempt_count = %d, want 1", latest.AttemptCount) + } +} + +func TestAcquireSyncLogWithMode_ClaimsRetryPendingRow(t *testing.T) { + db := newTestSyncWorkerDB(t) + w := &SyncWorker{ + db: db, + cfg: SyncWorkerConfig{BatchSize: 10, MaxRetries: 3}, + } + + insertEpisodeForSyncWorkerTest(t, db, 19, "approved", false) + insertSyncLogForSyncWorkerTest(t, db, 19, "pending", 1) + + _, attemptCount, err := w.acquireSyncLogWithMode(context.Background(), 19, "local/episode.mcap", false) + if err != nil { + t.Fatalf("claim retry pending sync log failed: %v", err) + } + if attemptCount != 2 { + t.Fatalf("attemptCount = %d, want retry attempt 2", attemptCount) + } + + latest := latestSyncLogForSyncWorkerTest(t, db, 19) + if latest.Status != "in_progress" { + t.Fatalf("latest status = %q, want in_progress", latest.Status) + } + if latest.AttemptCount != 2 { + t.Fatalf("latest attempt_count = %d, want 2", latest.AttemptCount) + } +} + func TestProcessEnqueuedEpisode_HoldsMarkerUntilProcessingReturns(t *testing.T) { w := &SyncWorker{ enqueuedEpisode: map[int64]struct{}{77: {}}, @@ -323,12 +531,15 @@ func newTestSyncWorkerDB(t *testing.T) *sqlx.DB { created_at TIMESTAMP NOT NULL )`, `CREATE TABLE sync_logs ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - episode_id INTEGER NOT NULL, - status TEXT NOT NULL, - attempt_count INTEGER NOT NULL DEFAULT 0, - next_retry_at TIMESTAMP NULL, - started_at TIMESTAMP NULL, + id INTEGER PRIMARY KEY AUTOINCREMENT, + episode_id INTEGER NOT NULL, + source_path TEXT, + status TEXT NOT NULL, + duration_sec INTEGER, + error_message TEXT, + attempt_count INTEGER NOT NULL DEFAULT 0, + next_retry_at TIMESTAMP NULL, + started_at TIMESTAMP NULL, completed_at TIMESTAMP NULL )`, } @@ -371,6 +582,37 @@ func insertSyncLogForSyncWorkerTest(t *testing.T, db *sqlx.DB, episodeID int64, } } +type syncLogForSyncWorkerTest struct { + Status string `db:"status"` + AttemptCount int `db:"attempt_count"` +} + +func latestSyncLogForSyncWorkerTest(t *testing.T, db *sqlx.DB, episodeID int64) syncLogForSyncWorkerTest { + t.Helper() + + var row syncLogForSyncWorkerTest + if err := db.Get(&row, ` + SELECT status, attempt_count + FROM sync_logs + WHERE episode_id = ? + ORDER BY id DESC + LIMIT 1 + `, episodeID); err != nil { + t.Fatalf("query latest sync log for episode %d: %v", episodeID, err) + } + return row +} + +func countSyncLogsForSyncWorkerTest(t *testing.T, db *sqlx.DB, episodeID int64) int { + t.Helper() + + var count int + if err := db.Get(&count, "SELECT COUNT(*) FROM sync_logs WHERE episode_id = ?", episodeID); err != nil { + t.Fatalf("count sync logs for episode %d: %v", episodeID, err) + } + return count +} + func assertEpisodeIDs(t *testing.T, got, want []int64) { t.Helper()