feat(scheduler): pluggable storage, status/history APIs, SQLite backend#26
feat(scheduler): pluggable storage, status/history APIs, SQLite backend#26
Conversation
- Update BUF_VERSION to v1.66.0 in .project-settings.env and Makefile - Keeps GO_VERSION and other settings unchanged - Aligns local dev and CI to the same Buf version to prevent tooling drift
…lerStopped - Introduce ErrSchedulerStopped and have Schedule() return it when called after Stop(). - Add a stopped flag with mutex protection; make Stop() idempotent and guard Schedule/Stop paths. - Implement cleanup of completed jobs (cleanupJob); ensure in-flight jobs are cleaned up on Stop(). - Update runJob to use entry + cleanup path for safer execution. - Tests: add coverage for schedule-after-stop, completed-job cleanup, concurrency limits, retrier hooks (TestSchedulerScheduleAfterStopReturnsError, TestSchedulerCompletedJobCleanup, TestSchedulerConcurrencyLimit, TestRetrierHooksOnRetry); refine error assertions/messages. - Docs: refresh README.md and PRD.md to document scheduler/retrier behavior, usage, and backlog. - Build: refine Makefile run/bench targets and help/usage text.
- Introduce NewSchedulerWithError(opts...) returning (*Scheduler, error)
- NewScheduler now delegates to NewSchedulerWithError and logs a warning if
default URL-validator initialization fails
- Add default URL validator flow (newDefaultURLValidator) with
ErrURLValidatorInitialization; WithURLValidator marks validator as configured
(explicit nil skips default init)
- Add scheduler introspection helpers: JobCount() and JobIDs()
- Improve execution tracking with per-attempt state and a snapshot used in
callback payloads (Attempts, StatusCode, ResponseBody)
- Add tests for validator failure/skip paths, introspection, and logging on
callback failure
- Update PRD.md/README.md; adjust cspell.json
- Add JobsStorage interface for active jobs + status/history; default InMemoryJobsStorage. - New options: WithJobsStorage(storage), WithHistoryLimit(n) (default retention: 20). - New read APIs: JobStatus(id), JobStatuses(), JobHistory(id); JobCount/JobIDs now backed by storage. - Persist lifecycle transitions: - Schedule: Save(job) + UpsertStatus(scheduled) with duplicate-ID guard. - Remove/Stop: Delete(job), MarkRemoved/MarkTerminal with best-effort logging on write failures. - Execution: MarkExecutionStart, RecordExecutionResult(payload, limit) with retention trimming. - Implement SQLite storage (modernc.org/sqlite): - NewSQLiteJobsStorage(path) with WAL, busy timeout, durable state across restarts; Close() added. - Logging improvements: - warn vs debug levels (context.Canceled => debug); explicit logs for callback send/request failures. - structured storage write-failure logging via logStorageWriteFailure(...). - Error handling: - Migrate to github.com/hyp3rd/ewrap; introduce ErrStorageOperation; wrap errors with context. - Docs: README/PRD describe storage options, new APIs, and behavior notes. - Tests: - Storage contract tests (CRUD, status transitions, history retention, concurrency). - SQLite persistence (reopen), custom storage tracking/fail-path seams. - Scheduler tests for status/history, remove semantics, and logger coverage. - Deps: add modernc.org/sqlite and related indirects; cspell: add “Upserts”. No breaking API changes expected; NewScheduler remains backward-compatible with in-memory defaults.
- Add JobsStorage interface for active jobs + status/history; default InMemoryJobsStorage. - New options: WithJobsStorage(storage), WithHistoryLimit(n) (default retention: 20). - New read APIs: JobStatus(id), JobStatuses(), JobHistory(id); JobCount/JobIDs now backed by storage. - Persist lifecycle transitions: - Schedule: Save(job) + UpsertStatus(scheduled) with duplicate-ID guard. - Remove/Stop: Delete(job), MarkRemoved/MarkTerminal with best-effort logging on write failures. - Execution: MarkExecutionStart, RecordExecutionResult(payload, limit) with retention trimming. - Implement SQLite storage (modernc.org/sqlite): - NewSQLiteJobsStorage(path) with WAL, busy timeout, durable state across restarts; Close() added. - Logging improvements: - warn vs debug levels (context.Canceled => debug); explicit logs for callback send/request failures. - structured storage write-failure logging via logStorageWriteFailure(...). - Error handling: - Migrate to github.com/hyp3rd/ewrap; introduce ErrStorageOperation; wrap errors with context. - Docs: README/PRD describe storage options, new APIs, and behavior notes. - Tests: - Storage contract tests (CRUD, status transitions, history retention, concurrency). - SQLite persistence (reopen), custom storage tracking/fail-path seams. - Scheduler tests for status/history, remove semantics, and logger coverage. - Deps: add modernc.org/sqlite and related indirects; cspell: add “Upserts”. No breaking API changes expected; NewScheduler remains backward-compatible with in-memory defaults.
There was a problem hiding this comment.
Pull request overview
This PR expands pkg/scheduler with pluggable state storage (in-memory default + SQLite backend) and adds job status/history introspection APIs, along with updated docs, examples, and tests to cover new lifecycle semantics and logging.
Changes:
- Introduce
JobsStorage(in-memory + SQLite) to persist active jobs plus status/history with retention trimming. - Add scheduler read APIs (
JobStatus,JobStatuses,JobHistory, and storage-backedJobCount/JobIDs) and options (WithJobsStorage,WithHistoryLimit). - Add/extend tests, examples, and documentation to cover storage contracts, persistence, lifecycle transitions, and improved logging/error behavior.
Reviewed changes
Copilot reviewed 18 out of 19 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
pkg/scheduler/storage.go |
Defines JobsStorage and implements InMemoryJobsStorage for jobs + status/history retention. |
pkg/scheduler/storage_sqlite.go |
Adds SQLite-backed JobsStorage implementation using modernc.org/sqlite. |
pkg/scheduler/types.go |
Introduces JobState, JobStatus, and JobRun types for status/history APIs. |
pkg/scheduler/scheduler.go |
Wires storage into scheduler lifecycle, adds status/history read APIs, improves logging/error handling, adds stop-guard. |
pkg/scheduler/options.go |
Adds WithJobsStorage and WithHistoryLimit; updates URL validator option to mark explicit configuration. |
pkg/scheduler/errors.go |
Adds storage/stop/validator-init errors and migrates scheduler errors to ewrap. |
pkg/scheduler/storage_contract_test.go |
Adds shared storage contract tests for in-memory and SQLite backends (including concurrency + reopen persistence). |
pkg/scheduler/scheduler_internal_test.go |
Adds internal tests for validator-init behavior and logger level semantics (warn vs debug). |
tests/scheduler_test.go |
Adds higher-level scheduler tests for stop semantics, introspection, status/history, custom storage, and log coverage. |
tests/retrier_test.go |
Minor refactor for error messages + adds retrier hook coverage. |
__examples/scheduler/scheduler.go |
New runnable example demonstrating scheduler + callback + status/history reads. |
__examples/scheduler_sqlite/scheduler_sqlite.go |
New runnable example demonstrating SQLite storage usage. |
README.md |
Major rewrite/expansion documenting retrier + scheduler features, options, and behavior notes. |
PRD.md |
Updated to a “current state audit” + gap backlog format, aligned with implemented functionality. |
go.mod / go.sum |
Adds modernc.org/sqlite and indirects for SQLite backend. |
Makefile / .project-settings.env |
Updates Buf version and fixes bench/run targets to match repo layout. |
cspell.json |
Adds new terms used by lint directives and docs/tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (s *Scheduler) Schedule(job Job) (string, error) { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| if s.stopped { | ||
| return "", ErrSchedulerStopped | ||
| } | ||
|
|
||
| normalized, err := s.normalizeJob(job) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
|
|
||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| if normalized.ID == "" { | ||
| normalized.ID = s.nextID() | ||
| } | ||
|
|
||
| if _, exists := s.jobs[normalized.ID]; exists { | ||
| return "", fmt.Errorf("%w: job id already exists: %s", ErrInvalidJob, normalized.ID) | ||
| return "", ewrap.Wrapf(ErrInvalidJob, "job id already exists: %s", normalized.ID) | ||
| } | ||
|
|
||
| err = s.jobsStorage.Save(normalized) | ||
| if err != nil { | ||
| if errors.Is(err, errJobAlreadyExists) { | ||
| return "", ewrap.Wrapf(ErrInvalidJob, "job id already exists: %s", normalized.ID) | ||
| } | ||
|
|
||
| return "", ewrap.Wrapf(ErrStorageOperation, "job storage save failed: %v", err) | ||
| } | ||
|
|
||
| err = s.jobsStorage.UpsertStatus(normalized.ID, JobStateScheduled) | ||
| if err != nil { | ||
| _ = s.jobsStorage.Delete(normalized.ID) | ||
|
|
||
| return "", ewrap.Wrapf(ErrStorageOperation, "job status upsert failed: %v", err) | ||
| } |
There was a problem hiding this comment.
Schedule() now holds s.mu across job normalization (URL validation, retrier setup) and storage writes (Save/UpsertStatus). Those steps can be relatively slow (especially with SQLite or custom storage) and will block Stop()/Remove()/other Schedule() calls for the duration. Consider narrowing the critical section: e.g., check s.stopped and in-memory duplicate IDs under lock, unlock to normalize/persist, then re-lock to commit (with a final re-check for stop/duplicate) to avoid long lock holds.
| func (s *Scheduler) Stop() { | ||
| s.cancel() | ||
|
|
||
| s.mu.Lock() | ||
| if s.stopped { | ||
| s.mu.Unlock() | ||
| s.wg.Wait() | ||
|
|
||
| return | ||
| } | ||
|
|
||
| s.stopped = true | ||
| s.cancel() | ||
|
|
||
| for _, entry := range s.jobs { | ||
| entry.cancel() | ||
| _ = s.jobsStorage.Delete(entry.job.ID) | ||
|
|
||
| err := s.jobsStorage.MarkTerminal(entry.job.ID, JobStateStopped) | ||
| if err != nil { | ||
| s.logStorageWriteFailure("mark stopped", entry.job.ID, err) | ||
| } | ||
| } |
There was a problem hiding this comment.
Stop() holds s.mu while iterating jobs and performing storage I/O (Delete + MarkTerminal). With a durable backend (SQLite) or a custom storage implementation, those calls can block for non-trivial time and prevent concurrent operations (e.g., Remove, status reads that also take s.mu elsewhere). Consider copying the current job IDs/entries under lock, releasing the lock, then performing storage updates/cancels without holding s.mu.
feat(scheduler): pluggable storage, status/history APIs, SQLite backend
No breaking API changes expected; NewScheduler remains backward-compatible with in-memory defaults