Add one-time startup task system (PP-3684)#3035
Conversation
|
This is very cool. Great idea. I love the alembic inspired design. |
24301f8 to
d0b142e
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3035 +/- ##
========================================
Coverage 93.04% 93.05%
========================================
Files 480 482 +2
Lines 43716 43869 +153
Branches 6027 6045 +18
========================================
+ Hits 40677 40823 +146
- Misses 1968 1972 +4
- Partials 1071 1074 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
a868599 to
5b9e3bb
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3035 +/- ##
==========================================
+ Coverage 93.16% 93.18% +0.01%
==========================================
Files 487 489 +2
Lines 44787 44943 +156
Branches 6173 6191 +18
==========================================
+ Hits 41726 41880 +154
- Misses 1985 1986 +1
- Partials 1076 1077 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
dbernstein
left a comment
There was a problem hiding this comment.
Looks good. A couple of minor comments to consider as you see fit.
| dispatched_task_id = async_result.id | ||
| _record_task(session, key, state=StartupTaskState.RUN) | ||
| except Exception: | ||
| logger.exception("Failed to execute startup task %r.", key) |
There was a problem hiding this comment.
If this task fails to start (or at least queued in celery), is it correct to say that the next time this routine ran, the tasks that failed to run would still be pending and thus would be run again?
Perhaps we'll need a new cloudwatch alert or some other mechanism to let us know if there is an accumulation pending task failures?
There was a problem hiding this comment.
Thats right. The task the failed to run won't stop the startup, but will output a log message.
This log message will output in migrate.log, which shows up in ansible in the Read migration log task. So if there is a failure we should see it when we do the upgrade.
Your right though, a cloudwatch alert is a good idea when we automate those upgrades.
Introduce a general-purpose registry for one-time startup tasks that are automatically discovered and dispatched to Celery on the first application start after deployment. This is useful for data backfills, re-imports, or reindexing that are too long-running for database migrations. - Add StartupTask SQLAlchemy model and migration for tracking queued tasks - Add auto-discovery system that scans startup_tasks/ for Python files defining a create_signature() callable - Add StartupTaskRunner integrated into InstanceInitializationScript, protected by the existing advisory lock - On fresh database installs, tasks are stamped without dispatching (no existing data to migrate) - Add bin/create_startup_task scaffolding command - Include example task for force-harvesting OPDS for Distributors
- Revert bin/util/initialize_instance to pass config_file instead of repo_root, matching the InstanceInitializationScript constructor - Resolve tasks_dir=None default to STARTUP_TASKS_DIR in run_startup_tasks and stamp_startup_tasks - Fix test_initialization.py tests to match actual API signatures and return value semantics for initialize_database and run_startup_tasks - Remove tests for nonexistent error-handling behavior - Update run_startup_tasks docstring to accurately describe error handling - Record task execution in the same transaction as the task itself to prevent duplicate Celery dispatches on crash - Rename queued_at column to recorded_at - Replace run boolean column with state enum (RUN, MARKED) - Fix StartupTaskCallable type alias to include logger parameter - Fix test task module signatures to match actual 3-arg call convention
Move apply_async() inside the transaction so broker failures roll back the task record, allowing retry on next startup. Add enum type cleanup to the migration downgrade to match project conventions. Add migration tests and a Celery dispatch retry test.
Move the run-vs-stamp branching from initialization.py into startup.py behind a single run_startup_tasks() function with an already_initialized parameter. Trim the module docstring to a README pointer. Add slug length truncation to create_startup_task.
The glob results are already iterated in sorted order, so the dict is built in key order and the final sorted() call was unnecessary.
- Fix template rendering crash on descriptions with braces by using str.replace instead of str.format - Rename ambiguous local variable db_initialized to already_initialized in initialization script - Document that startup tasks block application startup and should dispatch heavy work via Celery - Fix docstring to say "sorted by filename" instead of "sorted by key"
- Replace DatabaseTransactionFixture + Session patching with function_database for realistic engine-based test isolation - Extract RunStartupTasksFixture to share mocked discover_startup_tasks and services across all tests - Merge duplicate failure test into test_run_handles_failure_gracefully - Remove _engine context manager and related imports
Replace str.replace with a Jinja2 template for the startup task scaffolding to avoid ambiguity with brace-style placeholders. Also add an else branch so non-Celery tasks log "Executed" while Celery tasks only log "dispatched", avoiding a redundant double log.
Replace per-task "already executed; skipping" log lines with a single summary count to avoid spamming logs on every startup as tasks accumulate.
Fix grammar, correct the example filename format, remove the code example with an incorrect function signature, and simplify the documentation.
86d1de9 to
5c109eb
Compare
…PP-3684) (#3055) ## Description > Note: #3035 needs to get merged before this can go in. Adds a startup task that dispatches a forced re-harvest of all OPDS for Distributors collections on the next deployment. ## Motivation and Context After streaming media support landed (PR #3015), existing OPDS for Distributors collections need to be re-imported with the new parsing logic to pick up the changes. Resolves PP-3684 ## How Has This Been Tested? The startup task follows the established pattern and delegates to the existing `import_all` Celery task with `force=True`. The `import_all` task is already covered by existing tests. ## Checklist - [x] I have updated the documentation accordingly. - [x] All new and existing tests passed.
Description
Add a one-time startup task system that auto-discovers and executes tasks on the first application start after deployment. This provides a clean mechanism for post-deployment work that doesn't belong in a database migration. The main purpose of this system is to queue celery tasks, but it can also do other necessary work like cache invalidations, etc.
Each task is a Python file in the top-level
startup_tasks/directory that defines arun(services, session, log)callable. Tasks can perform inline work with the database/Redis/search or return a CelerySignatureto dispatch heavy work asynchronously. On each container start, the initialization script discovers all task files, checks thestartup_tasksdatabase table for previously executed keys, and runs any new ones. The process is idempotent — each task runs only once.Motivation and Context
After deploying new code (e.g. adding streaming media support), we sometimes need to run one-time tasks such as force-harvesting all collections. Alembic migrations handle schema changes but aren't suited for queuing long-running async Celery work or performing post-deployment operations that need access to the full services container. This system provides a clean, self-documenting mechanism for these post-deployment tasks.
PP-3684
How Has This Been Tested?
TestDiscoverStartupTasks), execution (TestRunStartupTasks), and scaffolding (TestCreateStartupTask)runattributes, import errors, underscore file skipping, nonexistent directory, executing new tasks, Celery signature dispatch, skipping already-executed tasks, failure handling, idempotency, stamp-only mode for fresh installs, and CLI edge casestest_initialization.pyverify call ordering andalready_initializedforwardingChecklist